fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Package index contains logic for building Zoekt indexes. NOTE: this package is not considered 16// part of the public API, and it is not recommended to rely on it in external code. 17package index 18 19import ( 20 "crypto/sha1" 21 "flag" 22 "fmt" 23 "log" 24 "net/url" 25 "os" 26 "os/exec" 27 "path" 28 "path/filepath" 29 "reflect" 30 "runtime" 31 "runtime/pprof" 32 "sort" 33 "strconv" 34 "strings" 35 "sync" 36 "time" 37 38 "github.com/bmatcuk/doublestar" 39 "github.com/dustin/go-humanize" 40 "github.com/go-enry/go-enry/v2" 41 "github.com/rs/xid" 42 "golang.org/x/sys/unix" 43 44 "maps" 45 46 "github.com/sourcegraph/zoekt" 47 "github.com/sourcegraph/zoekt/internal/ctags" 48 "github.com/sourcegraph/zoekt/internal/tenant" 49) 50 51var DefaultDir = filepath.Join(os.Getenv("HOME"), ".zoekt") 52 53// Branch describes a single branch version. 54type Branch struct { 55 Name string 56 Version string 57} 58 59// Options sets options for the index building. 60type Options struct { 61 // IndexDir is a directory that holds *.zoekt index files. 62 IndexDir string 63 64 // ShardPrefixOverride sets the prefix for shards name 65 ShardPrefixOverride string 66 67 // SizeMax is the maximum file size 68 SizeMax int 69 70 // Parallelism is the maximum number of shards to index in parallel 71 Parallelism int 72 73 // ShardMax sets the maximum corpus size for a single shard 74 ShardMax int 75 76 // TrigramMax sets the maximum number of distinct trigrams per document. 77 TrigramMax int 78 79 // RepositoryDescription holds names and URLs for the repository. 80 RepositoryDescription zoekt.Repository 81 82 // SubRepositories is a path => sub repository map. 83 SubRepositories map[string]*zoekt.Repository 84 85 // DisableCTags disables the generation of ctags metadata. 86 DisableCTags bool 87 88 // CtagsPath is the path to the ctags binary to run, or empty 89 // if a valid binary couldn't be found. 90 CTagsPath string 91 92 // Same as CTagsPath but for scip-ctags 93 ScipCTagsPath string 94 95 // If set, ctags must succeed. 96 CTagsMustSucceed bool 97 98 // LargeFiles is a slice of glob patterns, including ** for any number 99 // of directories, where matching file paths should be indexed 100 // regardless of their size. The full pattern syntax is here: 101 // https://github.com/bmatcuk/doublestar/tree/v1#patterns. 102 LargeFiles []string 103 104 // IsDelta is true if this run contains only the changed documents since the 105 // last run. 106 IsDelta bool 107 108 // changedOrRemovedFiles is a list of file paths that have been changed or removed 109 // since the last indexing job for this repository. These files will be tombstoned 110 // in the older shards for this repository. 111 changedOrRemovedFiles []string 112 113 LanguageMap ctags.LanguageMap 114 115 // ShardMerging is true if builder should respect compound shards. This is a 116 // Sourcegraph specific option. 117 ShardMerging bool 118 119 // HeapProfileTriggerBytes is the heap allocation in bytes that will trigger a memory profile. If 0, no memory profile 120 // will be triggered. Note this trigger looks at total heap allocation (which includes both inuse and garbage objects). 121 // 122 // Profiles will be written to files named `index-memory.prof.n` in the index directory. No more than 10 files are written. 123 // 124 // Note: heap checking is "best effort", and it's possible for the process to OOM without triggering the heap profile. 125 HeapProfileTriggerBytes uint64 126} 127 128// HashOptions contains only the options in Options that upon modification leads to IndexState of IndexStateMismatch during the next index building. 129type HashOptions struct { 130 sizeMax int 131 disableCTags bool 132 ctagsPath string 133 cTagsMustSucceed bool 134 largeFiles []string 135} 136 137func (o *Options) HashOptions() HashOptions { 138 return HashOptions{ 139 sizeMax: o.SizeMax, 140 disableCTags: o.DisableCTags, 141 ctagsPath: o.CTagsPath, 142 cTagsMustSucceed: o.CTagsMustSucceed, 143 largeFiles: o.LargeFiles, 144 } 145} 146 147func (o *Options) GetHash() string { 148 h := o.HashOptions() 149 hasher := sha1.New() 150 151 hasher.Write([]byte(h.ctagsPath)) 152 hasher.Write(fmt.Appendf(nil, "%t", h.cTagsMustSucceed)) 153 hasher.Write(fmt.Appendf(nil, "%d", h.sizeMax)) 154 hasher.Write(fmt.Appendf(nil, "%q", h.largeFiles)) 155 hasher.Write(fmt.Appendf(nil, "%t", h.disableCTags)) 156 157 return fmt.Sprintf("%x", hasher.Sum(nil)) 158} 159 160type largeFilesFlag struct{ *Options } 161 162func (f largeFilesFlag) String() string { 163 // From flag.Value documentation: 164 // 165 // The flag package may call the String method with a zero-valued receiver, 166 // such as a nil pointer. 167 if f.Options == nil { 168 return "" 169 } 170 s := append([]string{""}, f.LargeFiles...) 171 return strings.Join(s, "-large_file ") 172} 173 174func (f largeFilesFlag) Set(value string) error { 175 f.LargeFiles = append(f.LargeFiles, value) 176 return nil 177} 178 179// Flags adds flags for build options to fs. It is the "inverse" of Args. 180func (o *Options) Flags(fs *flag.FlagSet) { 181 x := *o 182 x.SetDefaults() 183 fs.IntVar(&o.SizeMax, "file_limit", x.SizeMax, "maximum file size") 184 fs.IntVar(&o.TrigramMax, "max_trigram_count", x.TrigramMax, "maximum number of trigrams per document") 185 fs.IntVar(&o.ShardMax, "shard_limit", x.ShardMax, "maximum corpus size for a shard") 186 fs.IntVar(&o.Parallelism, "parallelism", x.Parallelism, "maximum number of parallel indexing processes.") 187 fs.StringVar(&o.IndexDir, "index", x.IndexDir, "directory for search indices") 188 fs.StringVar(&o.ShardPrefixOverride, "shard_prefix_override", x.ShardPrefixOverride, "prefix for shard name") 189 fs.BoolVar(&o.CTagsMustSucceed, "require_ctags", x.CTagsMustSucceed, "If set, ctags calls must succeed.") 190 fs.Var(largeFilesFlag{o}, "large_file", "A glob pattern where matching files are to be index regardless of their size. You can add multiple patterns by setting this more than once.") 191 192 // Sourcegraph specific 193 fs.BoolVar(&o.DisableCTags, "disable_ctags", x.DisableCTags, "If set, ctags will not be called.") 194 fs.BoolVar(&o.ShardMerging, "shard_merging", x.ShardMerging, "If set, builder will respect compound shards.") 195} 196 197// Args generates command line arguments for o. It is the "inverse" of Flags. 198func (o *Options) Args() []string { 199 var args []string 200 201 if o.SizeMax != 0 { 202 args = append(args, "-file_limit", strconv.Itoa(o.SizeMax)) 203 } 204 205 if o.TrigramMax != 0 { 206 args = append(args, "-max_trigram_count", strconv.Itoa(o.TrigramMax)) 207 } 208 209 if o.ShardMax != 0 { 210 args = append(args, "-shard_limit", strconv.Itoa(o.ShardMax)) 211 } 212 213 if o.Parallelism != 0 { 214 args = append(args, "-parallelism", strconv.Itoa(o.Parallelism)) 215 } 216 217 if o.IndexDir != "" { 218 args = append(args, "-index", o.IndexDir) 219 } 220 221 if o.ShardPrefixOverride != "" { 222 args = append(args, "-shard_prefix_override", o.ShardPrefixOverride) 223 } 224 225 if o.CTagsMustSucceed { 226 args = append(args, "-require_ctags") 227 } 228 229 for _, a := range o.LargeFiles { 230 args = append(args, "-large_file", a) 231 } 232 233 // Sourcegraph specific 234 if o.DisableCTags { 235 args = append(args, "-disable_ctags") 236 } 237 238 if o.ShardMerging { 239 args = append(args, "-shard_merging") 240 } 241 242 return args 243} 244 245// Builder manages (parallel) creation of uniformly sized shards. The 246// builder buffers up documents until it collects enough documents and 247// then builds a shard and writes. 248type Builder struct { 249 opts Options 250 throttle chan int 251 252 nextShardNum int 253 todo []*Document 254 docChecker DocChecker 255 size int 256 257 parserBins ctags.ParserBinMap 258 building sync.WaitGroup 259 260 errMu sync.Mutex 261 buildError error 262 263 // temp name => final name for finished shards. We only rename 264 // them once all shards succeed to avoid Frankstein corpuses. 265 finishedShards map[string]string 266 267 // indexTime is set by tests for doing reproducible builds. 268 indexTime time.Time 269 270 // heapProfileMu is used to ensure that only one memory profile is written at a time 271 heapProfileMu sync.Mutex 272 heapProfileNum int 273 274 // a sortable 20 chars long id. 275 id string 276 277 finishCalled bool 278 279 // postingsPool reuses postingsBuilder instances across shard builds, 280 // retaining their map and slice allocations to avoid repeated 281 // memclr/madvise overhead. 282 postingsPool sync.Pool 283} 284 285type finishedShard struct { 286 temp, final string 287} 288 289func checkCTags() string { 290 if ctags := os.Getenv("CTAGS_COMMAND"); ctags != "" { 291 return ctags 292 } 293 294 if ctags, err := exec.LookPath("universal-ctags"); err == nil { 295 return ctags 296 } 297 298 return "" 299} 300 301func checkScipCTags() string { 302 if ctags := os.Getenv("SCIP_CTAGS_COMMAND"); ctags != "" { 303 return ctags 304 } 305 306 if ctags, err := exec.LookPath("scip-ctags"); err == nil { 307 return ctags 308 } 309 310 return "" 311} 312 313// SetDefaults sets reasonable default options. 314func (o *Options) SetDefaults() { 315 if o.CTagsPath == "" && !o.DisableCTags { 316 o.CTagsPath = checkCTags() 317 } 318 319 if o.ScipCTagsPath == "" && !o.DisableCTags { 320 o.ScipCTagsPath = checkScipCTags() 321 } 322 323 if o.Parallelism == 0 { 324 o.Parallelism = 4 325 } 326 if o.SizeMax == 0 { 327 o.SizeMax = 2 << 20 328 } 329 if o.ShardMax == 0 { 330 o.ShardMax = 100 << 20 331 } 332 if o.TrigramMax == 0 { 333 o.TrigramMax = 20000 334 } 335 336 if o.RepositoryDescription.Name == "" && o.RepositoryDescription.URL != "" { 337 parsed, _ := url.Parse(o.RepositoryDescription.URL) 338 if parsed != nil { 339 o.RepositoryDescription.Name = filepath.Join(parsed.Host, parsed.Path) 340 } 341 } 342} 343 344// ShardName returns the name the given index shard. 345func (o *Options) shardName(n int) string { 346 return o.shardNameVersion(IndexFormatVersion, n) 347} 348 349func (o *Options) shardNameVersion(version, n int) string { 350 prefix := o.ShardPrefixOverride // ShardPrefixOverride takes precedence to support custom shard naming strategies 351 352 if prefix == "" { 353 // Sourcegraph specific: We use IDs in shard names on multi-tenant 354 // instances to prevent conflicts. 355 if tenant.UseIDBasedShardNames() { 356 prefix = fmt.Sprintf("%09d_%09d", o.RepositoryDescription.TenantID, o.RepositoryDescription.ID) 357 } else { 358 prefix = o.RepositoryDescription.Name 359 } 360 } 361 362 return shardName(o.IndexDir, prefix, version, n) 363} 364 365type IndexState string 366 367const ( 368 IndexStateMissing IndexState = "missing" 369 IndexStateCorrupt IndexState = "corrupt" 370 IndexStateVersion IndexState = "version-mismatch" 371 IndexStateOption IndexState = "option-mismatch" 372 IndexStateMeta IndexState = "meta-mismatch" 373 IndexStateContent IndexState = "content-mismatch" 374 IndexStateEqual IndexState = "equal" 375) 376 377var readVersions = []struct { 378 IndexFormatVersion int 379 FeatureVersion int 380}{{ 381 IndexFormatVersion: IndexFormatVersion, 382 FeatureVersion: FeatureVersion, 383}, { 384 IndexFormatVersion: NextIndexFormatVersion, 385 FeatureVersion: FeatureVersion, 386}} 387 388// IncrementalSkipIndexing returns true if the index present on disk matches 389// the build options. 390func (o *Options) IncrementalSkipIndexing() bool { 391 state, _ := o.IndexState() 392 return state == IndexStateEqual 393} 394 395// IndexState checks how the index present on disk compares to the build 396// options and returns the IndexState and the name of the first shard. 397func (o *Options) IndexState() (IndexState, string) { 398 // Open the latest version we support that is on disk. 399 fn := o.findShard() 400 if fn == "" { 401 return IndexStateMissing, fn 402 } 403 404 repos, index, err := ReadMetadataPathAlive(fn) 405 if os.IsNotExist(err) { 406 return IndexStateMissing, fn 407 } else if err != nil { 408 return IndexStateCorrupt, fn 409 } 410 411 for _, v := range readVersions { 412 if v.IndexFormatVersion == index.IndexFormatVersion && v.FeatureVersion != index.IndexFeatureVersion { 413 return IndexStateVersion, fn 414 } 415 } 416 417 var repo *zoekt.Repository 418 for _, cand := range repos { 419 if cand.Name == o.RepositoryDescription.Name { 420 repo = cand 421 break 422 } 423 } 424 425 if repo == nil { 426 return IndexStateCorrupt, fn 427 } 428 429 if repo.IndexOptions != o.GetHash() { 430 return IndexStateOption, fn 431 } 432 433 if !reflect.DeepEqual(repo.Branches, o.RepositoryDescription.Branches) { 434 return IndexStateContent, fn 435 } 436 437 // We can mutate repo since it lives in the scope of this function call. 438 if updated, err := repo.MergeMutable(&o.RepositoryDescription); err != nil { 439 // non-nil err means we are trying to update an immutable field => 440 // reindex content. 441 log.Printf("warn: immutable field changed, requires re-index: %s", err) 442 return IndexStateContent, fn 443 } else if updated { 444 return IndexStateMeta, fn 445 } 446 447 return IndexStateEqual, fn 448} 449 450// FindRepositoryMetadata returns the index metadata for the repository 451// specified in the options. 'ok' is false if the repository's metadata 452// couldn't be found or if an error occurred. 453func (o *Options) FindRepositoryMetadata() (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 454 shard := o.findShard() 455 if shard == "" { 456 return nil, nil, false, nil 457 } 458 459 repositories, metadata, err := ReadMetadataPathAlive(shard) 460 if err != nil { 461 return nil, nil, false, fmt.Errorf("reading metadata for shard %q: %w", shard, err) 462 } 463 464 ID := o.RepositoryDescription.ID 465 for _, r := range repositories { 466 // compound shards contain multiple repositories, so we 467 // have to pick only the one we're looking for 468 if r.ID == ID { 469 return r, metadata, true, nil 470 } 471 } 472 473 // If we're here, then we're somehow in a state where we found a matching 474 // shard that's missing the repository metadata we're looking for. This 475 // should never happen. 476 name := o.RepositoryDescription.Name 477 return nil, nil, false, fmt.Errorf("matching shard %q doesn't contain metadata for repo id %d (%q)", shard, ID, name) 478} 479 480func (o *Options) findShard() string { 481 for _, v := range readVersions { 482 fn := o.shardNameVersion(v.IndexFormatVersion, 0) 483 if _, err := os.Stat(fn); err == nil { 484 return fn 485 } 486 } 487 488 // Brute force finding the shard in compound shards. We should only hit this 489 // code path for repositories that don't exist yet or are in compound shards. 490 return o.findCompoundShard() 491} 492 493func (o *Options) findCompoundShard() string { 494 compoundShards, err := filepath.Glob(path.Join(o.IndexDir, "compound-*.zoekt")) 495 if err != nil { 496 return "" 497 } 498 for _, fn := range compoundShards { 499 if containsRepo(fn, o.RepositoryDescription.ID) { 500 return fn 501 } 502 } 503 504 return "" 505} 506 507func (o *Options) FindAllShards() []string { 508 for _, v := range readVersions { 509 fn := o.shardNameVersion(v.IndexFormatVersion, 0) 510 if _, err := os.Stat(fn); err == nil { 511 shards := []string{fn} 512 for i := 1; ; i++ { 513 fn := o.shardNameVersion(v.IndexFormatVersion, i) 514 if _, err := os.Stat(fn); err != nil { 515 return shards 516 } 517 shards = append(shards, fn) 518 } 519 } 520 } 521 522 // lazily fallback to findShard which will look for a compound shard. 523 if fn := o.findShard(); fn != "" { 524 return []string{fn} 525 } 526 527 return nil 528} 529 530// IgnoreSizeMax determines whether the max size should be ignored. 531func (o *Options) IgnoreSizeMax(name string) bool { 532 // A pattern match will override preceding pattern matches. 533 for i := len(o.LargeFiles) - 1; i >= 0; i-- { 534 pattern := strings.TrimSpace(o.LargeFiles[i]) 535 negated, validatedPattern := checkIsNegatePattern(pattern) 536 537 if m, _ := doublestar.PathMatch(validatedPattern, name); m { 538 if negated { 539 return false 540 } else { 541 return true 542 } 543 } 544 } 545 546 return false 547} 548 549func checkIsNegatePattern(pattern string) (bool, string) { 550 negate := "!" 551 552 // if negated then strip prefix meta character which identifies negated filter pattern 553 if strings.HasPrefix(pattern, negate) { 554 return true, pattern[len(negate):] 555 } 556 557 return false, pattern 558} 559 560// NewBuilder creates a new Builder instance. 561func NewBuilder(opts Options) (*Builder, error) { 562 opts.SetDefaults() 563 if opts.RepositoryDescription.Name == "" { 564 return nil, fmt.Errorf("builder: must set Name") 565 } 566 567 b := &Builder{ 568 opts: opts, 569 throttle: make(chan int, opts.Parallelism), 570 finishedShards: map[string]string{}, 571 } 572 573 parserBins, err := ctags.NewParserBinMap( 574 b.opts.CTagsPath, 575 b.opts.ScipCTagsPath, 576 opts.LanguageMap, 577 b.opts.CTagsMustSucceed, 578 ) 579 if err != nil { 580 return nil, err 581 } 582 583 b.parserBins = parserBins 584 585 if opts.IsDelta { 586 // Delta shards build on top of previously existing shards. 587 // As a consequence, the shardNum for delta shards starts from 588 // the number following the most recently generated shard - not 0. 589 // 590 // Using this numbering scheme allows all the shards to be 591 // discovered as a set. 592 shards := b.opts.FindAllShards() 593 b.nextShardNum = len(shards) // shards are zero indexed, so len() provides the next number after the last one 594 } 595 596 if _, err := b.newShardBuilder(); err != nil { 597 return nil, err 598 } 599 600 now := time.Now() 601 b.indexTime = now 602 b.id = xid.NewWithTime(now).String() 603 604 return b, nil 605} 606 607// AddFile is a convenience wrapper for the Add method 608func (b *Builder) AddFile(name string, content []byte) error { 609 return b.Add(Document{Name: name, Content: content}) 610} 611 612func (b *Builder) Add(doc Document) error { 613 if b.finishCalled { 614 return nil 615 } 616 617 allowLargeFile := b.opts.IgnoreSizeMax(doc.Name) 618 if len(doc.Content) > b.opts.SizeMax && !allowLargeFile { 619 // We could pass the document on to the shardbuilder, but if 620 // we pass through a part of the source tree with binary/large 621 // files, the corresponding shard would be mostly empty, so 622 // insert a reason here too. 623 doc.SkipReason = SkipReasonTooLarge 624 } else if skip := b.docChecker.Check(doc.Content, b.opts.TrigramMax, allowLargeFile); skip != SkipReasonNone { 625 doc.SkipReason = skip 626 } 627 628 b.todo = append(b.todo, &doc) 629 630 if doc.SkipReason == SkipReasonNone { 631 b.size += len(doc.Name) + len(doc.Content) 632 } else { 633 b.size += len(doc.Name) 634 // Drop the content if we are skipping the document. Skipped content is not counted towards the 635 // shard size limit, so otherwise we might buffer too much data in memory before flushing. 636 doc.Content = nil 637 } 638 639 if b.size > b.opts.ShardMax { 640 return b.flush() 641 } 642 643 return nil 644} 645 646// MarkFileAsChangedOrRemoved indicates that the file specified by the given path 647// has been changed or removed since the last indexing job for this repository. 648// 649// If this build is a delta build, these files will be tombstoned in the older shards for this repository. 650func (b *Builder) MarkFileAsChangedOrRemoved(path string) { 651 b.opts.changedOrRemovedFiles = append(b.opts.changedOrRemovedFiles, path) 652} 653 654// Finish creates a last shard from the buffered documents, and clears 655// stale shards from previous runs. This should always be called, also 656// in failure cases, to ensure cleanup. 657// 658// It is safe to call Finish() multiple times. 659func (b *Builder) Finish() error { 660 if b.finishCalled { 661 return b.buildError 662 } 663 664 b.finishCalled = true 665 666 b.flush() 667 b.building.Wait() 668 669 if b.buildError != nil { 670 for tmp := range b.finishedShards { 671 log.Printf("Builder.Finish %s", tmp) 672 os.Remove(tmp) 673 } 674 b.finishedShards = map[string]string{} 675 return b.buildError 676 } 677 678 // map of temporary -> final names for all updated shards + shard metadata files 679 artifactPaths := make(map[string]string) 680 maps.Copy(artifactPaths, b.finishedShards) 681 682 oldShards := b.opts.FindAllShards() 683 684 if b.opts.IsDelta { 685 // Delta shard builds need to update FileTombstone and branch commit information for all 686 // existing shards 687 for _, shard := range oldShards { 688 repositories, _, err := ReadMetadataPathAlive(shard) 689 if err != nil { 690 return fmt.Errorf("reading metadata from shard %q: %w", shard, err) 691 } 692 693 if len(repositories) > 1 { 694 return fmt.Errorf("delta shard builds don't support repositories contained in compound shards (shard %q)", shard) 695 } 696 697 if len(repositories) == 0 { 698 return fmt.Errorf("failed to update repository metadata for shard %q - shard contains no repositories", shard) 699 } 700 701 repository := repositories[0] 702 if repository.ID != b.opts.RepositoryDescription.ID { 703 return fmt.Errorf("shard %q doesn't contain repository ID %d (%q)", shard, b.opts.RepositoryDescription.ID, b.opts.RepositoryDescription.Name) 704 } 705 706 if len(b.opts.changedOrRemovedFiles) > 0 && repository.FileTombstones == nil { 707 repository.FileTombstones = make(map[string]struct{}, len(b.opts.changedOrRemovedFiles)) 708 } 709 710 for _, f := range b.opts.changedOrRemovedFiles { 711 repository.FileTombstones[f] = struct{}{} 712 } 713 714 if !BranchNamesEqual(repository.Branches, b.opts.RepositoryDescription.Branches) { 715 return deltaBranchSetError{ 716 shardName: shard, 717 old: repository.Branches, 718 new: b.opts.RepositoryDescription.Branches, 719 } 720 } 721 722 if b.opts.GetHash() != repository.IndexOptions { 723 return &deltaIndexOptionsMismatchError{ 724 shardName: shard, 725 newOptions: b.opts.HashOptions(), 726 } 727 } 728 729 repository.Branches = b.opts.RepositoryDescription.Branches 730 731 repository.LatestCommitDate = b.opts.RepositoryDescription.LatestCommitDate 732 733 repository.Metadata = b.opts.RepositoryDescription.Metadata 734 735 tempPath, finalPath, err := JsonMarshalRepoMetaTemp(shard, repository) 736 if err != nil { 737 return fmt.Errorf("writing repository metadta for shard %q: %w", shard, err) 738 } 739 740 artifactPaths[tempPath] = finalPath 741 } 742 } 743 744 // We mark finished shards as empty when we successfully finish. Return now 745 // to allow call sites to call Finish idempotently. 746 if len(artifactPaths) == 0 { 747 return b.buildError 748 } 749 750 // Collect a map of the old shards on disk. For each new shard we replace we 751 // delete it from toDelete. Anything remaining in toDelete will be removed 752 // after we have renamed everything into place. 753 754 var toDelete map[string]struct{} 755 if !b.opts.IsDelta { 756 // Non-delta shard builds delete all existing shards before they write out 757 // new ones. 758 // By contrast, delta shard builds work by stacking changes on top of existing shards. 759 // So, we skip populating the toDelete map if we're building delta shards. 760 761 toDelete = make(map[string]struct{}) 762 for _, name := range oldShards { 763 paths, err := IndexFilePaths(name) 764 if err != nil { 765 b.buildError = fmt.Errorf("failed to find old paths for %s: %w", name, err) 766 } 767 for _, p := range paths { 768 toDelete[p] = struct{}{} 769 } 770 } 771 } 772 773 for tmp, final := range artifactPaths { 774 if err := os.Rename(tmp, final); err != nil { 775 b.buildError = err 776 continue 777 } 778 779 delete(toDelete, final) 780 } 781 782 b.finishedShards = map[string]string{} 783 784 for p := range toDelete { 785 // Don't delete compound shards, set tombstones instead. 786 if b.opts.ShardMerging && strings.HasPrefix(filepath.Base(p), "compound-") { 787 if !strings.HasSuffix(p, ".zoekt") { 788 continue 789 } 790 err := SetTombstone(p, b.opts.RepositoryDescription.ID) 791 b.buildError = err 792 continue 793 } 794 log.Printf("removing old shard file: %s", p) 795 if err := os.Remove(p); err != nil { 796 b.buildError = err 797 } 798 } 799 800 return b.buildError 801} 802 803// BranchNamesEqual compares the given zoekt.RepositoryBranch slices, and returns true 804// iff both slices specify the same set of branch names in the same order. 805func BranchNamesEqual(a, b []zoekt.RepositoryBranch) bool { 806 if len(a) != len(b) { 807 return false 808 } 809 810 for i := range a { 811 x, y := a[i], b[i] 812 if x.Name != y.Name { 813 return false 814 } 815 } 816 817 return true 818} 819 820func (b *Builder) flush() error { 821 todo := b.todo 822 b.todo = nil 823 b.size = 0 824 b.errMu.Lock() 825 defer b.errMu.Unlock() 826 if b.buildError != nil { 827 return b.buildError 828 } 829 830 hasShard := b.nextShardNum > 0 831 if len(todo) == 0 && hasShard { 832 return nil 833 } 834 835 shard := b.nextShardNum 836 b.nextShardNum++ 837 838 if b.opts.Parallelism > 1 { 839 b.building.Add(1) 840 b.throttle <- 1 841 go func() { 842 done, err := b.buildShard(todo, shard) 843 <-b.throttle 844 845 b.errMu.Lock() 846 defer b.errMu.Unlock() 847 if err != nil && b.buildError == nil { 848 b.buildError = err 849 } 850 if err == nil { 851 b.finishedShards[done.temp] = done.final 852 } 853 b.building.Done() 854 }() 855 } else { 856 // No goroutines when we're not parallel. This 857 // simplifies memory profiling. 858 done, err := b.buildShard(todo, shard) 859 b.buildError = err 860 if err == nil { 861 b.finishedShards[done.temp] = done.final 862 } 863 864 return b.buildError 865 } 866 867 return nil 868} 869 870// map [0,inf) to [0,1) monotonically 871func squashRange(j int) float64 { 872 x := float64(j) 873 return x / (1 + x) 874} 875 876type rankedDoc struct { 877 *Document 878 rank []float64 879} 880 881// rank returns a vector of scores which is used at index-time to sort documents 882// before writing them to disk. The order of documents in the shard is important 883// at query time, because earlier documents receive a boost at query time and 884// have a higher chance of being searched before limits kick in. 885func rank(d *Document, origIdx int) []float64 { 886 skipped := 0.0 887 if d.SkipReason != SkipReasonNone { 888 skipped = 1.0 889 } 890 891 generated := 0.0 892 if enry.IsGenerated(d.Name, d.Content) { 893 generated = 1.0 894 } 895 896 vendor := 0.0 897 if enry.IsVendor(d.Name) { 898 vendor = 1.0 899 } 900 901 test := 0.0 902 if enry.IsTest(d.Name) { 903 test = 1.0 904 } 905 906 // Smaller is earlier (=better). 907 return []float64{ 908 // Always place skipped docs last 909 skipped, 910 911 // Prefer docs that are not generated 912 generated, 913 914 // Prefer docs that are not vendored 915 vendor, 916 917 // Prefer docs that are not tests 918 test, 919 920 // With short names 921 squashRange(len(d.Name)), 922 923 // With many symbols 924 1.0 - squashRange(len(d.Symbols)), 925 926 // With short content 927 squashRange(len(d.Content)), 928 929 // That is present is as many branches as possible 930 1.0 - squashRange(len(d.Branches)), 931 932 // Preserve original ordering. 933 squashRange(origIdx), 934 } 935} 936 937func sortDocuments(todo []*Document) { 938 rs := make([]rankedDoc, 0, len(todo)) 939 for i, t := range todo { 940 rd := rankedDoc{t, rank(t, i)} 941 rs = append(rs, rd) 942 } 943 sort.Slice(rs, func(i, j int) bool { 944 r1 := rs[i].rank 945 r2 := rs[j].rank 946 for i := range r1 { 947 if r1[i] < r2[i] { 948 return true 949 } 950 if r1[i] > r2[i] { 951 return false 952 } 953 } 954 955 return false 956 }) 957 for i := range todo { 958 todo[i] = rs[i].Document 959 } 960} 961 962func (b *Builder) buildShard(todo []*Document, nextShardNum int) (*finishedShard, error) { 963 if !b.opts.DisableCTags && (b.opts.CTagsPath != "" || b.opts.ScipCTagsPath != "") { 964 err := parseSymbols(todo, b.opts.LanguageMap, b.parserBins) 965 if b.opts.CTagsMustSucceed && err != nil { 966 return nil, err 967 } 968 if err != nil { 969 log.Printf("ignoring universal:%s or scip:%s error: %v", b.opts.CTagsPath, b.opts.ScipCTagsPath, err) 970 } 971 } 972 973 name := b.opts.shardName(nextShardNum) 974 975 shardBuilder, err := b.newShardBuilder() 976 if err != nil { 977 return nil, err 978 } 979 980 sortDocuments(todo) 981 982 for idx, t := range todo { 983 if err := shardBuilder.Add(*t); err != nil { 984 return nil, err 985 } 986 987 if idx%10_000 == 0 { 988 b.CheckMemoryUsage() 989 } 990 } 991 992 result, err := b.writeShard(name, shardBuilder) 993 b.returnPostingsBuilders(shardBuilder) 994 return result, err 995} 996 997// CheckMemoryUsage checks the memory usage of the process and writes a memory profile if the heap usage exceeds the 998// configured threshold. NOTE: this method is expensive and should only be used for debugging. 999func (b *Builder) CheckMemoryUsage() { 1000 // Don't check memory if heap profiling is disabled, or we've already written 10 profiles 1001 if b.opts.HeapProfileTriggerBytes <= 0 || b.heapProfileNum >= 10 { 1002 return 1003 } 1004 1005 var m runtime.MemStats 1006 runtime.ReadMemStats(&m) 1007 1008 if m.HeapAlloc > b.opts.HeapProfileTriggerBytes && b.heapProfileMu.TryLock() { 1009 defer b.heapProfileMu.Unlock() 1010 1011 log.Printf("writing memory profile, allocated heap: %s", humanize.Bytes(m.HeapAlloc)) 1012 name := filepath.Join(b.opts.IndexDir, fmt.Sprintf("indexmemory.prof.%d", b.heapProfileNum)) 1013 f, err := os.Create(name) 1014 if err != nil { 1015 log.Printf("failed to create memory profile file: %v", err) 1016 return 1017 } 1018 1019 err = pprof.WriteHeapProfile(f) 1020 if err != nil { 1021 log.Printf("failed to write memory profile: %v", err) 1022 } 1023 1024 b.heapProfileNum++ 1025 } 1026} 1027 1028func (b *Builder) getPostingsBuilder() *postingsBuilder { 1029 if pb, ok := b.postingsPool.Get().(*postingsBuilder); ok { 1030 pb.reset() 1031 return pb 1032 } 1033 return newPostingsBuilder(b.opts.ShardMax) 1034} 1035 1036// returnPostingsBuilders returns both postings builders from sb to the 1037// pool and nils the fields so any subsequent misuse crashes obviously. 1038func (b *Builder) returnPostingsBuilders(sb *ShardBuilder) { 1039 if sb.contentPostings != nil { 1040 b.postingsPool.Put(sb.contentPostings) 1041 sb.contentPostings = nil 1042 } 1043 if sb.namePostings != nil { 1044 b.postingsPool.Put(sb.namePostings) 1045 sb.namePostings = nil 1046 } 1047} 1048 1049func (b *Builder) newShardBuilder() (*ShardBuilder, error) { 1050 desc := b.opts.RepositoryDescription 1051 desc.HasSymbols = !b.opts.DisableCTags && b.opts.CTagsPath != "" 1052 desc.SubRepoMap = b.opts.SubRepositories 1053 desc.IndexOptions = b.opts.GetHash() 1054 1055 content := b.getPostingsBuilder() 1056 name := b.getPostingsBuilder() 1057 shardBuilder := newShardBuilderWithPostings(content, name) 1058 if err := shardBuilder.setRepository(&desc); err != nil { 1059 return nil, err 1060 } 1061 shardBuilder.IndexTime = b.indexTime 1062 shardBuilder.ID = b.id 1063 return shardBuilder, nil 1064} 1065 1066func (b *Builder) writeShard(fn string, ib *ShardBuilder) (*finishedShard, error) { 1067 dir := filepath.Dir(fn) 1068 if err := os.MkdirAll(dir, 0o700); err != nil { 1069 return nil, err 1070 } 1071 1072 f, err := os.CreateTemp(dir, filepath.Base(fn)+".*.tmp") 1073 if err != nil { 1074 return nil, err 1075 } 1076 if runtime.GOOS != "windows" { 1077 if err := f.Chmod(0o666 &^ umask); err != nil { 1078 return nil, err 1079 } 1080 } 1081 1082 defer f.Close() 1083 if err := ib.Write(f); err != nil { 1084 return nil, err 1085 } 1086 fi, err := f.Stat() 1087 if err != nil { 1088 return nil, err 1089 } 1090 if err := f.Close(); err != nil { 1091 return nil, err 1092 } 1093 1094 log.Printf("finished shard %s: %d index bytes (overhead %3.1f), %d files processed \n", 1095 fn, 1096 fi.Size(), 1097 float64(fi.Size())/float64(ib.ContentSize()+1), 1098 ib.NumFiles()) 1099 1100 return &finishedShard{f.Name(), fn}, nil 1101} 1102 1103type deltaBranchSetError struct { 1104 shardName string 1105 old, new []zoekt.RepositoryBranch 1106} 1107 1108func (e deltaBranchSetError) Error() string { 1109 return fmt.Sprintf("repository metadata in shard %q contains a different set of branch names than what was requested, which is unsupported in a delta shard build. old: %+v, new: %+v", e.shardName, e.old, e.new) 1110} 1111 1112type deltaIndexOptionsMismatchError struct { 1113 shardName string 1114 newOptions HashOptions 1115} 1116 1117func (e *deltaIndexOptionsMismatchError) Error() string { 1118 return fmt.Sprintf("one or more index options for shard %q do not match Builder's index options. These index option updates are incompatible with delta build. New index options: %+v", e.shardName, e.newOptions) 1119} 1120 1121// umask holds the Umask of the current process 1122var umask os.FileMode 1123 1124func init() { 1125 umask = os.FileMode(unix.Umask(0)) 1126 unix.Umask(int(umask)) 1127}