fork of https://github.com/sourcegraph/zoekt
1package index
2
3import (
4 "crypto/sha1"
5 "fmt"
6 "io"
7 "log"
8 "os"
9 "path/filepath"
10 "runtime"
11 "sort"
12
13 "github.com/sourcegraph/zoekt"
14)
15
16// Merge files into a compound shard in dstDir. Merge returns tmpName and a
17// dstName. It is the responsibility of the caller to delete the input shards and
18// rename the temporary compound shard from tmpName to dstName.
19func Merge(dstDir string, files ...IndexFile) (tmpName, dstName string, _ error) {
20 var ds []*indexData
21 for _, f := range files {
22 searcher, err := NewSearcher(f)
23 if err != nil {
24 return "", "", err
25 }
26 ds = append(ds, searcher.(*indexData))
27 }
28
29 ib, err := merge(ds...)
30 if err != nil {
31 return "", "", err
32 }
33
34 hasher := sha1.New()
35 for _, d := range ds {
36 for i, md := range d.repoMetaData {
37 if d.repoMetaData[i].Tombstone {
38 continue
39 }
40 hasher.Write([]byte(md.Name))
41 hasher.Write([]byte{0})
42 }
43 }
44
45 dstName = filepath.Join(dstDir, fmt.Sprintf("compound-%x_v%d.%05d.zoekt", hasher.Sum(nil), NextIndexFormatVersion, 0))
46 tmpName = dstName + ".tmp"
47 if err := builderWriteAll(tmpName, ib); err != nil {
48 return "", "", err
49 }
50 return tmpName, dstName, nil
51}
52
53func builderWriteAll(fn string, ib *ShardBuilder) error {
54 dir := filepath.Dir(fn)
55 if err := os.MkdirAll(dir, 0o700); err != nil {
56 return err
57 }
58
59 f, err := os.CreateTemp(dir, filepath.Base(fn)+".*.tmp")
60 if err != nil {
61 return err
62 }
63 if runtime.GOOS != "windows" {
64 // umask?
65 if err := f.Chmod(0o666); err != nil {
66 return err
67 }
68 }
69
70 defer f.Close()
71 if err := ib.Write(f); err != nil {
72 return err
73 }
74 fi, err := f.Stat()
75 if err != nil {
76 return err
77 }
78 if err := f.Close(); err != nil {
79 return err
80 }
81
82 if err := os.Rename(f.Name(), fn); err != nil {
83 return err
84 }
85
86 log.Printf("finished shard %s: %d index bytes (overhead %3.1f)", fn, fi.Size(),
87 float64(fi.Size())/float64(ib.ContentSize()+1))
88
89 return nil
90}
91
92func merge(ds ...*indexData) (*ShardBuilder, error) {
93 if len(ds) == 0 {
94 return nil, fmt.Errorf("need 1 or more indexData to merge")
95 }
96
97 sort.Slice(ds, func(i, j int) bool {
98 return ds[i].repoMetaData[0].GetPriority() > ds[j].repoMetaData[0].GetPriority()
99 })
100
101 sb := newShardBuilder(0)
102 sb.indexFormatVersion = NextIndexFormatVersion
103
104 for _, d := range ds {
105 lastRepoID := -1
106 for docID := uint32(0); int(docID) < len(d.fileBranchMasks); docID++ {
107 repoID := int(d.repos[docID])
108
109 if d.repoMetaData[repoID].Tombstone {
110 continue
111 }
112
113 if repoID != lastRepoID {
114 if lastRepoID > repoID {
115 return nil, fmt.Errorf("non-contiguous repo ids in %s for document %d: old=%d current=%d", d.String(), docID, lastRepoID, repoID)
116 }
117 lastRepoID = repoID
118
119 // Initialize repo metadata if it does not already exist.
120 repo := d.repoMetaData[repoID]
121 if repo.Metadata == nil {
122 repo.Metadata = make(map[string]string)
123 }
124
125 // TODO we are losing empty repos on merging since we only get here if
126 // there is an associated document.
127
128 if err := sb.setRepository(&d.repoMetaData[repoID]); err != nil {
129 return nil, err
130 }
131 }
132
133 if err := addDocument(d, sb, repoID, docID); err != nil {
134 return nil, err
135 }
136 }
137 }
138
139 return sb, nil
140}
141
142// Explode takes an input shard and creates 1 simple shard per repository. It is
143// a wrapper around explode that takes care of removing the input shard and
144// renaming the temporary shards.
145func Explode(dstDir string, inputShard string) error {
146 f, err := os.Open(inputShard)
147 if err != nil {
148 return err
149 }
150 defer f.Close()
151
152 indexFile, err := NewIndexFile(f)
153 if err != nil {
154 return err
155 }
156 defer indexFile.Close()
157
158 exploded, err := explode(dstDir, indexFile)
159 defer func() {
160 // best effort removal of tmp files. If os.Remove fails, indexserver will delete
161 // the leftover tmp files during the next cleanup.
162 for tmpFn := range exploded {
163 os.Remove(tmpFn)
164 }
165 }()
166 if err != nil {
167 return fmt.Errorf("zoekt.Explode: %w", err)
168 }
169
170 // remove the input shard first to avoid duplicate indexes. In the worst case,
171 // the process is interrupted just after we delete the compound shard, in which
172 // case we have to reindex the lost repos.
173 paths, err := IndexFilePaths(inputShard)
174 if err != nil {
175 return err
176 }
177 for _, path := range paths {
178 err = os.Remove(path)
179 if err != nil {
180 return err
181 }
182 }
183
184 // best effort rename shards.
185 for tmpFn, dstFn := range exploded {
186 if err := os.Rename(tmpFn, dstFn); err != nil {
187 log.Printf("explode: rename failed: %s", err)
188 }
189 }
190
191 return nil
192}
193
194type shardBuilderFunc func(ib *ShardBuilder)
195
196// explode takes an IndexFile f and creates 1 simple shard per repository
197// contained in f. explode returns a map of tmpName -> dstName. It is the
198// responsibility of the caller to rename the temporary shard(s) and delete the
199// input shard.
200func explode(dstDir string, f IndexFile, ibFuncs ...shardBuilderFunc) (map[string]string, error) {
201 searcher, err := NewSearcher(f)
202 if err != nil {
203 return nil, err
204 }
205 d := searcher.(*indexData)
206
207 shardNames := make(map[string]string, len(d.repoMetaData))
208
209 writeShard := func(ib *ShardBuilder) error {
210 if len(ib.repoList) != 1 {
211 return fmt.Errorf("expected sb to contain exactly 1 repository")
212 }
213 for _, ibFunc := range ibFuncs {
214 ibFunc(ib)
215 }
216
217 opts := Options{
218 IndexDir: dstDir,
219 RepositoryDescription: ib.repoList[0],
220 }
221
222 shardName := opts.shardNameVersion(ib.indexFormatVersion, 0)
223 shardNameTmp := shardName + ".tmp"
224 shardNames[shardNameTmp] = shardName
225 return builderWriteAll(shardNameTmp, ib)
226 }
227
228 var sb *ShardBuilder
229 lastRepoID := -1
230 for docID := uint32(0); int(docID) < len(d.fileBranchMasks); docID++ {
231 repoID := int(d.repos[docID])
232
233 if d.repoMetaData[repoID].Tombstone {
234 continue
235 }
236
237 if repoID != lastRepoID {
238 if lastRepoID > repoID {
239 return shardNames, fmt.Errorf("non-contiguous repo ids in %s for document %d: old=%d current=%d", d.String(), docID, lastRepoID, repoID)
240 }
241 lastRepoID = repoID
242
243 if sb != nil {
244 if err := writeShard(sb); err != nil {
245 return shardNames, err
246 }
247 }
248
249 sb = newShardBuilder(0)
250 sb.indexFormatVersion = IndexFormatVersion
251 if err := sb.setRepository(&d.repoMetaData[repoID]); err != nil {
252 return shardNames, err
253 }
254 }
255
256 err := addDocument(d, sb, repoID, docID)
257 if err != nil {
258 return shardNames, err
259 }
260 }
261
262 if sb != nil {
263 if err := writeShard(sb); err != nil {
264 return shardNames, err
265 }
266 }
267
268 return shardNames, nil
269}
270
271func addDocument(d *indexData, ib *ShardBuilder, repoID int, docID uint32) error {
272 doc := Document{
273 Name: string(d.fileName(docID)),
274 // Content set below since it can return an error
275 // Branches set below since it requires lookups
276 SubRepositoryPath: d.subRepoPaths[repoID][d.subRepos[docID]],
277 Language: d.languageMap[d.getLanguage(docID)],
278 Category: d.getCategory(docID),
279 // SkipReason not set, will be part of content from original indexer.
280 }
281
282 var err error
283 if doc.Content, err = d.readContents(docID); err != nil {
284 return err
285 }
286
287 if doc.Symbols, _, err = d.readDocSections(docID, nil); err != nil {
288 return err
289 }
290
291 doc.SymbolsMetaData = make([]*zoekt.Symbol, len(doc.Symbols))
292 for i := range doc.SymbolsMetaData {
293 doc.SymbolsMetaData[i] = d.symbols.data(d.fileEndSymbol[docID] + uint32(i))
294 }
295
296 // calculate branches
297 {
298 mask := d.fileBranchMasks[docID]
299 id := uint32(1)
300 for mask != 0 {
301 if mask&0x1 != 0 {
302 doc.Branches = append(doc.Branches, d.branchNames[repoID][uint(id)])
303 }
304 id <<= 1
305 mask >>= 1
306 }
307 }
308 return ib.Add(doc)
309}
310
311// copied from builder package to avoid circular imports.
312func hashString(s string) string {
313 h := sha1.New()
314 _, _ = io.WriteString(h, s)
315 return fmt.Sprintf("%x", h.Sum(nil))
316}