fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// This program manages a zoekt indexing deployment: 16// * recycling logs 17// * periodically fetching new data. 18// * periodically reindexing all git repos. 19 20package main 21 22import ( 23 "bytes" 24 "context" 25 "flag" 26 "fmt" 27 "log" 28 "math" 29 "os" 30 "os/exec" 31 "path/filepath" 32 "runtime" 33 "strings" 34 "time" 35 36 "github.com/sourcegraph/zoekt" 37 "github.com/sourcegraph/zoekt/gitindex" 38) 39 40const day = time.Hour * 24 41 42func loggedRun(cmd *exec.Cmd) (out, err []byte) { 43 outBuf := &bytes.Buffer{} 44 errBuf := &bytes.Buffer{} 45 cmd.Stdout = outBuf 46 cmd.Stderr = errBuf 47 48 log.Printf("run %v", cmd.Args) 49 if err := cmd.Run(); err != nil { 50 log.Printf("command %s failed: %v\nOUT: %s\nERR: %s", 51 cmd.Args, err, outBuf.String(), errBuf.String()) 52 } 53 54 return outBuf.Bytes(), errBuf.Bytes() 55} 56 57type Options struct { 58 cpuFraction float64 59 cpuCount int 60 fetchInterval time.Duration 61 mirrorInterval time.Duration 62 indexFlagsStr string 63 indexFlags []string 64 mirrorConfigFile string 65 maxLogAge time.Duration 66 indexTimeout time.Duration 67} 68 69func (o *Options) validate() { 70 if o.cpuFraction <= 0.0 || o.cpuFraction > 1.0 { 71 log.Fatal("cpu_fraction must be between 0.0 and 1.0") 72 } 73 74 o.cpuCount = int(math.Trunc(float64(runtime.GOMAXPROCS(0)) * o.cpuFraction)) 75 if o.cpuCount < 1 { 76 o.cpuCount = 1 77 } 78 if o.indexFlagsStr != "" { 79 o.indexFlags = strings.Split(o.indexFlagsStr, " ") 80 } 81} 82 83func (o *Options) defineFlags() { 84 flag.DurationVar(&o.indexTimeout, "index_timeout", time.Hour, "kill index job after this much time") 85 flag.DurationVar(&o.maxLogAge, "max_log_age", 3*day, "recycle index logs after this much time") 86 flag.DurationVar(&o.fetchInterval, "fetch_interval", time.Hour, "run fetches this often") 87 flag.StringVar(&o.mirrorConfigFile, "mirror_config", 88 "", "JSON file holding mirror configuration.") 89 90 flag.DurationVar(&o.mirrorInterval, "mirror_duration", 24*time.Hour, "find and clone new repos at this frequency.") 91 flag.Float64Var(&o.cpuFraction, "cpu_fraction", 0.25, 92 "use this fraction of the cores for indexing.") 93 flag.StringVar(&o.indexFlagsStr, "git_index_flags", "", "space separated list of flags passed through to zoekt-git-index (e.g. -git_index_flags='-symbols=false -submodules=false'") 94} 95 96// periodicFetch runs git-fetch every once in a while. Results are 97// posted on pendingRepos. 98func periodicFetch(repoDir, indexDir string, opts *Options, pendingRepos chan<- string) { 99 t := time.NewTicker(opts.fetchInterval) 100 for { 101 repos, err := gitindex.FindGitRepos(repoDir) 102 if err != nil { 103 log.Println(err) 104 continue 105 } 106 if len(repos) == 0 { 107 log.Printf("no repos found under %s", repoDir) 108 } 109 110 // TODO: Randomize to make sure quota throttling hits everyone. 111 112 later := map[string]struct{}{} 113 for _, dir := range repos { 114 if ok := fetchGitRepo(dir); !ok { 115 later[dir] = struct{}{} 116 } else { 117 pendingRepos <- dir 118 } 119 } 120 121 for r := range later { 122 pendingRepos <- r 123 } 124 125 <-t.C 126 } 127} 128 129// fetchGitRepo runs git-fetch, and returns true if there was an 130// update. 131func fetchGitRepo(dir string) bool { 132 cmd := exec.Command("git", "--git-dir", dir, "fetch", "origin", "--prune") 133 134 output, err := cmd.CombinedOutput() 135 if err != nil { 136 log.Printf("command %s failed: %v\nCOMBINED_OUT: %s\n", 137 cmd.Args, err, string(output)) 138 return false 139 } 140 // When fetch found no updates, it prints nothing out 141 return len(output) != 0 142} 143 144// indexPendingRepos consumes the directories on the repos channel and 145// indexes them, sequentially. 146func indexPendingRepos(indexDir, repoDir string, opts *Options, repos <-chan string) { 147 for dir := range repos { 148 indexPendingRepo(dir, indexDir, repoDir, opts) 149 150 // Failures (eg. timeout) will leave temp files 151 // around. We have to clean them, or they will fill up the indexing volume. 152 if failures, err := filepath.Glob(filepath.Join(indexDir, "*.tmp")); err != nil { 153 log.Printf("Glob: %v", err) 154 } else { 155 for _, f := range failures { 156 os.Remove(f) 157 } 158 } 159 } 160} 161 162func indexPendingRepo(dir, indexDir, repoDir string, opts *Options) { 163 ctx, cancel := context.WithTimeout(context.Background(), opts.indexTimeout) 164 defer cancel() 165 args := []string{ 166 "-require_ctags", 167 fmt.Sprintf("-parallelism=%d", opts.cpuCount), 168 "-repo_cache", repoDir, 169 "-index", indexDir, 170 "-incremental", 171 } 172 args = append(args, opts.indexFlags...) 173 args = append(args, dir) 174 cmd := exec.CommandContext(ctx, "zoekt-git-index", args...) 175 loggedRun(cmd) 176} 177 178// deleteLogs deletes old logs. 179func deleteLogs(logDir string, maxAge time.Duration) { 180 fs, err := filepath.Glob(filepath.Join(logDir, "*")) 181 if err != nil { 182 log.Fatalf("filepath.Glob(%s): %v", logDir, err) 183 } 184 185 threshold := time.Now().Add(-maxAge) 186 for _, fn := range fs { 187 if fi, err := os.Lstat(fn); err == nil && fi.ModTime().Before(threshold) { 188 os.Remove(fn) 189 } 190 } 191} 192 193func deleteLogsLoop(logDir string, maxAge time.Duration) { 194 tick := time.NewTicker(maxAge / 100) 195 for { 196 deleteLogs(logDir, maxAge) 197 <-tick.C 198 } 199} 200 201// Delete the shard if its corresponding git repo can't be found. 202func deleteIfOrphan(repoDir string, fn string) error { 203 f, err := os.Open(fn) 204 if err != nil { 205 return nil 206 } 207 defer f.Close() 208 209 ifile, err := zoekt.NewIndexFile(f) 210 if err != nil { 211 return nil 212 } 213 defer ifile.Close() 214 215 repos, _, err := zoekt.ReadMetadata(ifile) 216 if err != nil { 217 return nil 218 } 219 220 // TODO support compound shards in zoekt-indexserver 221 if len(repos) != 1 { 222 return nil 223 } 224 repo := repos[0] 225 226 _, err = os.Stat(repo.Source) 227 if os.IsNotExist(err) { 228 log.Printf("deleting orphan shard %s; source %q not found", fn, repo.Source) 229 return os.Remove(fn) 230 } 231 232 return err 233} 234 235func deleteOrphanIndexes(indexDir, repoDir string, watchInterval time.Duration) { 236 t := time.NewTicker(watchInterval) 237 238 expr := indexDir + "/*" 239 for { 240 fs, err := filepath.Glob(expr) 241 if err != nil { 242 log.Printf("Glob(%q): %v", expr, err) 243 } 244 245 for _, f := range fs { 246 if err := deleteIfOrphan(repoDir, f); err != nil { 247 log.Printf("deleteIfOrphan(%q): %v", f, err) 248 } 249 } 250 <-t.C 251 } 252} 253 254func main() { 255 var opts Options 256 opts.defineFlags() 257 dataDir := flag.String("data_dir", 258 filepath.Join(os.Getenv("HOME"), "zoekt-serving"), "directory holding all data.") 259 indexDir := flag.String("index_dir", "", "directory holding index shards. Defaults to $data_dir/index/") 260 flag.Parse() 261 opts.validate() 262 263 if *dataDir == "" { 264 log.Fatal("must set --data_dir") 265 } 266 267 // Automatically prepend our own path at the front, to minimize 268 // required configuration. 269 if l, err := os.Readlink("/proc/self/exe"); err == nil { 270 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 271 } 272 273 logDir := filepath.Join(*dataDir, "logs") 274 if *indexDir == "" { 275 *indexDir = filepath.Join(*dataDir, "index") 276 } 277 repoDir := filepath.Join(*dataDir, "repos") 278 for _, s := range []string{logDir, *indexDir, repoDir} { 279 if _, err := os.Stat(s); err == nil { 280 continue 281 } 282 283 if err := os.MkdirAll(s, 0o755); err != nil { 284 log.Fatalf("MkdirAll %s: %v", s, err) 285 } 286 } 287 288 _, err := readConfigURL(opts.mirrorConfigFile) 289 if err != nil { 290 log.Fatalf("readConfigURL(%s): %v", opts.mirrorConfigFile, err) 291 } 292 293 pendingRepos := make(chan string, 10) 294 go periodicMirrorFile(repoDir, &opts, pendingRepos) 295 go deleteLogsLoop(logDir, opts.maxLogAge) 296 go deleteOrphanIndexes(*indexDir, repoDir, opts.fetchInterval) 297 go indexPendingRepos(*indexDir, repoDir, &opts, pendingRepos) 298 periodicFetch(repoDir, *indexDir, &opts, pendingRepos) 299}