fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "bytes" 5 "container/heap" 6 "fmt" 7 "io" 8 "net/http" 9 "reflect" 10 "sort" 11 "strconv" 12 "strings" 13 "sync" 14 "text/tabwriter" 15 "time" 16 17 "github.com/prometheus/client_golang/prometheus" 18 "github.com/prometheus/client_golang/prometheus/promauto" 19 20 sglog "github.com/sourcegraph/log" 21) 22 23type queueItem struct { 24 // repoID is the ID of the repo 25 repoID uint32 26 // opts are the options to use when indexing repoID. 27 opts IndexOptions 28 // indexed is true if opts has been indexed. 29 indexed bool 30 // indexState is the indexState of the last attempt at indexing repoID. 31 indexState indexState 32 // heapIdx is the index of the item in the heap. If < 0 then the item is 33 // not on the heap. 34 heapIdx int 35 // seq is a sequence number used as a tiebreaker. This is to ensure we 36 // act like a FIFO queue. 37 seq int64 38 // dateAddedToQueue is the time when this indexing job was added to the queue. If this item is no longer 39 // in the heap (i.e. it has been processed already), this value is nonsensical. 40 dateAddedToQueue time.Time 41 // backoff will handle backing off of future indexing requests for a duration of time based on previous failures 42 backoff backoff 43} 44 45// Queue is a priority queue which returns the next repo to index. It is safe 46// to use concurrently. It is a min queue on: 47// 48// (!indexed, time added to the queue) 49// 50// We use the above since we'd rather index a repo sooner if we know the commit is stale. 51type Queue struct { 52 mu sync.Mutex 53 items map[uint32]*queueItem 54 pq pqueue 55 seq int64 56 logger sglog.Logger 57 newQueueItem func(uint32) *queueItem 58} 59 60func NewQueue(backoffDuration, maxBackoffDuration time.Duration, l sglog.Logger) *Queue { 61 if backoffDuration < 0 || maxBackoffDuration < 0 { 62 backoffDuration = 0 63 maxBackoffDuration = 0 64 } 65 newQueueItem := func(repoID uint32) *queueItem { 66 return &queueItem{ 67 repoID: repoID, 68 heapIdx: -1, 69 backoff: backoff{ 70 backoffDuration: backoffDuration, 71 maxBackoff: maxBackoffDuration, 72 }, 73 } 74 } 75 76 return &Queue{ 77 items: map[uint32]*queueItem{}, 78 pq: make(pqueue, 0), 79 seq: 0, 80 logger: l.Scoped("queue"), 81 newQueueItem: newQueueItem, 82 } 83} 84 85type QueueItem struct { 86 // Opts are the options to use when indexing the repo. 87 Opts IndexOptions 88 // DateAddedToQueue is the time when this indexing job was added to the queue, used for telemetry. 89 DateAddedToQueue time.Time 90} 91 92// Pop returns options and metadata for the next repo to index. If the queue is empty ok is false. 93func (q *Queue) Pop() (result QueueItem, ok bool) { 94 q.mu.Lock() 95 if len(q.pq) == 0 { 96 q.mu.Unlock() 97 return QueueItem{}, false 98 } 99 100 item := heap.Pop(&q.pq).(*queueItem) 101 102 metricQueueLen.Set(float64(len(q.pq))) 103 metricQueueCap.Set(float64(len(q.items))) 104 105 dateAdded := item.dateAddedToQueue 106 item.dateAddedToQueue = time.Unix(0, 0) 107 108 q.mu.Unlock() 109 110 return QueueItem{item.opts, dateAdded}, true 111} 112 113// Len returns the number of items in the queue. 114func (q *Queue) Len() int { 115 q.mu.Lock() 116 l := len(q.pq) 117 q.mu.Unlock() 118 return l 119} 120 121// AddOrUpdate sets which opts to index next. If opts.RepoID is already in the 122// queue, it is updated. 123func (q *Queue) AddOrUpdate(opts IndexOptions) { 124 q.mu.Lock() 125 item := q.getOrAdd(opts.RepoID) 126 if !reflect.DeepEqual(item.opts, opts) { 127 item.indexed = false 128 item.opts = opts 129 } 130 if item.heapIdx < 0 { 131 if item.backoff.Allow(time.Now()) { 132 q.seq++ 133 item.seq = q.seq 134 item.dateAddedToQueue = time.Now() 135 136 heap.Push(&q.pq, item) 137 metricQueueLen.Set(float64(len(q.pq))) 138 metricQueueCap.Set(float64(len(q.items))) 139 } 140 } else { 141 heap.Fix(&q.pq, item.heapIdx) 142 } 143 q.mu.Unlock() 144} 145 146// Bump will take any repository in ids which is not on the queue and 147// re-insert it with the last known IndexOptions. Bump returns ids that are 148// unknown to the queue. 149func (q *Queue) Bump(ids []uint32) []uint32 { 150 q.mu.Lock() 151 defer q.mu.Unlock() 152 153 var missing []uint32 154 for _, id := range ids { 155 item, ok := q.items[id] 156 if !ok { 157 missing = append(missing, id) 158 } else if item.heapIdx < 0 { 159 if item.backoff.Allow(time.Now()) { 160 q.seq++ 161 item.seq = q.seq 162 item.dateAddedToQueue = time.Now() 163 164 heap.Push(&q.pq, item) 165 metricQueueLen.Set(float64(len(q.pq))) 166 metricQueueCap.Set(float64(len(q.items))) 167 } 168 } 169 } 170 171 return missing 172} 173 174// Iterate will call f on each item known to the queue, including items that 175// have been popped from the queue. Note: this is done in a random order and 176// the queue mutex is held during all calls to f. Do not mutate the data. 177func (q *Queue) Iterate(f func(*IndexOptions)) { 178 q.mu.Lock() 179 defer q.mu.Unlock() 180 for _, item := range q.items { 181 f(&item.opts) 182 } 183} 184 185// debugIteratedOrdered will call f on each queueItem (sorted by indexing priority) 186// known to the queue, including queueItems that have been popped from the queue). 187// 188// Note: The mutex is held during all calls to f. Callers must not modify the queueItems. 189func (q *Queue) debugIteratedOrdered(f func(*queueItem)) { 190 q.mu.Lock() 191 defer q.mu.Unlock() 192 193 queueItems := make([]*queueItem, 0, len(q.items)) 194 for _, item := range q.items { 195 queueItems = append(queueItems, item) 196 } 197 198 sort.Slice(queueItems, func(i, j int) bool { 199 x, y := queueItems[i], queueItems[j] 200 201 xOnQueue, yOnQueue := x.heapIdx >= 0, y.heapIdx >= 0 202 if xOnQueue != yOnQueue { 203 return xOnQueue 204 } 205 206 return lessQueueItemPriority(x, y) 207 }) 208 209 for _, item := range queueItems { 210 f(item) 211 } 212} 213 214func (q *Queue) handleDebugQueue(w http.ResponseWriter, r *http.Request) { 215 if r.Method != http.MethodGet { 216 http.Error(w, "method must be GET", http.StatusBadRequest) 217 return 218 } 219 220 w.Header().Set("Content-Type", "text/plain") 221 222 var bufferedWriter bytes.Buffer 223 224 writer := tabwriter.NewWriter(&bufferedWriter, 16, 8, 4, ' ', 0) 225 226 _, err := fmt.Fprintf(writer, "Position\tName\tID\tIsOnQueue\tAge\tBranches\t\n") 227 if err != nil { 228 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 229 return 230 } 231 232 now := time.Now() 233 234 position := -1 235 q.debugIteratedOrdered(func(item *queueItem) { 236 position++ 237 238 if err != nil { 239 return 240 } 241 242 var branches []string 243 for _, b := range item.opts.Branches { 244 branches = append(branches, b.String()) 245 } 246 247 isOnQueue := item.heapIdx >= 0 248 age := "-" 249 if isOnQueue { 250 age = now.Sub(item.dateAddedToQueue).Round(time.Second).String() 251 } 252 253 _, err = fmt.Fprintf(writer, "%d\t%s\t%d\t%t\t%s\t%s\n", position, item.opts.Name, item.repoID, isOnQueue, age, strings.Join(branches, ", ")) 254 }) 255 256 if err != nil { 257 http.Error(w, fmt.Sprintf("writing queueItem: %s", err), http.StatusInternalServerError) 258 return 259 } 260 261 err = writer.Flush() 262 if err != nil { 263 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 264 return 265 } 266 267 w.Header().Set("Content-Length", strconv.Itoa(bufferedWriter.Len())) 268 269 _, err = io.Copy(w, &bufferedWriter) 270 if err != nil { 271 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 272 return 273 } 274} 275 276// SetIndexed sets what the currently indexed options are for opts.RepoID. 277func (q *Queue) SetIndexed(opts IndexOptions, state indexState) { 278 q.mu.Lock() 279 item := q.getOrAdd(opts.RepoID) 280 281 item.indexState = state 282 if state != indexStateFail { 283 item.indexed = reflect.DeepEqual(opts, item.opts) 284 item.backoff.Reset() 285 286 if item.heapIdx >= 0 { 287 // We only update the position in the queue, never add it. 288 heap.Fix(&q.pq, item.heapIdx) 289 } 290 } else { 291 item.backoff.Fail(time.Now(), q.logger, item.opts) 292 293 if item.heapIdx >= 0 { 294 // Remove from queue 295 heap.Remove(&q.pq, item.heapIdx) 296 item.heapIdx = -1 297 } 298 } 299 300 q.mu.Unlock() 301} 302 303// MaybeRemoveMissing will remove all queue items not in ids and return the 304// ids of items removed from the queue. It will heuristically not run to 305// conserve resources. 306// 307// In the server's steady state we expect that the list of names is equal to 308// the items in queue. As such in the steady state this function should do no 309// removals. Removal requires memory allocation and coarse locking. To avoid 310// that we use a heuristic which can falsely decide it doesn't need to 311// remove. However, we will converge onto removing items. 312func (q *Queue) MaybeRemoveMissing(ids []uint32) []uint32 { 313 q.mu.Lock() 314 sameSize := len(q.items) == len(ids) 315 q.mu.Unlock() 316 317 // heuristically skip expensive work 318 if sameSize { 319 debugLog.Printf("skipping MaybeRemoveMissing due to same size: %d", len(ids)) 320 return nil 321 } 322 323 set := make(map[uint32]struct{}, len(ids)) 324 for _, id := range ids { 325 set[id] = struct{}{} 326 } 327 328 q.mu.Lock() 329 defer q.mu.Unlock() 330 331 var removed []uint32 332 for _, item := range q.items { 333 if _, ok := set[item.opts.RepoID]; ok { 334 continue 335 } 336 337 if item.heapIdx >= 0 { 338 heap.Remove(&q.pq, item.heapIdx) 339 } 340 341 item.indexState = "" 342 343 delete(q.items, item.opts.RepoID) 344 345 removed = append(removed, item.opts.RepoID) 346 } 347 348 metricQueueLen.Set(float64(len(q.pq))) 349 metricQueueCap.Set(float64(len(q.items))) 350 351 return removed 352} 353 354// getOrAdd returns the item for repoID. If the repoID hasn't been seen before, it 355// is added to q.items. 356// 357// Note: getOrAdd requires that q.mu is held. 358func (q *Queue) getOrAdd(repoID uint32) *queueItem { 359 item, ok := q.items[repoID] 360 if !ok { 361 item = q.newQueueItem(repoID) 362 q.items[repoID] = item 363 } 364 365 return item 366} 367 368// get returns the item for repoID. 369// 370// Note: get requires that q.mu is held. 371func (q *Queue) get(repoID uint32) *queueItem { 372 return q.items[repoID] 373} 374 375// pqueue implements a priority queue via the interface for container/heap 376type pqueue []*queueItem 377 378func (pq pqueue) Len() int { return len(pq) } 379 380func (pq pqueue) Less(i, j int) bool { 381 x := pq[i] 382 y := pq[j] 383 return lessQueueItemPriority(x, y) 384} 385 386func (pq pqueue) Swap(i, j int) { 387 pq[i], pq[j] = pq[j], pq[i] 388 pq[i].heapIdx = i 389 pq[j].heapIdx = j 390} 391 392func (pq *pqueue) Push(x interface{}) { 393 n := len(*pq) 394 item := x.(*queueItem) 395 item.heapIdx = n 396 *pq = append(*pq, item) 397} 398 399func (pq *pqueue) Pop() interface{} { 400 old := *pq 401 n := len(old) 402 item := old[n-1] 403 item.heapIdx = -1 404 *pq = old[0 : n-1] 405 return item 406} 407 408// lessQueueItemPriority returns true if indexing x should be prioritized over indexing y 409func lessQueueItemPriority(x, y *queueItem) bool { 410 // If we know x needs an update and y doesn't, then return true. Otherwise 411 // they are either equal priority or y is more urgent. 412 if x.indexed != y.indexed { 413 return !x.indexed 414 } 415 416 if xFail, yFail := x.indexState == indexStateFail, y.indexState == indexStateFail; xFail != yFail { 417 // if you failed to index, you are likely to fail again. So prefer 418 // non-failed. 419 return !xFail 420 } 421 422 // tiebreaker is to prefer the item added to the queue first 423 return x.seq < y.seq 424} 425 426var ( 427 metricQueueLen = promauto.NewGauge(prometheus.GaugeOpts{ 428 Name: "index_queue_len", 429 Help: "The number of repositories in the index queue.", 430 }) 431 metricQueueCap = promauto.NewGauge(prometheus.GaugeOpts{ 432 Name: "index_queue_cap", 433 Help: "The number of repositories tracked by the index queue, including popped items. Should be the same as index_num_assigned.", 434 }) 435)