fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "bytes" 5 "container/heap" 6 "fmt" 7 "io" 8 "net/http" 9 "reflect" 10 "sort" 11 "strconv" 12 "strings" 13 "sync" 14 "text/tabwriter" 15 "time" 16 17 "github.com/prometheus/client_golang/prometheus" 18 "github.com/prometheus/client_golang/prometheus/promauto" 19 20 sglog "github.com/sourcegraph/log" 21) 22 23type queueItem struct { 24 // repoID is the ID of the repo 25 repoID uint32 26 // opts are the options to use when indexing repoID. 27 opts IndexOptions 28 // indexed is true if opts has been indexed. 29 indexed bool 30 // indexState is the indexState of the last attempt at indexing repoID. 31 indexState indexState 32 // heapIdx is the index of the item in the heap. If < 0 then the item is 33 // not on the heap. 34 heapIdx int 35 // seq is a sequence number used as a tiebreaker. This is to ensure we 36 // act like a FIFO queue. 37 seq int64 38 // dateAddedToQueue is the time when this indexing job was added to the queue. If this item is no longer 39 // in the heap (i.e. it has been processed already), this value is nonsensical. 40 dateAddedToQueue time.Time 41 // backoff will handle backing off of future indexing requests for a duration of time based on previous failures 42 backoff backoff 43} 44 45// Queue is a priority queue which returns the next repo to index. It is safe 46// to use concurrently. It is a min queue on: 47// 48// (!indexed, time added to the queue) 49// 50// We use the above since we'd rather index a repo sooner if we know the commit is stale. 51type Queue struct { 52 mu sync.Mutex 53 items map[uint32]*queueItem 54 pq pqueue 55 seq int64 56 logger sglog.Logger 57 newQueueItem func(uint32) *queueItem 58} 59 60func NewQueue(backoffDuration, maxBackoffDuration time.Duration, l sglog.Logger) *Queue { 61 if backoffDuration < 0 || maxBackoffDuration < 0 { 62 backoffDuration = 0 63 maxBackoffDuration = 0 64 } 65 newQueueItem := func(repoID uint32) *queueItem { 66 return &queueItem{ 67 repoID: repoID, 68 heapIdx: -1, 69 backoff: backoff{ 70 backoffDuration: backoffDuration, 71 maxBackoff: maxBackoffDuration, 72 }, 73 } 74 } 75 76 q := &Queue{ 77 newQueueItem: newQueueItem, 78 logger: l.Scoped("queue"), 79 } 80 81 q.init() 82 return q 83} 84 85type QueueItem struct { 86 // Opts are the options to use when indexing the repo. 87 Opts IndexOptions 88 // DateAddedToQueue is the time when this indexing job was added to the queue, used for telemetry. 89 DateAddedToQueue time.Time 90} 91 92// Pop returns options and metadata for the next repo to index. If the queue is empty ok is false. 93func (q *Queue) Pop() (result QueueItem, ok bool) { 94 q.mu.Lock() 95 if len(q.pq) == 0 { 96 q.mu.Unlock() 97 return QueueItem{}, false 98 } 99 100 item := heap.Pop(&q.pq).(*queueItem) 101 102 metricQueueLen.Set(float64(len(q.pq))) 103 metricQueueCap.Set(float64(len(q.items))) 104 105 dateAdded := item.dateAddedToQueue 106 item.dateAddedToQueue = time.Unix(0, 0) 107 108 q.mu.Unlock() 109 110 name := repoNameForMetric(item.opts.Name) 111 age := time.Since(dateAdded) 112 metricQueueAge.WithLabelValues(name).Observe(age.Seconds()) 113 114 return QueueItem{item.opts, dateAdded}, true 115} 116 117// Len returns the number of items in the queue. 118func (q *Queue) Len() int { 119 q.mu.Lock() 120 l := len(q.pq) 121 q.mu.Unlock() 122 return l 123} 124 125// AddOrUpdate sets which opts to index next. If opts.RepoID is already in the 126// queue, it is updated. 127func (q *Queue) AddOrUpdate(opts IndexOptions) { 128 q.mu.Lock() 129 item := q.getOrAdd(opts.RepoID) 130 if !reflect.DeepEqual(item.opts, opts) { 131 item.indexed = false 132 item.opts = opts 133 } 134 if item.heapIdx < 0 { 135 if item.backoff.Allow(time.Now()) { 136 q.seq++ 137 item.seq = q.seq 138 item.dateAddedToQueue = time.Now() 139 140 heap.Push(&q.pq, item) 141 metricQueueLen.Set(float64(len(q.pq))) 142 metricQueueCap.Set(float64(len(q.items))) 143 } 144 } else { 145 heap.Fix(&q.pq, item.heapIdx) 146 } 147 q.mu.Unlock() 148} 149 150// Bump will take any repository in ids which is not on the queue and 151// re-insert it with the last known IndexOptions. Bump returns ids that are 152// unknown to the queue. 153func (q *Queue) Bump(ids []uint32) []uint32 { 154 q.mu.Lock() 155 defer q.mu.Unlock() 156 157 if q.items == nil { 158 q.init() 159 } 160 161 var missing []uint32 162 for _, id := range ids { 163 item, ok := q.items[id] 164 if !ok { 165 missing = append(missing, id) 166 } else if item.heapIdx < 0 { 167 if item.backoff.Allow(time.Now()) { 168 q.seq++ 169 item.seq = q.seq 170 item.dateAddedToQueue = time.Now() 171 172 heap.Push(&q.pq, item) 173 metricQueueLen.Set(float64(len(q.pq))) 174 metricQueueCap.Set(float64(len(q.items))) 175 } 176 } 177 } 178 179 return missing 180} 181 182// Iterate will call f on each item known to the queue, including items that 183// have been popped from the queue. Note: this is done in a random order and 184// the queue mutex is held during all calls to f. Do not mutate the data. 185func (q *Queue) Iterate(f func(*IndexOptions)) { 186 q.mu.Lock() 187 defer q.mu.Unlock() 188 for _, item := range q.items { 189 f(&item.opts) 190 } 191} 192 193// debugIteratedOrdered will call f on each queueItem (sorted by indexing priority) 194// known to the queue, including queueItems that have been popped from the queue). 195// 196// Note: The mutex is held during all calls to f. Callers must not modify the queueItems. 197func (q *Queue) debugIteratedOrdered(f func(*queueItem)) { 198 q.mu.Lock() 199 defer q.mu.Unlock() 200 201 queueItems := make([]*queueItem, 0, len(q.items)) 202 for _, item := range q.items { 203 queueItems = append(queueItems, item) 204 } 205 206 sort.Slice(queueItems, func(i, j int) bool { 207 x, y := queueItems[i], queueItems[j] 208 209 xOnQueue, yOnQueue := x.heapIdx >= 0, y.heapIdx >= 0 210 if xOnQueue != yOnQueue { 211 return xOnQueue 212 } 213 214 return lessQueueItemPriority(x, y) 215 }) 216 217 for _, item := range queueItems { 218 f(item) 219 } 220} 221 222func (q *Queue) handleDebugQueue(w http.ResponseWriter, r *http.Request) { 223 if r.Method != http.MethodGet { 224 http.Error(w, "method must be GET", http.StatusBadRequest) 225 return 226 } 227 228 w.Header().Set("Content-Type", "text/plain") 229 230 var bufferedWriter bytes.Buffer 231 232 writer := tabwriter.NewWriter(&bufferedWriter, 16, 8, 4, ' ', 0) 233 234 _, err := fmt.Fprintf(writer, "Position\tName\tID\tIsOnQueue\tAge\tBranches\t\n") 235 if err != nil { 236 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 237 return 238 } 239 240 now := time.Now() 241 242 position := -1 243 q.debugIteratedOrdered(func(item *queueItem) { 244 position++ 245 246 if err != nil { 247 return 248 } 249 250 var branches []string 251 for _, b := range item.opts.Branches { 252 branches = append(branches, b.String()) 253 } 254 255 isOnQueue := item.heapIdx >= 0 256 age := "-" 257 if isOnQueue { 258 age = now.Sub(item.dateAddedToQueue).Round(time.Second).String() 259 } 260 261 _, err = fmt.Fprintf(writer, "%d\t%s\t%d\t%t\t%s\t%s\n", position, item.opts.Name, item.repoID, isOnQueue, age, strings.Join(branches, ", ")) 262 }) 263 264 if err != nil { 265 http.Error(w, fmt.Sprintf("writing queueItem: %s", err), http.StatusInternalServerError) 266 return 267 } 268 269 err = writer.Flush() 270 if err != nil { 271 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 272 return 273 } 274 275 w.Header().Set("Content-Length", strconv.Itoa(bufferedWriter.Len())) 276 277 _, err = io.Copy(w, &bufferedWriter) 278 if err != nil { 279 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 280 return 281 } 282} 283 284// SetIndexed sets what the currently indexed options are for opts.RepoID. 285func (q *Queue) SetIndexed(opts IndexOptions, state indexState) { 286 q.mu.Lock() 287 item := q.getOrAdd(opts.RepoID) 288 289 item.indexState = state 290 if state != indexStateFail { 291 item.indexed = reflect.DeepEqual(opts, item.opts) 292 item.backoff.Reset() 293 294 if item.heapIdx >= 0 { 295 // We only update the position in the queue, never add it. 296 heap.Fix(&q.pq, item.heapIdx) 297 } 298 } else { 299 item.backoff.Fail(time.Now(), q.logger, item.opts) 300 301 if item.heapIdx >= 0 { 302 // Remove from queue 303 heap.Remove(&q.pq, item.heapIdx) 304 item.heapIdx = -1 305 } 306 } 307 308 q.mu.Unlock() 309} 310 311// MaybeRemoveMissing will remove all queue items not in ids and return the 312// ids of items removed from the queue. It will heuristically not run to 313// conserve resources. 314// 315// In the server's steady state we expect that the list of names is equal to 316// the items in queue. As such in the steady state this function should do no 317// removals. Removal requires memory allocation and coarse locking. To avoid 318// that we use a heuristic which can falsely decide it doesn't need to 319// remove. However, we will converge onto removing items. 320func (q *Queue) MaybeRemoveMissing(ids []uint32) []uint32 { 321 q.mu.Lock() 322 sameSize := len(q.items) == len(ids) 323 q.mu.Unlock() 324 325 // heuristically skip expensive work 326 if sameSize { 327 debug.Printf("skipping MaybeRemoveMissing due to same size: %d", len(ids)) 328 return nil 329 } 330 331 set := make(map[uint32]struct{}, len(ids)) 332 for _, id := range ids { 333 set[id] = struct{}{} 334 } 335 336 q.mu.Lock() 337 defer q.mu.Unlock() 338 339 var removed []uint32 340 for _, item := range q.items { 341 if _, ok := set[item.opts.RepoID]; ok { 342 continue 343 } 344 345 if item.heapIdx >= 0 { 346 heap.Remove(&q.pq, item.heapIdx) 347 } 348 349 item.indexState = "" 350 351 delete(q.items, item.opts.RepoID) 352 353 removed = append(removed, item.opts.RepoID) 354 } 355 356 metricQueueLen.Set(float64(len(q.pq))) 357 metricQueueCap.Set(float64(len(q.items))) 358 359 return removed 360} 361 362// getOrAdd returns the item for repoID. If the repoID hasn't been seen before, it 363// is added to q.items. 364// 365// Note: getOrAdd requires that q.mu is held. 366func (q *Queue) getOrAdd(repoID uint32) *queueItem { 367 if q.items == nil { 368 q.init() 369 } 370 371 item, ok := q.items[repoID] 372 if !ok { 373 item = q.newQueueItem(repoID) 374 q.items[repoID] = item 375 } 376 377 return item 378} 379 380// get returns the item for repoID. 381// 382// Note: get requires that q.mu is held. 383func (q *Queue) get(repoID uint32) *queueItem { 384 return q.items[repoID] 385} 386 387func (q *Queue) init() { 388 q.items = map[uint32]*queueItem{} 389 q.pq = make(pqueue, 0) 390} 391 392// pqueue implements a priority queue via the interface for container/heap 393type pqueue []*queueItem 394 395func (pq pqueue) Len() int { return len(pq) } 396 397func (pq pqueue) Less(i, j int) bool { 398 x := pq[i] 399 y := pq[j] 400 return lessQueueItemPriority(x, y) 401} 402 403func (pq pqueue) Swap(i, j int) { 404 pq[i], pq[j] = pq[j], pq[i] 405 pq[i].heapIdx = i 406 pq[j].heapIdx = j 407} 408 409func (pq *pqueue) Push(x interface{}) { 410 n := len(*pq) 411 item := x.(*queueItem) 412 item.heapIdx = n 413 *pq = append(*pq, item) 414} 415 416func (pq *pqueue) Pop() interface{} { 417 old := *pq 418 n := len(old) 419 item := old[n-1] 420 item.heapIdx = -1 421 *pq = old[0 : n-1] 422 return item 423} 424 425// lessQueueItemPriority returns true if indexing x should be prioritized over indexing y 426func lessQueueItemPriority(x, y *queueItem) bool { 427 // If we know x needs an update and y doesn't, then return true. Otherwise 428 // they are either equal priority or y is more urgent. 429 if x.indexed != y.indexed { 430 return !x.indexed 431 } 432 433 if xFail, yFail := x.indexState == indexStateFail, y.indexState == indexStateFail; xFail != yFail { 434 // if you failed to index, you are likely to fail again. So prefer 435 // non-failed. 436 return !xFail 437 } 438 439 // tiebreaker is to prefer the item added to the queue first 440 return x.seq < y.seq 441} 442 443var ( 444 metricQueueLen = promauto.NewGauge(prometheus.GaugeOpts{ 445 Name: "index_queue_len", 446 Help: "The number of repositories in the index queue.", 447 }) 448 metricQueueCap = promauto.NewGauge(prometheus.GaugeOpts{ 449 Name: "index_queue_cap", 450 Help: "The number of repositories tracked by the index queue, including popped items. Should be the same as index_num_assigned.", 451 }) 452 metricQueueAge = promauto.NewHistogramVec(prometheus.HistogramOpts{ 453 Name: "index_queue_age_seconds", 454 Help: "A histogram of the amount of time a popped repository spent sitting in the queue beforehand.", 455 Buckets: []float64{ 456 60, // 1m 457 300, // 5m 458 1200, // 20m 459 2400, // 40m 460 3600, // 1h 461 10800, // 3h 462 18000, // 5h 463 36000, // 10h 464 43200, // 12h 465 54000, // 15h 466 72000, // 20h 467 86400, // 24h 468 108000, // 30h 469 126000, // 35h 470 172800, // 48h 471 }, 472 }, []string{"name"}) // name=name of the repository that was just popped from the queue 473)