fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "bytes"
5 "container/heap"
6 "fmt"
7 "io"
8 "net/http"
9 "reflect"
10 "sort"
11 "strconv"
12 "strings"
13 "sync"
14 "text/tabwriter"
15 "time"
16
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/prometheus/client_golang/prometheus/promauto"
19
20 sglog "github.com/sourcegraph/log"
21)
22
23type queueItem struct {
24 // repoID is the ID of the repo
25 repoID uint32
26 // opts are the options to use when indexing repoID.
27 opts IndexOptions
28 // indexed is true if opts has been indexed.
29 indexed bool
30 // indexState is the indexState of the last attempt at indexing repoID.
31 indexState indexState
32 // heapIdx is the index of the item in the heap. If < 0 then the item is
33 // not on the heap.
34 heapIdx int
35 // seq is a sequence number used as a tiebreaker. This is to ensure we
36 // act like a FIFO queue.
37 seq int64
38 // dateAddedToQueue is the time when this indexing job was added to the queue. If this item is no longer
39 // in the heap (i.e. it has been processed already), this value is nonsensical.
40 dateAddedToQueue time.Time
41 // backoff will handle backing off of future indexing requests for a duration of time based on previous failures
42 backoff backoff
43}
44
45// Queue is a priority queue which returns the next repo to index. It is safe
46// to use concurrently. It is a min queue on:
47//
48// (!indexed, time added to the queue)
49//
50// We use the above since we'd rather index a repo sooner if we know the commit is stale.
51type Queue struct {
52 mu sync.Mutex
53 items map[uint32]*queueItem
54 pq pqueue
55 seq int64
56 logger sglog.Logger
57 newQueueItem func(uint32) *queueItem
58}
59
60func NewQueue(backoffDuration, maxBackoffDuration time.Duration, l sglog.Logger) *Queue {
61 if backoffDuration < 0 || maxBackoffDuration < 0 {
62 backoffDuration = 0
63 maxBackoffDuration = 0
64 }
65 newQueueItem := func(repoID uint32) *queueItem {
66 return &queueItem{
67 repoID: repoID,
68 heapIdx: -1,
69 backoff: backoff{
70 backoffDuration: backoffDuration,
71 maxBackoff: maxBackoffDuration,
72 },
73 }
74 }
75
76 return &Queue{
77 items: map[uint32]*queueItem{},
78 pq: make(pqueue, 0),
79 seq: 0,
80 logger: l.Scoped("queue"),
81 newQueueItem: newQueueItem,
82 }
83}
84
85type QueueItem struct {
86 // Opts are the options to use when indexing the repo.
87 Opts IndexOptions
88 // DateAddedToQueue is the time when this indexing job was added to the queue, used for telemetry.
89 DateAddedToQueue time.Time
90}
91
92// Pop returns options and metadata for the next repo to index. If the queue is empty ok is false.
93func (q *Queue) Pop() (result QueueItem, ok bool) {
94 q.mu.Lock()
95 if len(q.pq) == 0 {
96 q.mu.Unlock()
97 return QueueItem{}, false
98 }
99
100 item := heap.Pop(&q.pq).(*queueItem)
101
102 metricQueueLen.Set(float64(len(q.pq)))
103 metricQueueCap.Set(float64(len(q.items)))
104
105 dateAdded := item.dateAddedToQueue
106 item.dateAddedToQueue = time.Unix(0, 0)
107
108 q.mu.Unlock()
109
110 return QueueItem{item.opts, dateAdded}, true
111}
112
113// Len returns the number of items in the queue.
114func (q *Queue) Len() int {
115 q.mu.Lock()
116 l := len(q.pq)
117 q.mu.Unlock()
118 return l
119}
120
121// AddOrUpdate sets which opts to index next. If opts.RepoID is already in the
122// queue, it is updated.
123func (q *Queue) AddOrUpdate(opts IndexOptions) {
124 q.mu.Lock()
125 item := q.getOrAdd(opts.RepoID)
126 if !reflect.DeepEqual(item.opts, opts) {
127 item.indexed = false
128 item.opts = opts
129 }
130 if item.heapIdx < 0 {
131 if item.backoff.Allow(time.Now()) {
132 q.seq++
133 item.seq = q.seq
134 item.dateAddedToQueue = time.Now()
135
136 heap.Push(&q.pq, item)
137 metricQueueLen.Set(float64(len(q.pq)))
138 metricQueueCap.Set(float64(len(q.items)))
139 }
140 } else {
141 heap.Fix(&q.pq, item.heapIdx)
142 }
143 q.mu.Unlock()
144}
145
146// Bump will take any repository in ids which is not on the queue and
147// re-insert it with the last known IndexOptions. Bump returns ids that are
148// unknown to the queue.
149func (q *Queue) Bump(ids []uint32) []uint32 {
150 q.mu.Lock()
151 defer q.mu.Unlock()
152
153 var missing []uint32
154 for _, id := range ids {
155 item, ok := q.items[id]
156 if !ok {
157 missing = append(missing, id)
158 } else if item.heapIdx < 0 {
159 if item.backoff.Allow(time.Now()) {
160 q.seq++
161 item.seq = q.seq
162 item.dateAddedToQueue = time.Now()
163
164 heap.Push(&q.pq, item)
165 metricQueueLen.Set(float64(len(q.pq)))
166 metricQueueCap.Set(float64(len(q.items)))
167 }
168 }
169 }
170
171 return missing
172}
173
174// Iterate will call f on each item known to the queue, including items that
175// have been popped from the queue. Note: this is done in a random order and
176// the queue mutex is held during all calls to f. Do not mutate the data.
177func (q *Queue) Iterate(f func(*IndexOptions)) {
178 q.mu.Lock()
179 defer q.mu.Unlock()
180 for _, item := range q.items {
181 f(&item.opts)
182 }
183}
184
185// debugIteratedOrdered will call f on each queueItem (sorted by indexing priority)
186// known to the queue, including queueItems that have been popped from the queue).
187//
188// Note: The mutex is held during all calls to f. Callers must not modify the queueItems.
189func (q *Queue) debugIteratedOrdered(f func(*queueItem)) {
190 q.mu.Lock()
191 defer q.mu.Unlock()
192
193 queueItems := make([]*queueItem, 0, len(q.items))
194 for _, item := range q.items {
195 queueItems = append(queueItems, item)
196 }
197
198 sort.Slice(queueItems, func(i, j int) bool {
199 x, y := queueItems[i], queueItems[j]
200
201 xOnQueue, yOnQueue := x.heapIdx >= 0, y.heapIdx >= 0
202 if xOnQueue != yOnQueue {
203 return xOnQueue
204 }
205
206 return lessQueueItemPriority(x, y)
207 })
208
209 for _, item := range queueItems {
210 f(item)
211 }
212}
213
214func (q *Queue) handleDebugQueue(w http.ResponseWriter, r *http.Request) {
215 if r.Method != http.MethodGet {
216 http.Error(w, "method must be GET", http.StatusBadRequest)
217 return
218 }
219
220 w.Header().Set("Content-Type", "text/plain")
221
222 var bufferedWriter bytes.Buffer
223
224 writer := tabwriter.NewWriter(&bufferedWriter, 16, 8, 4, ' ', 0)
225
226 _, err := fmt.Fprintf(writer, "Position\tName\tID\tIsOnQueue\tAge\tBranches\t\n")
227 if err != nil {
228 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
229 return
230 }
231
232 now := time.Now()
233
234 position := -1
235 q.debugIteratedOrdered(func(item *queueItem) {
236 position++
237
238 if err != nil {
239 return
240 }
241
242 var branches []string
243 for _, b := range item.opts.Branches {
244 branches = append(branches, b.String())
245 }
246
247 isOnQueue := item.heapIdx >= 0
248 age := "-"
249 if isOnQueue {
250 age = now.Sub(item.dateAddedToQueue).Round(time.Second).String()
251 }
252
253 _, err = fmt.Fprintf(writer, "%d\t%s\t%d\t%t\t%s\t%s\n", position, item.opts.Name, item.repoID, isOnQueue, age, strings.Join(branches, ", "))
254 })
255
256 if err != nil {
257 http.Error(w, fmt.Sprintf("writing queueItem: %s", err), http.StatusInternalServerError)
258 return
259 }
260
261 err = writer.Flush()
262 if err != nil {
263 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
264 return
265 }
266
267 w.Header().Set("Content-Length", strconv.Itoa(bufferedWriter.Len()))
268
269 _, err = io.Copy(w, &bufferedWriter)
270 if err != nil {
271 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
272 return
273 }
274}
275
276// SetIndexed sets what the currently indexed options are for opts.RepoID.
277func (q *Queue) SetIndexed(opts IndexOptions, state indexState) {
278 q.mu.Lock()
279 item := q.getOrAdd(opts.RepoID)
280
281 item.indexState = state
282 if state != indexStateFail {
283 item.indexed = reflect.DeepEqual(opts, item.opts)
284 item.backoff.Reset()
285
286 if item.heapIdx >= 0 {
287 // We only update the position in the queue, never add it.
288 heap.Fix(&q.pq, item.heapIdx)
289 }
290 } else {
291 item.backoff.Fail(time.Now(), q.logger, item.opts)
292
293 if item.heapIdx >= 0 {
294 // Remove from queue
295 heap.Remove(&q.pq, item.heapIdx)
296 item.heapIdx = -1
297 }
298 }
299
300 q.mu.Unlock()
301}
302
303// MaybeRemoveMissing will remove all queue items not in ids and return the
304// ids of items removed from the queue. It will heuristically not run to
305// conserve resources.
306//
307// In the server's steady state we expect that the list of names is equal to
308// the items in queue. As such in the steady state this function should do no
309// removals. Removal requires memory allocation and coarse locking. To avoid
310// that we use a heuristic which can falsely decide it doesn't need to
311// remove. However, we will converge onto removing items.
312func (q *Queue) MaybeRemoveMissing(ids []uint32) []uint32 {
313 q.mu.Lock()
314 sameSize := len(q.items) == len(ids)
315 q.mu.Unlock()
316
317 // heuristically skip expensive work
318 if sameSize {
319 debugLog.Printf("skipping MaybeRemoveMissing due to same size: %d", len(ids))
320 return nil
321 }
322
323 set := make(map[uint32]struct{}, len(ids))
324 for _, id := range ids {
325 set[id] = struct{}{}
326 }
327
328 q.mu.Lock()
329 defer q.mu.Unlock()
330
331 var removed []uint32
332 for _, item := range q.items {
333 if _, ok := set[item.opts.RepoID]; ok {
334 continue
335 }
336
337 if item.heapIdx >= 0 {
338 heap.Remove(&q.pq, item.heapIdx)
339 }
340
341 item.indexState = ""
342
343 delete(q.items, item.opts.RepoID)
344
345 removed = append(removed, item.opts.RepoID)
346 }
347
348 metricQueueLen.Set(float64(len(q.pq)))
349 metricQueueCap.Set(float64(len(q.items)))
350
351 return removed
352}
353
354// getOrAdd returns the item for repoID. If the repoID hasn't been seen before, it
355// is added to q.items.
356//
357// Note: getOrAdd requires that q.mu is held.
358func (q *Queue) getOrAdd(repoID uint32) *queueItem {
359 item, ok := q.items[repoID]
360 if !ok {
361 item = q.newQueueItem(repoID)
362 q.items[repoID] = item
363 }
364
365 return item
366}
367
368// get returns the item for repoID.
369//
370// Note: get requires that q.mu is held.
371func (q *Queue) get(repoID uint32) *queueItem {
372 return q.items[repoID]
373}
374
375// pqueue implements a priority queue via the interface for container/heap
376type pqueue []*queueItem
377
378func (pq pqueue) Len() int { return len(pq) }
379
380func (pq pqueue) Less(i, j int) bool {
381 x := pq[i]
382 y := pq[j]
383 return lessQueueItemPriority(x, y)
384}
385
386func (pq pqueue) Swap(i, j int) {
387 pq[i], pq[j] = pq[j], pq[i]
388 pq[i].heapIdx = i
389 pq[j].heapIdx = j
390}
391
392func (pq *pqueue) Push(x interface{}) {
393 n := len(*pq)
394 item := x.(*queueItem)
395 item.heapIdx = n
396 *pq = append(*pq, item)
397}
398
399func (pq *pqueue) Pop() interface{} {
400 old := *pq
401 n := len(old)
402 item := old[n-1]
403 item.heapIdx = -1
404 *pq = old[0 : n-1]
405 return item
406}
407
408// lessQueueItemPriority returns true if indexing x should be prioritized over indexing y
409func lessQueueItemPriority(x, y *queueItem) bool {
410 // If we know x needs an update and y doesn't, then return true. Otherwise
411 // they are either equal priority or y is more urgent.
412 if x.indexed != y.indexed {
413 return !x.indexed
414 }
415
416 if xFail, yFail := x.indexState == indexStateFail, y.indexState == indexStateFail; xFail != yFail {
417 // if you failed to index, you are likely to fail again. So prefer
418 // non-failed.
419 return !xFail
420 }
421
422 // tiebreaker is to prefer the item added to the queue first
423 return x.seq < y.seq
424}
425
426var (
427 metricQueueLen = promauto.NewGauge(prometheus.GaugeOpts{
428 Name: "index_queue_len",
429 Help: "The number of repositories in the index queue.",
430 })
431 metricQueueCap = promauto.NewGauge(prometheus.GaugeOpts{
432 Name: "index_queue_cap",
433 Help: "The number of repositories tracked by the index queue, including popped items. Should be the same as index_num_assigned.",
434 })
435)