fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Command zoekt-indexserver starts a service that periodically reindexes repositories. It follows 16// a "pull-based" design, where it reaches out to code hosts to fetch new data. 17package main 18 19import ( 20 "bytes" 21 "context" 22 "flag" 23 "fmt" 24 "log" 25 "math" 26 "os" 27 "os/exec" 28 "path/filepath" 29 "runtime" 30 "strings" 31 "time" 32 33 "github.com/sourcegraph/zoekt/index" 34 "github.com/sourcegraph/zoekt/internal/gitindex" 35) 36 37const day = time.Hour * 24 38 39func loggedRun(cmd *exec.Cmd) (out, err []byte) { 40 outBuf := &bytes.Buffer{} 41 errBuf := &bytes.Buffer{} 42 cmd.Stdout = outBuf 43 cmd.Stderr = errBuf 44 45 log.Printf("run %v", cmd.Args) 46 if err := cmd.Run(); err != nil { 47 log.Printf("command %s failed: %v\nOUT: %s\nERR: %s", 48 cmd.Args, err, outBuf.String(), errBuf.String()) 49 } 50 51 return outBuf.Bytes(), errBuf.Bytes() 52} 53 54type Options struct { 55 cpuFraction float64 56 cpuCount int 57 fetchInterval time.Duration 58 mirrorInterval time.Duration 59 indexFlagsStr string 60 indexFlags []string 61 mirrorConfigFile string 62 maxLogAge time.Duration 63 indexTimeout time.Duration 64} 65 66func (o *Options) validate() { 67 if o.cpuFraction <= 0.0 || o.cpuFraction > 1.0 { 68 log.Fatal("cpu_fraction must be between 0.0 and 1.0") 69 } 70 71 o.cpuCount = int(math.Trunc(float64(runtime.GOMAXPROCS(0)) * o.cpuFraction)) 72 if o.cpuCount < 1 { 73 o.cpuCount = 1 74 } 75 if o.indexFlagsStr != "" { 76 o.indexFlags = strings.Split(o.indexFlagsStr, " ") 77 } 78} 79 80func (o *Options) defineFlags() { 81 flag.DurationVar(&o.indexTimeout, "index_timeout", time.Hour, "kill index job after this much time") 82 flag.DurationVar(&o.maxLogAge, "max_log_age", 3*day, "recycle index logs after this much time") 83 flag.DurationVar(&o.fetchInterval, "fetch_interval", time.Hour, "run fetches this often") 84 flag.StringVar(&o.mirrorConfigFile, "mirror_config", 85 "", "JSON file holding mirror configuration.") 86 87 flag.DurationVar(&o.mirrorInterval, "mirror_duration", 24*time.Hour, "find and clone new repos at this frequency.") 88 flag.Float64Var(&o.cpuFraction, "cpu_fraction", 0.25, 89 "use this fraction of the cores for indexing.") 90 flag.StringVar(&o.indexFlagsStr, "git_index_flags", "", "space separated list of flags passed through to zoekt-git-index (e.g. -git_index_flags='-symbols=false -submodules=false'") 91} 92 93// periodicFetch runs git-fetch every once in a while. Results are 94// posted on pendingRepos. 95func periodicFetch(repoDir, indexDir string, opts *Options, pendingRepos chan<- string) { 96 t := time.NewTicker(opts.fetchInterval) 97 for { 98 repos, err := gitindex.FindGitRepos(repoDir) 99 if err != nil { 100 log.Println(err) 101 continue 102 } 103 if len(repos) == 0 { 104 log.Printf("no repos found under %s", repoDir) 105 } 106 107 // TODO: Randomize to make sure quota throttling hits everyone. 108 109 later := map[string]struct{}{} 110 for _, dir := range repos { 111 if ok := fetchGitRepo(dir); !ok { 112 later[dir] = struct{}{} 113 } else { 114 pendingRepos <- dir 115 } 116 } 117 118 for r := range later { 119 pendingRepos <- r 120 } 121 122 <-t.C 123 } 124} 125 126// fetchGitRepo runs git-fetch, and returns true if there was an 127// update. 128func fetchGitRepo(dir string) bool { 129 cmd := exec.Command("git", "--git-dir", dir, "fetch", "origin", "--prune") 130 131 output, err := cmd.CombinedOutput() 132 if err != nil { 133 log.Printf("command %s failed: %v\nCOMBINED_OUT: %s\n", 134 cmd.Args, err, string(output)) 135 return false 136 } 137 // When fetch found no updates, it prints nothing out 138 return len(output) != 0 139} 140 141// indexPendingRepos consumes the directories on the repos channel and 142// indexes them, sequentially. 143func indexPendingRepos(indexDir, repoDir string, opts *Options, repos <-chan string) { 144 for dir := range repos { 145 indexPendingRepo(dir, indexDir, repoDir, opts) 146 147 // Failures (eg. timeout) will leave temp files 148 // around. We have to clean them, or they will fill up the indexing volume. 149 if failures, err := filepath.Glob(filepath.Join(indexDir, "*.tmp")); err != nil { 150 log.Printf("Glob: %v", err) 151 } else { 152 for _, f := range failures { 153 os.Remove(f) 154 } 155 } 156 } 157} 158 159func indexPendingRepo(dir, indexDir, repoDir string, opts *Options) { 160 ctx, cancel := context.WithTimeout(context.Background(), opts.indexTimeout) 161 defer cancel() 162 args := []string{ 163 "-require_ctags", 164 fmt.Sprintf("-parallelism=%d", opts.cpuCount), 165 "-repo_cache", repoDir, 166 "-index", indexDir, 167 "-incremental", 168 } 169 args = append(args, opts.indexFlags...) 170 args = append(args, dir) 171 cmd := exec.CommandContext(ctx, "zoekt-git-index", args...) 172 loggedRun(cmd) 173} 174 175// deleteLogs deletes old logs. 176func deleteLogs(logDir string, maxAge time.Duration) { 177 fs, err := filepath.Glob(filepath.Join(logDir, "*")) 178 if err != nil { 179 log.Fatalf("filepath.Glob(%s): %v", logDir, err) 180 } 181 182 threshold := time.Now().Add(-maxAge) 183 for _, fn := range fs { 184 if fi, err := os.Lstat(fn); err == nil && fi.ModTime().Before(threshold) { 185 os.Remove(fn) 186 } 187 } 188} 189 190func deleteLogsLoop(logDir string, maxAge time.Duration) { 191 tick := time.NewTicker(maxAge / 100) 192 for { 193 deleteLogs(logDir, maxAge) 194 <-tick.C 195 } 196} 197 198// Delete the shard if its corresponding git repo can't be found. 199func deleteIfOrphan(repoDir string, fn string) error { 200 f, err := os.Open(fn) 201 if err != nil { 202 return nil 203 } 204 defer f.Close() 205 206 ifile, err := index.NewIndexFile(f) 207 if err != nil { 208 return nil 209 } 210 defer ifile.Close() 211 212 repos, _, err := index.ReadMetadata(ifile) 213 if err != nil { 214 return nil 215 } 216 217 // TODO support compound shards in zoekt-indexserver 218 if len(repos) != 1 { 219 return nil 220 } 221 repo := repos[0] 222 223 _, err = os.Stat(repo.Source) 224 if os.IsNotExist(err) { 225 log.Printf("deleting orphan shard %s; source %q not found", fn, repo.Source) 226 return os.Remove(fn) 227 } 228 229 return err 230} 231 232func deleteOrphanIndexes(indexDir, repoDir string, watchInterval time.Duration) { 233 t := time.NewTicker(watchInterval) 234 235 expr := indexDir + "/*" 236 for { 237 fs, err := filepath.Glob(expr) 238 if err != nil { 239 log.Printf("Glob(%q): %v", expr, err) 240 } 241 242 for _, f := range fs { 243 if err := deleteIfOrphan(repoDir, f); err != nil { 244 log.Printf("deleteIfOrphan(%q): %v", f, err) 245 } 246 } 247 <-t.C 248 } 249} 250 251func main() { 252 var opts Options 253 opts.defineFlags() 254 dataDir := flag.String("data_dir", 255 filepath.Join(os.Getenv("HOME"), "zoekt-serving"), "directory holding all data.") 256 indexDir := flag.String("index_dir", "", "directory holding index shards. Defaults to $data_dir/index/") 257 flag.Parse() 258 opts.validate() 259 260 if *dataDir == "" { 261 log.Fatal("must set --data_dir") 262 } 263 264 // Automatically prepend our own path at the front, to minimize 265 // required configuration. 266 if l, err := os.Readlink("/proc/self/exe"); err == nil { 267 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 268 } 269 270 logDir := filepath.Join(*dataDir, "logs") 271 if *indexDir == "" { 272 *indexDir = filepath.Join(*dataDir, "index") 273 } 274 repoDir := filepath.Join(*dataDir, "repos") 275 for _, s := range []string{logDir, *indexDir, repoDir} { 276 if _, err := os.Stat(s); err == nil { 277 continue 278 } 279 280 if err := os.MkdirAll(s, 0o755); err != nil { 281 log.Fatalf("MkdirAll %s: %v", s, err) 282 } 283 } 284 285 _, err := readConfigURL(opts.mirrorConfigFile) 286 if err != nil { 287 log.Fatalf("readConfigURL(%s): %v", opts.mirrorConfigFile, err) 288 } 289 290 pendingRepos := make(chan string, 10) 291 go periodicMirrorFile(repoDir, &opts, pendingRepos) 292 go deleteLogsLoop(logDir, opts.maxLogAge) 293 go deleteOrphanIndexes(*indexDir, repoDir, opts.fetchInterval) 294 go indexPendingRepos(*indexDir, repoDir, &opts, pendingRepos) 295 periodicFetch(repoDir, *indexDir, &opts, pendingRepos) 296}