fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// This program manages a zoekt dynamic indexing deployment:
16// * listens to indexing commands
17// * reindexes specified repositories
18
19package main
20
21import (
22 "bytes"
23 "context"
24 "encoding/json"
25 "flag"
26 "fmt"
27 "log"
28 "net/http"
29 "os"
30 "os/exec"
31 "path/filepath"
32 "strconv"
33 "time"
34
35 "github.com/prometheus/client_golang/prometheus"
36 "github.com/prometheus/client_golang/prometheus/collectors"
37 "github.com/prometheus/client_golang/prometheus/promauto"
38 "github.com/prometheus/client_golang/prometheus/promhttp"
39)
40
41func loggedRun(cmd *exec.Cmd) error {
42 outBuf := &bytes.Buffer{}
43 errBuf := &bytes.Buffer{}
44 cmd.Stdout = outBuf
45 cmd.Stderr = errBuf
46
47 log.Printf("run %v", cmd.Args)
48 if err := cmd.Run(); err != nil {
49 log.Printf("command %s failed: %v\nOUT: %s\nERR: %s",
50 cmd.Args, err, outBuf.String(), errBuf.String())
51 return fmt.Errorf("command %s failed: %v", cmd.Args, err)
52 }
53
54 return nil
55}
56
57type Options struct {
58 indexTimeout time.Duration
59 repoDir string
60 indexDir string
61 listen string
62}
63
64func (o *Options) createMissingDirectories() {
65 for _, s := range []string{o.repoDir, o.indexDir} {
66 if err := os.MkdirAll(s, 0o755); err != nil {
67 log.Fatalf("MkdirAll %s: %v", s, err)
68 }
69 }
70}
71
72type indexRequest struct {
73 CloneURL string // TODO: Decide if tokens can be in the URL or if we should pass separately
74 RepoID uint32
75}
76
77// This function is declared as var so that we can stub it in test
78var executeCmd = func(ctx context.Context, name string, arg ...string) error {
79 cmd := exec.CommandContext(ctx, name, arg...)
80 cmd.Stdin = &bytes.Buffer{}
81 err := loggedRun(cmd)
82
83 return err
84}
85
86func indexRepository(opts Options, req indexRequest) (map[string]any, error) {
87 ctx, cancel := context.WithTimeout(context.Background(), opts.indexTimeout)
88 defer cancel()
89
90 args := []string{}
91 args = append(args, "-dest", opts.repoDir)
92 args = append(args, "-name", strconv.FormatUint(uint64(req.RepoID), 10))
93 args = append(args, "-repoid", strconv.FormatUint(uint64(req.RepoID), 10))
94 args = append(args, req.CloneURL)
95 err := executeCmd(ctx, "zoekt-git-clone", args...)
96 if err != nil {
97 return nil, err
98 }
99
100 gitRepoPath, err := filepath.Abs(filepath.Join(opts.repoDir, fmt.Sprintf("%d.git", req.RepoID)))
101 if err != nil {
102 return nil, err
103 }
104
105 args = []string{
106 "-C",
107 gitRepoPath,
108 "fetch",
109 }
110 err = executeCmd(ctx, "git", args...)
111 if err != nil {
112 return nil, err
113 }
114
115 args = []string{
116 "-index", opts.indexDir,
117 gitRepoPath,
118 }
119 err = executeCmd(ctx, "zoekt-git-index", args...)
120 if err != nil {
121 return nil, err
122 }
123
124 response := map[string]any{
125 "Success": true,
126 }
127
128 return response, nil
129}
130
131type indexServer struct {
132 opts Options
133 promRegistry *prometheus.Registry
134 metricsRequestsTotal *prometheus.CounterVec
135}
136
137func (s *indexServer) serveHealthCheck(w http.ResponseWriter, r *http.Request) {
138 // Nothing to do. Just return 200
139}
140
141func (s *indexServer) serveMetrics(w http.ResponseWriter, r *http.Request) {
142 promhttp.HandlerFor(s.promRegistry, promhttp.HandlerOpts{Registry: s.promRegistry}).ServeHTTP(w, r)
143}
144
145func (s *indexServer) serveIndex(w http.ResponseWriter, r *http.Request) {
146 route := "index"
147 dec := json.NewDecoder(r.Body)
148 dec.DisallowUnknownFields()
149 var req indexRequest
150 err := dec.Decode(&req)
151
152 if err != nil {
153 log.Printf("Error decoding index request: %v", err)
154 http.Error(w, "JSON parser error", http.StatusBadRequest)
155 return
156 }
157
158 response, err := indexRepository(s.opts, req)
159 if err != nil {
160 s.respondWithError(w, r.Method, route, err)
161 return
162 }
163
164 w.Header().Set("Content-Type", "application/json")
165 _ = json.NewEncoder(w).Encode(response)
166
167 s.incrementRequestsTotal(r.Method, route, http.StatusOK)
168}
169
170func (s *indexServer) serveTruncate(w http.ResponseWriter, r *http.Request) {
171 route := "truncate"
172 err := emptyDirectory(s.opts.repoDir)
173
174 if err != nil {
175 err = fmt.Errorf("Failed to empty repoDir repoDir: %v with error: %v", s.opts.repoDir, err)
176
177 s.respondWithError(w, r.Method, route, err)
178 return
179 }
180
181 err = emptyDirectory(s.opts.indexDir)
182
183 if err != nil {
184 err = fmt.Errorf("Failed to empty repoDir indexDir: %v with error: %v", s.opts.repoDir, err)
185
186 s.respondWithError(w, r.Method, route, err)
187 return
188 }
189
190 response := map[string]any{
191 "Success": true,
192 }
193 w.Header().Set("Content-Type", "application/json")
194 _ = json.NewEncoder(w).Encode(response)
195
196 s.incrementRequestsTotal(r.Method, route, http.StatusOK)
197}
198
199func (s *indexServer) respondWithError(w http.ResponseWriter, method, route string, err error) {
200 responseCode := http.StatusInternalServerError
201
202 log.Print(err)
203 s.incrementRequestsTotal(method, route, responseCode)
204
205 w.Header().Set("Content-Type", "application/json")
206 w.WriteHeader(responseCode)
207 response := map[string]any{
208 "Success": false,
209 "Error": err.Error(),
210 }
211
212 _ = json.NewEncoder(w).Encode(response)
213}
214
215func (s *indexServer) incrementRequestsTotal(method, route string, responseCode int) {
216 s.metricsRequestsTotal.With(prometheus.Labels{"code": strconv.Itoa(responseCode), "method": method, "route": route}).Inc()
217}
218
219func (s *indexServer) initMetrics() {
220 s.promRegistry = prometheus.NewRegistry()
221
222 // Add go runtime metrics and process collectors.
223 s.promRegistry.MustRegister(
224 collectors.NewGoCollector(),
225 collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
226 )
227
228 s.metricsRequestsTotal = promauto.With(s.promRegistry).NewCounterVec(
229 prometheus.CounterOpts{
230 Name: "zoekt_dynamic_indexserver_requests_total",
231 Help: "Total number of HTTP requests by status code, method, and route.",
232 },
233 []string{"method", "route", "code"},
234 )
235}
236
237func (s *indexServer) startIndexingApi() {
238 http.HandleFunc("/", s.serveHealthCheck)
239 http.HandleFunc("/metrics", s.serveMetrics)
240 http.HandleFunc("/index", s.serveIndex)
241 http.HandleFunc("/truncate", s.serveTruncate)
242
243 if err := http.ListenAndServe(s.opts.listen, nil); err != nil {
244 log.Fatal(err)
245 }
246}
247
248func emptyDirectory(dir string) error {
249 files, err := os.ReadDir(dir)
250
251 if err != nil {
252 return err
253 }
254
255 for _, file := range files {
256 filePath := filepath.Join(dir, file.Name())
257 err := os.RemoveAll(filePath)
258 if err != nil {
259 return err
260 }
261 }
262
263 return nil
264}
265
266func parseOptions() Options {
267 repoDir := flag.String("repo_dir", "", "directory holding cloned repos.")
268 indexDir := flag.String("index_dir", "", "directory holding index shards.")
269 timeout := flag.Duration("index_timeout", time.Hour, "kill index job after this much time.")
270 listen := flag.String("listen", ":6060", "listen on this address.")
271 flag.Parse()
272
273 if *repoDir == "" {
274 log.Fatal("must set -repo_dir")
275 }
276
277 if *indexDir == "" {
278 log.Fatal("must set -index_dir")
279 *indexDir = filepath.Join(*repoDir, "index")
280 }
281
282 return Options{
283 repoDir: *repoDir,
284 indexDir: *indexDir,
285 indexTimeout: *timeout,
286 listen: *listen,
287 }
288}
289
290func main() {
291 opts := parseOptions()
292 opts.createMissingDirectories()
293
294 server := indexServer{
295 opts: opts,
296 }
297
298 server.initMetrics()
299 server.startIndexingApi()
300}