fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Command zoekt-indexserver starts a service that periodically reindexes repositories. It follows
16// a "pull-based" design, where it reaches out to code hosts to fetch new data.
17package main
18
19import (
20 "bytes"
21 "context"
22 "flag"
23 "fmt"
24 "log"
25 "math"
26 "os"
27 "os/exec"
28 "path/filepath"
29 "runtime"
30 "strings"
31 "time"
32
33 "github.com/sourcegraph/zoekt/index"
34 "github.com/sourcegraph/zoekt/internal/gitindex"
35)
36
37const day = time.Hour * 24
38
39func loggedRun(cmd *exec.Cmd) (out, err []byte) {
40 outBuf := &bytes.Buffer{}
41 errBuf := &bytes.Buffer{}
42 cmd.Stdout = outBuf
43 cmd.Stderr = errBuf
44
45 log.Printf("run %v", cmd.Args)
46 if err := cmd.Run(); err != nil {
47 log.Printf("command %s failed: %v\nOUT: %s\nERR: %s",
48 cmd.Args, err, outBuf.String(), errBuf.String())
49 }
50
51 return outBuf.Bytes(), errBuf.Bytes()
52}
53
54type Options struct {
55 cpuFraction float64
56 cpuCount int
57 fetchInterval time.Duration
58 mirrorInterval time.Duration
59 indexFlagsStr string
60 indexFlags []string
61 mirrorConfigFile string
62 maxLogAge time.Duration
63 indexTimeout time.Duration
64}
65
66func (o *Options) validate() {
67 if o.cpuFraction <= 0.0 || o.cpuFraction > 1.0 {
68 log.Fatal("cpu_fraction must be between 0.0 and 1.0")
69 }
70
71 o.cpuCount = int(math.Trunc(float64(runtime.GOMAXPROCS(0)) * o.cpuFraction))
72 if o.cpuCount < 1 {
73 o.cpuCount = 1
74 }
75 if o.indexFlagsStr != "" {
76 o.indexFlags = strings.Split(o.indexFlagsStr, " ")
77 }
78}
79
80func (o *Options) defineFlags() {
81 flag.DurationVar(&o.indexTimeout, "index_timeout", time.Hour, "kill index job after this much time")
82 flag.DurationVar(&o.maxLogAge, "max_log_age", 3*day, "recycle index logs after this much time")
83 flag.DurationVar(&o.fetchInterval, "fetch_interval", time.Hour, "run fetches this often")
84 flag.StringVar(&o.mirrorConfigFile, "mirror_config",
85 "", "JSON file holding mirror configuration.")
86
87 flag.DurationVar(&o.mirrorInterval, "mirror_duration", 24*time.Hour, "find and clone new repos at this frequency.")
88 flag.Float64Var(&o.cpuFraction, "cpu_fraction", 0.25,
89 "use this fraction of the cores for indexing.")
90 flag.StringVar(&o.indexFlagsStr, "git_index_flags", "", "space separated list of flags passed through to zoekt-git-index (e.g. -git_index_flags='-symbols=false -submodules=false'")
91}
92
93// periodicFetch runs git-fetch every once in a while. Results are
94// posted on pendingRepos.
95func periodicFetch(repoDir, indexDir string, opts *Options, pendingRepos chan<- string) {
96 t := time.NewTicker(opts.fetchInterval)
97 for {
98 repos, err := gitindex.FindGitRepos(repoDir)
99 if err != nil {
100 log.Println(err)
101 continue
102 }
103 if len(repos) == 0 {
104 log.Printf("no repos found under %s", repoDir)
105 }
106
107 // TODO: Randomize to make sure quota throttling hits everyone.
108
109 later := map[string]struct{}{}
110 for _, dir := range repos {
111 if ok := fetchGitRepo(dir); !ok {
112 later[dir] = struct{}{}
113 } else {
114 pendingRepos <- dir
115 }
116 }
117
118 for r := range later {
119 pendingRepos <- r
120 }
121
122 <-t.C
123 }
124}
125
126// fetchGitRepo runs git-fetch, and returns true if there was an
127// update.
128func fetchGitRepo(dir string) bool {
129 cmd := exec.Command("git", "--git-dir", dir, "fetch", "origin", "--prune")
130
131 output, err := cmd.CombinedOutput()
132 if err != nil {
133 log.Printf("command %s failed: %v\nCOMBINED_OUT: %s\n",
134 cmd.Args, err, string(output))
135 return false
136 }
137 // When fetch found no updates, it prints nothing out
138 return len(output) != 0
139}
140
141// indexPendingRepos consumes the directories on the repos channel and
142// indexes them, sequentially.
143func indexPendingRepos(indexDir, repoDir string, opts *Options, repos <-chan string) {
144 for dir := range repos {
145 indexPendingRepo(dir, indexDir, repoDir, opts)
146
147 // Failures (eg. timeout) will leave temp files
148 // around. We have to clean them, or they will fill up the indexing volume.
149 if failures, err := filepath.Glob(filepath.Join(indexDir, "*.tmp")); err != nil {
150 log.Printf("Glob: %v", err)
151 } else {
152 for _, f := range failures {
153 os.Remove(f)
154 }
155 }
156 }
157}
158
159func indexPendingRepo(dir, indexDir, repoDir string, opts *Options) {
160 ctx, cancel := context.WithTimeout(context.Background(), opts.indexTimeout)
161 defer cancel()
162 args := []string{
163 "-require_ctags",
164 fmt.Sprintf("-parallelism=%d", opts.cpuCount),
165 "-repo_cache", repoDir,
166 "-index", indexDir,
167 "-incremental",
168 }
169 args = append(args, opts.indexFlags...)
170 args = append(args, dir)
171 cmd := exec.CommandContext(ctx, "zoekt-git-index", args...)
172 loggedRun(cmd)
173}
174
175// deleteLogs deletes old logs.
176func deleteLogs(logDir string, maxAge time.Duration) {
177 fs, err := filepath.Glob(filepath.Join(logDir, "*"))
178 if err != nil {
179 log.Fatalf("filepath.Glob(%s): %v", logDir, err)
180 }
181
182 threshold := time.Now().Add(-maxAge)
183 for _, fn := range fs {
184 if fi, err := os.Lstat(fn); err == nil && fi.ModTime().Before(threshold) {
185 os.Remove(fn)
186 }
187 }
188}
189
190func deleteLogsLoop(logDir string, maxAge time.Duration) {
191 tick := time.NewTicker(maxAge / 100)
192 for {
193 deleteLogs(logDir, maxAge)
194 <-tick.C
195 }
196}
197
198// Delete the shard if its corresponding git repo can't be found.
199func deleteIfOrphan(repoDir string, fn string) error {
200 f, err := os.Open(fn)
201 if err != nil {
202 return nil
203 }
204 defer f.Close()
205
206 ifile, err := index.NewIndexFile(f)
207 if err != nil {
208 return nil
209 }
210 defer ifile.Close()
211
212 repos, _, err := index.ReadMetadata(ifile)
213 if err != nil {
214 return nil
215 }
216
217 // TODO support compound shards in zoekt-indexserver
218 if len(repos) != 1 {
219 return nil
220 }
221 repo := repos[0]
222
223 _, err = os.Stat(repo.Source)
224 if os.IsNotExist(err) {
225 log.Printf("deleting orphan shard %s; source %q not found", fn, repo.Source)
226 return os.Remove(fn)
227 }
228
229 return err
230}
231
232func deleteOrphanIndexes(indexDir, repoDir string, watchInterval time.Duration) {
233 t := time.NewTicker(watchInterval)
234
235 expr := indexDir + "/*"
236 for {
237 fs, err := filepath.Glob(expr)
238 if err != nil {
239 log.Printf("Glob(%q): %v", expr, err)
240 }
241
242 for _, f := range fs {
243 if err := deleteIfOrphan(repoDir, f); err != nil {
244 log.Printf("deleteIfOrphan(%q): %v", f, err)
245 }
246 }
247 <-t.C
248 }
249}
250
251func main() {
252 var opts Options
253 opts.defineFlags()
254 dataDir := flag.String("data_dir",
255 filepath.Join(os.Getenv("HOME"), "zoekt-serving"), "directory holding all data.")
256 indexDir := flag.String("index_dir", "", "directory holding index shards. Defaults to $data_dir/index/")
257 flag.Parse()
258 opts.validate()
259
260 if *dataDir == "" {
261 log.Fatal("must set --data_dir")
262 }
263
264 // Automatically prepend our own path at the front, to minimize
265 // required configuration.
266 if l, err := os.Readlink("/proc/self/exe"); err == nil {
267 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
268 }
269
270 logDir := filepath.Join(*dataDir, "logs")
271 if *indexDir == "" {
272 *indexDir = filepath.Join(*dataDir, "index")
273 }
274 repoDir := filepath.Join(*dataDir, "repos")
275 for _, s := range []string{logDir, *indexDir, repoDir} {
276 if _, err := os.Stat(s); err == nil {
277 continue
278 }
279
280 if err := os.MkdirAll(s, 0o755); err != nil {
281 log.Fatalf("MkdirAll %s: %v", s, err)
282 }
283 }
284
285 _, err := readConfigURL(opts.mirrorConfigFile)
286 if err != nil {
287 log.Fatalf("readConfigURL(%s): %v", opts.mirrorConfigFile, err)
288 }
289
290 pendingRepos := make(chan string, 10)
291 go periodicMirrorFile(repoDir, &opts, pendingRepos)
292 go deleteLogsLoop(logDir, opts.maxLogAge)
293 go deleteOrphanIndexes(*indexDir, repoDir, opts.fetchInterval)
294 go indexPendingRepos(*indexDir, repoDir, &opts, pendingRepos)
295 periodicFetch(repoDir, *indexDir, &opts, pendingRepos)
296}