fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

at tngl 31 kB View raw
1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Package index contains logic for building Zoekt indexes. NOTE: this package is not considered 16// part of the public API, and it is not recommended to rely on it in external code. 17package index 18 19import ( 20 "crypto/sha1" 21 "flag" 22 "fmt" 23 "log" 24 "net/url" 25 "os" 26 "os/exec" 27 "path" 28 "path/filepath" 29 "reflect" 30 "runtime" 31 "runtime/pprof" 32 "sort" 33 "strconv" 34 "strings" 35 "sync" 36 "time" 37 38 "github.com/bmatcuk/doublestar" 39 "github.com/dustin/go-humanize" 40 "github.com/rs/xid" 41 "golang.org/x/sys/unix" 42 43 "maps" 44 45 "github.com/sourcegraph/zoekt" 46 "github.com/sourcegraph/zoekt/internal/ctags" 47 "github.com/sourcegraph/zoekt/internal/tenant" 48) 49 50var DefaultDir = filepath.Join(os.Getenv("HOME"), ".zoekt") 51 52// Branch describes a single branch version. 53type Branch struct { 54 Name string 55 Version string 56} 57 58// Options sets options for the index building. 59type Options struct { 60 // IndexDir is a directory that holds *.zoekt index files. 61 IndexDir string 62 63 // ShardPrefixOverride sets the prefix for shards name 64 ShardPrefixOverride string 65 66 // SizeMax is the maximum file size 67 SizeMax int 68 69 // Parallelism is the maximum number of shards to index in parallel 70 Parallelism int 71 72 // ShardMax sets the maximum corpus size for a single shard 73 ShardMax int 74 75 // TrigramMax sets the maximum number of distinct trigrams per document. 76 TrigramMax int 77 78 // RepositoryDescription holds names and URLs for the repository. 79 RepositoryDescription zoekt.Repository 80 81 // SubRepositories is a path => sub repository map. 82 SubRepositories map[string]*zoekt.Repository 83 84 // DisableCTags disables the generation of ctags metadata. 85 DisableCTags bool 86 87 // CtagsPath is the path to the ctags binary to run, or empty 88 // if a valid binary couldn't be found. 89 CTagsPath string 90 91 // Same as CTagsPath but for scip-ctags 92 ScipCTagsPath string 93 94 // If set, ctags must succeed. 95 CTagsMustSucceed bool 96 97 // LargeFiles is a slice of glob patterns, including ** for any number 98 // of directories, where matching file paths should be indexed 99 // regardless of their size. The full pattern syntax is here: 100 // https://github.com/bmatcuk/doublestar/tree/v1#patterns. 101 LargeFiles []string 102 103 // IsDelta is true if this run contains only the changed documents since the 104 // last run. 105 IsDelta bool 106 107 // changedOrRemovedFiles is a list of file paths that have been changed or removed 108 // since the last indexing job for this repository. These files will be tombstoned 109 // in the older shards for this repository. 110 changedOrRemovedFiles []string 111 112 LanguageMap ctags.LanguageMap 113 114 // ShardMerging is true if builder should respect compound shards. This is a 115 // Sourcegraph specific option. 116 ShardMerging bool 117 118 // HeapProfileTriggerBytes is the heap allocation in bytes that will trigger a memory profile. If 0, no memory profile 119 // will be triggered. Note this trigger looks at total heap allocation (which includes both inuse and garbage objects). 120 // 121 // Profiles will be written to files named `index-memory.prof.n` in the index directory. No more than 10 files are written. 122 // 123 // Note: heap checking is "best effort", and it's possible for the process to OOM without triggering the heap profile. 124 HeapProfileTriggerBytes uint64 125} 126 127// HashOptions contains only the options in Options that upon modification leads to IndexState of IndexStateMismatch during the next index building. 128type HashOptions struct { 129 sizeMax int 130 disableCTags bool 131 ctagsPath string 132 cTagsMustSucceed bool 133 largeFiles []string 134} 135 136func (o *Options) HashOptions() HashOptions { 137 return HashOptions{ 138 sizeMax: o.SizeMax, 139 disableCTags: o.DisableCTags, 140 ctagsPath: o.CTagsPath, 141 cTagsMustSucceed: o.CTagsMustSucceed, 142 largeFiles: o.LargeFiles, 143 } 144} 145 146func (o *Options) GetHash() string { 147 h := o.HashOptions() 148 hasher := sha1.New() 149 150 hasher.Write([]byte(h.ctagsPath)) 151 hasher.Write(fmt.Appendf(nil, "%t", h.cTagsMustSucceed)) 152 hasher.Write(fmt.Appendf(nil, "%d", h.sizeMax)) 153 hasher.Write(fmt.Appendf(nil, "%q", h.largeFiles)) 154 hasher.Write(fmt.Appendf(nil, "%t", h.disableCTags)) 155 156 return fmt.Sprintf("%x", hasher.Sum(nil)) 157} 158 159type largeFilesFlag struct{ *Options } 160 161func (f largeFilesFlag) String() string { 162 // From flag.Value documentation: 163 // 164 // The flag package may call the String method with a zero-valued receiver, 165 // such as a nil pointer. 166 if f.Options == nil { 167 return "" 168 } 169 s := append([]string{""}, f.LargeFiles...) 170 return strings.Join(s, "-large_file ") 171} 172 173func (f largeFilesFlag) Set(value string) error { 174 f.LargeFiles = append(f.LargeFiles, value) 175 return nil 176} 177 178// Flags adds flags for build options to fs. It is the "inverse" of Args. 179func (o *Options) Flags(fs *flag.FlagSet) { 180 x := *o 181 x.SetDefaults() 182 fs.IntVar(&o.SizeMax, "file_limit", x.SizeMax, "maximum file size") 183 fs.IntVar(&o.TrigramMax, "max_trigram_count", x.TrigramMax, "maximum number of trigrams per document") 184 fs.IntVar(&o.ShardMax, "shard_limit", x.ShardMax, "maximum corpus size for a shard") 185 fs.IntVar(&o.Parallelism, "parallelism", x.Parallelism, "maximum number of parallel indexing processes.") 186 fs.StringVar(&o.IndexDir, "index", x.IndexDir, "directory for search indices") 187 fs.StringVar(&o.ShardPrefixOverride, "shard_prefix_override", x.ShardPrefixOverride, "prefix for shard name") 188 fs.BoolVar(&o.CTagsMustSucceed, "require_ctags", x.CTagsMustSucceed, "If set, ctags calls must succeed.") 189 fs.Var(largeFilesFlag{o}, "large_file", "A glob pattern where matching files are to be index regardless of their size. You can add multiple patterns by setting this more than once.") 190 191 // Sourcegraph specific 192 fs.BoolVar(&o.DisableCTags, "disable_ctags", x.DisableCTags, "If set, ctags will not be called.") 193 fs.BoolVar(&o.ShardMerging, "shard_merging", x.ShardMerging, "If set, builder will respect compound shards.") 194} 195 196// Args generates command line arguments for o. It is the "inverse" of Flags. 197func (o *Options) Args() []string { 198 var args []string 199 200 if o.SizeMax != 0 { 201 args = append(args, "-file_limit", strconv.Itoa(o.SizeMax)) 202 } 203 204 if o.TrigramMax != 0 { 205 args = append(args, "-max_trigram_count", strconv.Itoa(o.TrigramMax)) 206 } 207 208 if o.ShardMax != 0 { 209 args = append(args, "-shard_limit", strconv.Itoa(o.ShardMax)) 210 } 211 212 if o.Parallelism != 0 { 213 args = append(args, "-parallelism", strconv.Itoa(o.Parallelism)) 214 } 215 216 if o.IndexDir != "" { 217 args = append(args, "-index", o.IndexDir) 218 } 219 220 if o.ShardPrefixOverride != "" { 221 args = append(args, "-shard_prefix_override", o.ShardPrefixOverride) 222 } 223 224 if o.CTagsMustSucceed { 225 args = append(args, "-require_ctags") 226 } 227 228 for _, a := range o.LargeFiles { 229 args = append(args, "-large_file", a) 230 } 231 232 // Sourcegraph specific 233 if o.DisableCTags { 234 args = append(args, "-disable_ctags") 235 } 236 237 if o.ShardMerging { 238 args = append(args, "-shard_merging") 239 } 240 241 return args 242} 243 244// Builder manages (parallel) creation of uniformly sized shards. The 245// builder buffers up documents until it collects enough documents and 246// then builds a shard and writes. 247type Builder struct { 248 opts Options 249 throttle chan int 250 251 nextShardNum int 252 todo []*Document 253 docChecker DocChecker 254 size int 255 256 parserBins ctags.ParserBinMap 257 building sync.WaitGroup 258 259 errMu sync.Mutex 260 buildError error 261 262 // temp name => final name for finished shards. We only rename 263 // them once all shards succeed to avoid Frankstein corpuses. 264 finishedShards map[string]string 265 266 // indexTime is set by tests for doing reproducible builds. 267 indexTime time.Time 268 269 // heapProfileMu is used to ensure that only one memory profile is written at a time 270 heapProfileMu sync.Mutex 271 heapProfileNum int 272 273 // a sortable 20 chars long id. 274 id string 275 276 finishCalled bool 277 278 // postingsPool reuses postingsBuilder instances across shard builds, 279 // retaining their map and slice allocations to avoid repeated 280 // memclr/madvise overhead. 281 postingsPool sync.Pool 282} 283 284type finishedShard struct { 285 temp, final string 286} 287 288func checkCTags() string { 289 if ctags := os.Getenv("CTAGS_COMMAND"); ctags != "" { 290 return ctags 291 } 292 293 if ctags, err := exec.LookPath("universal-ctags"); err == nil { 294 return ctags 295 } 296 297 return "" 298} 299 300func checkScipCTags() string { 301 if ctags := os.Getenv("SCIP_CTAGS_COMMAND"); ctags != "" { 302 return ctags 303 } 304 305 if ctags, err := exec.LookPath("scip-ctags"); err == nil { 306 return ctags 307 } 308 309 return "" 310} 311 312// SetDefaults sets reasonable default options. 313func (o *Options) SetDefaults() { 314 if o.CTagsPath == "" && !o.DisableCTags { 315 o.CTagsPath = checkCTags() 316 } 317 318 if o.ScipCTagsPath == "" && !o.DisableCTags { 319 o.ScipCTagsPath = checkScipCTags() 320 } 321 322 if o.Parallelism == 0 { 323 o.Parallelism = 4 324 } 325 if o.SizeMax == 0 { 326 o.SizeMax = 2 << 20 327 } 328 if o.ShardMax == 0 { 329 o.ShardMax = 100 << 20 330 } 331 if o.TrigramMax == 0 { 332 o.TrigramMax = 20000 333 } 334 335 if o.RepositoryDescription.Name == "" && o.RepositoryDescription.URL != "" { 336 parsed, _ := url.Parse(o.RepositoryDescription.URL) 337 if parsed != nil { 338 o.RepositoryDescription.Name = filepath.Join(parsed.Host, parsed.Path) 339 } 340 } 341} 342 343// ShardName returns the name the given index shard. 344func (o *Options) shardName(n int) string { 345 return o.shardNameVersion(IndexFormatVersion, n) 346} 347 348func (o *Options) shardNameVersion(version, n int) string { 349 prefix := o.ShardPrefixOverride // ShardPrefixOverride takes precedence to support custom shard naming strategies 350 351 if prefix == "" { 352 // Sourcegraph specific: We use IDs in shard names on multi-tenant 353 // instances to prevent conflicts. 354 if tenant.UseIDBasedShardNames() { 355 prefix = fmt.Sprintf("%09d_%09d", o.RepositoryDescription.TenantID, o.RepositoryDescription.ID) 356 } else { 357 prefix = o.RepositoryDescription.Name 358 } 359 } 360 361 return shardName(o.IndexDir, prefix, version, n) 362} 363 364type IndexState string 365 366const ( 367 IndexStateMissing IndexState = "missing" 368 IndexStateCorrupt IndexState = "corrupt" 369 IndexStateVersion IndexState = "version-mismatch" 370 IndexStateOption IndexState = "option-mismatch" 371 IndexStateMeta IndexState = "meta-mismatch" 372 IndexStateContent IndexState = "content-mismatch" 373 IndexStateEqual IndexState = "equal" 374) 375 376var readVersions = []struct { 377 IndexFormatVersion int 378 FeatureVersion int 379}{{ 380 IndexFormatVersion: IndexFormatVersion, 381 FeatureVersion: FeatureVersion, 382}, { 383 IndexFormatVersion: NextIndexFormatVersion, 384 FeatureVersion: FeatureVersion, 385}} 386 387// IncrementalSkipIndexing returns true if the index present on disk matches 388// the build options. 389func (o *Options) IncrementalSkipIndexing() bool { 390 state, _ := o.IndexState() 391 return state == IndexStateEqual 392} 393 394// IndexState checks how the index present on disk compares to the build 395// options and returns the IndexState and the name of the first shard. 396func (o *Options) IndexState() (IndexState, string) { 397 // Open the latest version we support that is on disk. 398 fn := o.findShard() 399 if fn == "" { 400 return IndexStateMissing, fn 401 } 402 403 repos, index, err := ReadMetadataPathAlive(fn) 404 if os.IsNotExist(err) { 405 return IndexStateMissing, fn 406 } else if err != nil { 407 return IndexStateCorrupt, fn 408 } 409 410 for _, v := range readVersions { 411 if v.IndexFormatVersion == index.IndexFormatVersion && v.FeatureVersion != index.IndexFeatureVersion { 412 return IndexStateVersion, fn 413 } 414 } 415 416 var repo *zoekt.Repository 417 for _, cand := range repos { 418 if cand.Name == o.RepositoryDescription.Name { 419 repo = cand 420 break 421 } 422 } 423 424 if repo == nil { 425 return IndexStateCorrupt, fn 426 } 427 428 if repo.IndexOptions != o.GetHash() { 429 return IndexStateOption, fn 430 } 431 432 if !reflect.DeepEqual(repo.Branches, o.RepositoryDescription.Branches) { 433 return IndexStateContent, fn 434 } 435 436 // We can mutate repo since it lives in the scope of this function call. 437 if updated, err := repo.MergeMutable(&o.RepositoryDescription); err != nil { 438 // non-nil err means we are trying to update an immutable field => 439 // reindex content. 440 log.Printf("warn: immutable field changed, requires re-index: %s", err) 441 return IndexStateContent, fn 442 } else if updated { 443 return IndexStateMeta, fn 444 } 445 446 return IndexStateEqual, fn 447} 448 449// FindRepositoryMetadata returns the index metadata for the repository 450// specified in the options. 'ok' is false if the repository's metadata 451// couldn't be found or if an error occurred. 452func (o *Options) FindRepositoryMetadata() (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 453 shard := o.findShard() 454 if shard == "" { 455 return nil, nil, false, nil 456 } 457 458 repositories, metadata, err := ReadMetadataPathAlive(shard) 459 if err != nil { 460 return nil, nil, false, fmt.Errorf("reading metadata for shard %q: %w", shard, err) 461 } 462 463 ID := o.RepositoryDescription.ID 464 for _, r := range repositories { 465 // compound shards contain multiple repositories, so we 466 // have to pick only the one we're looking for 467 if r.ID == ID { 468 return r, metadata, true, nil 469 } 470 } 471 472 // If we're here, then we're somehow in a state where we found a matching 473 // shard that's missing the repository metadata we're looking for. This 474 // should never happen. 475 name := o.RepositoryDescription.Name 476 return nil, nil, false, fmt.Errorf("matching shard %q doesn't contain metadata for repo id %d (%q)", shard, ID, name) 477} 478 479func (o *Options) findShard() string { 480 for _, v := range readVersions { 481 fn := o.shardNameVersion(v.IndexFormatVersion, 0) 482 if _, err := os.Stat(fn); err == nil { 483 return fn 484 } 485 } 486 487 // Brute force finding the shard in compound shards. We should only hit this 488 // code path for repositories that don't exist yet or are in compound shards. 489 return o.findCompoundShard() 490} 491 492func (o *Options) findCompoundShard() string { 493 compoundShards, err := filepath.Glob(path.Join(o.IndexDir, "compound-*.zoekt")) 494 if err != nil { 495 return "" 496 } 497 for _, fn := range compoundShards { 498 if containsRepo(fn, o.RepositoryDescription.ID) { 499 return fn 500 } 501 } 502 503 return "" 504} 505 506func (o *Options) FindAllShards() []string { 507 for _, v := range readVersions { 508 fn := o.shardNameVersion(v.IndexFormatVersion, 0) 509 if _, err := os.Stat(fn); err == nil { 510 shards := []string{fn} 511 for i := 1; ; i++ { 512 fn := o.shardNameVersion(v.IndexFormatVersion, i) 513 if _, err := os.Stat(fn); err != nil { 514 return shards 515 } 516 shards = append(shards, fn) 517 } 518 } 519 } 520 521 // lazily fallback to findShard which will look for a compound shard. 522 if fn := o.findShard(); fn != "" { 523 return []string{fn} 524 } 525 526 return nil 527} 528 529// IgnoreSizeMax determines whether the max size should be ignored. 530func (o *Options) IgnoreSizeMax(name string) bool { 531 // A pattern match will override preceding pattern matches. 532 for i := len(o.LargeFiles) - 1; i >= 0; i-- { 533 pattern := strings.TrimSpace(o.LargeFiles[i]) 534 negated, validatedPattern := checkIsNegatePattern(pattern) 535 536 if m, _ := doublestar.PathMatch(validatedPattern, name); m { 537 if negated { 538 return false 539 } else { 540 return true 541 } 542 } 543 } 544 545 return false 546} 547 548func checkIsNegatePattern(pattern string) (bool, string) { 549 negate := "!" 550 551 // if negated then strip prefix meta character which identifies negated filter pattern 552 if strings.HasPrefix(pattern, negate) { 553 return true, pattern[len(negate):] 554 } 555 556 return false, pattern 557} 558 559// NewBuilder creates a new Builder instance. 560func NewBuilder(opts Options) (*Builder, error) { 561 opts.SetDefaults() 562 if opts.RepositoryDescription.Name == "" { 563 return nil, fmt.Errorf("builder: must set Name") 564 } 565 566 b := &Builder{ 567 opts: opts, 568 throttle: make(chan int, opts.Parallelism), 569 finishedShards: map[string]string{}, 570 } 571 572 parserBins, err := ctags.NewParserBinMap( 573 b.opts.CTagsPath, 574 b.opts.ScipCTagsPath, 575 opts.LanguageMap, 576 b.opts.CTagsMustSucceed, 577 ) 578 if err != nil { 579 return nil, err 580 } 581 582 b.parserBins = parserBins 583 584 if opts.IsDelta { 585 // Delta shards build on top of previously existing shards. 586 // As a consequence, the shardNum for delta shards starts from 587 // the number following the most recently generated shard - not 0. 588 // 589 // Using this numbering scheme allows all the shards to be 590 // discovered as a set. 591 shards := b.opts.FindAllShards() 592 b.nextShardNum = len(shards) // shards are zero indexed, so len() provides the next number after the last one 593 } 594 595 if _, err := b.newShardBuilder(); err != nil { 596 return nil, err 597 } 598 599 now := time.Now() 600 b.indexTime = now 601 b.id = xid.NewWithTime(now).String() 602 603 return b, nil 604} 605 606// AddFile is a convenience wrapper for the Add method 607func (b *Builder) AddFile(name string, content []byte) error { 608 return b.Add(Document{Name: name, Content: content}) 609} 610 611func (b *Builder) Add(doc Document) error { 612 if b.finishCalled { 613 return nil 614 } 615 616 allowLargeFile := b.opts.IgnoreSizeMax(doc.Name) 617 if len(doc.Content) > b.opts.SizeMax && !allowLargeFile { 618 // We could pass the document on to the shardbuilder, but if 619 // we pass through a part of the source tree with binary/large 620 // files, the corresponding shard would be mostly empty, so 621 // insert a reason here too. 622 doc.SkipReason = SkipReasonTooLarge 623 } else if skip := b.docChecker.Check(doc.Content, b.opts.TrigramMax, allowLargeFile); skip != SkipReasonNone { 624 doc.SkipReason = skip 625 } 626 627 // Pre-compute file category and language while content is still 628 // available, before content is dropped for skipped documents. 629 DetermineFileCategory(&doc) 630 DetermineLanguageIfUnknown(&doc) 631 632 b.todo = append(b.todo, &doc) 633 634 if doc.SkipReason == SkipReasonNone { 635 b.size += len(doc.Name) + len(doc.Content) 636 } else { 637 b.size += len(doc.Name) 638 // Drop the content if we are skipping the document. Skipped content is not counted towards the 639 // shard size limit, so otherwise we might buffer too much data in memory before flushing. 640 doc.Content = nil 641 } 642 643 if b.size > b.opts.ShardMax { 644 return b.flush() 645 } 646 647 return nil 648} 649 650// MarkFileAsChangedOrRemoved indicates that the file specified by the given path 651// has been changed or removed since the last indexing job for this repository. 652// 653// If this build is a delta build, these files will be tombstoned in the older shards for this repository. 654func (b *Builder) MarkFileAsChangedOrRemoved(path string) { 655 b.opts.changedOrRemovedFiles = append(b.opts.changedOrRemovedFiles, path) 656} 657 658// Finish creates a last shard from the buffered documents, and clears 659// stale shards from previous runs. This should always be called, also 660// in failure cases, to ensure cleanup. 661// 662// It is safe to call Finish() multiple times. 663func (b *Builder) Finish() error { 664 if b.finishCalled { 665 return b.buildError 666 } 667 668 b.finishCalled = true 669 670 b.flush() 671 b.building.Wait() 672 673 if b.buildError != nil { 674 for tmp := range b.finishedShards { 675 log.Printf("Builder.Finish %s", tmp) 676 os.Remove(tmp) 677 } 678 b.finishedShards = map[string]string{} 679 return b.buildError 680 } 681 682 // map of temporary -> final names for all updated shards + shard metadata files 683 artifactPaths := make(map[string]string) 684 maps.Copy(artifactPaths, b.finishedShards) 685 686 oldShards := b.opts.FindAllShards() 687 688 if b.opts.IsDelta { 689 // Delta shard builds need to update FileTombstone and branch commit information for all 690 // existing shards 691 for _, shard := range oldShards { 692 repositories, _, err := ReadMetadataPathAlive(shard) 693 if err != nil { 694 return fmt.Errorf("reading metadata from shard %q: %w", shard, err) 695 } 696 697 if len(repositories) > 1 { 698 return fmt.Errorf("delta shard builds don't support repositories contained in compound shards (shard %q)", shard) 699 } 700 701 if len(repositories) == 0 { 702 return fmt.Errorf("failed to update repository metadata for shard %q - shard contains no repositories", shard) 703 } 704 705 repository := repositories[0] 706 if repository.ID != b.opts.RepositoryDescription.ID { 707 return fmt.Errorf("shard %q doesn't contain repository ID %d (%q)", shard, b.opts.RepositoryDescription.ID, b.opts.RepositoryDescription.Name) 708 } 709 710 if len(b.opts.changedOrRemovedFiles) > 0 && repository.FileTombstones == nil { 711 repository.FileTombstones = make(map[string]struct{}, len(b.opts.changedOrRemovedFiles)) 712 } 713 714 for _, f := range b.opts.changedOrRemovedFiles { 715 repository.FileTombstones[f] = struct{}{} 716 } 717 718 if !BranchNamesEqual(repository.Branches, b.opts.RepositoryDescription.Branches) { 719 return deltaBranchSetError{ 720 shardName: shard, 721 old: repository.Branches, 722 new: b.opts.RepositoryDescription.Branches, 723 } 724 } 725 726 if b.opts.GetHash() != repository.IndexOptions { 727 return &deltaIndexOptionsMismatchError{ 728 shardName: shard, 729 newOptions: b.opts.HashOptions(), 730 } 731 } 732 733 repository.Branches = b.opts.RepositoryDescription.Branches 734 735 repository.LatestCommitDate = b.opts.RepositoryDescription.LatestCommitDate 736 737 repository.Metadata = b.opts.RepositoryDescription.Metadata 738 739 tempPath, finalPath, err := JsonMarshalRepoMetaTemp(shard, repository) 740 if err != nil { 741 return fmt.Errorf("writing repository metadta for shard %q: %w", shard, err) 742 } 743 744 artifactPaths[tempPath] = finalPath 745 } 746 } 747 748 // We mark finished shards as empty when we successfully finish. Return now 749 // to allow call sites to call Finish idempotently. 750 if len(artifactPaths) == 0 { 751 return b.buildError 752 } 753 754 // Collect a map of the old shards on disk. For each new shard we replace we 755 // delete it from toDelete. Anything remaining in toDelete will be removed 756 // after we have renamed everything into place. 757 758 var toDelete map[string]struct{} 759 if !b.opts.IsDelta { 760 // Non-delta shard builds delete all existing shards before they write out 761 // new ones. 762 // By contrast, delta shard builds work by stacking changes on top of existing shards. 763 // So, we skip populating the toDelete map if we're building delta shards. 764 765 toDelete = make(map[string]struct{}) 766 for _, name := range oldShards { 767 paths, err := IndexFilePaths(name) 768 if err != nil { 769 b.buildError = fmt.Errorf("failed to find old paths for %s: %w", name, err) 770 } 771 for _, p := range paths { 772 toDelete[p] = struct{}{} 773 } 774 } 775 } 776 777 for tmp, final := range artifactPaths { 778 if err := os.Rename(tmp, final); err != nil { 779 b.buildError = err 780 continue 781 } 782 783 delete(toDelete, final) 784 } 785 786 b.finishedShards = map[string]string{} 787 788 for p := range toDelete { 789 // Don't delete compound shards, set tombstones instead. 790 if b.opts.ShardMerging && strings.HasPrefix(filepath.Base(p), "compound-") { 791 if !strings.HasSuffix(p, ".zoekt") { 792 continue 793 } 794 err := SetTombstone(p, b.opts.RepositoryDescription.ID) 795 b.buildError = err 796 continue 797 } 798 log.Printf("removing old shard file: %s", p) 799 if err := os.Remove(p); err != nil { 800 b.buildError = err 801 } 802 } 803 804 return b.buildError 805} 806 807// BranchNamesEqual compares the given zoekt.RepositoryBranch slices, and returns true 808// iff both slices specify the same set of branch names in the same order. 809func BranchNamesEqual(a, b []zoekt.RepositoryBranch) bool { 810 if len(a) != len(b) { 811 return false 812 } 813 814 for i := range a { 815 x, y := a[i], b[i] 816 if x.Name != y.Name { 817 return false 818 } 819 } 820 821 return true 822} 823 824func (b *Builder) flush() error { 825 todo := b.todo 826 b.todo = nil 827 b.size = 0 828 b.errMu.Lock() 829 defer b.errMu.Unlock() 830 if b.buildError != nil { 831 return b.buildError 832 } 833 834 hasShard := b.nextShardNum > 0 835 if len(todo) == 0 && hasShard { 836 return nil 837 } 838 839 shard := b.nextShardNum 840 b.nextShardNum++ 841 842 if b.opts.Parallelism > 1 { 843 b.building.Add(1) 844 b.throttle <- 1 845 go func() { 846 done, err := b.buildShard(todo, shard) 847 <-b.throttle 848 849 b.errMu.Lock() 850 defer b.errMu.Unlock() 851 if err != nil && b.buildError == nil { 852 b.buildError = err 853 } 854 if err == nil { 855 b.finishedShards[done.temp] = done.final 856 } 857 b.building.Done() 858 }() 859 } else { 860 // No goroutines when we're not parallel. This 861 // simplifies memory profiling. 862 done, err := b.buildShard(todo, shard) 863 b.buildError = err 864 if err == nil { 865 b.finishedShards[done.temp] = done.final 866 } 867 868 return b.buildError 869 } 870 871 return nil 872} 873 874// map [0,inf) to [0,1) monotonically 875func squashRange(j int) float64 { 876 x := float64(j) 877 return x / (1 + x) 878} 879 880type rankedDoc struct { 881 *Document 882 rank []float64 883} 884 885// rank returns a vector of scores which is used at index-time to sort documents 886// before writing them to disk. The order of documents in the shard is important 887// at query time, because earlier documents receive a boost at query time and 888// have a higher chance of being searched before limits kick in. 889func rank(d *Document, origIdx int) []float64 { 890 skipped := 0.0 891 if d.SkipReason != SkipReasonNone { 892 skipped = 1.0 893 } 894 895 // Use pre-computed Category from DetermineFileCategory. 896 generated := 0.0 897 if d.Category == FileCategoryGenerated { 898 generated = 1.0 899 } 900 901 vendor := 0.0 902 if d.Category == FileCategoryVendored { 903 vendor = 1.0 904 } 905 906 test := 0.0 907 if d.Category == FileCategoryTest { 908 test = 1.0 909 } 910 911 // Smaller is earlier (=better). 912 return []float64{ 913 // Always place skipped docs last 914 skipped, 915 916 // Prefer docs that are not generated 917 generated, 918 919 // Prefer docs that are not vendored 920 vendor, 921 922 // Prefer docs that are not tests 923 test, 924 925 // With short names 926 squashRange(len(d.Name)), 927 928 // With many symbols 929 1.0 - squashRange(len(d.Symbols)), 930 931 // With short content 932 squashRange(len(d.Content)), 933 934 // That is present is as many branches as possible 935 1.0 - squashRange(len(d.Branches)), 936 937 // Preserve original ordering. 938 squashRange(origIdx), 939 } 940} 941 942func sortDocuments(todo []*Document) { 943 rs := make([]rankedDoc, 0, len(todo)) 944 for i, t := range todo { 945 rd := rankedDoc{t, rank(t, i)} 946 rs = append(rs, rd) 947 } 948 sort.Slice(rs, func(i, j int) bool { 949 r1 := rs[i].rank 950 r2 := rs[j].rank 951 for i := range r1 { 952 if r1[i] < r2[i] { 953 return true 954 } 955 if r1[i] > r2[i] { 956 return false 957 } 958 } 959 960 return false 961 }) 962 for i := range todo { 963 todo[i] = rs[i].Document 964 } 965} 966 967func (b *Builder) buildShard(todo []*Document, nextShardNum int) (*finishedShard, error) { 968 if !b.opts.DisableCTags && (b.opts.CTagsPath != "" || b.opts.ScipCTagsPath != "") { 969 err := parseSymbols(todo, b.opts.LanguageMap, b.parserBins) 970 if b.opts.CTagsMustSucceed && err != nil { 971 return nil, err 972 } 973 if err != nil { 974 log.Printf("ignoring universal:%s or scip:%s error: %v", b.opts.CTagsPath, b.opts.ScipCTagsPath, err) 975 } 976 } 977 978 name := b.opts.shardName(nextShardNum) 979 980 shardBuilder, err := b.newShardBuilder() 981 if err != nil { 982 return nil, err 983 } 984 985 sortDocuments(todo) 986 987 for idx, t := range todo { 988 if err := shardBuilder.Add(*t); err != nil { 989 return nil, err 990 } 991 992 if idx%10_000 == 0 { 993 b.CheckMemoryUsage() 994 } 995 } 996 997 result, err := b.writeShard(name, shardBuilder) 998 b.returnPostingsBuilders(shardBuilder) 999 return result, err 1000} 1001 1002// CheckMemoryUsage checks the memory usage of the process and writes a memory profile if the heap usage exceeds the 1003// configured threshold. NOTE: this method is expensive and should only be used for debugging. 1004func (b *Builder) CheckMemoryUsage() { 1005 // Don't check memory if heap profiling is disabled, or we've already written 10 profiles 1006 if b.opts.HeapProfileTriggerBytes <= 0 || b.heapProfileNum >= 10 { 1007 return 1008 } 1009 1010 var m runtime.MemStats 1011 runtime.ReadMemStats(&m) 1012 1013 if m.HeapAlloc > b.opts.HeapProfileTriggerBytes && b.heapProfileMu.TryLock() { 1014 defer b.heapProfileMu.Unlock() 1015 1016 log.Printf("writing memory profile, allocated heap: %s", humanize.Bytes(m.HeapAlloc)) 1017 name := filepath.Join(b.opts.IndexDir, fmt.Sprintf("indexmemory.prof.%d", b.heapProfileNum)) 1018 f, err := os.Create(name) 1019 if err != nil { 1020 log.Printf("failed to create memory profile file: %v", err) 1021 return 1022 } 1023 1024 err = pprof.WriteHeapProfile(f) 1025 if err != nil { 1026 log.Printf("failed to write memory profile: %v", err) 1027 } 1028 1029 b.heapProfileNum++ 1030 } 1031} 1032 1033func (b *Builder) getPostingsBuilder() *postingsBuilder { 1034 if pb, ok := b.postingsPool.Get().(*postingsBuilder); ok { 1035 pb.reset() 1036 return pb 1037 } 1038 return newPostingsBuilder(b.opts.ShardMax) 1039} 1040 1041// returnPostingsBuilders returns both postings builders from sb to the 1042// pool and nils the fields so any subsequent misuse crashes obviously. 1043func (b *Builder) returnPostingsBuilders(sb *ShardBuilder) { 1044 if sb.contentPostings != nil { 1045 b.postingsPool.Put(sb.contentPostings) 1046 sb.contentPostings = nil 1047 } 1048 if sb.namePostings != nil { 1049 b.postingsPool.Put(sb.namePostings) 1050 sb.namePostings = nil 1051 } 1052} 1053 1054func (b *Builder) newShardBuilder() (*ShardBuilder, error) { 1055 desc := b.opts.RepositoryDescription 1056 desc.HasSymbols = !b.opts.DisableCTags && b.opts.CTagsPath != "" 1057 desc.SubRepoMap = b.opts.SubRepositories 1058 desc.IndexOptions = b.opts.GetHash() 1059 1060 content := b.getPostingsBuilder() 1061 name := b.getPostingsBuilder() 1062 shardBuilder := newShardBuilderWithPostings(content, name) 1063 if err := shardBuilder.setRepository(&desc); err != nil { 1064 return nil, err 1065 } 1066 shardBuilder.IndexTime = b.indexTime 1067 shardBuilder.ID = b.id 1068 return shardBuilder, nil 1069} 1070 1071func (b *Builder) writeShard(fn string, ib *ShardBuilder) (*finishedShard, error) { 1072 dir := filepath.Dir(fn) 1073 if err := os.MkdirAll(dir, 0o700); err != nil { 1074 return nil, err 1075 } 1076 1077 f, err := os.CreateTemp(dir, filepath.Base(fn)+".*.tmp") 1078 if err != nil { 1079 return nil, err 1080 } 1081 if runtime.GOOS != "windows" { 1082 if err := f.Chmod(0o666 &^ umask); err != nil { 1083 return nil, err 1084 } 1085 } 1086 1087 defer f.Close() 1088 if err := ib.Write(f); err != nil { 1089 return nil, err 1090 } 1091 fi, err := f.Stat() 1092 if err != nil { 1093 return nil, err 1094 } 1095 if err := f.Close(); err != nil { 1096 return nil, err 1097 } 1098 1099 log.Printf("finished shard %s: %d index bytes (overhead %3.1f), %d files processed \n", 1100 fn, 1101 fi.Size(), 1102 float64(fi.Size())/float64(ib.ContentSize()+1), 1103 ib.NumFiles()) 1104 1105 return &finishedShard{f.Name(), fn}, nil 1106} 1107 1108type deltaBranchSetError struct { 1109 shardName string 1110 old, new []zoekt.RepositoryBranch 1111} 1112 1113func (e deltaBranchSetError) Error() string { 1114 return fmt.Sprintf("repository metadata in shard %q contains a different set of branch names than what was requested, which is unsupported in a delta shard build. old: %+v, new: %+v", e.shardName, e.old, e.new) 1115} 1116 1117type deltaIndexOptionsMismatchError struct { 1118 shardName string 1119 newOptions HashOptions 1120} 1121 1122func (e *deltaIndexOptionsMismatchError) Error() string { 1123 return fmt.Sprintf("one or more index options for shard %q do not match Builder's index options. These index option updates are incompatible with delta build. New index options: %+v", e.shardName, e.newOptions) 1124} 1125 1126// umask holds the Umask of the current process 1127var umask os.FileMode 1128 1129func init() { 1130 umask = os.FileMode(unix.Umask(0)) 1131 unix.Umask(int(umask)) 1132}