fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "bytes"
5 "container/heap"
6 "fmt"
7 "io"
8 "net/http"
9 "reflect"
10 "sort"
11 "strconv"
12 "strings"
13 "sync"
14 "text/tabwriter"
15 "time"
16
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/prometheus/client_golang/prometheus/promauto"
19
20 sglog "github.com/sourcegraph/log"
21)
22
23type queueItem struct {
24 // repoID is the ID of the repo
25 repoID uint32
26 // opts are the options to use when indexing repoID.
27 opts IndexOptions
28 // indexed is true if opts has been indexed.
29 indexed bool
30 // indexState is the indexState of the last attempt at indexing repoID.
31 indexState indexState
32 // heapIdx is the index of the item in the heap. If < 0 then the item is
33 // not on the heap.
34 heapIdx int
35 // seq is a sequence number used as a tiebreaker. This is to ensure we
36 // act like a FIFO queue.
37 seq int64
38 // dateAddedToQueue is the time when this indexing job was added to the queue. If this item is no longer
39 // in the heap (i.e. it has been processed already), this value is nonsensical.
40 dateAddedToQueue time.Time
41 // backoff will handle backing off of future indexing requests for a duration of time based on previous failures
42 backoff backoff
43}
44
45// Queue is a priority queue which returns the next repo to index. It is safe
46// to use concurrently. It is a min queue on:
47//
48// (!indexed, time added to the queue)
49//
50// We use the above since we'd rather index a repo sooner if we know the commit is stale.
51type Queue struct {
52 mu sync.Mutex
53 items map[uint32]*queueItem
54 pq pqueue
55 seq int64
56 logger sglog.Logger
57 newQueueItem func(uint32) *queueItem
58}
59
60func NewQueue(backoffDuration, maxBackoffDuration time.Duration, l sglog.Logger) *Queue {
61 if backoffDuration < 0 || maxBackoffDuration < 0 {
62 backoffDuration = 0
63 maxBackoffDuration = 0
64 }
65 newQueueItem := func(repoID uint32) *queueItem {
66 return &queueItem{
67 repoID: repoID,
68 heapIdx: -1,
69 backoff: backoff{
70 backoffDuration: backoffDuration,
71 maxBackoff: maxBackoffDuration,
72 },
73 }
74 }
75
76 q := &Queue{
77 newQueueItem: newQueueItem,
78 logger: l.Scoped("queue"),
79 }
80
81 q.init()
82 return q
83}
84
85// Pop returns the opts of the next repo to index. If the queue is empty ok is
86// false.
87func (q *Queue) Pop() (opts IndexOptions, ok bool) {
88 q.mu.Lock()
89 if len(q.pq) == 0 {
90 q.mu.Unlock()
91 return IndexOptions{}, false
92 }
93
94 item := heap.Pop(&q.pq).(*queueItem)
95 opts = item.opts
96
97 metricQueueLen.Set(float64(len(q.pq)))
98 metricQueueCap.Set(float64(len(q.items)))
99
100 dateAdded := item.dateAddedToQueue
101 item.dateAddedToQueue = time.Unix(0, 0)
102
103 q.mu.Unlock()
104
105 name := repoNameForMetric(opts.Name)
106 age := time.Since(dateAdded)
107 metricQueueAge.WithLabelValues(name).Observe(age.Seconds())
108
109 return opts, true
110}
111
112// Len returns the number of items in the queue.
113func (q *Queue) Len() int {
114 q.mu.Lock()
115 l := len(q.pq)
116 q.mu.Unlock()
117 return l
118}
119
120// AddOrUpdate sets which opts to index next. If opts.RepoID is already in the
121// queue, it is updated.
122func (q *Queue) AddOrUpdate(opts IndexOptions) {
123 q.mu.Lock()
124 item := q.getOrAdd(opts.RepoID)
125 if !reflect.DeepEqual(item.opts, opts) {
126 item.indexed = false
127 item.opts = opts
128 }
129 if item.heapIdx < 0 {
130 if item.backoff.Allow(time.Now()) {
131 q.seq++
132 item.seq = q.seq
133 item.dateAddedToQueue = time.Now()
134
135 heap.Push(&q.pq, item)
136 metricQueueLen.Set(float64(len(q.pq)))
137 metricQueueCap.Set(float64(len(q.items)))
138 }
139 } else {
140 heap.Fix(&q.pq, item.heapIdx)
141 }
142 q.mu.Unlock()
143}
144
145// Bump will take any repository in ids which is not on the queue and
146// re-insert it with the last known IndexOptions. Bump returns ids that are
147// unknown to the queue.
148func (q *Queue) Bump(ids []uint32) []uint32 {
149 q.mu.Lock()
150 defer q.mu.Unlock()
151
152 if q.items == nil {
153 q.init()
154 }
155
156 var missing []uint32
157 for _, id := range ids {
158 item, ok := q.items[id]
159 if !ok {
160 missing = append(missing, id)
161 } else if item.heapIdx < 0 {
162 if item.backoff.Allow(time.Now()) {
163 q.seq++
164 item.seq = q.seq
165 item.dateAddedToQueue = time.Now()
166
167 heap.Push(&q.pq, item)
168 metricQueueLen.Set(float64(len(q.pq)))
169 metricQueueCap.Set(float64(len(q.items)))
170 }
171 }
172 }
173
174 return missing
175}
176
177// Iterate will call f on each item known to the queue, including items that
178// have been popped from the queue. Note: this is done in a random order and
179// the queue mutex is held during all calls to f. Do not mutate the data.
180func (q *Queue) Iterate(f func(*IndexOptions)) {
181 q.mu.Lock()
182 defer q.mu.Unlock()
183 for _, item := range q.items {
184 f(&item.opts)
185 }
186}
187
188// debugIteratedOrdered will call f on each queueItem (sorted by indexing priority)
189// known to the queue, including queueItems that have been popped from the queue).
190//
191// Note: The mutex is held during all calls to f. Callers must not modify the queueItems.
192func (q *Queue) debugIteratedOrdered(f func(*queueItem)) {
193 q.mu.Lock()
194 defer q.mu.Unlock()
195
196 queueItems := make([]*queueItem, 0, len(q.items))
197 for _, item := range q.items {
198 queueItems = append(queueItems, item)
199 }
200
201 sort.Slice(queueItems, func(i, j int) bool {
202 x, y := queueItems[i], queueItems[j]
203
204 xOnQueue, yOnQueue := x.heapIdx >= 0, y.heapIdx >= 0
205 if xOnQueue != yOnQueue {
206 return xOnQueue
207 }
208
209 return lessQueueItemPriority(x, y)
210 })
211
212 for _, item := range queueItems {
213 f(item)
214 }
215}
216
217func (q *Queue) handleDebugQueue(w http.ResponseWriter, r *http.Request) {
218 if r.Method != http.MethodGet {
219 http.Error(w, "method must be GET", http.StatusBadRequest)
220 return
221 }
222
223 w.Header().Set("Content-Type", "text/plain")
224
225 var bufferedWriter bytes.Buffer
226
227 writer := tabwriter.NewWriter(&bufferedWriter, 16, 8, 4, ' ', 0)
228
229 _, err := fmt.Fprintf(writer, "Position\tName\tID\tIsOnQueue\tAge\tBranches\t\n")
230 if err != nil {
231 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
232 return
233 }
234
235 now := time.Now()
236
237 position := -1
238 q.debugIteratedOrdered(func(item *queueItem) {
239 position++
240
241 if err != nil {
242 return
243 }
244
245 var branches []string
246 for _, b := range item.opts.Branches {
247 branches = append(branches, b.String())
248 }
249
250 isOnQueue := item.heapIdx >= 0
251 age := "-"
252 if isOnQueue {
253 age = now.Sub(item.dateAddedToQueue).Round(time.Second).String()
254 }
255
256 _, err = fmt.Fprintf(writer, "%d\t%s\t%d\t%t\t%s\t%s\n", position, item.opts.Name, item.repoID, isOnQueue, age, strings.Join(branches, ", "))
257 })
258
259 if err != nil {
260 http.Error(w, fmt.Sprintf("writing queueItem: %s", err), http.StatusInternalServerError)
261 return
262 }
263
264 err = writer.Flush()
265 if err != nil {
266 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
267 return
268 }
269
270 w.Header().Set("Content-Length", strconv.Itoa(bufferedWriter.Len()))
271
272 _, err = io.Copy(w, &bufferedWriter)
273 if err != nil {
274 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
275 return
276 }
277}
278
279// SetIndexed sets what the currently indexed options are for opts.RepoID.
280func (q *Queue) SetIndexed(opts IndexOptions, state indexState) {
281 q.mu.Lock()
282 item := q.getOrAdd(opts.RepoID)
283
284 item.indexState = state
285 if state != indexStateFail {
286 item.indexed = reflect.DeepEqual(opts, item.opts)
287 item.backoff.Reset()
288
289 if item.heapIdx >= 0 {
290 // We only update the position in the queue, never add it.
291 heap.Fix(&q.pq, item.heapIdx)
292 }
293 } else {
294 item.backoff.Fail(time.Now(), q.logger, item.opts)
295
296 if item.heapIdx >= 0 {
297 // Remove from queue
298 heap.Remove(&q.pq, item.heapIdx)
299 item.heapIdx = -1
300 }
301 }
302
303 q.mu.Unlock()
304}
305
306// MaybeRemoveMissing will remove all queue items not in ids and return the
307// ids of items removed from the queue. It will heuristically not run to
308// conserve resources.
309//
310// In the server's steady state we expect that the list of names is equal to
311// the items in queue. As such in the steady state this function should do no
312// removals. Removal requires memory allocation and coarse locking. To avoid
313// that we use a heuristic which can falsely decide it doesn't need to
314// remove. However, we will converge onto removing items.
315func (q *Queue) MaybeRemoveMissing(ids []uint32) []uint32 {
316 q.mu.Lock()
317 sameSize := len(q.items) == len(ids)
318 q.mu.Unlock()
319
320 // heuristically skip expensive work
321 if sameSize {
322 debug.Printf("skipping MaybeRemoveMissing due to same size: %d", len(ids))
323 return nil
324 }
325
326 set := make(map[uint32]struct{}, len(ids))
327 for _, id := range ids {
328 set[id] = struct{}{}
329 }
330
331 q.mu.Lock()
332 defer q.mu.Unlock()
333
334 var removed []uint32
335 for _, item := range q.items {
336 if _, ok := set[item.opts.RepoID]; ok {
337 continue
338 }
339
340 if item.heapIdx >= 0 {
341 heap.Remove(&q.pq, item.heapIdx)
342 }
343
344 item.indexState = ""
345
346 delete(q.items, item.opts.RepoID)
347
348 removed = append(removed, item.opts.RepoID)
349 }
350
351 metricQueueLen.Set(float64(len(q.pq)))
352 metricQueueCap.Set(float64(len(q.items)))
353
354 return removed
355}
356
357// getOrAdd returns the item for repoID. If the repoID hasn't been seen before, it
358// is added to q.items.
359//
360// Note: getOrAdd requires that q.mu is held.
361func (q *Queue) getOrAdd(repoID uint32) *queueItem {
362 if q.items == nil {
363 q.init()
364 }
365
366 item, ok := q.items[repoID]
367 if !ok {
368 item = q.newQueueItem(repoID)
369 q.items[repoID] = item
370 }
371
372 return item
373}
374
375// get returns the item for repoID.
376//
377// Note: get requires that q.mu is held.
378func (q *Queue) get(repoID uint32) *queueItem {
379 return q.items[repoID]
380}
381
382func (q *Queue) init() {
383 q.items = map[uint32]*queueItem{}
384 q.pq = make(pqueue, 0)
385}
386
387// pqueue implements a priority queue via the interface for container/heap
388type pqueue []*queueItem
389
390func (pq pqueue) Len() int { return len(pq) }
391
392func (pq pqueue) Less(i, j int) bool {
393 x := pq[i]
394 y := pq[j]
395 return lessQueueItemPriority(x, y)
396}
397
398func (pq pqueue) Swap(i, j int) {
399 pq[i], pq[j] = pq[j], pq[i]
400 pq[i].heapIdx = i
401 pq[j].heapIdx = j
402}
403
404func (pq *pqueue) Push(x interface{}) {
405 n := len(*pq)
406 item := x.(*queueItem)
407 item.heapIdx = n
408 *pq = append(*pq, item)
409}
410
411func (pq *pqueue) Pop() interface{} {
412 old := *pq
413 n := len(old)
414 item := old[n-1]
415 item.heapIdx = -1
416 *pq = old[0 : n-1]
417 return item
418}
419
420// lessQueueItemPriority returns true if indexing x should be prioritized over indexing y
421func lessQueueItemPriority(x, y *queueItem) bool {
422 // If we know x needs an update and y doesn't, then return true. Otherwise
423 // they are either equal priority or y is more urgent.
424 if x.indexed != y.indexed {
425 return !x.indexed
426 }
427
428 if xFail, yFail := x.indexState == indexStateFail, y.indexState == indexStateFail; xFail != yFail {
429 // if you failed to index, you are likely to fail again. So prefer
430 // non-failed.
431 return !xFail
432 }
433
434 // tiebreaker is to prefer the item added to the queue first
435 return x.seq < y.seq
436}
437
438var (
439 metricQueueLen = promauto.NewGauge(prometheus.GaugeOpts{
440 Name: "index_queue_len",
441 Help: "The number of repositories in the index queue.",
442 })
443 metricQueueCap = promauto.NewGauge(prometheus.GaugeOpts{
444 Name: "index_queue_cap",
445 Help: "The number of repositories tracked by the index queue, including popped items. Should be the same as index_num_assigned.",
446 })
447 metricQueueAge = promauto.NewHistogramVec(prometheus.HistogramOpts{
448 Name: "index_queue_age_seconds",
449 Help: "A histogram of the amount of time a popped repository spent sitting in the queue beforehand.",
450 Buckets: []float64{
451 60, // 1m
452 300, // 5m
453 1200, // 20m
454 2400, // 40m
455 3600, // 1h
456 10800, // 3h
457 18000, // 5h
458 36000, // 10h
459 43200, // 12h
460 54000, // 15h
461 72000, // 20h
462 86400, // 24h
463 108000, // 30h
464 126000, // 35h
465 172800, // 48h
466 },
467 }, []string{"name"}) // name=name of the repository that was just popped from the queue
468)