fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Command zoekt-dynamic-indexserver starts a server to manage dynamic indexing. In contrast to
16// zoekt-indexserver, it's designed for a "push-based" indexing model. The server
17// * listens to indexing commands
18// * reindexes specified repositories
19package main
20
21import (
22 "bytes"
23 "context"
24 "encoding/json"
25 "flag"
26 "fmt"
27 "log"
28 "net/http"
29 "os"
30 "os/exec"
31 "path/filepath"
32 "strconv"
33 "time"
34
35 "github.com/prometheus/client_golang/prometheus"
36 "github.com/prometheus/client_golang/prometheus/collectors"
37 "github.com/prometheus/client_golang/prometheus/promauto"
38 "github.com/prometheus/client_golang/prometheus/promhttp"
39)
40
41func loggedRun(cmd *exec.Cmd) error {
42 outBuf := &bytes.Buffer{}
43 errBuf := &bytes.Buffer{}
44 cmd.Stdout = outBuf
45 cmd.Stderr = errBuf
46
47 log.Printf("run %v", cmd.Args)
48 if err := cmd.Run(); err != nil {
49 log.Printf("command %s failed: %v\nOUT: %s\nERR: %s",
50 cmd.Args, err, outBuf.String(), errBuf.String())
51 return fmt.Errorf("command %s failed: %v", cmd.Args, err)
52 }
53
54 return nil
55}
56
57type Options struct {
58 indexTimeout time.Duration
59 repoDir string
60 indexDir string
61 listen string
62}
63
64func (o *Options) createMissingDirectories() {
65 for _, s := range []string{o.repoDir, o.indexDir} {
66 if err := os.MkdirAll(s, 0o755); err != nil {
67 log.Fatalf("MkdirAll %s: %v", s, err)
68 }
69 }
70}
71
72type indexRequest struct {
73 CloneURL string // TODO: Decide if tokens can be in the URL or if we should pass separately
74 RepoID uint32
75}
76
77// This function is declared as var so that we can stub it in test
78var executeCmd = func(ctx context.Context, name string, arg ...string) error {
79 cmd := exec.CommandContext(ctx, name, arg...)
80 cmd.Stdin = &bytes.Buffer{}
81 err := loggedRun(cmd)
82
83 return err
84}
85
86func indexRepository(opts Options, req indexRequest) (map[string]any, error) {
87 ctx, cancel := context.WithTimeout(context.Background(), opts.indexTimeout)
88 defer cancel()
89
90 args := []string{}
91 args = append(args, "-dest", opts.repoDir)
92 args = append(args, "-name", strconv.FormatUint(uint64(req.RepoID), 10))
93 args = append(args, "-repoid", strconv.FormatUint(uint64(req.RepoID), 10))
94 args = append(args, req.CloneURL)
95 err := executeCmd(ctx, "zoekt-git-clone", args...)
96 if err != nil {
97 return nil, err
98 }
99
100 gitRepoPath, err := filepath.Abs(filepath.Join(opts.repoDir, fmt.Sprintf("%d.git", req.RepoID)))
101 if err != nil {
102 return nil, err
103 }
104
105 args = []string{
106 "-C",
107 gitRepoPath,
108 "fetch",
109 }
110 err = executeCmd(ctx, "git", args...)
111 if err != nil {
112 return nil, err
113 }
114
115 args = []string{
116 "-index", opts.indexDir,
117 gitRepoPath,
118 }
119 err = executeCmd(ctx, "zoekt-git-index", args...)
120 if err != nil {
121 return nil, err
122 }
123
124 response := map[string]any{
125 "Success": true,
126 }
127
128 return response, nil
129}
130
131type indexServer struct {
132 opts Options
133 promRegistry *prometheus.Registry
134 metricsRequestsTotal *prometheus.CounterVec
135}
136
137func (s *indexServer) serveHealthCheck(w http.ResponseWriter, r *http.Request) {
138 // Nothing to do. Just return 200
139}
140
141func (s *indexServer) serveMetrics(w http.ResponseWriter, r *http.Request) {
142 promhttp.HandlerFor(s.promRegistry, promhttp.HandlerOpts{Registry: s.promRegistry}).ServeHTTP(w, r)
143}
144
145func (s *indexServer) serveIndex(w http.ResponseWriter, r *http.Request) {
146 route := "index"
147 dec := json.NewDecoder(r.Body)
148 dec.DisallowUnknownFields()
149 var req indexRequest
150 err := dec.Decode(&req)
151 if err != nil {
152 log.Printf("Error decoding index request: %v", err)
153 http.Error(w, "JSON parser error", http.StatusBadRequest)
154 return
155 }
156
157 response, err := indexRepository(s.opts, req)
158 if err != nil {
159 s.respondWithError(w, r.Method, route, err)
160 return
161 }
162
163 w.Header().Set("Content-Type", "application/json")
164 _ = json.NewEncoder(w).Encode(response)
165
166 s.incrementRequestsTotal(r.Method, route, http.StatusOK)
167}
168
169func (s *indexServer) serveTruncate(w http.ResponseWriter, r *http.Request) {
170 route := "truncate"
171 err := emptyDirectory(s.opts.repoDir)
172 if err != nil {
173 err = fmt.Errorf("Failed to empty repoDir repoDir: %v with error: %v", s.opts.repoDir, err)
174
175 s.respondWithError(w, r.Method, route, err)
176 return
177 }
178
179 err = emptyDirectory(s.opts.indexDir)
180 if err != nil {
181 err = fmt.Errorf("Failed to empty repoDir indexDir: %v with error: %v", s.opts.repoDir, err)
182
183 s.respondWithError(w, r.Method, route, err)
184 return
185 }
186
187 response := map[string]any{
188 "Success": true,
189 }
190 w.Header().Set("Content-Type", "application/json")
191 _ = json.NewEncoder(w).Encode(response)
192
193 s.incrementRequestsTotal(r.Method, route, http.StatusOK)
194}
195
196func (s *indexServer) respondWithError(w http.ResponseWriter, method, route string, err error) {
197 responseCode := http.StatusInternalServerError
198
199 log.Print(err)
200 s.incrementRequestsTotal(method, route, responseCode)
201
202 w.Header().Set("Content-Type", "application/json")
203 w.WriteHeader(responseCode)
204 response := map[string]any{
205 "Success": false,
206 "Error": err.Error(),
207 }
208
209 _ = json.NewEncoder(w).Encode(response)
210}
211
212func (s *indexServer) incrementRequestsTotal(method, route string, responseCode int) {
213 s.metricsRequestsTotal.With(prometheus.Labels{"code": strconv.Itoa(responseCode), "method": method, "route": route}).Inc()
214}
215
216func (s *indexServer) initMetrics() {
217 s.promRegistry = prometheus.NewRegistry()
218
219 // Add go runtime metrics and process collectors.
220 s.promRegistry.MustRegister(
221 collectors.NewGoCollector(),
222 collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
223 )
224
225 s.metricsRequestsTotal = promauto.With(s.promRegistry).NewCounterVec(
226 prometheus.CounterOpts{
227 Name: "zoekt_dynamic_indexserver_requests_total",
228 Help: "Total number of HTTP requests by status code, method, and route.",
229 },
230 []string{"method", "route", "code"},
231 )
232}
233
234func (s *indexServer) startIndexingApi() {
235 http.HandleFunc("/", s.serveHealthCheck)
236 http.HandleFunc("/metrics", s.serveMetrics)
237 http.HandleFunc("/index", s.serveIndex)
238 http.HandleFunc("/truncate", s.serveTruncate)
239
240 if err := http.ListenAndServe(s.opts.listen, nil); err != nil {
241 log.Fatal(err)
242 }
243}
244
245func emptyDirectory(dir string) error {
246 files, err := os.ReadDir(dir)
247 if err != nil {
248 return err
249 }
250
251 for _, file := range files {
252 filePath := filepath.Join(dir, file.Name())
253 err := os.RemoveAll(filePath)
254 if err != nil {
255 return err
256 }
257 }
258
259 return nil
260}
261
262func parseOptions() Options {
263 repoDir := flag.String("repo_dir", "", "directory holding cloned repos.")
264 indexDir := flag.String("index_dir", "", "directory holding index shards.")
265 timeout := flag.Duration("index_timeout", time.Hour, "kill index job after this much time.")
266 listen := flag.String("listen", ":6060", "listen on this address.")
267 flag.Parse()
268
269 if *repoDir == "" {
270 log.Fatal("must set -repo_dir")
271 }
272
273 if *indexDir == "" {
274 log.Fatal("must set -index_dir")
275 *indexDir = filepath.Join(*repoDir, "index")
276 }
277
278 return Options{
279 repoDir: *repoDir,
280 indexDir: *indexDir,
281 indexTimeout: *timeout,
282 listen: *listen,
283 }
284}
285
286func main() {
287 opts := parseOptions()
288 opts.createMissingDirectories()
289
290 server := indexServer{
291 opts: opts,
292 }
293
294 server.initMetrics()
295 server.startIndexingApi()
296}