fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package index contains logic for building Zoekt indexes. NOTE: this package is not considered
16// part of the public API, and it is not recommended to rely on it in external code.
17package index
18
19import (
20 "crypto/sha1"
21 "flag"
22 "fmt"
23 "log"
24 "net/url"
25 "os"
26 "os/exec"
27 "path"
28 "path/filepath"
29 "reflect"
30 "runtime"
31 "runtime/pprof"
32 "sort"
33 "strconv"
34 "strings"
35 "sync"
36 "time"
37
38 "github.com/bmatcuk/doublestar"
39 "github.com/dustin/go-humanize"
40 "github.com/go-enry/go-enry/v2"
41 "github.com/rs/xid"
42 "golang.org/x/sys/unix"
43
44 "maps"
45
46 "github.com/sourcegraph/zoekt"
47 "github.com/sourcegraph/zoekt/internal/ctags"
48 "github.com/sourcegraph/zoekt/internal/tenant"
49)
50
51var DefaultDir = filepath.Join(os.Getenv("HOME"), ".zoekt")
52
53// Branch describes a single branch version.
54type Branch struct {
55 Name string
56 Version string
57}
58
59// Options sets options for the index building.
60type Options struct {
61 // IndexDir is a directory that holds *.zoekt index files.
62 IndexDir string
63
64 // ShardPrefixOverride sets the prefix for shards name
65 ShardPrefixOverride string
66
67 // SizeMax is the maximum file size
68 SizeMax int
69
70 // Parallelism is the maximum number of shards to index in parallel
71 Parallelism int
72
73 // ShardMax sets the maximum corpus size for a single shard
74 ShardMax int
75
76 // TrigramMax sets the maximum number of distinct trigrams per document.
77 TrigramMax int
78
79 // RepositoryDescription holds names and URLs for the repository.
80 RepositoryDescription zoekt.Repository
81
82 // SubRepositories is a path => sub repository map.
83 SubRepositories map[string]*zoekt.Repository
84
85 // DisableCTags disables the generation of ctags metadata.
86 DisableCTags bool
87
88 // CtagsPath is the path to the ctags binary to run, or empty
89 // if a valid binary couldn't be found.
90 CTagsPath string
91
92 // Same as CTagsPath but for scip-ctags
93 ScipCTagsPath string
94
95 // If set, ctags must succeed.
96 CTagsMustSucceed bool
97
98 // LargeFiles is a slice of glob patterns, including ** for any number
99 // of directories, where matching file paths should be indexed
100 // regardless of their size. The full pattern syntax is here:
101 // https://github.com/bmatcuk/doublestar/tree/v1#patterns.
102 LargeFiles []string
103
104 // IsDelta is true if this run contains only the changed documents since the
105 // last run.
106 IsDelta bool
107
108 // changedOrRemovedFiles is a list of file paths that have been changed or removed
109 // since the last indexing job for this repository. These files will be tombstoned
110 // in the older shards for this repository.
111 changedOrRemovedFiles []string
112
113 LanguageMap ctags.LanguageMap
114
115 // ShardMerging is true if builder should respect compound shards. This is a
116 // Sourcegraph specific option.
117 ShardMerging bool
118
119 // HeapProfileTriggerBytes is the heap allocation in bytes that will trigger a memory profile. If 0, no memory profile
120 // will be triggered. Note this trigger looks at total heap allocation (which includes both inuse and garbage objects).
121 //
122 // Profiles will be written to files named `index-memory.prof.n` in the index directory. No more than 10 files are written.
123 //
124 // Note: heap checking is "best effort", and it's possible for the process to OOM without triggering the heap profile.
125 HeapProfileTriggerBytes uint64
126}
127
128// HashOptions contains only the options in Options that upon modification leads to IndexState of IndexStateMismatch during the next index building.
129type HashOptions struct {
130 sizeMax int
131 disableCTags bool
132 ctagsPath string
133 cTagsMustSucceed bool
134 largeFiles []string
135}
136
137func (o *Options) HashOptions() HashOptions {
138 return HashOptions{
139 sizeMax: o.SizeMax,
140 disableCTags: o.DisableCTags,
141 ctagsPath: o.CTagsPath,
142 cTagsMustSucceed: o.CTagsMustSucceed,
143 largeFiles: o.LargeFiles,
144 }
145}
146
147func (o *Options) GetHash() string {
148 h := o.HashOptions()
149 hasher := sha1.New()
150
151 hasher.Write([]byte(h.ctagsPath))
152 hasher.Write(fmt.Appendf(nil, "%t", h.cTagsMustSucceed))
153 hasher.Write(fmt.Appendf(nil, "%d", h.sizeMax))
154 hasher.Write(fmt.Appendf(nil, "%q", h.largeFiles))
155 hasher.Write(fmt.Appendf(nil, "%t", h.disableCTags))
156
157 return fmt.Sprintf("%x", hasher.Sum(nil))
158}
159
160type largeFilesFlag struct{ *Options }
161
162func (f largeFilesFlag) String() string {
163 // From flag.Value documentation:
164 //
165 // The flag package may call the String method with a zero-valued receiver,
166 // such as a nil pointer.
167 if f.Options == nil {
168 return ""
169 }
170 s := append([]string{""}, f.LargeFiles...)
171 return strings.Join(s, "-large_file ")
172}
173
174func (f largeFilesFlag) Set(value string) error {
175 f.LargeFiles = append(f.LargeFiles, value)
176 return nil
177}
178
179// Flags adds flags for build options to fs. It is the "inverse" of Args.
180func (o *Options) Flags(fs *flag.FlagSet) {
181 x := *o
182 x.SetDefaults()
183 fs.IntVar(&o.SizeMax, "file_limit", x.SizeMax, "maximum file size")
184 fs.IntVar(&o.TrigramMax, "max_trigram_count", x.TrigramMax, "maximum number of trigrams per document")
185 fs.IntVar(&o.ShardMax, "shard_limit", x.ShardMax, "maximum corpus size for a shard")
186 fs.IntVar(&o.Parallelism, "parallelism", x.Parallelism, "maximum number of parallel indexing processes.")
187 fs.StringVar(&o.IndexDir, "index", x.IndexDir, "directory for search indices")
188 fs.StringVar(&o.ShardPrefixOverride, "shard_prefix_override", x.ShardPrefixOverride, "prefix for shard name")
189 fs.BoolVar(&o.CTagsMustSucceed, "require_ctags", x.CTagsMustSucceed, "If set, ctags calls must succeed.")
190 fs.Var(largeFilesFlag{o}, "large_file", "A glob pattern where matching files are to be index regardless of their size. You can add multiple patterns by setting this more than once.")
191
192 // Sourcegraph specific
193 fs.BoolVar(&o.DisableCTags, "disable_ctags", x.DisableCTags, "If set, ctags will not be called.")
194 fs.BoolVar(&o.ShardMerging, "shard_merging", x.ShardMerging, "If set, builder will respect compound shards.")
195}
196
197// Args generates command line arguments for o. It is the "inverse" of Flags.
198func (o *Options) Args() []string {
199 var args []string
200
201 if o.SizeMax != 0 {
202 args = append(args, "-file_limit", strconv.Itoa(o.SizeMax))
203 }
204
205 if o.TrigramMax != 0 {
206 args = append(args, "-max_trigram_count", strconv.Itoa(o.TrigramMax))
207 }
208
209 if o.ShardMax != 0 {
210 args = append(args, "-shard_limit", strconv.Itoa(o.ShardMax))
211 }
212
213 if o.Parallelism != 0 {
214 args = append(args, "-parallelism", strconv.Itoa(o.Parallelism))
215 }
216
217 if o.IndexDir != "" {
218 args = append(args, "-index", o.IndexDir)
219 }
220
221 if o.ShardPrefixOverride != "" {
222 args = append(args, "-shard_prefix_override", o.ShardPrefixOverride)
223 }
224
225 if o.CTagsMustSucceed {
226 args = append(args, "-require_ctags")
227 }
228
229 for _, a := range o.LargeFiles {
230 args = append(args, "-large_file", a)
231 }
232
233 // Sourcegraph specific
234 if o.DisableCTags {
235 args = append(args, "-disable_ctags")
236 }
237
238 if o.ShardMerging {
239 args = append(args, "-shard_merging")
240 }
241
242 return args
243}
244
245// Builder manages (parallel) creation of uniformly sized shards. The
246// builder buffers up documents until it collects enough documents and
247// then builds a shard and writes.
248type Builder struct {
249 opts Options
250 throttle chan int
251
252 nextShardNum int
253 todo []*Document
254 docChecker DocChecker
255 size int
256
257 parserBins ctags.ParserBinMap
258 building sync.WaitGroup
259
260 errMu sync.Mutex
261 buildError error
262
263 // temp name => final name for finished shards. We only rename
264 // them once all shards succeed to avoid Frankstein corpuses.
265 finishedShards map[string]string
266
267 // indexTime is set by tests for doing reproducible builds.
268 indexTime time.Time
269
270 // heapProfileMu is used to ensure that only one memory profile is written at a time
271 heapProfileMu sync.Mutex
272 heapProfileNum int
273
274 // a sortable 20 chars long id.
275 id string
276
277 finishCalled bool
278
279 // postingsPool reuses postingsBuilder instances across shard builds,
280 // retaining their map and slice allocations to avoid repeated
281 // memclr/madvise overhead.
282 postingsPool sync.Pool
283}
284
285type finishedShard struct {
286 temp, final string
287}
288
289func checkCTags() string {
290 if ctags := os.Getenv("CTAGS_COMMAND"); ctags != "" {
291 return ctags
292 }
293
294 if ctags, err := exec.LookPath("universal-ctags"); err == nil {
295 return ctags
296 }
297
298 return ""
299}
300
301func checkScipCTags() string {
302 if ctags := os.Getenv("SCIP_CTAGS_COMMAND"); ctags != "" {
303 return ctags
304 }
305
306 if ctags, err := exec.LookPath("scip-ctags"); err == nil {
307 return ctags
308 }
309
310 return ""
311}
312
313// SetDefaults sets reasonable default options.
314func (o *Options) SetDefaults() {
315 if o.CTagsPath == "" && !o.DisableCTags {
316 o.CTagsPath = checkCTags()
317 }
318
319 if o.ScipCTagsPath == "" && !o.DisableCTags {
320 o.ScipCTagsPath = checkScipCTags()
321 }
322
323 if o.Parallelism == 0 {
324 o.Parallelism = 4
325 }
326 if o.SizeMax == 0 {
327 o.SizeMax = 2 << 20
328 }
329 if o.ShardMax == 0 {
330 o.ShardMax = 100 << 20
331 }
332 if o.TrigramMax == 0 {
333 o.TrigramMax = 20000
334 }
335
336 if o.RepositoryDescription.Name == "" && o.RepositoryDescription.URL != "" {
337 parsed, _ := url.Parse(o.RepositoryDescription.URL)
338 if parsed != nil {
339 o.RepositoryDescription.Name = filepath.Join(parsed.Host, parsed.Path)
340 }
341 }
342}
343
344// ShardName returns the name the given index shard.
345func (o *Options) shardName(n int) string {
346 return o.shardNameVersion(IndexFormatVersion, n)
347}
348
349func (o *Options) shardNameVersion(version, n int) string {
350 prefix := o.ShardPrefixOverride // ShardPrefixOverride takes precedence to support custom shard naming strategies
351
352 if prefix == "" {
353 // Sourcegraph specific: We use IDs in shard names on multi-tenant
354 // instances to prevent conflicts.
355 if tenant.UseIDBasedShardNames() {
356 prefix = fmt.Sprintf("%09d_%09d", o.RepositoryDescription.TenantID, o.RepositoryDescription.ID)
357 } else {
358 prefix = o.RepositoryDescription.Name
359 }
360 }
361
362 return shardName(o.IndexDir, prefix, version, n)
363}
364
365type IndexState string
366
367const (
368 IndexStateMissing IndexState = "missing"
369 IndexStateCorrupt IndexState = "corrupt"
370 IndexStateVersion IndexState = "version-mismatch"
371 IndexStateOption IndexState = "option-mismatch"
372 IndexStateMeta IndexState = "meta-mismatch"
373 IndexStateContent IndexState = "content-mismatch"
374 IndexStateEqual IndexState = "equal"
375)
376
377var readVersions = []struct {
378 IndexFormatVersion int
379 FeatureVersion int
380}{{
381 IndexFormatVersion: IndexFormatVersion,
382 FeatureVersion: FeatureVersion,
383}, {
384 IndexFormatVersion: NextIndexFormatVersion,
385 FeatureVersion: FeatureVersion,
386}}
387
388// IncrementalSkipIndexing returns true if the index present on disk matches
389// the build options.
390func (o *Options) IncrementalSkipIndexing() bool {
391 state, _ := o.IndexState()
392 return state == IndexStateEqual
393}
394
395// IndexState checks how the index present on disk compares to the build
396// options and returns the IndexState and the name of the first shard.
397func (o *Options) IndexState() (IndexState, string) {
398 // Open the latest version we support that is on disk.
399 fn := o.findShard()
400 if fn == "" {
401 return IndexStateMissing, fn
402 }
403
404 repos, index, err := ReadMetadataPathAlive(fn)
405 if os.IsNotExist(err) {
406 return IndexStateMissing, fn
407 } else if err != nil {
408 return IndexStateCorrupt, fn
409 }
410
411 for _, v := range readVersions {
412 if v.IndexFormatVersion == index.IndexFormatVersion && v.FeatureVersion != index.IndexFeatureVersion {
413 return IndexStateVersion, fn
414 }
415 }
416
417 var repo *zoekt.Repository
418 for _, cand := range repos {
419 if cand.Name == o.RepositoryDescription.Name {
420 repo = cand
421 break
422 }
423 }
424
425 if repo == nil {
426 return IndexStateCorrupt, fn
427 }
428
429 if repo.IndexOptions != o.GetHash() {
430 return IndexStateOption, fn
431 }
432
433 if !reflect.DeepEqual(repo.Branches, o.RepositoryDescription.Branches) {
434 return IndexStateContent, fn
435 }
436
437 // We can mutate repo since it lives in the scope of this function call.
438 if updated, err := repo.MergeMutable(&o.RepositoryDescription); err != nil {
439 // non-nil err means we are trying to update an immutable field =>
440 // reindex content.
441 log.Printf("warn: immutable field changed, requires re-index: %s", err)
442 return IndexStateContent, fn
443 } else if updated {
444 return IndexStateMeta, fn
445 }
446
447 return IndexStateEqual, fn
448}
449
450// FindRepositoryMetadata returns the index metadata for the repository
451// specified in the options. 'ok' is false if the repository's metadata
452// couldn't be found or if an error occurred.
453func (o *Options) FindRepositoryMetadata() (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
454 shard := o.findShard()
455 if shard == "" {
456 return nil, nil, false, nil
457 }
458
459 repositories, metadata, err := ReadMetadataPathAlive(shard)
460 if err != nil {
461 return nil, nil, false, fmt.Errorf("reading metadata for shard %q: %w", shard, err)
462 }
463
464 ID := o.RepositoryDescription.ID
465 for _, r := range repositories {
466 // compound shards contain multiple repositories, so we
467 // have to pick only the one we're looking for
468 if r.ID == ID {
469 return r, metadata, true, nil
470 }
471 }
472
473 // If we're here, then we're somehow in a state where we found a matching
474 // shard that's missing the repository metadata we're looking for. This
475 // should never happen.
476 name := o.RepositoryDescription.Name
477 return nil, nil, false, fmt.Errorf("matching shard %q doesn't contain metadata for repo id %d (%q)", shard, ID, name)
478}
479
480func (o *Options) findShard() string {
481 for _, v := range readVersions {
482 fn := o.shardNameVersion(v.IndexFormatVersion, 0)
483 if _, err := os.Stat(fn); err == nil {
484 return fn
485 }
486 }
487
488 // Brute force finding the shard in compound shards. We should only hit this
489 // code path for repositories that don't exist yet or are in compound shards.
490 return o.findCompoundShard()
491}
492
493func (o *Options) findCompoundShard() string {
494 compoundShards, err := filepath.Glob(path.Join(o.IndexDir, "compound-*.zoekt"))
495 if err != nil {
496 return ""
497 }
498 for _, fn := range compoundShards {
499 if containsRepo(fn, o.RepositoryDescription.ID) {
500 return fn
501 }
502 }
503
504 return ""
505}
506
507func (o *Options) FindAllShards() []string {
508 for _, v := range readVersions {
509 fn := o.shardNameVersion(v.IndexFormatVersion, 0)
510 if _, err := os.Stat(fn); err == nil {
511 shards := []string{fn}
512 for i := 1; ; i++ {
513 fn := o.shardNameVersion(v.IndexFormatVersion, i)
514 if _, err := os.Stat(fn); err != nil {
515 return shards
516 }
517 shards = append(shards, fn)
518 }
519 }
520 }
521
522 // lazily fallback to findShard which will look for a compound shard.
523 if fn := o.findShard(); fn != "" {
524 return []string{fn}
525 }
526
527 return nil
528}
529
530// IgnoreSizeMax determines whether the max size should be ignored.
531func (o *Options) IgnoreSizeMax(name string) bool {
532 // A pattern match will override preceding pattern matches.
533 for i := len(o.LargeFiles) - 1; i >= 0; i-- {
534 pattern := strings.TrimSpace(o.LargeFiles[i])
535 negated, validatedPattern := checkIsNegatePattern(pattern)
536
537 if m, _ := doublestar.PathMatch(validatedPattern, name); m {
538 if negated {
539 return false
540 } else {
541 return true
542 }
543 }
544 }
545
546 return false
547}
548
549func checkIsNegatePattern(pattern string) (bool, string) {
550 negate := "!"
551
552 // if negated then strip prefix meta character which identifies negated filter pattern
553 if strings.HasPrefix(pattern, negate) {
554 return true, pattern[len(negate):]
555 }
556
557 return false, pattern
558}
559
560// NewBuilder creates a new Builder instance.
561func NewBuilder(opts Options) (*Builder, error) {
562 opts.SetDefaults()
563 if opts.RepositoryDescription.Name == "" {
564 return nil, fmt.Errorf("builder: must set Name")
565 }
566
567 b := &Builder{
568 opts: opts,
569 throttle: make(chan int, opts.Parallelism),
570 finishedShards: map[string]string{},
571 }
572
573 parserBins, err := ctags.NewParserBinMap(
574 b.opts.CTagsPath,
575 b.opts.ScipCTagsPath,
576 opts.LanguageMap,
577 b.opts.CTagsMustSucceed,
578 )
579 if err != nil {
580 return nil, err
581 }
582
583 b.parserBins = parserBins
584
585 if opts.IsDelta {
586 // Delta shards build on top of previously existing shards.
587 // As a consequence, the shardNum for delta shards starts from
588 // the number following the most recently generated shard - not 0.
589 //
590 // Using this numbering scheme allows all the shards to be
591 // discovered as a set.
592 shards := b.opts.FindAllShards()
593 b.nextShardNum = len(shards) // shards are zero indexed, so len() provides the next number after the last one
594 }
595
596 if _, err := b.newShardBuilder(); err != nil {
597 return nil, err
598 }
599
600 now := time.Now()
601 b.indexTime = now
602 b.id = xid.NewWithTime(now).String()
603
604 return b, nil
605}
606
607// AddFile is a convenience wrapper for the Add method
608func (b *Builder) AddFile(name string, content []byte) error {
609 return b.Add(Document{Name: name, Content: content})
610}
611
612func (b *Builder) Add(doc Document) error {
613 if b.finishCalled {
614 return nil
615 }
616
617 allowLargeFile := b.opts.IgnoreSizeMax(doc.Name)
618 if len(doc.Content) > b.opts.SizeMax && !allowLargeFile {
619 // We could pass the document on to the shardbuilder, but if
620 // we pass through a part of the source tree with binary/large
621 // files, the corresponding shard would be mostly empty, so
622 // insert a reason here too.
623 doc.SkipReason = SkipReasonTooLarge
624 } else if skip := b.docChecker.Check(doc.Content, b.opts.TrigramMax, allowLargeFile); skip != SkipReasonNone {
625 doc.SkipReason = skip
626 }
627
628 b.todo = append(b.todo, &doc)
629
630 if doc.SkipReason == SkipReasonNone {
631 b.size += len(doc.Name) + len(doc.Content)
632 } else {
633 b.size += len(doc.Name)
634 // Drop the content if we are skipping the document. Skipped content is not counted towards the
635 // shard size limit, so otherwise we might buffer too much data in memory before flushing.
636 doc.Content = nil
637 }
638
639 if b.size > b.opts.ShardMax {
640 return b.flush()
641 }
642
643 return nil
644}
645
646// MarkFileAsChangedOrRemoved indicates that the file specified by the given path
647// has been changed or removed since the last indexing job for this repository.
648//
649// If this build is a delta build, these files will be tombstoned in the older shards for this repository.
650func (b *Builder) MarkFileAsChangedOrRemoved(path string) {
651 b.opts.changedOrRemovedFiles = append(b.opts.changedOrRemovedFiles, path)
652}
653
654// Finish creates a last shard from the buffered documents, and clears
655// stale shards from previous runs. This should always be called, also
656// in failure cases, to ensure cleanup.
657//
658// It is safe to call Finish() multiple times.
659func (b *Builder) Finish() error {
660 if b.finishCalled {
661 return b.buildError
662 }
663
664 b.finishCalled = true
665
666 b.flush()
667 b.building.Wait()
668
669 if b.buildError != nil {
670 for tmp := range b.finishedShards {
671 log.Printf("Builder.Finish %s", tmp)
672 os.Remove(tmp)
673 }
674 b.finishedShards = map[string]string{}
675 return b.buildError
676 }
677
678 // map of temporary -> final names for all updated shards + shard metadata files
679 artifactPaths := make(map[string]string)
680 maps.Copy(artifactPaths, b.finishedShards)
681
682 oldShards := b.opts.FindAllShards()
683
684 if b.opts.IsDelta {
685 // Delta shard builds need to update FileTombstone and branch commit information for all
686 // existing shards
687 for _, shard := range oldShards {
688 repositories, _, err := ReadMetadataPathAlive(shard)
689 if err != nil {
690 return fmt.Errorf("reading metadata from shard %q: %w", shard, err)
691 }
692
693 if len(repositories) > 1 {
694 return fmt.Errorf("delta shard builds don't support repositories contained in compound shards (shard %q)", shard)
695 }
696
697 if len(repositories) == 0 {
698 return fmt.Errorf("failed to update repository metadata for shard %q - shard contains no repositories", shard)
699 }
700
701 repository := repositories[0]
702 if repository.ID != b.opts.RepositoryDescription.ID {
703 return fmt.Errorf("shard %q doesn't contain repository ID %d (%q)", shard, b.opts.RepositoryDescription.ID, b.opts.RepositoryDescription.Name)
704 }
705
706 if len(b.opts.changedOrRemovedFiles) > 0 && repository.FileTombstones == nil {
707 repository.FileTombstones = make(map[string]struct{}, len(b.opts.changedOrRemovedFiles))
708 }
709
710 for _, f := range b.opts.changedOrRemovedFiles {
711 repository.FileTombstones[f] = struct{}{}
712 }
713
714 if !BranchNamesEqual(repository.Branches, b.opts.RepositoryDescription.Branches) {
715 return deltaBranchSetError{
716 shardName: shard,
717 old: repository.Branches,
718 new: b.opts.RepositoryDescription.Branches,
719 }
720 }
721
722 if b.opts.GetHash() != repository.IndexOptions {
723 return &deltaIndexOptionsMismatchError{
724 shardName: shard,
725 newOptions: b.opts.HashOptions(),
726 }
727 }
728
729 repository.Branches = b.opts.RepositoryDescription.Branches
730
731 repository.LatestCommitDate = b.opts.RepositoryDescription.LatestCommitDate
732
733 repository.Metadata = b.opts.RepositoryDescription.Metadata
734
735 tempPath, finalPath, err := JsonMarshalRepoMetaTemp(shard, repository)
736 if err != nil {
737 return fmt.Errorf("writing repository metadta for shard %q: %w", shard, err)
738 }
739
740 artifactPaths[tempPath] = finalPath
741 }
742 }
743
744 // We mark finished shards as empty when we successfully finish. Return now
745 // to allow call sites to call Finish idempotently.
746 if len(artifactPaths) == 0 {
747 return b.buildError
748 }
749
750 // Collect a map of the old shards on disk. For each new shard we replace we
751 // delete it from toDelete. Anything remaining in toDelete will be removed
752 // after we have renamed everything into place.
753
754 var toDelete map[string]struct{}
755 if !b.opts.IsDelta {
756 // Non-delta shard builds delete all existing shards before they write out
757 // new ones.
758 // By contrast, delta shard builds work by stacking changes on top of existing shards.
759 // So, we skip populating the toDelete map if we're building delta shards.
760
761 toDelete = make(map[string]struct{})
762 for _, name := range oldShards {
763 paths, err := IndexFilePaths(name)
764 if err != nil {
765 b.buildError = fmt.Errorf("failed to find old paths for %s: %w", name, err)
766 }
767 for _, p := range paths {
768 toDelete[p] = struct{}{}
769 }
770 }
771 }
772
773 for tmp, final := range artifactPaths {
774 if err := os.Rename(tmp, final); err != nil {
775 b.buildError = err
776 continue
777 }
778
779 delete(toDelete, final)
780 }
781
782 b.finishedShards = map[string]string{}
783
784 for p := range toDelete {
785 // Don't delete compound shards, set tombstones instead.
786 if b.opts.ShardMerging && strings.HasPrefix(filepath.Base(p), "compound-") {
787 if !strings.HasSuffix(p, ".zoekt") {
788 continue
789 }
790 err := SetTombstone(p, b.opts.RepositoryDescription.ID)
791 b.buildError = err
792 continue
793 }
794 log.Printf("removing old shard file: %s", p)
795 if err := os.Remove(p); err != nil {
796 b.buildError = err
797 }
798 }
799
800 return b.buildError
801}
802
803// BranchNamesEqual compares the given zoekt.RepositoryBranch slices, and returns true
804// iff both slices specify the same set of branch names in the same order.
805func BranchNamesEqual(a, b []zoekt.RepositoryBranch) bool {
806 if len(a) != len(b) {
807 return false
808 }
809
810 for i := range a {
811 x, y := a[i], b[i]
812 if x.Name != y.Name {
813 return false
814 }
815 }
816
817 return true
818}
819
820func (b *Builder) flush() error {
821 todo := b.todo
822 b.todo = nil
823 b.size = 0
824 b.errMu.Lock()
825 defer b.errMu.Unlock()
826 if b.buildError != nil {
827 return b.buildError
828 }
829
830 hasShard := b.nextShardNum > 0
831 if len(todo) == 0 && hasShard {
832 return nil
833 }
834
835 shard := b.nextShardNum
836 b.nextShardNum++
837
838 if b.opts.Parallelism > 1 {
839 b.building.Add(1)
840 b.throttle <- 1
841 go func() {
842 done, err := b.buildShard(todo, shard)
843 <-b.throttle
844
845 b.errMu.Lock()
846 defer b.errMu.Unlock()
847 if err != nil && b.buildError == nil {
848 b.buildError = err
849 }
850 if err == nil {
851 b.finishedShards[done.temp] = done.final
852 }
853 b.building.Done()
854 }()
855 } else {
856 // No goroutines when we're not parallel. This
857 // simplifies memory profiling.
858 done, err := b.buildShard(todo, shard)
859 b.buildError = err
860 if err == nil {
861 b.finishedShards[done.temp] = done.final
862 }
863
864 return b.buildError
865 }
866
867 return nil
868}
869
870// map [0,inf) to [0,1) monotonically
871func squashRange(j int) float64 {
872 x := float64(j)
873 return x / (1 + x)
874}
875
876type rankedDoc struct {
877 *Document
878 rank []float64
879}
880
881// rank returns a vector of scores which is used at index-time to sort documents
882// before writing them to disk. The order of documents in the shard is important
883// at query time, because earlier documents receive a boost at query time and
884// have a higher chance of being searched before limits kick in.
885func rank(d *Document, origIdx int) []float64 {
886 skipped := 0.0
887 if d.SkipReason != SkipReasonNone {
888 skipped = 1.0
889 }
890
891 generated := 0.0
892 if enry.IsGenerated(d.Name, d.Content) {
893 generated = 1.0
894 }
895
896 vendor := 0.0
897 if enry.IsVendor(d.Name) {
898 vendor = 1.0
899 }
900
901 test := 0.0
902 if enry.IsTest(d.Name) {
903 test = 1.0
904 }
905
906 // Smaller is earlier (=better).
907 return []float64{
908 // Always place skipped docs last
909 skipped,
910
911 // Prefer docs that are not generated
912 generated,
913
914 // Prefer docs that are not vendored
915 vendor,
916
917 // Prefer docs that are not tests
918 test,
919
920 // With short names
921 squashRange(len(d.Name)),
922
923 // With many symbols
924 1.0 - squashRange(len(d.Symbols)),
925
926 // With short content
927 squashRange(len(d.Content)),
928
929 // That is present is as many branches as possible
930 1.0 - squashRange(len(d.Branches)),
931
932 // Preserve original ordering.
933 squashRange(origIdx),
934 }
935}
936
937func sortDocuments(todo []*Document) {
938 rs := make([]rankedDoc, 0, len(todo))
939 for i, t := range todo {
940 rd := rankedDoc{t, rank(t, i)}
941 rs = append(rs, rd)
942 }
943 sort.Slice(rs, func(i, j int) bool {
944 r1 := rs[i].rank
945 r2 := rs[j].rank
946 for i := range r1 {
947 if r1[i] < r2[i] {
948 return true
949 }
950 if r1[i] > r2[i] {
951 return false
952 }
953 }
954
955 return false
956 })
957 for i := range todo {
958 todo[i] = rs[i].Document
959 }
960}
961
962func (b *Builder) buildShard(todo []*Document, nextShardNum int) (*finishedShard, error) {
963 if !b.opts.DisableCTags && (b.opts.CTagsPath != "" || b.opts.ScipCTagsPath != "") {
964 err := parseSymbols(todo, b.opts.LanguageMap, b.parserBins)
965 if b.opts.CTagsMustSucceed && err != nil {
966 return nil, err
967 }
968 if err != nil {
969 log.Printf("ignoring universal:%s or scip:%s error: %v", b.opts.CTagsPath, b.opts.ScipCTagsPath, err)
970 }
971 }
972
973 name := b.opts.shardName(nextShardNum)
974
975 shardBuilder, err := b.newShardBuilder()
976 if err != nil {
977 return nil, err
978 }
979
980 sortDocuments(todo)
981
982 for idx, t := range todo {
983 if err := shardBuilder.Add(*t); err != nil {
984 return nil, err
985 }
986
987 if idx%10_000 == 0 {
988 b.CheckMemoryUsage()
989 }
990 }
991
992 result, err := b.writeShard(name, shardBuilder)
993 b.returnPostingsBuilders(shardBuilder)
994 return result, err
995}
996
997// CheckMemoryUsage checks the memory usage of the process and writes a memory profile if the heap usage exceeds the
998// configured threshold. NOTE: this method is expensive and should only be used for debugging.
999func (b *Builder) CheckMemoryUsage() {
1000 // Don't check memory if heap profiling is disabled, or we've already written 10 profiles
1001 if b.opts.HeapProfileTriggerBytes <= 0 || b.heapProfileNum >= 10 {
1002 return
1003 }
1004
1005 var m runtime.MemStats
1006 runtime.ReadMemStats(&m)
1007
1008 if m.HeapAlloc > b.opts.HeapProfileTriggerBytes && b.heapProfileMu.TryLock() {
1009 defer b.heapProfileMu.Unlock()
1010
1011 log.Printf("writing memory profile, allocated heap: %s", humanize.Bytes(m.HeapAlloc))
1012 name := filepath.Join(b.opts.IndexDir, fmt.Sprintf("indexmemory.prof.%d", b.heapProfileNum))
1013 f, err := os.Create(name)
1014 if err != nil {
1015 log.Printf("failed to create memory profile file: %v", err)
1016 return
1017 }
1018
1019 err = pprof.WriteHeapProfile(f)
1020 if err != nil {
1021 log.Printf("failed to write memory profile: %v", err)
1022 }
1023
1024 b.heapProfileNum++
1025 }
1026}
1027
1028func (b *Builder) getPostingsBuilder() *postingsBuilder {
1029 if pb, ok := b.postingsPool.Get().(*postingsBuilder); ok {
1030 pb.reset()
1031 return pb
1032 }
1033 return newPostingsBuilder(b.opts.ShardMax)
1034}
1035
1036// returnPostingsBuilders returns both postings builders from sb to the
1037// pool and nils the fields so any subsequent misuse crashes obviously.
1038func (b *Builder) returnPostingsBuilders(sb *ShardBuilder) {
1039 if sb.contentPostings != nil {
1040 b.postingsPool.Put(sb.contentPostings)
1041 sb.contentPostings = nil
1042 }
1043 if sb.namePostings != nil {
1044 b.postingsPool.Put(sb.namePostings)
1045 sb.namePostings = nil
1046 }
1047}
1048
1049func (b *Builder) newShardBuilder() (*ShardBuilder, error) {
1050 desc := b.opts.RepositoryDescription
1051 desc.HasSymbols = !b.opts.DisableCTags && b.opts.CTagsPath != ""
1052 desc.SubRepoMap = b.opts.SubRepositories
1053 desc.IndexOptions = b.opts.GetHash()
1054
1055 content := b.getPostingsBuilder()
1056 name := b.getPostingsBuilder()
1057 shardBuilder := newShardBuilderWithPostings(content, name)
1058 if err := shardBuilder.setRepository(&desc); err != nil {
1059 return nil, err
1060 }
1061 shardBuilder.IndexTime = b.indexTime
1062 shardBuilder.ID = b.id
1063 return shardBuilder, nil
1064}
1065
1066func (b *Builder) writeShard(fn string, ib *ShardBuilder) (*finishedShard, error) {
1067 dir := filepath.Dir(fn)
1068 if err := os.MkdirAll(dir, 0o700); err != nil {
1069 return nil, err
1070 }
1071
1072 f, err := os.CreateTemp(dir, filepath.Base(fn)+".*.tmp")
1073 if err != nil {
1074 return nil, err
1075 }
1076 if runtime.GOOS != "windows" {
1077 if err := f.Chmod(0o666 &^ umask); err != nil {
1078 return nil, err
1079 }
1080 }
1081
1082 defer f.Close()
1083 if err := ib.Write(f); err != nil {
1084 return nil, err
1085 }
1086 fi, err := f.Stat()
1087 if err != nil {
1088 return nil, err
1089 }
1090 if err := f.Close(); err != nil {
1091 return nil, err
1092 }
1093
1094 log.Printf("finished shard %s: %d index bytes (overhead %3.1f), %d files processed \n",
1095 fn,
1096 fi.Size(),
1097 float64(fi.Size())/float64(ib.ContentSize()+1),
1098 ib.NumFiles())
1099
1100 return &finishedShard{f.Name(), fn}, nil
1101}
1102
1103type deltaBranchSetError struct {
1104 shardName string
1105 old, new []zoekt.RepositoryBranch
1106}
1107
1108func (e deltaBranchSetError) Error() string {
1109 return fmt.Sprintf("repository metadata in shard %q contains a different set of branch names than what was requested, which is unsupported in a delta shard build. old: %+v, new: %+v", e.shardName, e.old, e.new)
1110}
1111
1112type deltaIndexOptionsMismatchError struct {
1113 shardName string
1114 newOptions HashOptions
1115}
1116
1117func (e *deltaIndexOptionsMismatchError) Error() string {
1118 return fmt.Sprintf("one or more index options for shard %q do not match Builder's index options. These index option updates are incompatible with delta build. New index options: %+v", e.shardName, e.newOptions)
1119}
1120
1121// umask holds the Umask of the current process
1122var umask os.FileMode
1123
1124func init() {
1125 umask = os.FileMode(unix.Umask(0))
1126 unix.Umask(int(umask))
1127}