fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

at tngl 7.7 kB View raw
1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Command zoekt-dynamic-indexserver starts a server to manage dynamic indexing. In contrast to 16// zoekt-indexserver, it's designed for a "push-based" indexing model. The server 17// * listens to indexing commands 18// * reindexes specified repositories 19package main 20 21import ( 22 "bytes" 23 "context" 24 "encoding/json" 25 "flag" 26 "fmt" 27 "log" 28 "net/http" 29 "os" 30 "os/exec" 31 "path/filepath" 32 "strconv" 33 "time" 34 35 "github.com/prometheus/client_golang/prometheus" 36 "github.com/prometheus/client_golang/prometheus/collectors" 37 "github.com/prometheus/client_golang/prometheus/promauto" 38 "github.com/prometheus/client_golang/prometheus/promhttp" 39) 40 41func loggedRun(cmd *exec.Cmd) error { 42 outBuf := &bytes.Buffer{} 43 errBuf := &bytes.Buffer{} 44 cmd.Stdout = outBuf 45 cmd.Stderr = errBuf 46 47 log.Printf("run %v", cmd.Args) 48 if err := cmd.Run(); err != nil { 49 log.Printf("command %s failed: %v\nOUT: %s\nERR: %s", 50 cmd.Args, err, outBuf.String(), errBuf.String()) 51 return fmt.Errorf("command %s failed: %v", cmd.Args, err) 52 } 53 54 return nil 55} 56 57type Options struct { 58 indexTimeout time.Duration 59 repoDir string 60 indexDir string 61 listen string 62} 63 64func (o *Options) createMissingDirectories() { 65 for _, s := range []string{o.repoDir, o.indexDir} { 66 if err := os.MkdirAll(s, 0o755); err != nil { 67 log.Fatalf("MkdirAll %s: %v", s, err) 68 } 69 } 70} 71 72type indexRequest struct { 73 CloneURL string // TODO: Decide if tokens can be in the URL or if we should pass separately 74 RepoID uint32 75} 76 77// This function is declared as var so that we can stub it in test 78var executeCmd = func(ctx context.Context, name string, arg ...string) error { 79 cmd := exec.CommandContext(ctx, name, arg...) 80 cmd.Stdin = &bytes.Buffer{} 81 err := loggedRun(cmd) 82 83 return err 84} 85 86func indexRepository(opts Options, req indexRequest) (map[string]any, error) { 87 ctx, cancel := context.WithTimeout(context.Background(), opts.indexTimeout) 88 defer cancel() 89 90 args := []string{} 91 args = append(args, "-dest", opts.repoDir) 92 args = append(args, "-name", strconv.FormatUint(uint64(req.RepoID), 10)) 93 args = append(args, "-repoid", strconv.FormatUint(uint64(req.RepoID), 10)) 94 args = append(args, req.CloneURL) 95 err := executeCmd(ctx, "zoekt-git-clone", args...) 96 if err != nil { 97 return nil, err 98 } 99 100 gitRepoPath, err := filepath.Abs(filepath.Join(opts.repoDir, fmt.Sprintf("%d.git", req.RepoID))) 101 if err != nil { 102 return nil, err 103 } 104 105 args = []string{ 106 "-C", 107 gitRepoPath, 108 "fetch", 109 } 110 err = executeCmd(ctx, "git", args...) 111 if err != nil { 112 return nil, err 113 } 114 115 args = []string{ 116 "-index", opts.indexDir, 117 gitRepoPath, 118 } 119 err = executeCmd(ctx, "zoekt-git-index", args...) 120 if err != nil { 121 return nil, err 122 } 123 124 response := map[string]any{ 125 "Success": true, 126 } 127 128 return response, nil 129} 130 131type indexServer struct { 132 opts Options 133 promRegistry *prometheus.Registry 134 metricsRequestsTotal *prometheus.CounterVec 135} 136 137func (s *indexServer) serveHealthCheck(w http.ResponseWriter, r *http.Request) { 138 // Nothing to do. Just return 200 139} 140 141func (s *indexServer) serveMetrics(w http.ResponseWriter, r *http.Request) { 142 promhttp.HandlerFor(s.promRegistry, promhttp.HandlerOpts{Registry: s.promRegistry}).ServeHTTP(w, r) 143} 144 145func (s *indexServer) serveIndex(w http.ResponseWriter, r *http.Request) { 146 route := "index" 147 dec := json.NewDecoder(r.Body) 148 dec.DisallowUnknownFields() 149 var req indexRequest 150 err := dec.Decode(&req) 151 if err != nil { 152 log.Printf("Error decoding index request: %v", err) 153 http.Error(w, "JSON parser error", http.StatusBadRequest) 154 return 155 } 156 157 response, err := indexRepository(s.opts, req) 158 if err != nil { 159 s.respondWithError(w, r.Method, route, err) 160 return 161 } 162 163 w.Header().Set("Content-Type", "application/json") 164 _ = json.NewEncoder(w).Encode(response) 165 166 s.incrementRequestsTotal(r.Method, route, http.StatusOK) 167} 168 169func (s *indexServer) serveTruncate(w http.ResponseWriter, r *http.Request) { 170 route := "truncate" 171 err := emptyDirectory(s.opts.repoDir) 172 if err != nil { 173 err = fmt.Errorf("Failed to empty repoDir repoDir: %v with error: %v", s.opts.repoDir, err) 174 175 s.respondWithError(w, r.Method, route, err) 176 return 177 } 178 179 err = emptyDirectory(s.opts.indexDir) 180 if err != nil { 181 err = fmt.Errorf("Failed to empty repoDir indexDir: %v with error: %v", s.opts.repoDir, err) 182 183 s.respondWithError(w, r.Method, route, err) 184 return 185 } 186 187 response := map[string]any{ 188 "Success": true, 189 } 190 w.Header().Set("Content-Type", "application/json") 191 _ = json.NewEncoder(w).Encode(response) 192 193 s.incrementRequestsTotal(r.Method, route, http.StatusOK) 194} 195 196func (s *indexServer) respondWithError(w http.ResponseWriter, method, route string, err error) { 197 responseCode := http.StatusInternalServerError 198 199 log.Print(err) 200 s.incrementRequestsTotal(method, route, responseCode) 201 202 w.Header().Set("Content-Type", "application/json") 203 w.WriteHeader(responseCode) 204 response := map[string]any{ 205 "Success": false, 206 "Error": err.Error(), 207 } 208 209 _ = json.NewEncoder(w).Encode(response) 210} 211 212func (s *indexServer) incrementRequestsTotal(method, route string, responseCode int) { 213 s.metricsRequestsTotal.With(prometheus.Labels{"code": strconv.Itoa(responseCode), "method": method, "route": route}).Inc() 214} 215 216func (s *indexServer) initMetrics() { 217 s.promRegistry = prometheus.NewRegistry() 218 219 // Add go runtime metrics and process collectors. 220 s.promRegistry.MustRegister( 221 collectors.NewGoCollector(), 222 collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), 223 ) 224 225 s.metricsRequestsTotal = promauto.With(s.promRegistry).NewCounterVec( 226 prometheus.CounterOpts{ 227 Name: "zoekt_dynamic_indexserver_requests_total", 228 Help: "Total number of HTTP requests by status code, method, and route.", 229 }, 230 []string{"method", "route", "code"}, 231 ) 232} 233 234func (s *indexServer) startIndexingApi() { 235 http.HandleFunc("/", s.serveHealthCheck) 236 http.HandleFunc("/metrics", s.serveMetrics) 237 http.HandleFunc("/index", s.serveIndex) 238 http.HandleFunc("/truncate", s.serveTruncate) 239 240 if err := http.ListenAndServe(s.opts.listen, nil); err != nil { 241 log.Fatal(err) 242 } 243} 244 245func emptyDirectory(dir string) error { 246 files, err := os.ReadDir(dir) 247 if err != nil { 248 return err 249 } 250 251 for _, file := range files { 252 filePath := filepath.Join(dir, file.Name()) 253 err := os.RemoveAll(filePath) 254 if err != nil { 255 return err 256 } 257 } 258 259 return nil 260} 261 262func parseOptions() Options { 263 repoDir := flag.String("repo_dir", "", "directory holding cloned repos.") 264 indexDir := flag.String("index_dir", "", "directory holding index shards.") 265 timeout := flag.Duration("index_timeout", time.Hour, "kill index job after this much time.") 266 listen := flag.String("listen", ":6060", "listen on this address.") 267 flag.Parse() 268 269 if *repoDir == "" { 270 log.Fatal("must set -repo_dir") 271 } 272 273 if *indexDir == "" { 274 log.Fatal("must set -index_dir") 275 *indexDir = filepath.Join(*repoDir, "index") 276 } 277 278 return Options{ 279 repoDir: *repoDir, 280 indexDir: *indexDir, 281 indexTimeout: *timeout, 282 listen: *listen, 283 } 284} 285 286func main() { 287 opts := parseOptions() 288 opts.createMissingDirectories() 289 290 server := indexServer{ 291 opts: opts, 292 } 293 294 server.initMetrics() 295 server.startIndexingApi() 296}