fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "bytes" 5 "container/heap" 6 "fmt" 7 "io" 8 "net/http" 9 "reflect" 10 "sort" 11 "strconv" 12 "strings" 13 "sync" 14 "text/tabwriter" 15 "time" 16 17 "github.com/prometheus/client_golang/prometheus" 18 "github.com/prometheus/client_golang/prometheus/promauto" 19 20 sglog "github.com/sourcegraph/log" 21) 22 23type queueItem struct { 24 // repoID is the ID of the repo 25 repoID uint32 26 // opts are the options to use when indexing repoID. 27 opts IndexOptions 28 // indexed is true if opts has been indexed. 29 indexed bool 30 // indexState is the indexState of the last attempt at indexing repoID. 31 indexState indexState 32 // heapIdx is the index of the item in the heap. If < 0 then the item is 33 // not on the heap. 34 heapIdx int 35 // seq is a sequence number used as a tiebreaker. This is to ensure we 36 // act like a FIFO queue. 37 seq int64 38 // dateAddedToQueue is the time when this indexing job was added to the queue. If this item is no longer 39 // in the heap (i.e. it has been processed already), this value is nonsensical. 40 dateAddedToQueue time.Time 41 // backoff will handle backing off of future indexing requests for a duration of time based on previous failures 42 backoff backoff 43} 44 45// Queue is a priority queue which returns the next repo to index. It is safe 46// to use concurrently. It is a min queue on: 47// 48// (!indexed, time added to the queue) 49// 50// We use the above since we'd rather index a repo sooner if we know the commit is stale. 51type Queue struct { 52 mu sync.Mutex 53 items map[uint32]*queueItem 54 pq pqueue 55 seq int64 56 logger sglog.Logger 57 newQueueItem func(uint32) *queueItem 58} 59 60func NewQueue(backoffDuration, maxBackoffDuration time.Duration, l sglog.Logger) *Queue { 61 if backoffDuration < 0 || maxBackoffDuration < 0 { 62 backoffDuration = 0 63 maxBackoffDuration = 0 64 } 65 newQueueItem := func(repoID uint32) *queueItem { 66 return &queueItem{ 67 repoID: repoID, 68 heapIdx: -1, 69 backoff: backoff{ 70 backoffDuration: backoffDuration, 71 maxBackoff: maxBackoffDuration, 72 }, 73 } 74 } 75 76 q := &Queue{ 77 newQueueItem: newQueueItem, 78 logger: l.Scoped("queue"), 79 } 80 81 q.init() 82 return q 83} 84 85// Pop returns the opts of the next repo to index. If the queue is empty ok is 86// false. 87func (q *Queue) Pop() (opts IndexOptions, ok bool) { 88 q.mu.Lock() 89 if len(q.pq) == 0 { 90 q.mu.Unlock() 91 return IndexOptions{}, false 92 } 93 94 item := heap.Pop(&q.pq).(*queueItem) 95 opts = item.opts 96 97 metricQueueLen.Set(float64(len(q.pq))) 98 metricQueueCap.Set(float64(len(q.items))) 99 100 dateAdded := item.dateAddedToQueue 101 item.dateAddedToQueue = time.Unix(0, 0) 102 103 q.mu.Unlock() 104 105 name := repoNameForMetric(opts.Name) 106 age := time.Since(dateAdded) 107 metricQueueAge.WithLabelValues(name).Observe(age.Seconds()) 108 109 return opts, true 110} 111 112// Len returns the number of items in the queue. 113func (q *Queue) Len() int { 114 q.mu.Lock() 115 l := len(q.pq) 116 q.mu.Unlock() 117 return l 118} 119 120// AddOrUpdate sets which opts to index next. If opts.RepoID is already in the 121// queue, it is updated. 122func (q *Queue) AddOrUpdate(opts IndexOptions) { 123 q.mu.Lock() 124 item := q.getOrAdd(opts.RepoID) 125 if !reflect.DeepEqual(item.opts, opts) { 126 item.indexed = false 127 item.opts = opts 128 } 129 if item.heapIdx < 0 { 130 if item.backoff.Allow(time.Now()) { 131 q.seq++ 132 item.seq = q.seq 133 item.dateAddedToQueue = time.Now() 134 135 heap.Push(&q.pq, item) 136 metricQueueLen.Set(float64(len(q.pq))) 137 metricQueueCap.Set(float64(len(q.items))) 138 } 139 } else { 140 heap.Fix(&q.pq, item.heapIdx) 141 } 142 q.mu.Unlock() 143} 144 145// Bump will take any repository in ids which is not on the queue and 146// re-insert it with the last known IndexOptions. Bump returns ids that are 147// unknown to the queue. 148func (q *Queue) Bump(ids []uint32) []uint32 { 149 q.mu.Lock() 150 defer q.mu.Unlock() 151 152 if q.items == nil { 153 q.init() 154 } 155 156 var missing []uint32 157 for _, id := range ids { 158 item, ok := q.items[id] 159 if !ok { 160 missing = append(missing, id) 161 } else if item.heapIdx < 0 { 162 if item.backoff.Allow(time.Now()) { 163 q.seq++ 164 item.seq = q.seq 165 item.dateAddedToQueue = time.Now() 166 167 heap.Push(&q.pq, item) 168 metricQueueLen.Set(float64(len(q.pq))) 169 metricQueueCap.Set(float64(len(q.items))) 170 } 171 } 172 } 173 174 return missing 175} 176 177// Iterate will call f on each item known to the queue, including items that 178// have been popped from the queue. Note: this is done in a random order and 179// the queue mutex is held during all calls to f. Do not mutate the data. 180func (q *Queue) Iterate(f func(*IndexOptions)) { 181 q.mu.Lock() 182 defer q.mu.Unlock() 183 for _, item := range q.items { 184 f(&item.opts) 185 } 186} 187 188// debugIteratedOrdered will call f on each queueItem (sorted by indexing priority) 189// known to the queue, including queueItems that have been popped from the queue). 190// 191// Note: The mutex is held during all calls to f. Callers must not modify the queueItems. 192func (q *Queue) debugIteratedOrdered(f func(*queueItem)) { 193 q.mu.Lock() 194 defer q.mu.Unlock() 195 196 queueItems := make([]*queueItem, 0, len(q.items)) 197 for _, item := range q.items { 198 queueItems = append(queueItems, item) 199 } 200 201 sort.Slice(queueItems, func(i, j int) bool { 202 x, y := queueItems[i], queueItems[j] 203 204 xOnQueue, yOnQueue := x.heapIdx >= 0, y.heapIdx >= 0 205 if xOnQueue != yOnQueue { 206 return xOnQueue 207 } 208 209 return lessQueueItemPriority(x, y) 210 }) 211 212 for _, item := range queueItems { 213 f(item) 214 } 215} 216 217func (q *Queue) handleDebugQueue(w http.ResponseWriter, r *http.Request) { 218 if r.Method != http.MethodGet { 219 http.Error(w, "method must be GET", http.StatusBadRequest) 220 return 221 } 222 223 w.Header().Set("Content-Type", "text/plain") 224 225 var bufferedWriter bytes.Buffer 226 227 writer := tabwriter.NewWriter(&bufferedWriter, 16, 8, 4, ' ', 0) 228 229 _, err := fmt.Fprintf(writer, "Position\tName\tID\tIsOnQueue\tAge\tBranches\t\n") 230 if err != nil { 231 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 232 return 233 } 234 235 now := time.Now() 236 237 position := -1 238 q.debugIteratedOrdered(func(item *queueItem) { 239 position++ 240 241 if err != nil { 242 return 243 } 244 245 var branches []string 246 for _, b := range item.opts.Branches { 247 branches = append(branches, b.String()) 248 } 249 250 isOnQueue := item.heapIdx >= 0 251 age := "-" 252 if isOnQueue { 253 age = now.Sub(item.dateAddedToQueue).Round(time.Second).String() 254 } 255 256 _, err = fmt.Fprintf(writer, "%d\t%s\t%d\t%t\t%s\t%s\n", position, item.opts.Name, item.repoID, isOnQueue, age, strings.Join(branches, ", ")) 257 }) 258 259 if err != nil { 260 http.Error(w, fmt.Sprintf("writing queueItem: %s", err), http.StatusInternalServerError) 261 return 262 } 263 264 err = writer.Flush() 265 if err != nil { 266 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 267 return 268 } 269 270 w.Header().Set("Content-Length", strconv.Itoa(bufferedWriter.Len())) 271 272 _, err = io.Copy(w, &bufferedWriter) 273 if err != nil { 274 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 275 return 276 } 277} 278 279// SetIndexed sets what the currently indexed options are for opts.RepoID. 280func (q *Queue) SetIndexed(opts IndexOptions, state indexState) { 281 q.mu.Lock() 282 item := q.getOrAdd(opts.RepoID) 283 284 item.indexState = state 285 if state != indexStateFail { 286 item.indexed = reflect.DeepEqual(opts, item.opts) 287 item.backoff.Reset() 288 289 if item.heapIdx >= 0 { 290 // We only update the position in the queue, never add it. 291 heap.Fix(&q.pq, item.heapIdx) 292 } 293 } else { 294 item.backoff.Fail(time.Now(), q.logger, item.opts) 295 296 if item.heapIdx >= 0 { 297 // Remove from queue 298 heap.Remove(&q.pq, item.heapIdx) 299 item.heapIdx = -1 300 } 301 } 302 303 q.mu.Unlock() 304} 305 306// MaybeRemoveMissing will remove all queue items not in ids and return the 307// ids of items removed from the queue. It will heuristically not run to 308// conserve resources. 309// 310// In the server's steady state we expect that the list of names is equal to 311// the items in queue. As such in the steady state this function should do no 312// removals. Removal requires memory allocation and coarse locking. To avoid 313// that we use a heuristic which can falsely decide it doesn't need to 314// remove. However, we will converge onto removing items. 315func (q *Queue) MaybeRemoveMissing(ids []uint32) []uint32 { 316 q.mu.Lock() 317 sameSize := len(q.items) == len(ids) 318 q.mu.Unlock() 319 320 // heuristically skip expensive work 321 if sameSize { 322 debug.Printf("skipping MaybeRemoveMissing due to same size: %d", len(ids)) 323 return nil 324 } 325 326 set := make(map[uint32]struct{}, len(ids)) 327 for _, id := range ids { 328 set[id] = struct{}{} 329 } 330 331 q.mu.Lock() 332 defer q.mu.Unlock() 333 334 var removed []uint32 335 for _, item := range q.items { 336 if _, ok := set[item.opts.RepoID]; ok { 337 continue 338 } 339 340 if item.heapIdx >= 0 { 341 heap.Remove(&q.pq, item.heapIdx) 342 } 343 344 item.indexState = "" 345 346 delete(q.items, item.opts.RepoID) 347 348 removed = append(removed, item.opts.RepoID) 349 } 350 351 metricQueueLen.Set(float64(len(q.pq))) 352 metricQueueCap.Set(float64(len(q.items))) 353 354 return removed 355} 356 357// getOrAdd returns the item for repoID. If the repoID hasn't been seen before, it 358// is added to q.items. 359// 360// Note: getOrAdd requires that q.mu is held. 361func (q *Queue) getOrAdd(repoID uint32) *queueItem { 362 if q.items == nil { 363 q.init() 364 } 365 366 item, ok := q.items[repoID] 367 if !ok { 368 item = q.newQueueItem(repoID) 369 q.items[repoID] = item 370 } 371 372 return item 373} 374 375// get returns the item for repoID. 376// 377// Note: get requires that q.mu is held. 378func (q *Queue) get(repoID uint32) *queueItem { 379 return q.items[repoID] 380} 381 382func (q *Queue) init() { 383 q.items = map[uint32]*queueItem{} 384 q.pq = make(pqueue, 0) 385} 386 387// pqueue implements a priority queue via the interface for container/heap 388type pqueue []*queueItem 389 390func (pq pqueue) Len() int { return len(pq) } 391 392func (pq pqueue) Less(i, j int) bool { 393 x := pq[i] 394 y := pq[j] 395 return lessQueueItemPriority(x, y) 396} 397 398func (pq pqueue) Swap(i, j int) { 399 pq[i], pq[j] = pq[j], pq[i] 400 pq[i].heapIdx = i 401 pq[j].heapIdx = j 402} 403 404func (pq *pqueue) Push(x interface{}) { 405 n := len(*pq) 406 item := x.(*queueItem) 407 item.heapIdx = n 408 *pq = append(*pq, item) 409} 410 411func (pq *pqueue) Pop() interface{} { 412 old := *pq 413 n := len(old) 414 item := old[n-1] 415 item.heapIdx = -1 416 *pq = old[0 : n-1] 417 return item 418} 419 420// lessQueueItemPriority returns true if indexing x should be prioritized over indexing y 421func lessQueueItemPriority(x, y *queueItem) bool { 422 // If we know x needs an update and y doesn't, then return true. Otherwise 423 // they are either equal priority or y is more urgent. 424 if x.indexed != y.indexed { 425 return !x.indexed 426 } 427 428 if xFail, yFail := x.indexState == indexStateFail, y.indexState == indexStateFail; xFail != yFail { 429 // if you failed to index, you are likely to fail again. So prefer 430 // non-failed. 431 return !xFail 432 } 433 434 // tiebreaker is to prefer the item added to the queue first 435 return x.seq < y.seq 436} 437 438var ( 439 metricQueueLen = promauto.NewGauge(prometheus.GaugeOpts{ 440 Name: "index_queue_len", 441 Help: "The number of repositories in the index queue.", 442 }) 443 metricQueueCap = promauto.NewGauge(prometheus.GaugeOpts{ 444 Name: "index_queue_cap", 445 Help: "The number of repositories tracked by the index queue, including popped items. Should be the same as index_num_assigned.", 446 }) 447 metricQueueAge = promauto.NewHistogramVec(prometheus.HistogramOpts{ 448 Name: "index_queue_age_seconds", 449 Help: "A histogram of the amount of time a popped repository spent sitting in the queue beforehand.", 450 Buckets: []float64{ 451 60, // 1m 452 300, // 5m 453 1200, // 20m 454 2400, // 40m 455 3600, // 1h 456 10800, // 3h 457 18000, // 5h 458 36000, // 10h 459 43200, // 12h 460 54000, // 15h 461 72000, // 20h 462 86400, // 24h 463 108000, // 30h 464 126000, // 35h 465 172800, // 48h 466 }, 467 }, []string{"name"}) // name=name of the repository that was just popped from the queue 468)