fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Command zoekt-indexserver starts a service that periodically reindexes repositories. It follows
16// a "pull-based" design, where it reaches out to code hosts to fetch new data.
17package main
18
19import (
20 "bytes"
21 "context"
22 "flag"
23 "fmt"
24 "log"
25 "math"
26 "os"
27 "os/exec"
28 "path/filepath"
29 "runtime"
30 "strings"
31 "time"
32
33 "github.com/sourcegraph/zoekt/index"
34 "github.com/sourcegraph/zoekt/internal/gitindex"
35)
36
37const day = time.Hour * 24
38
39func loggedRun(cmd *exec.Cmd) (out, err []byte) {
40 outBuf := &bytes.Buffer{}
41 errBuf := &bytes.Buffer{}
42 cmd.Stdout = outBuf
43 cmd.Stderr = errBuf
44
45 log.Printf("run %v", cmd.Args)
46 if err := cmd.Run(); err != nil {
47 log.Printf("command %s failed: %v\nOUT: %s\nERR: %s",
48 cmd.Args, err, outBuf.String(), errBuf.String())
49 }
50
51 return outBuf.Bytes(), errBuf.Bytes()
52}
53
54type Options struct {
55 cpuFraction float64
56 cpuCount int
57 fetchInterval time.Duration
58 mirrorInterval time.Duration
59 indexFlagsStr string
60 indexFlags []string
61 mirrorConfigFile string
62 maxLogAge time.Duration
63 indexTimeout time.Duration
64}
65
66func (o *Options) validate() {
67 if o.cpuFraction <= 0.0 || o.cpuFraction > 1.0 {
68 log.Fatal("cpu_fraction must be between 0.0 and 1.0")
69 }
70
71 o.cpuCount = max(int(math.Trunc(float64(runtime.GOMAXPROCS(0))*o.cpuFraction)), 1)
72 if o.indexFlagsStr != "" {
73 o.indexFlags = strings.Split(o.indexFlagsStr, " ")
74 }
75}
76
77func (o *Options) defineFlags() {
78 flag.DurationVar(&o.indexTimeout, "index_timeout", time.Hour, "kill index job after this much time")
79 flag.DurationVar(&o.maxLogAge, "max_log_age", 3*day, "recycle index logs after this much time")
80 flag.DurationVar(&o.fetchInterval, "fetch_interval", time.Hour, "run fetches this often")
81 flag.StringVar(&o.mirrorConfigFile, "mirror_config",
82 "", "JSON file holding mirror configuration.")
83
84 flag.DurationVar(&o.mirrorInterval, "mirror_duration", 24*time.Hour, "find and clone new repos at this frequency.")
85 flag.Float64Var(&o.cpuFraction, "cpu_fraction", 0.25,
86 "use this fraction of the cores for indexing.")
87 flag.StringVar(&o.indexFlagsStr, "git_index_flags", "", "space separated list of flags passed through to zoekt-git-index (e.g. -git_index_flags='-symbols=false -submodules=false'")
88}
89
90// periodicFetch runs git-fetch every once in a while. Results are
91// posted on pendingRepos.
92func periodicFetch(repoDir, indexDir string, opts *Options, pendingRepos chan<- string) {
93 t := time.NewTicker(opts.fetchInterval)
94 for {
95 repos, err := gitindex.FindGitRepos(repoDir)
96 if err != nil {
97 log.Println(err)
98 continue
99 }
100 if len(repos) == 0 {
101 log.Printf("no repos found under %s", repoDir)
102 }
103
104 // TODO: Randomize to make sure quota throttling hits everyone.
105
106 later := map[string]struct{}{}
107 for _, dir := range repos {
108 if ok := fetchGitRepo(dir); !ok {
109 later[dir] = struct{}{}
110 } else {
111 pendingRepos <- dir
112 }
113 }
114
115 for r := range later {
116 pendingRepos <- r
117 }
118
119 <-t.C
120 }
121}
122
123// fetchGitRepo runs git-fetch, and returns true if there was an
124// update.
125func fetchGitRepo(dir string) bool {
126 cmd := exec.Command("git", "--git-dir", dir, "fetch", "origin", "--prune")
127
128 output, err := cmd.CombinedOutput()
129 if err != nil {
130 log.Printf("command %s failed: %v\nCOMBINED_OUT: %s\n",
131 cmd.Args, err, string(output))
132 return false
133 }
134 // When fetch found no updates, it prints nothing out
135 return len(output) != 0
136}
137
138// indexPendingRepos consumes the directories on the repos channel and
139// indexes them, sequentially.
140func indexPendingRepos(indexDir, repoDir string, opts *Options, repos <-chan string) {
141 for dir := range repos {
142 indexPendingRepo(dir, indexDir, repoDir, opts)
143
144 // Failures (eg. timeout) will leave temp files
145 // around. We have to clean them, or they will fill up the indexing volume.
146 if failures, err := filepath.Glob(filepath.Join(indexDir, "*.tmp")); err != nil {
147 log.Printf("Glob: %v", err)
148 } else {
149 for _, f := range failures {
150 os.Remove(f)
151 }
152 }
153 }
154}
155
156func indexPendingRepo(dir, indexDir, repoDir string, opts *Options) {
157 ctx, cancel := context.WithTimeout(context.Background(), opts.indexTimeout)
158 defer cancel()
159 args := []string{
160 "-require_ctags",
161 fmt.Sprintf("-parallelism=%d", opts.cpuCount),
162 "-repo_cache", repoDir,
163 "-index", indexDir,
164 "-incremental",
165 }
166 args = append(args, opts.indexFlags...)
167 args = append(args, dir)
168 cmd := exec.CommandContext(ctx, "zoekt-git-index", args...)
169 loggedRun(cmd)
170}
171
172// deleteLogs deletes old logs.
173func deleteLogs(logDir string, maxAge time.Duration) {
174 fs, err := filepath.Glob(filepath.Join(logDir, "*"))
175 if err != nil {
176 log.Fatalf("filepath.Glob(%s): %v", logDir, err)
177 }
178
179 threshold := time.Now().Add(-maxAge)
180 for _, fn := range fs {
181 if fi, err := os.Lstat(fn); err == nil && fi.ModTime().Before(threshold) {
182 os.Remove(fn)
183 }
184 }
185}
186
187func deleteLogsLoop(logDir string, maxAge time.Duration) {
188 tick := time.NewTicker(maxAge / 100)
189 for {
190 deleteLogs(logDir, maxAge)
191 <-tick.C
192 }
193}
194
195// Delete the shard if its corresponding git repo can't be found.
196func deleteIfOrphan(repoDir string, fn string) error {
197 f, err := os.Open(fn)
198 if err != nil {
199 return nil
200 }
201 defer f.Close()
202
203 ifile, err := index.NewIndexFile(f)
204 if err != nil {
205 return nil
206 }
207 defer ifile.Close()
208
209 repos, _, err := index.ReadMetadata(ifile)
210 if err != nil {
211 return nil
212 }
213
214 // TODO support compound shards in zoekt-indexserver
215 if len(repos) != 1 {
216 return nil
217 }
218 repo := repos[0]
219
220 _, err = os.Stat(repo.Source)
221 if os.IsNotExist(err) {
222 log.Printf("deleting orphan shard %s; source %q not found", fn, repo.Source)
223 return os.Remove(fn)
224 }
225
226 return err
227}
228
229func deleteOrphanIndexes(indexDir, repoDir string, watchInterval time.Duration) {
230 t := time.NewTicker(watchInterval)
231
232 expr := indexDir + "/*"
233 for {
234 fs, err := filepath.Glob(expr)
235 if err != nil {
236 log.Printf("Glob(%q): %v", expr, err)
237 }
238
239 for _, f := range fs {
240 if err := deleteIfOrphan(repoDir, f); err != nil {
241 log.Printf("deleteIfOrphan(%q): %v", f, err)
242 }
243 }
244 <-t.C
245 }
246}
247
248func main() {
249 var opts Options
250 opts.defineFlags()
251 dataDir := flag.String("data_dir",
252 filepath.Join(os.Getenv("HOME"), "zoekt-serving"), "directory holding all data.")
253 indexDir := flag.String("index_dir", "", "directory holding index shards. Defaults to $data_dir/index/")
254 flag.Parse()
255 opts.validate()
256
257 if *dataDir == "" {
258 log.Fatal("must set --data_dir")
259 }
260
261 // Automatically prepend our own path at the front, to minimize
262 // required configuration.
263 if l, err := os.Readlink("/proc/self/exe"); err == nil {
264 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
265 }
266
267 logDir := filepath.Join(*dataDir, "logs")
268 if *indexDir == "" {
269 *indexDir = filepath.Join(*dataDir, "index")
270 }
271 repoDir := filepath.Join(*dataDir, "repos")
272 for _, s := range []string{logDir, *indexDir, repoDir} {
273 if _, err := os.Stat(s); err == nil {
274 continue
275 }
276
277 if err := os.MkdirAll(s, 0o755); err != nil {
278 log.Fatalf("MkdirAll %s: %v", s, err)
279 }
280 }
281
282 _, err := readConfigURL(opts.mirrorConfigFile)
283 if err != nil {
284 log.Fatalf("readConfigURL(%s): %v", opts.mirrorConfigFile, err)
285 }
286
287 pendingRepos := make(chan string, 10)
288 go periodicMirrorFile(repoDir, &opts, pendingRepos)
289 go deleteLogsLoop(logDir, opts.maxLogAge)
290 go deleteOrphanIndexes(*indexDir, repoDir, opts.fetchInterval)
291 go indexPendingRepos(*indexDir, repoDir, &opts, pendingRepos)
292 periodicFetch(repoDir, *indexDir, &opts, pendingRepos)
293}