fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// This program manages a zoekt indexing deployment:
16// * recycling logs
17// * periodically fetching new data.
18// * periodically reindexing all git repos.
19
20package main
21
22import (
23 "bytes"
24 "context"
25 "flag"
26 "fmt"
27 "log"
28 "math"
29 "os"
30 "os/exec"
31 "path/filepath"
32 "runtime"
33 "strings"
34 "time"
35
36 "github.com/sourcegraph/zoekt"
37 "github.com/sourcegraph/zoekt/gitindex"
38)
39
40const day = time.Hour * 24
41
42func loggedRun(cmd *exec.Cmd) (out, err []byte) {
43 outBuf := &bytes.Buffer{}
44 errBuf := &bytes.Buffer{}
45 cmd.Stdout = outBuf
46 cmd.Stderr = errBuf
47
48 log.Printf("run %v", cmd.Args)
49 if err := cmd.Run(); err != nil {
50 log.Printf("command %s failed: %v\nOUT: %s\nERR: %s",
51 cmd.Args, err, outBuf.String(), errBuf.String())
52 }
53
54 return outBuf.Bytes(), errBuf.Bytes()
55}
56
57type Options struct {
58 cpuFraction float64
59 cpuCount int
60 fetchInterval time.Duration
61 mirrorInterval time.Duration
62 indexFlagsStr string
63 indexFlags []string
64 mirrorConfigFile string
65 maxLogAge time.Duration
66 indexTimeout time.Duration
67}
68
69func (o *Options) validate() {
70 if o.cpuFraction <= 0.0 || o.cpuFraction > 1.0 {
71 log.Fatal("cpu_fraction must be between 0.0 and 1.0")
72 }
73
74 o.cpuCount = int(math.Trunc(float64(runtime.GOMAXPROCS(0)) * o.cpuFraction))
75 if o.cpuCount < 1 {
76 o.cpuCount = 1
77 }
78 if o.indexFlagsStr != "" {
79 o.indexFlags = strings.Split(o.indexFlagsStr, " ")
80 }
81}
82
83func (o *Options) defineFlags() {
84 flag.DurationVar(&o.indexTimeout, "index_timeout", time.Hour, "kill index job after this much time")
85 flag.DurationVar(&o.maxLogAge, "max_log_age", 3*day, "recycle index logs after this much time")
86 flag.DurationVar(&o.fetchInterval, "fetch_interval", time.Hour, "run fetches this often")
87 flag.StringVar(&o.mirrorConfigFile, "mirror_config",
88 "", "JSON file holding mirror configuration.")
89
90 flag.DurationVar(&o.mirrorInterval, "mirror_duration", 24*time.Hour, "find and clone new repos at this frequency.")
91 flag.Float64Var(&o.cpuFraction, "cpu_fraction", 0.25,
92 "use this fraction of the cores for indexing.")
93 flag.StringVar(&o.indexFlagsStr, "git_index_flags", "", "space separated list of flags passed through to zoekt-git-index (e.g. -git_index_flags='-symbols=false -submodules=false'")
94}
95
96// periodicFetch runs git-fetch every once in a while. Results are
97// posted on pendingRepos.
98func periodicFetch(repoDir, indexDir string, opts *Options, pendingRepos chan<- string) {
99 t := time.NewTicker(opts.fetchInterval)
100 for {
101 repos, err := gitindex.FindGitRepos(repoDir)
102 if err != nil {
103 log.Println(err)
104 continue
105 }
106 if len(repos) == 0 {
107 log.Printf("no repos found under %s", repoDir)
108 }
109
110 // TODO: Randomize to make sure quota throttling hits everyone.
111
112 later := map[string]struct{}{}
113 for _, dir := range repos {
114 if ok := fetchGitRepo(dir); !ok {
115 later[dir] = struct{}{}
116 } else {
117 pendingRepos <- dir
118 }
119 }
120
121 for r := range later {
122 pendingRepos <- r
123 }
124
125 <-t.C
126 }
127}
128
129// fetchGitRepo runs git-fetch, and returns true if there was an
130// update.
131func fetchGitRepo(dir string) bool {
132 cmd := exec.Command("git", "--git-dir", dir, "fetch", "origin", "--prune")
133
134 output, err := cmd.CombinedOutput()
135 if err != nil {
136 log.Printf("command %s failed: %v\nCOMBINED_OUT: %s\n",
137 cmd.Args, err, string(output))
138 return false
139 }
140 // When fetch found no updates, it prints nothing out
141 return len(output) != 0
142}
143
144// indexPendingRepos consumes the directories on the repos channel and
145// indexes them, sequentially.
146func indexPendingRepos(indexDir, repoDir string, opts *Options, repos <-chan string) {
147 for dir := range repos {
148 indexPendingRepo(dir, indexDir, repoDir, opts)
149
150 // Failures (eg. timeout) will leave temp files
151 // around. We have to clean them, or they will fill up the indexing volume.
152 if failures, err := filepath.Glob(filepath.Join(indexDir, "*.tmp")); err != nil {
153 log.Printf("Glob: %v", err)
154 } else {
155 for _, f := range failures {
156 os.Remove(f)
157 }
158 }
159 }
160}
161
162func indexPendingRepo(dir, indexDir, repoDir string, opts *Options) {
163 ctx, cancel := context.WithTimeout(context.Background(), opts.indexTimeout)
164 defer cancel()
165 args := []string{
166 "-require_ctags",
167 fmt.Sprintf("-parallelism=%d", opts.cpuCount),
168 "-repo_cache", repoDir,
169 "-index", indexDir,
170 "-incremental",
171 }
172 args = append(args, opts.indexFlags...)
173 args = append(args, dir)
174 cmd := exec.CommandContext(ctx, "zoekt-git-index", args...)
175 loggedRun(cmd)
176}
177
178// deleteLogs deletes old logs.
179func deleteLogs(logDir string, maxAge time.Duration) {
180 fs, err := filepath.Glob(filepath.Join(logDir, "*"))
181 if err != nil {
182 log.Fatalf("filepath.Glob(%s): %v", logDir, err)
183 }
184
185 threshold := time.Now().Add(-maxAge)
186 for _, fn := range fs {
187 if fi, err := os.Lstat(fn); err == nil && fi.ModTime().Before(threshold) {
188 os.Remove(fn)
189 }
190 }
191}
192
193func deleteLogsLoop(logDir string, maxAge time.Duration) {
194 tick := time.NewTicker(maxAge / 100)
195 for {
196 deleteLogs(logDir, maxAge)
197 <-tick.C
198 }
199}
200
201// Delete the shard if its corresponding git repo can't be found.
202func deleteIfOrphan(repoDir string, fn string) error {
203 f, err := os.Open(fn)
204 if err != nil {
205 return nil
206 }
207 defer f.Close()
208
209 ifile, err := zoekt.NewIndexFile(f)
210 if err != nil {
211 return nil
212 }
213 defer ifile.Close()
214
215 repos, _, err := zoekt.ReadMetadata(ifile)
216 if err != nil {
217 return nil
218 }
219
220 // TODO support compound shards in zoekt-indexserver
221 if len(repos) != 1 {
222 return nil
223 }
224 repo := repos[0]
225
226 _, err = os.Stat(repo.Source)
227 if os.IsNotExist(err) {
228 log.Printf("deleting orphan shard %s; source %q not found", fn, repo.Source)
229 return os.Remove(fn)
230 }
231
232 return err
233}
234
235func deleteOrphanIndexes(indexDir, repoDir string, watchInterval time.Duration) {
236 t := time.NewTicker(watchInterval)
237
238 expr := indexDir + "/*"
239 for {
240 fs, err := filepath.Glob(expr)
241 if err != nil {
242 log.Printf("Glob(%q): %v", expr, err)
243 }
244
245 for _, f := range fs {
246 if err := deleteIfOrphan(repoDir, f); err != nil {
247 log.Printf("deleteIfOrphan(%q): %v", f, err)
248 }
249 }
250 <-t.C
251 }
252}
253
254func main() {
255 var opts Options
256 opts.defineFlags()
257 dataDir := flag.String("data_dir",
258 filepath.Join(os.Getenv("HOME"), "zoekt-serving"), "directory holding all data.")
259 indexDir := flag.String("index_dir", "", "directory holding index shards. Defaults to $data_dir/index/")
260 flag.Parse()
261 opts.validate()
262
263 if *dataDir == "" {
264 log.Fatal("must set --data_dir")
265 }
266
267 // Automatically prepend our own path at the front, to minimize
268 // required configuration.
269 if l, err := os.Readlink("/proc/self/exe"); err == nil {
270 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
271 }
272
273 logDir := filepath.Join(*dataDir, "logs")
274 if *indexDir == "" {
275 *indexDir = filepath.Join(*dataDir, "index")
276 }
277 repoDir := filepath.Join(*dataDir, "repos")
278 for _, s := range []string{logDir, *indexDir, repoDir} {
279 if _, err := os.Stat(s); err == nil {
280 continue
281 }
282
283 if err := os.MkdirAll(s, 0o755); err != nil {
284 log.Fatalf("MkdirAll %s: %v", s, err)
285 }
286 }
287
288 _, err := readConfigURL(opts.mirrorConfigFile)
289 if err != nil {
290 log.Fatalf("readConfigURL(%s): %v", opts.mirrorConfigFile, err)
291 }
292
293 pendingRepos := make(chan string, 10)
294 go periodicMirrorFile(repoDir, &opts, pendingRepos)
295 go deleteLogsLoop(logDir, opts.maxLogAge)
296 go deleteOrphanIndexes(*indexDir, repoDir, opts.fetchInterval)
297 go indexPendingRepos(*indexDir, repoDir, &opts, pendingRepos)
298 periodicFetch(repoDir, *indexDir, &opts, pendingRepos)
299}