fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Package index contains logic for building Zoekt indexes. NOTE: this package is not considered 16// part of the public API, and it is not recommended to rely on it in external code. 17package index 18 19import ( 20 "crypto/sha1" 21 "flag" 22 "fmt" 23 "log" 24 "net/url" 25 "os" 26 "os/exec" 27 "path" 28 "path/filepath" 29 "reflect" 30 "runtime" 31 "runtime/pprof" 32 "sort" 33 "strconv" 34 "strings" 35 "sync" 36 "time" 37 38 "github.com/bmatcuk/doublestar" 39 "github.com/dustin/go-humanize" 40 "github.com/go-enry/go-enry/v2" 41 "github.com/rs/xid" 42 "golang.org/x/sys/unix" 43 44 "maps" 45 46 "github.com/sourcegraph/zoekt" 47 "github.com/sourcegraph/zoekt/internal/ctags" 48 "github.com/sourcegraph/zoekt/internal/tenant" 49) 50 51var DefaultDir = filepath.Join(os.Getenv("HOME"), ".zoekt") 52 53// Branch describes a single branch version. 54type Branch struct { 55 Name string 56 Version string 57} 58 59// Options sets options for the index building. 60type Options struct { 61 // IndexDir is a directory that holds *.zoekt index files. 62 IndexDir string 63 64 // SizeMax is the maximum file size 65 SizeMax int 66 67 // Parallelism is the maximum number of shards to index in parallel 68 Parallelism int 69 70 // ShardMax sets the maximum corpus size for a single shard 71 ShardMax int 72 73 // TrigramMax sets the maximum number of distinct trigrams per document. 74 TrigramMax int 75 76 // RepositoryDescription holds names and URLs for the repository. 77 RepositoryDescription zoekt.Repository 78 79 // SubRepositories is a path => sub repository map. 80 SubRepositories map[string]*zoekt.Repository 81 82 // DisableCTags disables the generation of ctags metadata. 83 DisableCTags bool 84 85 // CtagsPath is the path to the ctags binary to run, or empty 86 // if a valid binary couldn't be found. 87 CTagsPath string 88 89 // Same as CTagsPath but for scip-ctags 90 ScipCTagsPath string 91 92 // If set, ctags must succeed. 93 CTagsMustSucceed bool 94 95 // LargeFiles is a slice of glob patterns, including ** for any number 96 // of directories, where matching file paths should be indexed 97 // regardless of their size. The full pattern syntax is here: 98 // https://github.com/bmatcuk/doublestar/tree/v1#patterns. 99 LargeFiles []string 100 101 // IsDelta is true if this run contains only the changed documents since the 102 // last run. 103 IsDelta bool 104 105 // changedOrRemovedFiles is a list of file paths that have been changed or removed 106 // since the last indexing job for this repository. These files will be tombstoned 107 // in the older shards for this repository. 108 changedOrRemovedFiles []string 109 110 LanguageMap ctags.LanguageMap 111 112 // ShardMerging is true if builder should respect compound shards. This is a 113 // Sourcegraph specific option. 114 ShardMerging bool 115 116 // HeapProfileTriggerBytes is the heap allocation in bytes that will trigger a memory profile. If 0, no memory profile 117 // will be triggered. Note this trigger looks at total heap allocation (which includes both inuse and garbage objects). 118 // 119 // Profiles will be written to files named `index-memory.prof.n` in the index directory. No more than 10 files are written. 120 // 121 // Note: heap checking is "best effort", and it's possible for the process to OOM without triggering the heap profile. 122 HeapProfileTriggerBytes uint64 123} 124 125// HashOptions contains only the options in Options that upon modification leads to IndexState of IndexStateMismatch during the next index building. 126type HashOptions struct { 127 sizeMax int 128 disableCTags bool 129 ctagsPath string 130 cTagsMustSucceed bool 131 largeFiles []string 132} 133 134func (o *Options) HashOptions() HashOptions { 135 return HashOptions{ 136 sizeMax: o.SizeMax, 137 disableCTags: o.DisableCTags, 138 ctagsPath: o.CTagsPath, 139 cTagsMustSucceed: o.CTagsMustSucceed, 140 largeFiles: o.LargeFiles, 141 } 142} 143 144func (o *Options) GetHash() string { 145 h := o.HashOptions() 146 hasher := sha1.New() 147 148 hasher.Write([]byte(h.ctagsPath)) 149 hasher.Write(fmt.Appendf(nil, "%t", h.cTagsMustSucceed)) 150 hasher.Write(fmt.Appendf(nil, "%d", h.sizeMax)) 151 hasher.Write(fmt.Appendf(nil, "%q", h.largeFiles)) 152 hasher.Write(fmt.Appendf(nil, "%t", h.disableCTags)) 153 154 return fmt.Sprintf("%x", hasher.Sum(nil)) 155} 156 157type largeFilesFlag struct{ *Options } 158 159func (f largeFilesFlag) String() string { 160 // From flag.Value documentation: 161 // 162 // The flag package may call the String method with a zero-valued receiver, 163 // such as a nil pointer. 164 if f.Options == nil { 165 return "" 166 } 167 s := append([]string{""}, f.LargeFiles...) 168 return strings.Join(s, "-large_file ") 169} 170 171func (f largeFilesFlag) Set(value string) error { 172 f.LargeFiles = append(f.LargeFiles, value) 173 return nil 174} 175 176// Flags adds flags for build options to fs. It is the "inverse" of Args. 177func (o *Options) Flags(fs *flag.FlagSet) { 178 x := *o 179 x.SetDefaults() 180 fs.IntVar(&o.SizeMax, "file_limit", x.SizeMax, "maximum file size") 181 fs.IntVar(&o.TrigramMax, "max_trigram_count", x.TrigramMax, "maximum number of trigrams per document") 182 fs.IntVar(&o.ShardMax, "shard_limit", x.ShardMax, "maximum corpus size for a shard") 183 fs.IntVar(&o.Parallelism, "parallelism", x.Parallelism, "maximum number of parallel indexing processes.") 184 fs.StringVar(&o.IndexDir, "index", x.IndexDir, "directory for search indices") 185 fs.BoolVar(&o.CTagsMustSucceed, "require_ctags", x.CTagsMustSucceed, "If set, ctags calls must succeed.") 186 fs.Var(largeFilesFlag{o}, "large_file", "A glob pattern where matching files are to be index regardless of their size. You can add multiple patterns by setting this more than once.") 187 188 // Sourcegraph specific 189 fs.BoolVar(&o.DisableCTags, "disable_ctags", x.DisableCTags, "If set, ctags will not be called.") 190 fs.BoolVar(&o.ShardMerging, "shard_merging", x.ShardMerging, "If set, builder will respect compound shards.") 191} 192 193// Args generates command line arguments for o. It is the "inverse" of Flags. 194func (o *Options) Args() []string { 195 var args []string 196 197 if o.SizeMax != 0 { 198 args = append(args, "-file_limit", strconv.Itoa(o.SizeMax)) 199 } 200 201 if o.TrigramMax != 0 { 202 args = append(args, "-max_trigram_count", strconv.Itoa(o.TrigramMax)) 203 } 204 205 if o.ShardMax != 0 { 206 args = append(args, "-shard_limit", strconv.Itoa(o.ShardMax)) 207 } 208 209 if o.Parallelism != 0 { 210 args = append(args, "-parallelism", strconv.Itoa(o.Parallelism)) 211 } 212 213 if o.IndexDir != "" { 214 args = append(args, "-index", o.IndexDir) 215 } 216 217 if o.CTagsMustSucceed { 218 args = append(args, "-require_ctags") 219 } 220 221 for _, a := range o.LargeFiles { 222 args = append(args, "-large_file", a) 223 } 224 225 // Sourcegraph specific 226 if o.DisableCTags { 227 args = append(args, "-disable_ctags") 228 } 229 230 if o.ShardMerging { 231 args = append(args, "-shard_merging") 232 } 233 234 return args 235} 236 237// Builder manages (parallel) creation of uniformly sized shards. The 238// builder buffers up documents until it collects enough documents and 239// then builds a shard and writes. 240type Builder struct { 241 opts Options 242 throttle chan int 243 244 nextShardNum int 245 todo []*Document 246 docChecker DocChecker 247 size int 248 249 parserBins ctags.ParserBinMap 250 building sync.WaitGroup 251 252 errMu sync.Mutex 253 buildError error 254 255 // temp name => final name for finished shards. We only rename 256 // them once all shards succeed to avoid Frankstein corpuses. 257 finishedShards map[string]string 258 259 // indexTime is set by tests for doing reproducible builds. 260 indexTime time.Time 261 262 // heapProfileMu is used to ensure that only one memory profile is written at a time 263 heapProfileMu sync.Mutex 264 heapProfileNum int 265 266 // a sortable 20 chars long id. 267 id string 268 269 finishCalled bool 270} 271 272type finishedShard struct { 273 temp, final string 274} 275 276func checkCTags() string { 277 if ctags := os.Getenv("CTAGS_COMMAND"); ctags != "" { 278 return ctags 279 } 280 281 if ctags, err := exec.LookPath("universal-ctags"); err == nil { 282 return ctags 283 } 284 285 return "" 286} 287 288func checkScipCTags() string { 289 if ctags := os.Getenv("SCIP_CTAGS_COMMAND"); ctags != "" { 290 return ctags 291 } 292 293 if ctags, err := exec.LookPath("scip-ctags"); err == nil { 294 return ctags 295 } 296 297 return "" 298} 299 300// SetDefaults sets reasonable default options. 301func (o *Options) SetDefaults() { 302 if o.CTagsPath == "" && !o.DisableCTags { 303 o.CTagsPath = checkCTags() 304 } 305 306 if o.ScipCTagsPath == "" && !o.DisableCTags { 307 o.ScipCTagsPath = checkScipCTags() 308 } 309 310 if o.Parallelism == 0 { 311 o.Parallelism = 4 312 } 313 if o.SizeMax == 0 { 314 o.SizeMax = 2 << 20 315 } 316 if o.ShardMax == 0 { 317 o.ShardMax = 100 << 20 318 } 319 if o.TrigramMax == 0 { 320 o.TrigramMax = 20000 321 } 322 323 if o.RepositoryDescription.Name == "" && o.RepositoryDescription.URL != "" { 324 parsed, _ := url.Parse(o.RepositoryDescription.URL) 325 if parsed != nil { 326 o.RepositoryDescription.Name = filepath.Join(parsed.Host, parsed.Path) 327 } 328 } 329} 330 331// ShardName returns the name the given index shard. 332func (o *Options) shardName(n int) string { 333 return o.shardNameVersion(IndexFormatVersion, n) 334} 335 336func (o *Options) shardNameVersion(version, n int) string { 337 var prefix string 338 339 // Sourcegraph specific: We use IDs in shard names on multi-tenant 340 // instances to prevent conflicts. 341 if tenant.UseIDBasedShardNames() { 342 prefix = fmt.Sprintf("%09d_%09d", o.RepositoryDescription.TenantID, o.RepositoryDescription.ID) 343 } else { 344 prefix = o.RepositoryDescription.Name 345 } 346 347 return shardName(o.IndexDir, prefix, version, n) 348} 349 350type IndexState string 351 352const ( 353 IndexStateMissing IndexState = "missing" 354 IndexStateCorrupt IndexState = "corrupt" 355 IndexStateVersion IndexState = "version-mismatch" 356 IndexStateOption IndexState = "option-mismatch" 357 IndexStateMeta IndexState = "meta-mismatch" 358 IndexStateContent IndexState = "content-mismatch" 359 IndexStateEqual IndexState = "equal" 360) 361 362var readVersions = []struct { 363 IndexFormatVersion int 364 FeatureVersion int 365}{{ 366 IndexFormatVersion: IndexFormatVersion, 367 FeatureVersion: FeatureVersion, 368}, { 369 IndexFormatVersion: NextIndexFormatVersion, 370 FeatureVersion: FeatureVersion, 371}} 372 373// IncrementalSkipIndexing returns true if the index present on disk matches 374// the build options. 375func (o *Options) IncrementalSkipIndexing() bool { 376 state, _ := o.IndexState() 377 return state == IndexStateEqual 378} 379 380// IndexState checks how the index present on disk compares to the build 381// options and returns the IndexState and the name of the first shard. 382func (o *Options) IndexState() (IndexState, string) { 383 // Open the latest version we support that is on disk. 384 fn := o.findShard() 385 if fn == "" { 386 return IndexStateMissing, fn 387 } 388 389 repos, index, err := ReadMetadataPathAlive(fn) 390 if os.IsNotExist(err) { 391 return IndexStateMissing, fn 392 } else if err != nil { 393 return IndexStateCorrupt, fn 394 } 395 396 for _, v := range readVersions { 397 if v.IndexFormatVersion == index.IndexFormatVersion && v.FeatureVersion != index.IndexFeatureVersion { 398 return IndexStateVersion, fn 399 } 400 } 401 402 var repo *zoekt.Repository 403 for _, cand := range repos { 404 if cand.Name == o.RepositoryDescription.Name { 405 repo = cand 406 break 407 } 408 } 409 410 if repo == nil { 411 return IndexStateCorrupt, fn 412 } 413 414 if repo.IndexOptions != o.GetHash() { 415 return IndexStateOption, fn 416 } 417 418 if !reflect.DeepEqual(repo.Branches, o.RepositoryDescription.Branches) { 419 return IndexStateContent, fn 420 } 421 422 // We can mutate repo since it lives in the scope of this function call. 423 if updated, err := repo.MergeMutable(&o.RepositoryDescription); err != nil { 424 // non-nil err means we are trying to update an immutable field => 425 // reindex content. 426 log.Printf("warn: immutable field changed, requires re-index: %s", err) 427 return IndexStateContent, fn 428 } else if updated { 429 return IndexStateMeta, fn 430 } 431 432 return IndexStateEqual, fn 433} 434 435// FindRepositoryMetadata returns the index metadata for the repository 436// specified in the options. 'ok' is false if the repository's metadata 437// couldn't be found or if an error occurred. 438func (o *Options) FindRepositoryMetadata() (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 439 shard := o.findShard() 440 if shard == "" { 441 return nil, nil, false, nil 442 } 443 444 repositories, metadata, err := ReadMetadataPathAlive(shard) 445 if err != nil { 446 return nil, nil, false, fmt.Errorf("reading metadata for shard %q: %w", shard, err) 447 } 448 449 ID := o.RepositoryDescription.ID 450 for _, r := range repositories { 451 // compound shards contain multiple repositories, so we 452 // have to pick only the one we're looking for 453 if r.ID == ID { 454 return r, metadata, true, nil 455 } 456 } 457 458 // If we're here, then we're somehow in a state where we found a matching 459 // shard that's missing the repository metadata we're looking for. This 460 // should never happen. 461 name := o.RepositoryDescription.Name 462 return nil, nil, false, fmt.Errorf("matching shard %q doesn't contain metadata for repo id %d (%q)", shard, ID, name) 463} 464 465func (o *Options) findShard() string { 466 for _, v := range readVersions { 467 fn := o.shardNameVersion(v.IndexFormatVersion, 0) 468 if _, err := os.Stat(fn); err == nil { 469 return fn 470 } 471 } 472 473 // Brute force finding the shard in compound shards. We should only hit this 474 // code path for repositories that don't exist yet or are in compound shards. 475 return o.findCompoundShard() 476} 477 478func (o *Options) findCompoundShard() string { 479 compoundShards, err := filepath.Glob(path.Join(o.IndexDir, "compound-*.zoekt")) 480 if err != nil { 481 return "" 482 } 483 for _, fn := range compoundShards { 484 if containsRepo(fn, o.RepositoryDescription.ID) { 485 return fn 486 } 487 } 488 489 return "" 490} 491 492func (o *Options) FindAllShards() []string { 493 for _, v := range readVersions { 494 fn := o.shardNameVersion(v.IndexFormatVersion, 0) 495 if _, err := os.Stat(fn); err == nil { 496 shards := []string{fn} 497 for i := 1; ; i++ { 498 fn := o.shardNameVersion(v.IndexFormatVersion, i) 499 if _, err := os.Stat(fn); err != nil { 500 return shards 501 } 502 shards = append(shards, fn) 503 } 504 } 505 } 506 507 // lazily fallback to findShard which will look for a compound shard. 508 if fn := o.findShard(); fn != "" { 509 return []string{fn} 510 } 511 512 return nil 513} 514 515// IgnoreSizeMax determines whether the max size should be ignored. 516func (o *Options) IgnoreSizeMax(name string) bool { 517 // A pattern match will override preceding pattern matches. 518 for i := len(o.LargeFiles) - 1; i >= 0; i-- { 519 pattern := strings.TrimSpace(o.LargeFiles[i]) 520 negated, validatedPattern := checkIsNegatePattern(pattern) 521 522 if m, _ := doublestar.PathMatch(validatedPattern, name); m { 523 if negated { 524 return false 525 } else { 526 return true 527 } 528 } 529 } 530 531 return false 532} 533 534func checkIsNegatePattern(pattern string) (bool, string) { 535 negate := "!" 536 537 // if negated then strip prefix meta character which identifies negated filter pattern 538 if strings.HasPrefix(pattern, negate) { 539 return true, pattern[len(negate):] 540 } 541 542 return false, pattern 543} 544 545// NewBuilder creates a new Builder instance. 546func NewBuilder(opts Options) (*Builder, error) { 547 opts.SetDefaults() 548 if opts.RepositoryDescription.Name == "" { 549 return nil, fmt.Errorf("builder: must set Name") 550 } 551 552 b := &Builder{ 553 opts: opts, 554 throttle: make(chan int, opts.Parallelism), 555 finishedShards: map[string]string{}, 556 } 557 558 parserBins, err := ctags.NewParserBinMap( 559 b.opts.CTagsPath, 560 b.opts.ScipCTagsPath, 561 opts.LanguageMap, 562 b.opts.CTagsMustSucceed, 563 ) 564 if err != nil { 565 return nil, err 566 } 567 568 b.parserBins = parserBins 569 570 if opts.IsDelta { 571 // Delta shards build on top of previously existing shards. 572 // As a consequence, the shardNum for delta shards starts from 573 // the number following the most recently generated shard - not 0. 574 // 575 // Using this numbering scheme allows all the shards to be 576 // discovered as a set. 577 shards := b.opts.FindAllShards() 578 b.nextShardNum = len(shards) // shards are zero indexed, so len() provides the next number after the last one 579 } 580 581 if _, err := b.newShardBuilder(); err != nil { 582 return nil, err 583 } 584 585 now := time.Now() 586 b.indexTime = now 587 b.id = xid.NewWithTime(now).String() 588 589 return b, nil 590} 591 592// AddFile is a convenience wrapper for the Add method 593func (b *Builder) AddFile(name string, content []byte) error { 594 return b.Add(Document{Name: name, Content: content}) 595} 596 597func (b *Builder) Add(doc Document) error { 598 if b.finishCalled { 599 return nil 600 } 601 602 allowLargeFile := b.opts.IgnoreSizeMax(doc.Name) 603 if len(doc.Content) > b.opts.SizeMax && !allowLargeFile { 604 // We could pass the document on to the shardbuilder, but if 605 // we pass through a part of the source tree with binary/large 606 // files, the corresponding shard would be mostly empty, so 607 // insert a reason here too. 608 doc.SkipReason = SkipReasonTooLarge 609 } else if skip := b.docChecker.Check(doc.Content, b.opts.TrigramMax, allowLargeFile); skip != SkipReasonNone { 610 doc.SkipReason = skip 611 } 612 613 b.todo = append(b.todo, &doc) 614 615 if doc.SkipReason == SkipReasonNone { 616 b.size += len(doc.Name) + len(doc.Content) 617 } else { 618 b.size += len(doc.Name) 619 // Drop the content if we are skipping the document. Skipped content is not counted towards the 620 // shard size limit, so otherwise we might buffer too much data in memory before flushing. 621 doc.Content = nil 622 } 623 624 if b.size > b.opts.ShardMax { 625 return b.flush() 626 } 627 628 return nil 629} 630 631// MarkFileAsChangedOrRemoved indicates that the file specified by the given path 632// has been changed or removed since the last indexing job for this repository. 633// 634// If this build is a delta build, these files will be tombstoned in the older shards for this repository. 635func (b *Builder) MarkFileAsChangedOrRemoved(path string) { 636 b.opts.changedOrRemovedFiles = append(b.opts.changedOrRemovedFiles, path) 637} 638 639// Finish creates a last shard from the buffered documents, and clears 640// stale shards from previous runs. This should always be called, also 641// in failure cases, to ensure cleanup. 642// 643// It is safe to call Finish() multiple times. 644func (b *Builder) Finish() error { 645 if b.finishCalled { 646 return b.buildError 647 } 648 649 b.finishCalled = true 650 651 b.flush() 652 b.building.Wait() 653 654 if b.buildError != nil { 655 for tmp := range b.finishedShards { 656 log.Printf("Builder.Finish %s", tmp) 657 os.Remove(tmp) 658 } 659 b.finishedShards = map[string]string{} 660 return b.buildError 661 } 662 663 // map of temporary -> final names for all updated shards + shard metadata files 664 artifactPaths := make(map[string]string) 665 maps.Copy(artifactPaths, b.finishedShards) 666 667 oldShards := b.opts.FindAllShards() 668 669 if b.opts.IsDelta { 670 // Delta shard builds need to update FileTombstone and branch commit information for all 671 // existing shards 672 for _, shard := range oldShards { 673 repositories, _, err := ReadMetadataPathAlive(shard) 674 if err != nil { 675 return fmt.Errorf("reading metadata from shard %q: %w", shard, err) 676 } 677 678 if len(repositories) > 1 { 679 return fmt.Errorf("delta shard builds don't support repositories contained in compound shards (shard %q)", shard) 680 } 681 682 if len(repositories) == 0 { 683 return fmt.Errorf("failed to update repository metadata for shard %q - shard contains no repositories", shard) 684 } 685 686 repository := repositories[0] 687 if repository.ID != b.opts.RepositoryDescription.ID { 688 return fmt.Errorf("shard %q doesn't contain repository ID %d (%q)", shard, b.opts.RepositoryDescription.ID, b.opts.RepositoryDescription.Name) 689 } 690 691 if len(b.opts.changedOrRemovedFiles) > 0 && repository.FileTombstones == nil { 692 repository.FileTombstones = make(map[string]struct{}, len(b.opts.changedOrRemovedFiles)) 693 } 694 695 for _, f := range b.opts.changedOrRemovedFiles { 696 repository.FileTombstones[f] = struct{}{} 697 } 698 699 if !BranchNamesEqual(repository.Branches, b.opts.RepositoryDescription.Branches) { 700 return deltaBranchSetError{ 701 shardName: shard, 702 old: repository.Branches, 703 new: b.opts.RepositoryDescription.Branches, 704 } 705 } 706 707 if b.opts.GetHash() != repository.IndexOptions { 708 return &deltaIndexOptionsMismatchError{ 709 shardName: shard, 710 newOptions: b.opts.HashOptions(), 711 } 712 } 713 714 repository.Branches = b.opts.RepositoryDescription.Branches 715 716 repository.LatestCommitDate = b.opts.RepositoryDescription.LatestCommitDate 717 718 repository.Metadata = b.opts.RepositoryDescription.Metadata 719 720 tempPath, finalPath, err := JsonMarshalRepoMetaTemp(shard, repository) 721 if err != nil { 722 return fmt.Errorf("writing repository metadta for shard %q: %w", shard, err) 723 } 724 725 artifactPaths[tempPath] = finalPath 726 } 727 } 728 729 // We mark finished shards as empty when we successfully finish. Return now 730 // to allow call sites to call Finish idempotently. 731 if len(artifactPaths) == 0 { 732 return b.buildError 733 } 734 735 // Collect a map of the old shards on disk. For each new shard we replace we 736 // delete it from toDelete. Anything remaining in toDelete will be removed 737 // after we have renamed everything into place. 738 739 var toDelete map[string]struct{} 740 if !b.opts.IsDelta { 741 // Non-delta shard builds delete all existing shards before they write out 742 // new ones. 743 // By contrast, delta shard builds work by stacking changes on top of existing shards. 744 // So, we skip populating the toDelete map if we're building delta shards. 745 746 toDelete = make(map[string]struct{}) 747 for _, name := range oldShards { 748 paths, err := IndexFilePaths(name) 749 if err != nil { 750 b.buildError = fmt.Errorf("failed to find old paths for %s: %w", name, err) 751 } 752 for _, p := range paths { 753 toDelete[p] = struct{}{} 754 } 755 } 756 } 757 758 for tmp, final := range artifactPaths { 759 if err := os.Rename(tmp, final); err != nil { 760 b.buildError = err 761 continue 762 } 763 764 delete(toDelete, final) 765 } 766 767 b.finishedShards = map[string]string{} 768 769 for p := range toDelete { 770 // Don't delete compound shards, set tombstones instead. 771 if b.opts.ShardMerging && strings.HasPrefix(filepath.Base(p), "compound-") { 772 if !strings.HasSuffix(p, ".zoekt") { 773 continue 774 } 775 err := SetTombstone(p, b.opts.RepositoryDescription.ID) 776 b.buildError = err 777 continue 778 } 779 log.Printf("removing old shard file: %s", p) 780 if err := os.Remove(p); err != nil { 781 b.buildError = err 782 } 783 } 784 785 return b.buildError 786} 787 788// BranchNamesEqual compares the given zoekt.RepositoryBranch slices, and returns true 789// iff both slices specify the same set of branch names in the same order. 790func BranchNamesEqual(a, b []zoekt.RepositoryBranch) bool { 791 if len(a) != len(b) { 792 return false 793 } 794 795 for i := range a { 796 x, y := a[i], b[i] 797 if x.Name != y.Name { 798 return false 799 } 800 } 801 802 return true 803} 804 805func (b *Builder) flush() error { 806 todo := b.todo 807 b.todo = nil 808 b.size = 0 809 b.errMu.Lock() 810 defer b.errMu.Unlock() 811 if b.buildError != nil { 812 return b.buildError 813 } 814 815 hasShard := b.nextShardNum > 0 816 if len(todo) == 0 && hasShard { 817 return nil 818 } 819 820 shard := b.nextShardNum 821 b.nextShardNum++ 822 823 if b.opts.Parallelism > 1 { 824 b.building.Add(1) 825 b.throttle <- 1 826 go func() { 827 done, err := b.buildShard(todo, shard) 828 <-b.throttle 829 830 b.errMu.Lock() 831 defer b.errMu.Unlock() 832 if err != nil && b.buildError == nil { 833 b.buildError = err 834 } 835 if err == nil { 836 b.finishedShards[done.temp] = done.final 837 } 838 b.building.Done() 839 }() 840 } else { 841 // No goroutines when we're not parallel. This 842 // simplifies memory profiling. 843 done, err := b.buildShard(todo, shard) 844 b.buildError = err 845 if err == nil { 846 b.finishedShards[done.temp] = done.final 847 } 848 849 return b.buildError 850 } 851 852 return nil 853} 854 855// map [0,inf) to [0,1) monotonically 856func squashRange(j int) float64 { 857 x := float64(j) 858 return x / (1 + x) 859} 860 861type rankedDoc struct { 862 *Document 863 rank []float64 864} 865 866// rank returns a vector of scores which is used at index-time to sort documents 867// before writing them to disk. The order of documents in the shard is important 868// at query time, because earlier documents receive a boost at query time and 869// have a higher chance of being searched before limits kick in. 870func rank(d *Document, origIdx int) []float64 { 871 skipped := 0.0 872 if d.SkipReason != SkipReasonNone { 873 skipped = 1.0 874 } 875 876 generated := 0.0 877 if enry.IsGenerated(d.Name, d.Content) { 878 generated = 1.0 879 } 880 881 vendor := 0.0 882 if enry.IsVendor(d.Name) { 883 vendor = 1.0 884 } 885 886 test := 0.0 887 if enry.IsTest(d.Name) { 888 test = 1.0 889 } 890 891 // Smaller is earlier (=better). 892 return []float64{ 893 // Always place skipped docs last 894 skipped, 895 896 // Prefer docs that are not generated 897 generated, 898 899 // Prefer docs that are not vendored 900 vendor, 901 902 // Prefer docs that are not tests 903 test, 904 905 // With short names 906 squashRange(len(d.Name)), 907 908 // With many symbols 909 1.0 - squashRange(len(d.Symbols)), 910 911 // With short content 912 squashRange(len(d.Content)), 913 914 // That is present is as many branches as possible 915 1.0 - squashRange(len(d.Branches)), 916 917 // Preserve original ordering. 918 squashRange(origIdx), 919 } 920} 921 922func sortDocuments(todo []*Document) { 923 rs := make([]rankedDoc, 0, len(todo)) 924 for i, t := range todo { 925 rd := rankedDoc{t, rank(t, i)} 926 rs = append(rs, rd) 927 } 928 sort.Slice(rs, func(i, j int) bool { 929 r1 := rs[i].rank 930 r2 := rs[j].rank 931 for i := range r1 { 932 if r1[i] < r2[i] { 933 return true 934 } 935 if r1[i] > r2[i] { 936 return false 937 } 938 } 939 940 return false 941 }) 942 for i := range todo { 943 todo[i] = rs[i].Document 944 } 945} 946 947func (b *Builder) buildShard(todo []*Document, nextShardNum int) (*finishedShard, error) { 948 if !b.opts.DisableCTags && (b.opts.CTagsPath != "" || b.opts.ScipCTagsPath != "") { 949 err := parseSymbols(todo, b.opts.LanguageMap, b.parserBins) 950 if b.opts.CTagsMustSucceed && err != nil { 951 return nil, err 952 } 953 if err != nil { 954 log.Printf("ignoring universal:%s or scip:%s error: %v", b.opts.CTagsPath, b.opts.ScipCTagsPath, err) 955 } 956 } 957 958 name := b.opts.shardName(nextShardNum) 959 960 shardBuilder, err := b.newShardBuilder() 961 if err != nil { 962 return nil, err 963 } 964 965 sortDocuments(todo) 966 967 for idx, t := range todo { 968 if err := shardBuilder.Add(*t); err != nil { 969 return nil, err 970 } 971 972 if idx%10_000 == 0 { 973 b.CheckMemoryUsage() 974 } 975 } 976 977 return b.writeShard(name, shardBuilder) 978} 979 980// CheckMemoryUsage checks the memory usage of the process and writes a memory profile if the heap usage exceeds the 981// configured threshold. NOTE: this method is expensive and should only be used for debugging. 982func (b *Builder) CheckMemoryUsage() { 983 // Don't check memory if heap profiling is disabled, or we've already written 10 profiles 984 if b.opts.HeapProfileTriggerBytes <= 0 || b.heapProfileNum >= 10 { 985 return 986 } 987 988 var m runtime.MemStats 989 runtime.ReadMemStats(&m) 990 991 if m.HeapAlloc > b.opts.HeapProfileTriggerBytes && b.heapProfileMu.TryLock() { 992 defer b.heapProfileMu.Unlock() 993 994 log.Printf("writing memory profile, allocated heap: %s", humanize.Bytes(m.HeapAlloc)) 995 name := filepath.Join(b.opts.IndexDir, fmt.Sprintf("indexmemory.prof.%d", b.heapProfileNum)) 996 f, err := os.Create(name) 997 if err != nil { 998 log.Printf("failed to create memory profile file: %v", err) 999 return 1000 } 1001 1002 err = pprof.WriteHeapProfile(f) 1003 if err != nil { 1004 log.Printf("failed to write memory profile: %v", err) 1005 } 1006 1007 b.heapProfileNum++ 1008 } 1009} 1010 1011func (b *Builder) newShardBuilder() (*ShardBuilder, error) { 1012 desc := b.opts.RepositoryDescription 1013 desc.HasSymbols = !b.opts.DisableCTags && b.opts.CTagsPath != "" 1014 desc.SubRepoMap = b.opts.SubRepositories 1015 desc.IndexOptions = b.opts.GetHash() 1016 1017 shardBuilder, err := NewShardBuilder(&desc) 1018 if err != nil { 1019 return nil, err 1020 } 1021 shardBuilder.IndexTime = b.indexTime 1022 shardBuilder.ID = b.id 1023 return shardBuilder, nil 1024} 1025 1026func (b *Builder) writeShard(fn string, ib *ShardBuilder) (*finishedShard, error) { 1027 dir := filepath.Dir(fn) 1028 if err := os.MkdirAll(dir, 0o700); err != nil { 1029 return nil, err 1030 } 1031 1032 f, err := os.CreateTemp(dir, filepath.Base(fn)+".*.tmp") 1033 if err != nil { 1034 return nil, err 1035 } 1036 if runtime.GOOS != "windows" { 1037 if err := f.Chmod(0o666 &^ umask); err != nil { 1038 return nil, err 1039 } 1040 } 1041 1042 defer f.Close() 1043 if err := ib.Write(f); err != nil { 1044 return nil, err 1045 } 1046 fi, err := f.Stat() 1047 if err != nil { 1048 return nil, err 1049 } 1050 if err := f.Close(); err != nil { 1051 return nil, err 1052 } 1053 1054 log.Printf("finished shard %s: %d index bytes (overhead %3.1f), %d files processed \n", 1055 fn, 1056 fi.Size(), 1057 float64(fi.Size())/float64(ib.ContentSize()+1), 1058 ib.NumFiles()) 1059 1060 return &finishedShard{f.Name(), fn}, nil 1061} 1062 1063type deltaBranchSetError struct { 1064 shardName string 1065 old, new []zoekt.RepositoryBranch 1066} 1067 1068func (e deltaBranchSetError) Error() string { 1069 return fmt.Sprintf("repository metadata in shard %q contains a different set of branch names than what was requested, which is unsupported in a delta shard build. old: %+v, new: %+v", e.shardName, e.old, e.new) 1070} 1071 1072type deltaIndexOptionsMismatchError struct { 1073 shardName string 1074 newOptions HashOptions 1075} 1076 1077func (e *deltaIndexOptionsMismatchError) Error() string { 1078 return fmt.Sprintf("one or more index options for shard %q do not match Builder's index options. These index option updates are incompatible with delta build. New index options: %+v", e.shardName, e.newOptions) 1079} 1080 1081// umask holds the Umask of the current process 1082var umask os.FileMode 1083 1084func init() { 1085 umask = os.FileMode(unix.Umask(0)) 1086 unix.Umask(int(umask)) 1087}