fork of https://github.com/sourcegraph/zoekt
1package index
2
3import (
4 "crypto/sha1"
5 "fmt"
6 "io"
7 "log"
8 "os"
9 "path/filepath"
10 "runtime"
11 "sort"
12
13 "github.com/sourcegraph/zoekt"
14)
15
16// Merge files into a compound shard in dstDir. Merge returns tmpName and a
17// dstName. It is the responsibility of the caller to delete the input shards and
18// rename the temporary compound shard from tmpName to dstName.
19func Merge(dstDir string, files ...IndexFile) (tmpName, dstName string, _ error) {
20 var ds []*indexData
21 for _, f := range files {
22 searcher, err := NewSearcher(f)
23 if err != nil {
24 return "", "", err
25 }
26 ds = append(ds, searcher.(*indexData))
27 }
28
29 ib, err := merge(ds...)
30 if err != nil {
31 return "", "", err
32 }
33
34 hasher := sha1.New()
35 for _, d := range ds {
36 for i, md := range d.repoMetaData {
37 if d.repoMetaData[i].Tombstone {
38 continue
39 }
40 hasher.Write([]byte(md.Name))
41 hasher.Write([]byte{0})
42 }
43 }
44
45 dstName = filepath.Join(dstDir, fmt.Sprintf("compound-%x_v%d.%05d.zoekt", hasher.Sum(nil), NextIndexFormatVersion, 0))
46 tmpName = dstName + ".tmp"
47 if err := builderWriteAll(tmpName, ib); err != nil {
48 return "", "", err
49 }
50 return tmpName, dstName, nil
51}
52
53func builderWriteAll(fn string, ib *ShardBuilder) error {
54 dir := filepath.Dir(fn)
55 if err := os.MkdirAll(dir, 0o700); err != nil {
56 return err
57 }
58
59 f, err := os.CreateTemp(dir, filepath.Base(fn)+".*.tmp")
60 if err != nil {
61 return err
62 }
63 if runtime.GOOS != "windows" {
64 // umask?
65 if err := f.Chmod(0o666); err != nil {
66 return err
67 }
68 }
69
70 defer f.Close()
71 if err := ib.Write(f); err != nil {
72 return err
73 }
74 fi, err := f.Stat()
75 if err != nil {
76 return err
77 }
78 if err := f.Close(); err != nil {
79 return err
80 }
81
82 if err := os.Rename(f.Name(), fn); err != nil {
83 return err
84 }
85
86 log.Printf("finished shard %s: %d index bytes (overhead %3.1f)", fn, fi.Size(),
87 float64(fi.Size())/float64(ib.ContentSize()+1))
88
89 return nil
90}
91
92func merge(ds ...*indexData) (*ShardBuilder, error) {
93 if len(ds) == 0 {
94 return nil, fmt.Errorf("need 1 or more indexData to merge")
95 }
96
97 sort.Slice(ds, func(i, j int) bool {
98 return ds[i].repoMetaData[0].GetPriority() > ds[j].repoMetaData[0].GetPriority()
99 })
100
101 sb := newShardBuilder()
102 sb.indexFormatVersion = NextIndexFormatVersion
103
104 for _, d := range ds {
105 lastRepoID := -1
106 for docID := uint32(0); int(docID) < len(d.fileBranchMasks); docID++ {
107 repoID := int(d.repos[docID])
108
109 if d.repoMetaData[repoID].Tombstone {
110 continue
111 }
112
113 if repoID != lastRepoID {
114 if lastRepoID > repoID {
115 return nil, fmt.Errorf("non-contiguous repo ids in %s for document %d: old=%d current=%d", d.String(), docID, lastRepoID, repoID)
116 }
117 lastRepoID = repoID
118
119 // TODO we are losing empty repos on merging since we only get here if
120 // there is an associated document.
121
122 if err := sb.setRepository(&d.repoMetaData[repoID]); err != nil {
123 return nil, err
124 }
125 }
126
127 if err := addDocument(d, sb, repoID, docID); err != nil {
128 return nil, err
129 }
130 }
131 }
132
133 return sb, nil
134}
135
136// Explode takes an input shard and creates 1 simple shard per repository. It is
137// a wrapper around explode that takes care of removing the input shard and
138// renaming the temporary shards.
139func Explode(dstDir string, inputShard string) error {
140 f, err := os.Open(inputShard)
141 if err != nil {
142 return err
143 }
144 defer f.Close()
145
146 indexFile, err := NewIndexFile(f)
147 if err != nil {
148 return err
149 }
150 defer indexFile.Close()
151
152 exploded, err := explode(dstDir, indexFile)
153 defer func() {
154 // best effort removal of tmp files. If os.Remove fails, indexserver will delete
155 // the leftover tmp files during the next cleanup.
156 for tmpFn := range exploded {
157 os.Remove(tmpFn)
158 }
159 }()
160 if err != nil {
161 return fmt.Errorf("zoekt.Explode: %w", err)
162 }
163
164 // remove the input shard first to avoid duplicate indexes. In the worst case,
165 // the process is interrupted just after we delete the compound shard, in which
166 // case we have to reindex the lost repos.
167 paths, err := IndexFilePaths(inputShard)
168 if err != nil {
169 return err
170 }
171 for _, path := range paths {
172 err = os.Remove(path)
173 if err != nil {
174 return err
175 }
176 }
177
178 // best effort rename shards.
179 for tmpFn, dstFn := range exploded {
180 if err := os.Rename(tmpFn, dstFn); err != nil {
181 log.Printf("explode: rename failed: %s", err)
182 }
183 }
184
185 return nil
186}
187
188type shardBuilderFunc func(ib *ShardBuilder)
189
190// explode takes an IndexFile f and creates 1 simple shard per repository
191// contained in f. explode returns a map of tmpName -> dstName. It is the
192// responsibility of the caller to rename the temporary shard(s) and delete the
193// input shard.
194func explode(dstDir string, f IndexFile, ibFuncs ...shardBuilderFunc) (map[string]string, error) {
195 searcher, err := NewSearcher(f)
196 if err != nil {
197 return nil, err
198 }
199 d := searcher.(*indexData)
200
201 shardNames := make(map[string]string, len(d.repoMetaData))
202
203 writeShard := func(ib *ShardBuilder) error {
204 if len(ib.repoList) != 1 {
205 return fmt.Errorf("expected sb to contain exactly 1 repository")
206 }
207 for _, ibFunc := range ibFuncs {
208 ibFunc(ib)
209 }
210
211 opts := Options{
212 IndexDir: dstDir,
213 RepositoryDescription: ib.repoList[0],
214 }
215
216 shardName := opts.shardNameVersion(ib.indexFormatVersion, 0)
217 shardNameTmp := shardName + ".tmp"
218 shardNames[shardNameTmp] = shardName
219 return builderWriteAll(shardNameTmp, ib)
220 }
221
222 var sb *ShardBuilder
223 lastRepoID := -1
224 for docID := uint32(0); int(docID) < len(d.fileBranchMasks); docID++ {
225 repoID := int(d.repos[docID])
226
227 if d.repoMetaData[repoID].Tombstone {
228 continue
229 }
230
231 if repoID != lastRepoID {
232 if lastRepoID > repoID {
233 return shardNames, fmt.Errorf("non-contiguous repo ids in %s for document %d: old=%d current=%d", d.String(), docID, lastRepoID, repoID)
234 }
235 lastRepoID = repoID
236
237 if sb != nil {
238 if err := writeShard(sb); err != nil {
239 return shardNames, err
240 }
241 }
242
243 sb = newShardBuilder()
244 sb.indexFormatVersion = IndexFormatVersion
245 if err := sb.setRepository(&d.repoMetaData[repoID]); err != nil {
246 return shardNames, err
247 }
248 }
249
250 err := addDocument(d, sb, repoID, docID)
251 if err != nil {
252 return shardNames, err
253 }
254 }
255
256 if sb != nil {
257 if err := writeShard(sb); err != nil {
258 return shardNames, err
259 }
260 }
261
262 return shardNames, nil
263}
264
265func addDocument(d *indexData, ib *ShardBuilder, repoID int, docID uint32) error {
266 doc := Document{
267 Name: string(d.fileName(docID)),
268 // Content set below since it can return an error
269 // Branches set below since it requires lookups
270 SubRepositoryPath: d.subRepoPaths[repoID][d.subRepos[docID]],
271 Language: d.languageMap[d.getLanguage(docID)],
272 // SkipReason not set, will be part of content from original indexer.
273 }
274
275 var err error
276 if doc.Content, err = d.readContents(docID); err != nil {
277 return err
278 }
279
280 if doc.Symbols, _, err = d.readDocSections(docID, nil); err != nil {
281 return err
282 }
283
284 doc.SymbolsMetaData = make([]*zoekt.Symbol, len(doc.Symbols))
285 for i := range doc.SymbolsMetaData {
286 doc.SymbolsMetaData[i] = d.symbols.data(d.fileEndSymbol[docID] + uint32(i))
287 }
288
289 // calculate branches
290 {
291 mask := d.fileBranchMasks[docID]
292 id := uint32(1)
293 for mask != 0 {
294 if mask&0x1 != 0 {
295 doc.Branches = append(doc.Branches, d.branchNames[repoID][uint(id)])
296 }
297 id <<= 1
298 mask >>= 1
299 }
300 }
301 return ib.Add(doc)
302}
303
304// copied from builder package to avoid circular imports.
305func hashString(s string) string {
306 h := sha1.New()
307 _, _ = io.WriteString(h, s)
308 return fmt.Sprintf("%x", h.Sum(nil))
309}