fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "bytes"
5 "container/heap"
6 "fmt"
7 "io"
8 "net/http"
9 "reflect"
10 "sort"
11 "strconv"
12 "strings"
13 "sync"
14 "text/tabwriter"
15 "time"
16
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/prometheus/client_golang/prometheus/promauto"
19
20 sglog "github.com/sourcegraph/log"
21)
22
23type queueItem struct {
24 // repoID is the ID of the repo
25 repoID uint32
26 // opts are the options to use when indexing repoID.
27 opts IndexOptions
28 // indexed is true if opts has been indexed.
29 indexed bool
30 // indexState is the indexState of the last attempt at indexing repoID.
31 indexState indexState
32 // heapIdx is the index of the item in the heap. If < 0 then the item is
33 // not on the heap.
34 heapIdx int
35 // seq is a sequence number used as a tiebreaker. This is to ensure we
36 // act like a FIFO queue.
37 seq int64
38 // dateAddedToQueue is the time when this indexing job was added to the queue. If this item is no longer
39 // in the heap (i.e. it has been processed already), this value is nonsensical.
40 dateAddedToQueue time.Time
41 // backoff will handle backing off of future indexing requests for a duration of time based on previous failures
42 backoff backoff
43}
44
45// Queue is a priority queue which returns the next repo to index. It is safe
46// to use concurrently. It is a min queue on:
47//
48// (!indexed, time added to the queue)
49//
50// We use the above since we'd rather index a repo sooner if we know the commit is stale.
51type Queue struct {
52 mu sync.Mutex
53 items map[uint32]*queueItem
54 pq pqueue
55 seq int64
56 logger sglog.Logger
57 newQueueItem func(uint32) *queueItem
58}
59
60func NewQueue(backoffDuration, maxBackoffDuration time.Duration, l sglog.Logger) *Queue {
61 if backoffDuration < 0 || maxBackoffDuration < 0 {
62 backoffDuration = 0
63 maxBackoffDuration = 0
64 }
65 newQueueItem := func(repoID uint32) *queueItem {
66 return &queueItem{
67 repoID: repoID,
68 heapIdx: -1,
69 backoff: backoff{
70 backoffDuration: backoffDuration,
71 maxBackoff: maxBackoffDuration,
72 },
73 }
74 }
75
76 q := &Queue{
77 newQueueItem: newQueueItem,
78 logger: l.Scoped("queue"),
79 }
80
81 q.init()
82 return q
83}
84
85type QueueItem struct {
86 // Opts are the options to use when indexing the repo.
87 Opts IndexOptions
88 // DateAddedToQueue is the time when this indexing job was added to the queue, used for telemetry.
89 DateAddedToQueue time.Time
90}
91
92// Pop returns options and metadata for the next repo to index. If the queue is empty ok is false.
93func (q *Queue) Pop() (result QueueItem, ok bool) {
94 q.mu.Lock()
95 if len(q.pq) == 0 {
96 q.mu.Unlock()
97 return QueueItem{}, false
98 }
99
100 item := heap.Pop(&q.pq).(*queueItem)
101
102 metricQueueLen.Set(float64(len(q.pq)))
103 metricQueueCap.Set(float64(len(q.items)))
104
105 dateAdded := item.dateAddedToQueue
106 item.dateAddedToQueue = time.Unix(0, 0)
107
108 q.mu.Unlock()
109
110 name := repoNameForMetric(item.opts.Name)
111 age := time.Since(dateAdded)
112 metricQueueAge.WithLabelValues(name).Observe(age.Seconds())
113
114 return QueueItem{item.opts, dateAdded}, true
115}
116
117// Len returns the number of items in the queue.
118func (q *Queue) Len() int {
119 q.mu.Lock()
120 l := len(q.pq)
121 q.mu.Unlock()
122 return l
123}
124
125// AddOrUpdate sets which opts to index next. If opts.RepoID is already in the
126// queue, it is updated.
127func (q *Queue) AddOrUpdate(opts IndexOptions) {
128 q.mu.Lock()
129 item := q.getOrAdd(opts.RepoID)
130 if !reflect.DeepEqual(item.opts, opts) {
131 item.indexed = false
132 item.opts = opts
133 }
134 if item.heapIdx < 0 {
135 if item.backoff.Allow(time.Now()) {
136 q.seq++
137 item.seq = q.seq
138 item.dateAddedToQueue = time.Now()
139
140 heap.Push(&q.pq, item)
141 metricQueueLen.Set(float64(len(q.pq)))
142 metricQueueCap.Set(float64(len(q.items)))
143 }
144 } else {
145 heap.Fix(&q.pq, item.heapIdx)
146 }
147 q.mu.Unlock()
148}
149
150// Bump will take any repository in ids which is not on the queue and
151// re-insert it with the last known IndexOptions. Bump returns ids that are
152// unknown to the queue.
153func (q *Queue) Bump(ids []uint32) []uint32 {
154 q.mu.Lock()
155 defer q.mu.Unlock()
156
157 if q.items == nil {
158 q.init()
159 }
160
161 var missing []uint32
162 for _, id := range ids {
163 item, ok := q.items[id]
164 if !ok {
165 missing = append(missing, id)
166 } else if item.heapIdx < 0 {
167 if item.backoff.Allow(time.Now()) {
168 q.seq++
169 item.seq = q.seq
170 item.dateAddedToQueue = time.Now()
171
172 heap.Push(&q.pq, item)
173 metricQueueLen.Set(float64(len(q.pq)))
174 metricQueueCap.Set(float64(len(q.items)))
175 }
176 }
177 }
178
179 return missing
180}
181
182// Iterate will call f on each item known to the queue, including items that
183// have been popped from the queue. Note: this is done in a random order and
184// the queue mutex is held during all calls to f. Do not mutate the data.
185func (q *Queue) Iterate(f func(*IndexOptions)) {
186 q.mu.Lock()
187 defer q.mu.Unlock()
188 for _, item := range q.items {
189 f(&item.opts)
190 }
191}
192
193// debugIteratedOrdered will call f on each queueItem (sorted by indexing priority)
194// known to the queue, including queueItems that have been popped from the queue).
195//
196// Note: The mutex is held during all calls to f. Callers must not modify the queueItems.
197func (q *Queue) debugIteratedOrdered(f func(*queueItem)) {
198 q.mu.Lock()
199 defer q.mu.Unlock()
200
201 queueItems := make([]*queueItem, 0, len(q.items))
202 for _, item := range q.items {
203 queueItems = append(queueItems, item)
204 }
205
206 sort.Slice(queueItems, func(i, j int) bool {
207 x, y := queueItems[i], queueItems[j]
208
209 xOnQueue, yOnQueue := x.heapIdx >= 0, y.heapIdx >= 0
210 if xOnQueue != yOnQueue {
211 return xOnQueue
212 }
213
214 return lessQueueItemPriority(x, y)
215 })
216
217 for _, item := range queueItems {
218 f(item)
219 }
220}
221
222func (q *Queue) handleDebugQueue(w http.ResponseWriter, r *http.Request) {
223 if r.Method != http.MethodGet {
224 http.Error(w, "method must be GET", http.StatusBadRequest)
225 return
226 }
227
228 w.Header().Set("Content-Type", "text/plain")
229
230 var bufferedWriter bytes.Buffer
231
232 writer := tabwriter.NewWriter(&bufferedWriter, 16, 8, 4, ' ', 0)
233
234 _, err := fmt.Fprintf(writer, "Position\tName\tID\tIsOnQueue\tAge\tBranches\t\n")
235 if err != nil {
236 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
237 return
238 }
239
240 now := time.Now()
241
242 position := -1
243 q.debugIteratedOrdered(func(item *queueItem) {
244 position++
245
246 if err != nil {
247 return
248 }
249
250 var branches []string
251 for _, b := range item.opts.Branches {
252 branches = append(branches, b.String())
253 }
254
255 isOnQueue := item.heapIdx >= 0
256 age := "-"
257 if isOnQueue {
258 age = now.Sub(item.dateAddedToQueue).Round(time.Second).String()
259 }
260
261 _, err = fmt.Fprintf(writer, "%d\t%s\t%d\t%t\t%s\t%s\n", position, item.opts.Name, item.repoID, isOnQueue, age, strings.Join(branches, ", "))
262 })
263
264 if err != nil {
265 http.Error(w, fmt.Sprintf("writing queueItem: %s", err), http.StatusInternalServerError)
266 return
267 }
268
269 err = writer.Flush()
270 if err != nil {
271 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
272 return
273 }
274
275 w.Header().Set("Content-Length", strconv.Itoa(bufferedWriter.Len()))
276
277 _, err = io.Copy(w, &bufferedWriter)
278 if err != nil {
279 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
280 return
281 }
282}
283
284// SetIndexed sets what the currently indexed options are for opts.RepoID.
285func (q *Queue) SetIndexed(opts IndexOptions, state indexState) {
286 q.mu.Lock()
287 item := q.getOrAdd(opts.RepoID)
288
289 item.indexState = state
290 if state != indexStateFail {
291 item.indexed = reflect.DeepEqual(opts, item.opts)
292 item.backoff.Reset()
293
294 if item.heapIdx >= 0 {
295 // We only update the position in the queue, never add it.
296 heap.Fix(&q.pq, item.heapIdx)
297 }
298 } else {
299 item.backoff.Fail(time.Now(), q.logger, item.opts)
300
301 if item.heapIdx >= 0 {
302 // Remove from queue
303 heap.Remove(&q.pq, item.heapIdx)
304 item.heapIdx = -1
305 }
306 }
307
308 q.mu.Unlock()
309}
310
311// MaybeRemoveMissing will remove all queue items not in ids and return the
312// ids of items removed from the queue. It will heuristically not run to
313// conserve resources.
314//
315// In the server's steady state we expect that the list of names is equal to
316// the items in queue. As such in the steady state this function should do no
317// removals. Removal requires memory allocation and coarse locking. To avoid
318// that we use a heuristic which can falsely decide it doesn't need to
319// remove. However, we will converge onto removing items.
320func (q *Queue) MaybeRemoveMissing(ids []uint32) []uint32 {
321 q.mu.Lock()
322 sameSize := len(q.items) == len(ids)
323 q.mu.Unlock()
324
325 // heuristically skip expensive work
326 if sameSize {
327 debug.Printf("skipping MaybeRemoveMissing due to same size: %d", len(ids))
328 return nil
329 }
330
331 set := make(map[uint32]struct{}, len(ids))
332 for _, id := range ids {
333 set[id] = struct{}{}
334 }
335
336 q.mu.Lock()
337 defer q.mu.Unlock()
338
339 var removed []uint32
340 for _, item := range q.items {
341 if _, ok := set[item.opts.RepoID]; ok {
342 continue
343 }
344
345 if item.heapIdx >= 0 {
346 heap.Remove(&q.pq, item.heapIdx)
347 }
348
349 item.indexState = ""
350
351 delete(q.items, item.opts.RepoID)
352
353 removed = append(removed, item.opts.RepoID)
354 }
355
356 metricQueueLen.Set(float64(len(q.pq)))
357 metricQueueCap.Set(float64(len(q.items)))
358
359 return removed
360}
361
362// getOrAdd returns the item for repoID. If the repoID hasn't been seen before, it
363// is added to q.items.
364//
365// Note: getOrAdd requires that q.mu is held.
366func (q *Queue) getOrAdd(repoID uint32) *queueItem {
367 if q.items == nil {
368 q.init()
369 }
370
371 item, ok := q.items[repoID]
372 if !ok {
373 item = q.newQueueItem(repoID)
374 q.items[repoID] = item
375 }
376
377 return item
378}
379
380// get returns the item for repoID.
381//
382// Note: get requires that q.mu is held.
383func (q *Queue) get(repoID uint32) *queueItem {
384 return q.items[repoID]
385}
386
387func (q *Queue) init() {
388 q.items = map[uint32]*queueItem{}
389 q.pq = make(pqueue, 0)
390}
391
392// pqueue implements a priority queue via the interface for container/heap
393type pqueue []*queueItem
394
395func (pq pqueue) Len() int { return len(pq) }
396
397func (pq pqueue) Less(i, j int) bool {
398 x := pq[i]
399 y := pq[j]
400 return lessQueueItemPriority(x, y)
401}
402
403func (pq pqueue) Swap(i, j int) {
404 pq[i], pq[j] = pq[j], pq[i]
405 pq[i].heapIdx = i
406 pq[j].heapIdx = j
407}
408
409func (pq *pqueue) Push(x interface{}) {
410 n := len(*pq)
411 item := x.(*queueItem)
412 item.heapIdx = n
413 *pq = append(*pq, item)
414}
415
416func (pq *pqueue) Pop() interface{} {
417 old := *pq
418 n := len(old)
419 item := old[n-1]
420 item.heapIdx = -1
421 *pq = old[0 : n-1]
422 return item
423}
424
425// lessQueueItemPriority returns true if indexing x should be prioritized over indexing y
426func lessQueueItemPriority(x, y *queueItem) bool {
427 // If we know x needs an update and y doesn't, then return true. Otherwise
428 // they are either equal priority or y is more urgent.
429 if x.indexed != y.indexed {
430 return !x.indexed
431 }
432
433 if xFail, yFail := x.indexState == indexStateFail, y.indexState == indexStateFail; xFail != yFail {
434 // if you failed to index, you are likely to fail again. So prefer
435 // non-failed.
436 return !xFail
437 }
438
439 // tiebreaker is to prefer the item added to the queue first
440 return x.seq < y.seq
441}
442
443var (
444 metricQueueLen = promauto.NewGauge(prometheus.GaugeOpts{
445 Name: "index_queue_len",
446 Help: "The number of repositories in the index queue.",
447 })
448 metricQueueCap = promauto.NewGauge(prometheus.GaugeOpts{
449 Name: "index_queue_cap",
450 Help: "The number of repositories tracked by the index queue, including popped items. Should be the same as index_num_assigned.",
451 })
452 metricQueueAge = promauto.NewHistogramVec(prometheus.HistogramOpts{
453 Name: "index_queue_age_seconds",
454 Help: "A histogram of the amount of time a popped repository spent sitting in the queue beforehand.",
455 Buckets: []float64{
456 60, // 1m
457 300, // 5m
458 1200, // 20m
459 2400, // 40m
460 3600, // 1h
461 10800, // 3h
462 18000, // 5h
463 36000, // 10h
464 43200, // 12h
465 54000, // 15h
466 72000, // 20h
467 86400, // 24h
468 108000, // 30h
469 126000, // 35h
470 172800, // 48h
471 },
472 }, []string{"name"}) // name=name of the repository that was just popped from the queue
473)