fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

at tngl 8.0 kB View raw
1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Command zoekt-indexserver starts a service that periodically reindexes repositories. It follows 16// a "pull-based" design, where it reaches out to code hosts to fetch new data. 17package main 18 19import ( 20 "bytes" 21 "context" 22 "flag" 23 "fmt" 24 "log" 25 "math" 26 "os" 27 "os/exec" 28 "path/filepath" 29 "runtime" 30 "strings" 31 "time" 32 33 "github.com/sourcegraph/zoekt/gitindex" 34 "github.com/sourcegraph/zoekt/index" 35) 36 37const day = time.Hour * 24 38 39func loggedRun(cmd *exec.Cmd) (out, err []byte) { 40 outBuf := &bytes.Buffer{} 41 errBuf := &bytes.Buffer{} 42 cmd.Stdout = outBuf 43 cmd.Stderr = errBuf 44 45 log.Printf("run %v", cmd.Args) 46 if err := cmd.Run(); err != nil { 47 log.Printf("command %s failed: %v\nOUT: %s\nERR: %s", 48 cmd.Args, err, outBuf.String(), errBuf.String()) 49 } 50 51 return outBuf.Bytes(), errBuf.Bytes() 52} 53 54type Options struct { 55 cpuFraction float64 56 cpuCount int 57 fetchInterval time.Duration 58 mirrorInterval time.Duration 59 indexFlagsStr string 60 indexFlags []string 61 mirrorConfigFile string 62 maxLogAge time.Duration 63 indexTimeout time.Duration 64} 65 66func (o *Options) validate() { 67 if o.cpuFraction <= 0.0 || o.cpuFraction > 1.0 { 68 log.Fatal("cpu_fraction must be between 0.0 and 1.0") 69 } 70 71 o.cpuCount = max(int(math.Trunc(float64(runtime.GOMAXPROCS(0))*o.cpuFraction)), 1) 72 if o.indexFlagsStr != "" { 73 o.indexFlags = strings.Split(o.indexFlagsStr, " ") 74 } 75} 76 77func (o *Options) defineFlags() { 78 flag.DurationVar(&o.indexTimeout, "index_timeout", time.Hour, "kill index job after this much time") 79 flag.DurationVar(&o.maxLogAge, "max_log_age", 3*day, "recycle index logs after this much time") 80 flag.DurationVar(&o.fetchInterval, "fetch_interval", time.Hour, "run fetches this often") 81 flag.StringVar(&o.mirrorConfigFile, "mirror_config", 82 "", "JSON file holding mirror configuration.") 83 84 flag.DurationVar(&o.mirrorInterval, "mirror_duration", 24*time.Hour, "find and clone new repos at this frequency.") 85 flag.Float64Var(&o.cpuFraction, "cpu_fraction", 0.25, 86 "use this fraction of the cores for indexing.") 87 flag.StringVar(&o.indexFlagsStr, "git_index_flags", "", "space separated list of flags passed through to zoekt-git-index (e.g. -git_index_flags='-symbols=false -submodules=false'") 88} 89 90// periodicFetch runs git-fetch every once in a while. Results are 91// posted on pendingRepos. 92func periodicFetch(repoDir, indexDir string, opts *Options, pendingRepos chan<- string) { 93 t := time.NewTicker(opts.fetchInterval) 94 for { 95 repos, err := gitindex.FindGitRepos(repoDir) 96 if err != nil { 97 log.Println(err) 98 continue 99 } 100 if len(repos) == 0 { 101 log.Printf("no repos found under %s", repoDir) 102 } 103 104 // TODO: Randomize to make sure quota throttling hits everyone. 105 106 later := map[string]struct{}{} 107 for _, dir := range repos { 108 if ok := fetchGitRepo(dir); !ok { 109 later[dir] = struct{}{} 110 } else { 111 pendingRepos <- dir 112 } 113 } 114 115 for r := range later { 116 pendingRepos <- r 117 } 118 119 <-t.C 120 } 121} 122 123// fetchGitRepo runs git-fetch, and returns true if there was an 124// update. 125func fetchGitRepo(dir string) bool { 126 cmd := exec.Command("git", "--git-dir", dir, "fetch", "origin", "--prune") 127 128 output, err := cmd.CombinedOutput() 129 if err != nil { 130 log.Printf("command %s failed: %v\nCOMBINED_OUT: %s\n", 131 cmd.Args, err, string(output)) 132 return false 133 } 134 // When fetch found no updates, it prints nothing out 135 return len(output) != 0 136} 137 138// indexPendingRepos consumes the directories on the repos channel and 139// indexes them, sequentially. 140func indexPendingRepos(indexDir, repoDir string, opts *Options, repos <-chan string) { 141 for dir := range repos { 142 indexPendingRepo(dir, indexDir, repoDir, opts) 143 144 // Failures (eg. timeout) will leave temp files 145 // around. We have to clean them, or they will fill up the indexing volume. 146 if failures, err := filepath.Glob(filepath.Join(indexDir, "*.tmp")); err != nil { 147 log.Printf("Glob: %v", err) 148 } else { 149 for _, f := range failures { 150 os.Remove(f) 151 } 152 } 153 } 154} 155 156func indexPendingRepo(dir, indexDir, repoDir string, opts *Options) { 157 ctx, cancel := context.WithTimeout(context.Background(), opts.indexTimeout) 158 defer cancel() 159 args := []string{ 160 "-require_ctags", 161 fmt.Sprintf("-parallelism=%d", opts.cpuCount), 162 "-repo_cache", repoDir, 163 "-index", indexDir, 164 "-incremental", 165 } 166 args = append(args, opts.indexFlags...) 167 args = append(args, dir) 168 cmd := exec.CommandContext(ctx, "zoekt-git-index", args...) 169 loggedRun(cmd) 170} 171 172// deleteLogs deletes old logs. 173func deleteLogs(logDir string, maxAge time.Duration) { 174 fs, err := filepath.Glob(filepath.Join(logDir, "*")) 175 if err != nil { 176 log.Fatalf("filepath.Glob(%s): %v", logDir, err) 177 } 178 179 threshold := time.Now().Add(-maxAge) 180 for _, fn := range fs { 181 if fi, err := os.Lstat(fn); err == nil && fi.ModTime().Before(threshold) { 182 os.Remove(fn) 183 } 184 } 185} 186 187func deleteLogsLoop(logDir string, maxAge time.Duration) { 188 tick := time.NewTicker(maxAge / 100) 189 for { 190 deleteLogs(logDir, maxAge) 191 <-tick.C 192 } 193} 194 195// Delete the shard if its corresponding git repo can't be found. 196func deleteIfOrphan(repoDir string, fn string) error { 197 f, err := os.Open(fn) 198 if err != nil { 199 return nil 200 } 201 defer f.Close() 202 203 ifile, err := index.NewIndexFile(f) 204 if err != nil { 205 return nil 206 } 207 defer ifile.Close() 208 209 repos, _, err := index.ReadMetadata(ifile) 210 if err != nil { 211 return nil 212 } 213 214 // TODO support compound shards in zoekt-indexserver 215 if len(repos) != 1 { 216 return nil 217 } 218 repo := repos[0] 219 220 _, err = os.Stat(repo.Source) 221 if os.IsNotExist(err) { 222 log.Printf("deleting orphan shard %s; source %q not found", fn, repo.Source) 223 return os.Remove(fn) 224 } 225 226 return err 227} 228 229func deleteOrphanIndexes(indexDir, repoDir string, watchInterval time.Duration) { 230 t := time.NewTicker(watchInterval) 231 232 expr := indexDir + "/*" 233 for { 234 fs, err := filepath.Glob(expr) 235 if err != nil { 236 log.Printf("Glob(%q): %v", expr, err) 237 } 238 239 for _, f := range fs { 240 if err := deleteIfOrphan(repoDir, f); err != nil { 241 log.Printf("deleteIfOrphan(%q): %v", f, err) 242 } 243 } 244 <-t.C 245 } 246} 247 248func main() { 249 var opts Options 250 opts.defineFlags() 251 dataDir := flag.String("data_dir", 252 filepath.Join(os.Getenv("HOME"), "zoekt-serving"), "directory holding all data.") 253 indexDir := flag.String("index_dir", "", "directory holding index shards. Defaults to $data_dir/index/") 254 flag.Parse() 255 opts.validate() 256 257 if *dataDir == "" { 258 log.Fatal("must set --data_dir") 259 } 260 261 // Automatically prepend our own path at the front, to minimize 262 // required configuration. 263 if l, err := os.Readlink("/proc/self/exe"); err == nil { 264 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 265 } 266 267 logDir := filepath.Join(*dataDir, "logs") 268 if *indexDir == "" { 269 *indexDir = filepath.Join(*dataDir, "index") 270 } 271 repoDir := filepath.Join(*dataDir, "repos") 272 for _, s := range []string{logDir, *indexDir, repoDir} { 273 if _, err := os.Stat(s); err == nil { 274 continue 275 } 276 277 if err := os.MkdirAll(s, 0o755); err != nil { 278 log.Fatalf("MkdirAll %s: %v", s, err) 279 } 280 } 281 282 _, err := readConfigURL(opts.mirrorConfigFile) 283 if err != nil { 284 log.Fatalf("readConfigURL(%s): %v", opts.mirrorConfigFile, err) 285 } 286 287 pendingRepos := make(chan string, 10) 288 go periodicMirrorFile(repoDir, &opts, pendingRepos) 289 go deleteLogsLoop(logDir, opts.maxLogAge) 290 go deleteOrphanIndexes(*indexDir, repoDir, opts.fetchInterval) 291 go indexPendingRepos(*indexDir, repoDir, &opts, pendingRepos) 292 periodicFetch(repoDir, *indexDir, &opts, pendingRepos) 293}