fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package index contains logic for building Zoekt indexes. NOTE: this package is not considered
16// part of the public API, and it is not recommended to rely on it in external code.
17package index
18
19import (
20 "crypto/sha1"
21 "flag"
22 "fmt"
23 "log"
24 "net/url"
25 "os"
26 "os/exec"
27 "path"
28 "path/filepath"
29 "reflect"
30 "runtime"
31 "runtime/pprof"
32 "sort"
33 "strconv"
34 "strings"
35 "sync"
36 "time"
37
38 "github.com/bmatcuk/doublestar"
39 "github.com/dustin/go-humanize"
40 "github.com/rs/xid"
41 "golang.org/x/sys/unix"
42
43 "maps"
44
45 "github.com/sourcegraph/zoekt"
46 "github.com/sourcegraph/zoekt/internal/ctags"
47 "github.com/sourcegraph/zoekt/internal/tenant"
48)
49
50var DefaultDir = filepath.Join(os.Getenv("HOME"), ".zoekt")
51
52// Branch describes a single branch version.
53type Branch struct {
54 Name string
55 Version string
56}
57
58// Options sets options for the index building.
59type Options struct {
60 // IndexDir is a directory that holds *.zoekt index files.
61 IndexDir string
62
63 // ShardPrefixOverride sets the prefix for shards name
64 ShardPrefixOverride string
65
66 // SizeMax is the maximum file size
67 SizeMax int
68
69 // Parallelism is the maximum number of shards to index in parallel
70 Parallelism int
71
72 // ShardMax sets the maximum corpus size for a single shard
73 ShardMax int
74
75 // TrigramMax sets the maximum number of distinct trigrams per document.
76 TrigramMax int
77
78 // RepositoryDescription holds names and URLs for the repository.
79 RepositoryDescription zoekt.Repository
80
81 // SubRepositories is a path => sub repository map.
82 SubRepositories map[string]*zoekt.Repository
83
84 // DisableCTags disables the generation of ctags metadata.
85 DisableCTags bool
86
87 // CtagsPath is the path to the ctags binary to run, or empty
88 // if a valid binary couldn't be found.
89 CTagsPath string
90
91 // Same as CTagsPath but for scip-ctags
92 ScipCTagsPath string
93
94 // If set, ctags must succeed.
95 CTagsMustSucceed bool
96
97 // LargeFiles is a slice of glob patterns, including ** for any number
98 // of directories, where matching file paths should be indexed
99 // regardless of their size. The full pattern syntax is here:
100 // https://github.com/bmatcuk/doublestar/tree/v1#patterns.
101 LargeFiles []string
102
103 // IsDelta is true if this run contains only the changed documents since the
104 // last run.
105 IsDelta bool
106
107 // changedOrRemovedFiles is a list of file paths that have been changed or removed
108 // since the last indexing job for this repository. These files will be tombstoned
109 // in the older shards for this repository.
110 changedOrRemovedFiles []string
111
112 LanguageMap ctags.LanguageMap
113
114 // ShardMerging is true if builder should respect compound shards. This is a
115 // Sourcegraph specific option.
116 ShardMerging bool
117
118 // HeapProfileTriggerBytes is the heap allocation in bytes that will trigger a memory profile. If 0, no memory profile
119 // will be triggered. Note this trigger looks at total heap allocation (which includes both inuse and garbage objects).
120 //
121 // Profiles will be written to files named `index-memory.prof.n` in the index directory. No more than 10 files are written.
122 //
123 // Note: heap checking is "best effort", and it's possible for the process to OOM without triggering the heap profile.
124 HeapProfileTriggerBytes uint64
125}
126
127// HashOptions contains only the options in Options that upon modification leads to IndexState of IndexStateMismatch during the next index building.
128type HashOptions struct {
129 sizeMax int
130 disableCTags bool
131 ctagsPath string
132 cTagsMustSucceed bool
133 largeFiles []string
134}
135
136func (o *Options) HashOptions() HashOptions {
137 return HashOptions{
138 sizeMax: o.SizeMax,
139 disableCTags: o.DisableCTags,
140 ctagsPath: o.CTagsPath,
141 cTagsMustSucceed: o.CTagsMustSucceed,
142 largeFiles: o.LargeFiles,
143 }
144}
145
146func (o *Options) GetHash() string {
147 h := o.HashOptions()
148 hasher := sha1.New()
149
150 hasher.Write([]byte(h.ctagsPath))
151 hasher.Write(fmt.Appendf(nil, "%t", h.cTagsMustSucceed))
152 hasher.Write(fmt.Appendf(nil, "%d", h.sizeMax))
153 hasher.Write(fmt.Appendf(nil, "%q", h.largeFiles))
154 hasher.Write(fmt.Appendf(nil, "%t", h.disableCTags))
155
156 return fmt.Sprintf("%x", hasher.Sum(nil))
157}
158
159type largeFilesFlag struct{ *Options }
160
161func (f largeFilesFlag) String() string {
162 // From flag.Value documentation:
163 //
164 // The flag package may call the String method with a zero-valued receiver,
165 // such as a nil pointer.
166 if f.Options == nil {
167 return ""
168 }
169 s := append([]string{""}, f.LargeFiles...)
170 return strings.Join(s, "-large_file ")
171}
172
173func (f largeFilesFlag) Set(value string) error {
174 f.LargeFiles = append(f.LargeFiles, value)
175 return nil
176}
177
178// Flags adds flags for build options to fs. It is the "inverse" of Args.
179func (o *Options) Flags(fs *flag.FlagSet) {
180 x := *o
181 x.SetDefaults()
182 fs.IntVar(&o.SizeMax, "file_limit", x.SizeMax, "maximum file size")
183 fs.IntVar(&o.TrigramMax, "max_trigram_count", x.TrigramMax, "maximum number of trigrams per document")
184 fs.IntVar(&o.ShardMax, "shard_limit", x.ShardMax, "maximum corpus size for a shard")
185 fs.IntVar(&o.Parallelism, "parallelism", x.Parallelism, "maximum number of parallel indexing processes.")
186 fs.StringVar(&o.IndexDir, "index", x.IndexDir, "directory for search indices")
187 fs.StringVar(&o.ShardPrefixOverride, "shard_prefix_override", x.ShardPrefixOverride, "prefix for shard name")
188 fs.BoolVar(&o.CTagsMustSucceed, "require_ctags", x.CTagsMustSucceed, "If set, ctags calls must succeed.")
189 fs.Var(largeFilesFlag{o}, "large_file", "A glob pattern where matching files are to be index regardless of their size. You can add multiple patterns by setting this more than once.")
190
191 // Sourcegraph specific
192 fs.BoolVar(&o.DisableCTags, "disable_ctags", x.DisableCTags, "If set, ctags will not be called.")
193 fs.BoolVar(&o.ShardMerging, "shard_merging", x.ShardMerging, "If set, builder will respect compound shards.")
194}
195
196// Args generates command line arguments for o. It is the "inverse" of Flags.
197func (o *Options) Args() []string {
198 var args []string
199
200 if o.SizeMax != 0 {
201 args = append(args, "-file_limit", strconv.Itoa(o.SizeMax))
202 }
203
204 if o.TrigramMax != 0 {
205 args = append(args, "-max_trigram_count", strconv.Itoa(o.TrigramMax))
206 }
207
208 if o.ShardMax != 0 {
209 args = append(args, "-shard_limit", strconv.Itoa(o.ShardMax))
210 }
211
212 if o.Parallelism != 0 {
213 args = append(args, "-parallelism", strconv.Itoa(o.Parallelism))
214 }
215
216 if o.IndexDir != "" {
217 args = append(args, "-index", o.IndexDir)
218 }
219
220 if o.ShardPrefixOverride != "" {
221 args = append(args, "-shard_prefix_override", o.ShardPrefixOverride)
222 }
223
224 if o.CTagsMustSucceed {
225 args = append(args, "-require_ctags")
226 }
227
228 for _, a := range o.LargeFiles {
229 args = append(args, "-large_file", a)
230 }
231
232 // Sourcegraph specific
233 if o.DisableCTags {
234 args = append(args, "-disable_ctags")
235 }
236
237 if o.ShardMerging {
238 args = append(args, "-shard_merging")
239 }
240
241 return args
242}
243
244// Builder manages (parallel) creation of uniformly sized shards. The
245// builder buffers up documents until it collects enough documents and
246// then builds a shard and writes.
247type Builder struct {
248 opts Options
249 throttle chan int
250
251 nextShardNum int
252 todo []*Document
253 docChecker DocChecker
254 size int
255
256 parserBins ctags.ParserBinMap
257 building sync.WaitGroup
258
259 errMu sync.Mutex
260 buildError error
261
262 // temp name => final name for finished shards. We only rename
263 // them once all shards succeed to avoid Frankstein corpuses.
264 finishedShards map[string]string
265
266 // indexTime is set by tests for doing reproducible builds.
267 indexTime time.Time
268
269 // heapProfileMu is used to ensure that only one memory profile is written at a time
270 heapProfileMu sync.Mutex
271 heapProfileNum int
272
273 // a sortable 20 chars long id.
274 id string
275
276 finishCalled bool
277
278 // postingsPool reuses postingsBuilder instances across shard builds,
279 // retaining their map and slice allocations to avoid repeated
280 // memclr/madvise overhead.
281 postingsPool sync.Pool
282}
283
284type finishedShard struct {
285 temp, final string
286}
287
288func checkCTags() string {
289 if ctags := os.Getenv("CTAGS_COMMAND"); ctags != "" {
290 return ctags
291 }
292
293 if ctags, err := exec.LookPath("universal-ctags"); err == nil {
294 return ctags
295 }
296
297 return ""
298}
299
300func checkScipCTags() string {
301 if ctags := os.Getenv("SCIP_CTAGS_COMMAND"); ctags != "" {
302 return ctags
303 }
304
305 if ctags, err := exec.LookPath("scip-ctags"); err == nil {
306 return ctags
307 }
308
309 return ""
310}
311
312// SetDefaults sets reasonable default options.
313func (o *Options) SetDefaults() {
314 if o.CTagsPath == "" && !o.DisableCTags {
315 o.CTagsPath = checkCTags()
316 }
317
318 if o.ScipCTagsPath == "" && !o.DisableCTags {
319 o.ScipCTagsPath = checkScipCTags()
320 }
321
322 if o.Parallelism == 0 {
323 o.Parallelism = 4
324 }
325 if o.SizeMax == 0 {
326 o.SizeMax = 2 << 20
327 }
328 if o.ShardMax == 0 {
329 o.ShardMax = 100 << 20
330 }
331 if o.TrigramMax == 0 {
332 o.TrigramMax = 20000
333 }
334
335 if o.RepositoryDescription.Name == "" && o.RepositoryDescription.URL != "" {
336 parsed, _ := url.Parse(o.RepositoryDescription.URL)
337 if parsed != nil {
338 o.RepositoryDescription.Name = filepath.Join(parsed.Host, parsed.Path)
339 }
340 }
341}
342
343// ShardName returns the name the given index shard.
344func (o *Options) shardName(n int) string {
345 return o.shardNameVersion(IndexFormatVersion, n)
346}
347
348func (o *Options) shardNameVersion(version, n int) string {
349 prefix := o.ShardPrefixOverride // ShardPrefixOverride takes precedence to support custom shard naming strategies
350
351 if prefix == "" {
352 // Sourcegraph specific: We use IDs in shard names on multi-tenant
353 // instances to prevent conflicts.
354 if tenant.UseIDBasedShardNames() {
355 prefix = fmt.Sprintf("%09d_%09d", o.RepositoryDescription.TenantID, o.RepositoryDescription.ID)
356 } else {
357 prefix = o.RepositoryDescription.Name
358 }
359 }
360
361 return shardName(o.IndexDir, prefix, version, n)
362}
363
364type IndexState string
365
366const (
367 IndexStateMissing IndexState = "missing"
368 IndexStateCorrupt IndexState = "corrupt"
369 IndexStateVersion IndexState = "version-mismatch"
370 IndexStateOption IndexState = "option-mismatch"
371 IndexStateMeta IndexState = "meta-mismatch"
372 IndexStateContent IndexState = "content-mismatch"
373 IndexStateEqual IndexState = "equal"
374)
375
376var readVersions = []struct {
377 IndexFormatVersion int
378 FeatureVersion int
379}{{
380 IndexFormatVersion: IndexFormatVersion,
381 FeatureVersion: FeatureVersion,
382}, {
383 IndexFormatVersion: NextIndexFormatVersion,
384 FeatureVersion: FeatureVersion,
385}}
386
387// IncrementalSkipIndexing returns true if the index present on disk matches
388// the build options.
389func (o *Options) IncrementalSkipIndexing() bool {
390 state, _ := o.IndexState()
391 return state == IndexStateEqual
392}
393
394// IndexState checks how the index present on disk compares to the build
395// options and returns the IndexState and the name of the first shard.
396func (o *Options) IndexState() (IndexState, string) {
397 // Open the latest version we support that is on disk.
398 fn := o.findShard()
399 if fn == "" {
400 return IndexStateMissing, fn
401 }
402
403 repos, index, err := ReadMetadataPathAlive(fn)
404 if os.IsNotExist(err) {
405 return IndexStateMissing, fn
406 } else if err != nil {
407 return IndexStateCorrupt, fn
408 }
409
410 for _, v := range readVersions {
411 if v.IndexFormatVersion == index.IndexFormatVersion && v.FeatureVersion != index.IndexFeatureVersion {
412 return IndexStateVersion, fn
413 }
414 }
415
416 var repo *zoekt.Repository
417 for _, cand := range repos {
418 if cand.Name == o.RepositoryDescription.Name {
419 repo = cand
420 break
421 }
422 }
423
424 if repo == nil {
425 return IndexStateCorrupt, fn
426 }
427
428 if repo.IndexOptions != o.GetHash() {
429 return IndexStateOption, fn
430 }
431
432 if !reflect.DeepEqual(repo.Branches, o.RepositoryDescription.Branches) {
433 return IndexStateContent, fn
434 }
435
436 // We can mutate repo since it lives in the scope of this function call.
437 if updated, err := repo.MergeMutable(&o.RepositoryDescription); err != nil {
438 // non-nil err means we are trying to update an immutable field =>
439 // reindex content.
440 log.Printf("warn: immutable field changed, requires re-index: %s", err)
441 return IndexStateContent, fn
442 } else if updated {
443 return IndexStateMeta, fn
444 }
445
446 return IndexStateEqual, fn
447}
448
449// FindRepositoryMetadata returns the index metadata for the repository
450// specified in the options. 'ok' is false if the repository's metadata
451// couldn't be found or if an error occurred.
452func (o *Options) FindRepositoryMetadata() (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
453 shard := o.findShard()
454 if shard == "" {
455 return nil, nil, false, nil
456 }
457
458 repositories, metadata, err := ReadMetadataPathAlive(shard)
459 if err != nil {
460 return nil, nil, false, fmt.Errorf("reading metadata for shard %q: %w", shard, err)
461 }
462
463 ID := o.RepositoryDescription.ID
464 for _, r := range repositories {
465 // compound shards contain multiple repositories, so we
466 // have to pick only the one we're looking for
467 if r.ID == ID {
468 return r, metadata, true, nil
469 }
470 }
471
472 // If we're here, then we're somehow in a state where we found a matching
473 // shard that's missing the repository metadata we're looking for. This
474 // should never happen.
475 name := o.RepositoryDescription.Name
476 return nil, nil, false, fmt.Errorf("matching shard %q doesn't contain metadata for repo id %d (%q)", shard, ID, name)
477}
478
479func (o *Options) findShard() string {
480 for _, v := range readVersions {
481 fn := o.shardNameVersion(v.IndexFormatVersion, 0)
482 if _, err := os.Stat(fn); err == nil {
483 return fn
484 }
485 }
486
487 // Brute force finding the shard in compound shards. We should only hit this
488 // code path for repositories that don't exist yet or are in compound shards.
489 return o.findCompoundShard()
490}
491
492func (o *Options) findCompoundShard() string {
493 compoundShards, err := filepath.Glob(path.Join(o.IndexDir, "compound-*.zoekt"))
494 if err != nil {
495 return ""
496 }
497 for _, fn := range compoundShards {
498 if containsRepo(fn, o.RepositoryDescription.ID) {
499 return fn
500 }
501 }
502
503 return ""
504}
505
506func (o *Options) FindAllShards() []string {
507 for _, v := range readVersions {
508 fn := o.shardNameVersion(v.IndexFormatVersion, 0)
509 if _, err := os.Stat(fn); err == nil {
510 shards := []string{fn}
511 for i := 1; ; i++ {
512 fn := o.shardNameVersion(v.IndexFormatVersion, i)
513 if _, err := os.Stat(fn); err != nil {
514 return shards
515 }
516 shards = append(shards, fn)
517 }
518 }
519 }
520
521 // lazily fallback to findShard which will look for a compound shard.
522 if fn := o.findShard(); fn != "" {
523 return []string{fn}
524 }
525
526 return nil
527}
528
529// IgnoreSizeMax determines whether the max size should be ignored.
530func (o *Options) IgnoreSizeMax(name string) bool {
531 // A pattern match will override preceding pattern matches.
532 for i := len(o.LargeFiles) - 1; i >= 0; i-- {
533 pattern := strings.TrimSpace(o.LargeFiles[i])
534 negated, validatedPattern := checkIsNegatePattern(pattern)
535
536 if m, _ := doublestar.PathMatch(validatedPattern, name); m {
537 if negated {
538 return false
539 } else {
540 return true
541 }
542 }
543 }
544
545 return false
546}
547
548func checkIsNegatePattern(pattern string) (bool, string) {
549 negate := "!"
550
551 // if negated then strip prefix meta character which identifies negated filter pattern
552 if strings.HasPrefix(pattern, negate) {
553 return true, pattern[len(negate):]
554 }
555
556 return false, pattern
557}
558
559// NewBuilder creates a new Builder instance.
560func NewBuilder(opts Options) (*Builder, error) {
561 opts.SetDefaults()
562 if opts.RepositoryDescription.Name == "" {
563 return nil, fmt.Errorf("builder: must set Name")
564 }
565
566 b := &Builder{
567 opts: opts,
568 throttle: make(chan int, opts.Parallelism),
569 finishedShards: map[string]string{},
570 }
571
572 parserBins, err := ctags.NewParserBinMap(
573 b.opts.CTagsPath,
574 b.opts.ScipCTagsPath,
575 opts.LanguageMap,
576 b.opts.CTagsMustSucceed,
577 )
578 if err != nil {
579 return nil, err
580 }
581
582 b.parserBins = parserBins
583
584 if opts.IsDelta {
585 // Delta shards build on top of previously existing shards.
586 // As a consequence, the shardNum for delta shards starts from
587 // the number following the most recently generated shard - not 0.
588 //
589 // Using this numbering scheme allows all the shards to be
590 // discovered as a set.
591 shards := b.opts.FindAllShards()
592 b.nextShardNum = len(shards) // shards are zero indexed, so len() provides the next number after the last one
593 }
594
595 if _, err := b.newShardBuilder(); err != nil {
596 return nil, err
597 }
598
599 now := time.Now()
600 b.indexTime = now
601 b.id = xid.NewWithTime(now).String()
602
603 return b, nil
604}
605
606// AddFile is a convenience wrapper for the Add method
607func (b *Builder) AddFile(name string, content []byte) error {
608 return b.Add(Document{Name: name, Content: content})
609}
610
611func (b *Builder) Add(doc Document) error {
612 if b.finishCalled {
613 return nil
614 }
615
616 allowLargeFile := b.opts.IgnoreSizeMax(doc.Name)
617 if len(doc.Content) > b.opts.SizeMax && !allowLargeFile {
618 // We could pass the document on to the shardbuilder, but if
619 // we pass through a part of the source tree with binary/large
620 // files, the corresponding shard would be mostly empty, so
621 // insert a reason here too.
622 doc.SkipReason = SkipReasonTooLarge
623 } else if skip := b.docChecker.Check(doc.Content, b.opts.TrigramMax, allowLargeFile); skip != SkipReasonNone {
624 doc.SkipReason = skip
625 }
626
627 // Pre-compute file category and language while content is still
628 // available, before content is dropped for skipped documents.
629 DetermineFileCategory(&doc)
630 DetermineLanguageIfUnknown(&doc)
631
632 b.todo = append(b.todo, &doc)
633
634 if doc.SkipReason == SkipReasonNone {
635 b.size += len(doc.Name) + len(doc.Content)
636 } else {
637 b.size += len(doc.Name)
638 // Drop the content if we are skipping the document. Skipped content is not counted towards the
639 // shard size limit, so otherwise we might buffer too much data in memory before flushing.
640 doc.Content = nil
641 }
642
643 if b.size > b.opts.ShardMax {
644 return b.flush()
645 }
646
647 return nil
648}
649
650// MarkFileAsChangedOrRemoved indicates that the file specified by the given path
651// has been changed or removed since the last indexing job for this repository.
652//
653// If this build is a delta build, these files will be tombstoned in the older shards for this repository.
654func (b *Builder) MarkFileAsChangedOrRemoved(path string) {
655 b.opts.changedOrRemovedFiles = append(b.opts.changedOrRemovedFiles, path)
656}
657
658// Finish creates a last shard from the buffered documents, and clears
659// stale shards from previous runs. This should always be called, also
660// in failure cases, to ensure cleanup.
661//
662// It is safe to call Finish() multiple times.
663func (b *Builder) Finish() error {
664 if b.finishCalled {
665 return b.buildError
666 }
667
668 b.finishCalled = true
669
670 b.flush()
671 b.building.Wait()
672
673 if b.buildError != nil {
674 for tmp := range b.finishedShards {
675 log.Printf("Builder.Finish %s", tmp)
676 os.Remove(tmp)
677 }
678 b.finishedShards = map[string]string{}
679 return b.buildError
680 }
681
682 // map of temporary -> final names for all updated shards + shard metadata files
683 artifactPaths := make(map[string]string)
684 maps.Copy(artifactPaths, b.finishedShards)
685
686 oldShards := b.opts.FindAllShards()
687
688 if b.opts.IsDelta {
689 // Delta shard builds need to update FileTombstone and branch commit information for all
690 // existing shards
691 for _, shard := range oldShards {
692 repositories, _, err := ReadMetadataPathAlive(shard)
693 if err != nil {
694 return fmt.Errorf("reading metadata from shard %q: %w", shard, err)
695 }
696
697 if len(repositories) > 1 {
698 return fmt.Errorf("delta shard builds don't support repositories contained in compound shards (shard %q)", shard)
699 }
700
701 if len(repositories) == 0 {
702 return fmt.Errorf("failed to update repository metadata for shard %q - shard contains no repositories", shard)
703 }
704
705 repository := repositories[0]
706 if repository.ID != b.opts.RepositoryDescription.ID {
707 return fmt.Errorf("shard %q doesn't contain repository ID %d (%q)", shard, b.opts.RepositoryDescription.ID, b.opts.RepositoryDescription.Name)
708 }
709
710 if len(b.opts.changedOrRemovedFiles) > 0 && repository.FileTombstones == nil {
711 repository.FileTombstones = make(map[string]struct{}, len(b.opts.changedOrRemovedFiles))
712 }
713
714 for _, f := range b.opts.changedOrRemovedFiles {
715 repository.FileTombstones[f] = struct{}{}
716 }
717
718 if !BranchNamesEqual(repository.Branches, b.opts.RepositoryDescription.Branches) {
719 return deltaBranchSetError{
720 shardName: shard,
721 old: repository.Branches,
722 new: b.opts.RepositoryDescription.Branches,
723 }
724 }
725
726 if b.opts.GetHash() != repository.IndexOptions {
727 return &deltaIndexOptionsMismatchError{
728 shardName: shard,
729 newOptions: b.opts.HashOptions(),
730 }
731 }
732
733 repository.Branches = b.opts.RepositoryDescription.Branches
734
735 repository.LatestCommitDate = b.opts.RepositoryDescription.LatestCommitDate
736
737 repository.Metadata = b.opts.RepositoryDescription.Metadata
738
739 tempPath, finalPath, err := JsonMarshalRepoMetaTemp(shard, repository)
740 if err != nil {
741 return fmt.Errorf("writing repository metadta for shard %q: %w", shard, err)
742 }
743
744 artifactPaths[tempPath] = finalPath
745 }
746 }
747
748 // We mark finished shards as empty when we successfully finish. Return now
749 // to allow call sites to call Finish idempotently.
750 if len(artifactPaths) == 0 {
751 return b.buildError
752 }
753
754 // Collect a map of the old shards on disk. For each new shard we replace we
755 // delete it from toDelete. Anything remaining in toDelete will be removed
756 // after we have renamed everything into place.
757
758 var toDelete map[string]struct{}
759 if !b.opts.IsDelta {
760 // Non-delta shard builds delete all existing shards before they write out
761 // new ones.
762 // By contrast, delta shard builds work by stacking changes on top of existing shards.
763 // So, we skip populating the toDelete map if we're building delta shards.
764
765 toDelete = make(map[string]struct{})
766 for _, name := range oldShards {
767 paths, err := IndexFilePaths(name)
768 if err != nil {
769 b.buildError = fmt.Errorf("failed to find old paths for %s: %w", name, err)
770 }
771 for _, p := range paths {
772 toDelete[p] = struct{}{}
773 }
774 }
775 }
776
777 for tmp, final := range artifactPaths {
778 if err := os.Rename(tmp, final); err != nil {
779 b.buildError = err
780 continue
781 }
782
783 delete(toDelete, final)
784 }
785
786 b.finishedShards = map[string]string{}
787
788 for p := range toDelete {
789 // Don't delete compound shards, set tombstones instead.
790 if b.opts.ShardMerging && strings.HasPrefix(filepath.Base(p), "compound-") {
791 if !strings.HasSuffix(p, ".zoekt") {
792 continue
793 }
794 err := SetTombstone(p, b.opts.RepositoryDescription.ID)
795 b.buildError = err
796 continue
797 }
798 log.Printf("removing old shard file: %s", p)
799 if err := os.Remove(p); err != nil {
800 b.buildError = err
801 }
802 }
803
804 return b.buildError
805}
806
807// BranchNamesEqual compares the given zoekt.RepositoryBranch slices, and returns true
808// iff both slices specify the same set of branch names in the same order.
809func BranchNamesEqual(a, b []zoekt.RepositoryBranch) bool {
810 if len(a) != len(b) {
811 return false
812 }
813
814 for i := range a {
815 x, y := a[i], b[i]
816 if x.Name != y.Name {
817 return false
818 }
819 }
820
821 return true
822}
823
824func (b *Builder) flush() error {
825 todo := b.todo
826 b.todo = nil
827 b.size = 0
828 b.errMu.Lock()
829 defer b.errMu.Unlock()
830 if b.buildError != nil {
831 return b.buildError
832 }
833
834 hasShard := b.nextShardNum > 0
835 if len(todo) == 0 && hasShard {
836 return nil
837 }
838
839 shard := b.nextShardNum
840 b.nextShardNum++
841
842 if b.opts.Parallelism > 1 {
843 b.building.Add(1)
844 b.throttle <- 1
845 go func() {
846 done, err := b.buildShard(todo, shard)
847 <-b.throttle
848
849 b.errMu.Lock()
850 defer b.errMu.Unlock()
851 if err != nil && b.buildError == nil {
852 b.buildError = err
853 }
854 if err == nil {
855 b.finishedShards[done.temp] = done.final
856 }
857 b.building.Done()
858 }()
859 } else {
860 // No goroutines when we're not parallel. This
861 // simplifies memory profiling.
862 done, err := b.buildShard(todo, shard)
863 b.buildError = err
864 if err == nil {
865 b.finishedShards[done.temp] = done.final
866 }
867
868 return b.buildError
869 }
870
871 return nil
872}
873
874// map [0,inf) to [0,1) monotonically
875func squashRange(j int) float64 {
876 x := float64(j)
877 return x / (1 + x)
878}
879
880type rankedDoc struct {
881 *Document
882 rank []float64
883}
884
885// rank returns a vector of scores which is used at index-time to sort documents
886// before writing them to disk. The order of documents in the shard is important
887// at query time, because earlier documents receive a boost at query time and
888// have a higher chance of being searched before limits kick in.
889func rank(d *Document, origIdx int) []float64 {
890 skipped := 0.0
891 if d.SkipReason != SkipReasonNone {
892 skipped = 1.0
893 }
894
895 // Use pre-computed Category from DetermineFileCategory.
896 generated := 0.0
897 if d.Category == FileCategoryGenerated {
898 generated = 1.0
899 }
900
901 vendor := 0.0
902 if d.Category == FileCategoryVendored {
903 vendor = 1.0
904 }
905
906 test := 0.0
907 if d.Category == FileCategoryTest {
908 test = 1.0
909 }
910
911 // Smaller is earlier (=better).
912 return []float64{
913 // Always place skipped docs last
914 skipped,
915
916 // Prefer docs that are not generated
917 generated,
918
919 // Prefer docs that are not vendored
920 vendor,
921
922 // Prefer docs that are not tests
923 test,
924
925 // With short names
926 squashRange(len(d.Name)),
927
928 // With many symbols
929 1.0 - squashRange(len(d.Symbols)),
930
931 // With short content
932 squashRange(len(d.Content)),
933
934 // That is present is as many branches as possible
935 1.0 - squashRange(len(d.Branches)),
936
937 // Preserve original ordering.
938 squashRange(origIdx),
939 }
940}
941
942func sortDocuments(todo []*Document) {
943 rs := make([]rankedDoc, 0, len(todo))
944 for i, t := range todo {
945 rd := rankedDoc{t, rank(t, i)}
946 rs = append(rs, rd)
947 }
948 sort.Slice(rs, func(i, j int) bool {
949 r1 := rs[i].rank
950 r2 := rs[j].rank
951 for i := range r1 {
952 if r1[i] < r2[i] {
953 return true
954 }
955 if r1[i] > r2[i] {
956 return false
957 }
958 }
959
960 return false
961 })
962 for i := range todo {
963 todo[i] = rs[i].Document
964 }
965}
966
967func (b *Builder) buildShard(todo []*Document, nextShardNum int) (*finishedShard, error) {
968 if !b.opts.DisableCTags && (b.opts.CTagsPath != "" || b.opts.ScipCTagsPath != "") {
969 err := parseSymbols(todo, b.opts.LanguageMap, b.parserBins)
970 if b.opts.CTagsMustSucceed && err != nil {
971 return nil, err
972 }
973 if err != nil {
974 log.Printf("ignoring universal:%s or scip:%s error: %v", b.opts.CTagsPath, b.opts.ScipCTagsPath, err)
975 }
976 }
977
978 name := b.opts.shardName(nextShardNum)
979
980 shardBuilder, err := b.newShardBuilder()
981 if err != nil {
982 return nil, err
983 }
984
985 sortDocuments(todo)
986
987 for idx, t := range todo {
988 if err := shardBuilder.Add(*t); err != nil {
989 return nil, err
990 }
991
992 if idx%10_000 == 0 {
993 b.CheckMemoryUsage()
994 }
995 }
996
997 result, err := b.writeShard(name, shardBuilder)
998 b.returnPostingsBuilders(shardBuilder)
999 return result, err
1000}
1001
1002// CheckMemoryUsage checks the memory usage of the process and writes a memory profile if the heap usage exceeds the
1003// configured threshold. NOTE: this method is expensive and should only be used for debugging.
1004func (b *Builder) CheckMemoryUsage() {
1005 // Don't check memory if heap profiling is disabled, or we've already written 10 profiles
1006 if b.opts.HeapProfileTriggerBytes <= 0 || b.heapProfileNum >= 10 {
1007 return
1008 }
1009
1010 var m runtime.MemStats
1011 runtime.ReadMemStats(&m)
1012
1013 if m.HeapAlloc > b.opts.HeapProfileTriggerBytes && b.heapProfileMu.TryLock() {
1014 defer b.heapProfileMu.Unlock()
1015
1016 log.Printf("writing memory profile, allocated heap: %s", humanize.Bytes(m.HeapAlloc))
1017 name := filepath.Join(b.opts.IndexDir, fmt.Sprintf("indexmemory.prof.%d", b.heapProfileNum))
1018 f, err := os.Create(name)
1019 if err != nil {
1020 log.Printf("failed to create memory profile file: %v", err)
1021 return
1022 }
1023
1024 err = pprof.WriteHeapProfile(f)
1025 if err != nil {
1026 log.Printf("failed to write memory profile: %v", err)
1027 }
1028
1029 b.heapProfileNum++
1030 }
1031}
1032
1033func (b *Builder) getPostingsBuilder() *postingsBuilder {
1034 if pb, ok := b.postingsPool.Get().(*postingsBuilder); ok {
1035 pb.reset()
1036 return pb
1037 }
1038 return newPostingsBuilder(b.opts.ShardMax)
1039}
1040
1041// returnPostingsBuilders returns both postings builders from sb to the
1042// pool and nils the fields so any subsequent misuse crashes obviously.
1043func (b *Builder) returnPostingsBuilders(sb *ShardBuilder) {
1044 if sb.contentPostings != nil {
1045 b.postingsPool.Put(sb.contentPostings)
1046 sb.contentPostings = nil
1047 }
1048 if sb.namePostings != nil {
1049 b.postingsPool.Put(sb.namePostings)
1050 sb.namePostings = nil
1051 }
1052}
1053
1054func (b *Builder) newShardBuilder() (*ShardBuilder, error) {
1055 desc := b.opts.RepositoryDescription
1056 desc.HasSymbols = !b.opts.DisableCTags && b.opts.CTagsPath != ""
1057 desc.SubRepoMap = b.opts.SubRepositories
1058 desc.IndexOptions = b.opts.GetHash()
1059
1060 content := b.getPostingsBuilder()
1061 name := b.getPostingsBuilder()
1062 shardBuilder := newShardBuilderWithPostings(content, name)
1063 if err := shardBuilder.setRepository(&desc); err != nil {
1064 return nil, err
1065 }
1066 shardBuilder.IndexTime = b.indexTime
1067 shardBuilder.ID = b.id
1068 return shardBuilder, nil
1069}
1070
1071func (b *Builder) writeShard(fn string, ib *ShardBuilder) (*finishedShard, error) {
1072 dir := filepath.Dir(fn)
1073 if err := os.MkdirAll(dir, 0o700); err != nil {
1074 return nil, err
1075 }
1076
1077 f, err := os.CreateTemp(dir, filepath.Base(fn)+".*.tmp")
1078 if err != nil {
1079 return nil, err
1080 }
1081 if runtime.GOOS != "windows" {
1082 if err := f.Chmod(0o666 &^ umask); err != nil {
1083 return nil, err
1084 }
1085 }
1086
1087 defer f.Close()
1088 if err := ib.Write(f); err != nil {
1089 return nil, err
1090 }
1091 fi, err := f.Stat()
1092 if err != nil {
1093 return nil, err
1094 }
1095 if err := f.Close(); err != nil {
1096 return nil, err
1097 }
1098
1099 log.Printf("finished shard %s: %d index bytes (overhead %3.1f), %d files processed \n",
1100 fn,
1101 fi.Size(),
1102 float64(fi.Size())/float64(ib.ContentSize()+1),
1103 ib.NumFiles())
1104
1105 return &finishedShard{f.Name(), fn}, nil
1106}
1107
1108type deltaBranchSetError struct {
1109 shardName string
1110 old, new []zoekt.RepositoryBranch
1111}
1112
1113func (e deltaBranchSetError) Error() string {
1114 return fmt.Sprintf("repository metadata in shard %q contains a different set of branch names than what was requested, which is unsupported in a delta shard build. old: %+v, new: %+v", e.shardName, e.old, e.new)
1115}
1116
1117type deltaIndexOptionsMismatchError struct {
1118 shardName string
1119 newOptions HashOptions
1120}
1121
1122func (e *deltaIndexOptionsMismatchError) Error() string {
1123 return fmt.Sprintf("one or more index options for shard %q do not match Builder's index options. These index option updates are incompatible with delta build. New index options: %+v", e.shardName, e.newOptions)
1124}
1125
1126// umask holds the Umask of the current process
1127var umask os.FileMode
1128
1129func init() {
1130 umask = os.FileMode(unix.Umask(0))
1131 unix.Umask(int(umask))
1132}