fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

indexserver: backoff repo if failing repeatedly (#442)

Introduce backoff strategy for failed indexing attempts. The strategy's goal is to backoff subsequent indexing attempts for a given repository for a period of time because we've identified that the preceding indexing attempt has failed. The backoff period of time is finite and increases linearly with consecutive indexing failures.

Test Plan: Add queue tests and backoff duration tests

author
Gary Lee
committer
GitHub
date (Nov 1, 2022, 9:36 AM -0700) commit fb9c22b1 parent d8b79cc1
+531 -37
+46
cmd/zoekt-sourcegraph-indexserver/backoff.go
··· 1 + package main 2 + 3 + import ( 4 + "github.com/sourcegraph/log" 5 + "time" 6 + ) 7 + 8 + type backoff struct { 9 + // maxBackoff is the longest duration we will backoff indexing operations of the given repo. 10 + maxBackoff time.Duration 11 + // backoffDuration is used to determine the duration of backoff. consecutiveFailures * backoffDuration calculates the 12 + // duration set on failed indexing attempt. 13 + backoffDuration time.Duration 14 + // consecutiveFailures is the count of preceding consecutive failures. 15 + consecutiveFailures int 16 + // backOffUntil is the earliest time when we allow the item to be pushed to the heap. Until then the item will not be enqueued 17 + // and indexing will not be attempted. 18 + backoffUntil time.Time 19 + } 20 + 21 + func (b *backoff) Allow(now time.Time) bool { 22 + return b.backoffUntil.Before(now) 23 + } 24 + 25 + func (b *backoff) Reset() { 26 + b.consecutiveFailures = 0 27 + b.backoffUntil = time.Unix(0, 0) 28 + } 29 + 30 + func (b *backoff) Fail(now time.Time, logger log.Logger, opts IndexOptions) { 31 + backoffDuration := time.Duration(b.consecutiveFailures+1) * b.backoffDuration 32 + 33 + if backoffDuration > b.maxBackoff { 34 + backoffDuration = b.maxBackoff 35 + } else { 36 + b.consecutiveFailures++ 37 + } 38 + b.backoffUntil = now.Add(backoffDuration) 39 + 40 + logger.Debug("Backoff subsequent attempts to index repository", 41 + log.String("repo", opts.Name), 42 + log.Uint32("id", opts.RepoID), 43 + log.Duration("backoff_duration", b.backoffDuration), 44 + log.Time("backoff_until", b.backoffUntil), 45 + ) 46 + }
+396
cmd/zoekt-sourcegraph-indexserver/backoff_test.go
··· 1 + package main 2 + 3 + import ( 4 + "github.com/sourcegraph/log/logtest" 5 + "testing" 6 + "time" 7 + ) 8 + 9 + func TestQueue_BackoffOnFail(t *testing.T) { 10 + backoffDuration := 1 * time.Millisecond 11 + maxBackoffDuration := backoffDuration * 2 12 + 13 + queue := NewQueue(backoffDuration, maxBackoffDuration, logtest.Scoped(t)) 14 + opts := IndexOptions{RepoID: 1, Name: "foo"} 15 + 16 + queue.AddOrUpdate(opts) 17 + emptyQueue(queue) 18 + 19 + queue.SetIndexed(opts, indexStateFail) 20 + 21 + bumpTime := time.Now() 22 + queue.Bump([]uint32{opts.RepoID}) 23 + 24 + // item is disallowed from being pushed to heap during backoff period 25 + if item, ok := queue.Pop(); ok { 26 + qi := queue.items[item.RepoID] 27 + if qi.backoff.backoffUntil.Before(bumpTime) { 28 + t.Errorf("backoffDuration already passed before first attempt to push item to heap in Bump(). Increase backoffDuration for the Queue. backoffDuration: %s. maxBackoffDuration: %s.", 29 + backoffDuration, maxBackoffDuration) 30 + } else { 31 + t.Fatal("queue should be empty") 32 + } 33 + } 34 + } 35 + 36 + func TestQueue_BackoffAllowAfterDuration(t *testing.T) { 37 + backoffDuration := 1 * time.Millisecond 38 + maxBackoffDuration := backoffDuration * 2 39 + 40 + queue := NewQueue(backoffDuration, maxBackoffDuration, logtest.Scoped(t)) 41 + opts := IndexOptions{RepoID: 1, Name: "foo"} 42 + 43 + queue.AddOrUpdate(opts) 44 + emptyQueue(queue) 45 + 46 + queue.SetIndexed(opts, indexStateFail) 47 + 48 + if _, ok := queue.Pop(); ok { 49 + t.Fatal("queue should be empty after SetIndexed") 50 + } 51 + 52 + time.Sleep(backoffDuration * 20) 53 + 54 + queue.Bump([]uint32{opts.RepoID}) 55 + 56 + if _, ok := queue.Pop(); !ok { 57 + t.Fatal("queue should no longer be empty after waiting for longer than the backoff duration and then bumping index options") 58 + } 59 + } 60 + 61 + func TestQueue_ResetBackoffUntil(t *testing.T) { 62 + backoffDuration := 1 * time.Hour 63 + maxBackoffDuration := backoffDuration * 2 64 + 65 + queue := NewQueue(backoffDuration, maxBackoffDuration, logtest.Scoped(t)) 66 + opts := IndexOptions{RepoID: 1, Name: "foo"} 67 + 68 + queue.AddOrUpdate(opts) 69 + emptyQueue(queue) 70 + 71 + queue.SetIndexed(opts, indexStateFail) 72 + 73 + if _, ok := queue.Pop(); ok { 74 + t.Fatal("queue should be empty after SetIndexed") 75 + } 76 + 77 + queue.SetIndexed(opts, indexStateSuccess) 78 + 79 + queue.Bump([]uint32{opts.RepoID}) 80 + 81 + if _, ok := queue.Pop(); !ok { 82 + t.Fatal("queue should no longer be empty after resetting backoff until time to zero value") 83 + } 84 + } 85 + 86 + func TestQueue_ResetFailuresCount(t *testing.T) { 87 + backoffDuration := 1 * time.Millisecond 88 + maxBackoffDuration := 1000 * time.Millisecond 89 + 90 + queue := NewQueue(backoffDuration, maxBackoffDuration, logtest.Scoped(t)) 91 + opts := IndexOptions{RepoID: 1, Name: "foo"} 92 + 93 + queue.AddOrUpdate(opts) 94 + emptyQueue(queue) 95 + 96 + // consecutive failures will push backoff until to a further out time 97 + for i := 0; i < 1000; i++ { 98 + queue.SetIndexed(opts, indexStateFail) 99 + } 100 + 101 + if _, ok := queue.Pop(); ok { 102 + t.Fatal("queue should be empty after SetIndexed") 103 + } 104 + 105 + queue.SetIndexed(opts, indexStateSuccess) 106 + 107 + queue.Bump([]uint32{opts.RepoID}) 108 + 109 + // backoff until is only one duration in the future after resetting consecutive failures count 110 + queue.SetIndexed(opts, indexStateFail) 111 + 112 + if _, ok := queue.Pop(); ok { 113 + t.Fatal("queue should be empty after SetIndexed") 114 + } 115 + 116 + time.Sleep(backoffDuration) 117 + 118 + queue.Bump([]uint32{opts.RepoID}) 119 + 120 + if _, ok := queue.Pop(); !ok { 121 + t.Fatal("queue should no longer be empty after waiting a backoff duration for the first failure") 122 + } 123 + } 124 + 125 + func TestQueue_MaxBackoffDuration(t *testing.T) { 126 + backoffDuration := 1 * time.Hour 127 + maxBackoffDuration := 1 * time.Millisecond 128 + 129 + queue := NewQueue(backoffDuration, maxBackoffDuration, logtest.Scoped(t)) 130 + opts := IndexOptions{RepoID: 1, Name: "foo"} 131 + 132 + queue.AddOrUpdate(opts) 133 + emptyQueue(queue) 134 + 135 + // consecutive failures increase duration up to a maximum 136 + for i := 0; i < 100; i++ { 137 + queue.SetIndexed(opts, indexStateFail) 138 + } 139 + 140 + if _, ok := queue.Pop(); ok { 141 + t.Fatal("queue should be empty after SetIndexed") 142 + } 143 + 144 + // sleep past maxBackoffDuration but long before backoffDuration would pass 145 + time.Sleep(maxBackoffDuration * 200) 146 + 147 + queue.Bump([]uint32{opts.RepoID}) 148 + 149 + if _, ok := queue.Pop(); !ok { 150 + t.Fatal("queue should no longer be empty after max backoff duration has passed") 151 + } 152 + } 153 + 154 + func TestQueue_BackoffDisabled(t *testing.T) { 155 + cases := []struct { 156 + name string 157 + backoffDuration time.Duration 158 + maxBackoffDuration time.Duration 159 + }{{ 160 + name: "negative backoff", 161 + backoffDuration: -1 * time.Minute, 162 + maxBackoffDuration: 1 * time.Minute, 163 + }, { 164 + name: "negative maximum backoff", 165 + backoffDuration: 1 * time.Minute, 166 + maxBackoffDuration: -1 * time.Minute, 167 + }, { 168 + name: "negative backoff and negative maximum backoff", 169 + backoffDuration: -1 * time.Minute, 170 + maxBackoffDuration: -1 * time.Minute, 171 + }} 172 + 173 + for _, tc := range cases { 174 + t.Run(tc.name, func(t *testing.T) { 175 + queue := NewQueue(tc.backoffDuration, tc.maxBackoffDuration, logtest.Scoped(t)) 176 + opts := IndexOptions{RepoID: 1, Name: "foo"} 177 + 178 + queue.AddOrUpdate(opts) 179 + emptyQueue(queue) 180 + queue.SetIndexed(opts, indexStateFail) 181 + 182 + queue.Bump([]uint32{opts.RepoID}) 183 + 184 + if _, ok := queue.Pop(); !ok { 185 + t.Fatal("queue should not be empty after bump when backoff is disabled") 186 + } 187 + }) 188 + } 189 + } 190 + 191 + func TestBackoff_AllowByDefault(t *testing.T) { 192 + backoffDuration := 1 * time.Minute 193 + maxBackoffDuration := 2 * backoffDuration 194 + 195 + backoff := backoff{ 196 + backoffDuration: backoffDuration, 197 + maxBackoff: maxBackoffDuration, 198 + } 199 + 200 + now := time.Now() 201 + assertAllow(t, now, backoff) 202 + } 203 + 204 + func TestBackoff_Disallow(t *testing.T) { 205 + backoffDuration := 10 * time.Minute 206 + maxBackoffDuration := 2 * backoffDuration 207 + opts := IndexOptions{RepoID: 1, Name: "foo"} 208 + 209 + backoff := backoff{ 210 + backoffDuration: backoffDuration, 211 + maxBackoff: maxBackoffDuration, 212 + } 213 + 214 + now := time.Now() 215 + backoff.Fail(now, logtest.Scoped(t), opts) 216 + assertDisallow(t, now, backoff) 217 + } 218 + 219 + func TestBackoff_BackoffExpiration(t *testing.T) { 220 + backoffDuration := 10 * time.Minute 221 + maxBackoffDuration := 2 * backoffDuration 222 + opts := IndexOptions{RepoID: 1, Name: "foo"} 223 + 224 + backoff := backoff{ 225 + backoffDuration: backoffDuration, 226 + maxBackoff: maxBackoffDuration, 227 + } 228 + 229 + now := time.Now() 230 + backoff.Fail(now, logtest.Scoped(t), opts) 231 + assertDisallow(t, now, backoff) 232 + 233 + backoffUntil := now.Add(backoffDuration) 234 + assertDisallow(t, backoffUntil, backoff) 235 + 236 + // backoff not applied for any timestamp after backoff until 237 + expiredBackoff := now.Add(backoffDuration + (1 * time.Nanosecond)) 238 + assertAllow(t, expiredBackoff, backoff) 239 + } 240 + 241 + func TestBackoff_ResetBackoffUntil(t *testing.T) { 242 + backoffDuration := 10 * time.Minute 243 + maxBackoffDuration := 2 * backoffDuration 244 + opts := IndexOptions{RepoID: 1, Name: "foo"} 245 + 246 + backoff := backoff{ 247 + backoffDuration: backoffDuration, 248 + maxBackoff: maxBackoffDuration, 249 + } 250 + 251 + now := time.Now() 252 + backoff.Fail(now, logtest.Scoped(t), opts) 253 + assertDisallow(t, now, backoff) 254 + 255 + backoff.Reset() 256 + assertAllow(t, now, backoff) 257 + } 258 + 259 + func TestBackoff_MaximumBackoffUntil(t *testing.T) { 260 + backoffDuration := 10 * time.Minute 261 + maxBackoffDuration := 25 * time.Minute 262 + opts := IndexOptions{RepoID: 1, Name: "foo"} 263 + 264 + backoff := backoff{ 265 + backoffDuration: backoffDuration, 266 + maxBackoff: maxBackoffDuration, 267 + } 268 + 269 + firstIndex := time.Now() 270 + backoff.Fail(firstIndex, logtest.Scoped(t), opts) 271 + currentBackoffUntil := backoffDuration 272 + 273 + // disallowed before we pass backoff until timestamp 274 + assertDisallow(t, firstIndex.Add(currentBackoffUntil-1*time.Minute), backoff) 275 + 276 + secondIndex := firstIndex.Add(currentBackoffUntil + 1*time.Minute) 277 + backoff.Fail(secondIndex, logtest.Scoped(t), opts) 278 + 279 + // failures applies increased backoff duration due to consecutive failures 280 + currentBackoffUntil += backoffDuration 281 + 282 + // disallowed before we pass backoff until timestamp 283 + assertDisallow(t, secondIndex.Add(currentBackoffUntil-1*time.Minute), backoff) 284 + 285 + thirdIndex := secondIndex.Add(currentBackoffUntil + 1*time.Minute) 286 + backoff.Fail(thirdIndex, logtest.Scoped(t), opts) 287 + 288 + // This would be the new backoff until timestamp if we were not bounded by maxBackoffDuration 289 + currentBackoffUntil += backoffDuration 290 + // currentBackoffUntil is not applied since it exceeds maximum 291 + assertAllow(t, thirdIndex.Add(currentBackoffUntil-1*time.Minute), backoff) 292 + 293 + // Maximum backoff duration was applied 294 + assertDisallow(t, thirdIndex.Add(maxBackoffDuration-1*time.Minute), backoff) 295 + } 296 + 297 + func TestBackoff_IncrementConsecutiveFailures(t *testing.T) { 298 + failedCount := 5 299 + backoffDuration := 1 * time.Minute 300 + maxBackoffDuration := time.Duration(failedCount) * backoffDuration 301 + opts := IndexOptions{RepoID: 1, Name: "foo"} 302 + 303 + backoff := backoff{ 304 + backoffDuration: backoffDuration, 305 + maxBackoff: maxBackoffDuration, 306 + } 307 + 308 + now := time.Now() 309 + expectedFailuresCount := 0 310 + 311 + for i := 0; i < failedCount; i++ { 312 + backoff.Fail(now.Add(time.Duration(i)*backoffDuration), logtest.Scoped(t), opts) 313 + expectedFailuresCount++ 314 + assertFailuresCount(t, expectedFailuresCount, backoff) 315 + } 316 + } 317 + 318 + func TestBackoff_MaximumConsecutiveFailures(t *testing.T) { 319 + maximumCount := 3 320 + failedCount := 2 * maximumCount 321 + backoffDuration := 1 * time.Minute 322 + maxBackoffDuration := time.Duration(maximumCount) * backoffDuration 323 + opts := IndexOptions{RepoID: 1, Name: "foo"} 324 + 325 + backoff := backoff{ 326 + backoffDuration: backoffDuration, 327 + maxBackoff: maxBackoffDuration, 328 + } 329 + 330 + now := time.Now() 331 + expectedFailuresCount := 0 332 + 333 + // consecutive failures count increments per failure 334 + for i := 0; i < maximumCount; i++ { 335 + backoff.Fail(now.Add(time.Duration(i)*backoffDuration), logtest.Scoped(t), opts) 336 + expectedFailuresCount++ 337 + assertFailuresCount(t, expectedFailuresCount, backoff) 338 + } 339 + 340 + // consecutive failures count does not change 341 + for i := maximumCount - 1; i < failedCount; i++ { 342 + backoff.Fail(now.Add(time.Duration(i)*backoffDuration), logtest.Scoped(t), opts) 343 + assertFailuresCount(t, expectedFailuresCount, backoff) 344 + } 345 + } 346 + 347 + func TestBackoff_ResetConsecutiveFailures(t *testing.T) { 348 + failedCount := 3 349 + backoffDuration := 10 * time.Minute 350 + maxBackoffDuration := time.Duration(failedCount) * backoffDuration 351 + opts := IndexOptions{RepoID: 1, Name: "foo"} 352 + 353 + backoff := backoff{ 354 + backoffDuration: backoffDuration, 355 + maxBackoff: maxBackoffDuration, 356 + } 357 + 358 + for i := 0; i < failedCount; i++ { 359 + now := time.Now() 360 + 361 + // fail j consecutive times 362 + for j := i; j <= i; j++ { 363 + backoff.Fail(now.Add(time.Duration(j)*backoffDuration), logtest.Scoped(t), opts) 364 + } 365 + 366 + // reset behavior is independent of current consecutiveFailures count 367 + backoff.Reset() 368 + assertFailuresCount(t, 0, backoff) 369 + } 370 + } 371 + 372 + func assertAllow(t *testing.T, now time.Time, b backoff) { 373 + if indexingAllowed := b.Allow(now); !indexingAllowed { 374 + t.Errorf("Indexing is not allowed to proceed by default at %s due to backing off until %s", 375 + now, b.backoffUntil) 376 + } 377 + } 378 + 379 + func assertDisallow(t *testing.T, now time.Time, b backoff) { 380 + if indexingAllowed := b.Allow(now); indexingAllowed { 381 + t.Errorf("Indexing is allowed to proceed at %s after failure despite being set to backoff until %s", 382 + now, b.backoffUntil) 383 + } 384 + } 385 + 386 + func assertFailuresCount(t *testing.T, expected int, b backoff) { 387 + if failuresCount := b.consecutiveFailures; failuresCount != expected { 388 + t.Errorf("Item currently tracks %d consecutive failures when expected consecutive failures count is %d", 389 + failuresCount, expected) 390 + } 391 + } 392 + 393 + func emptyQueue(q *Queue) { 394 + for ok := true; ok; _, ok = q.Pop() { 395 + } 396 + }
+9
cmd/zoekt-sourcegraph-indexserver/main.go
··· 1019 1019 mergeInterval time.Duration 1020 1020 targetSize int64 1021 1021 minSize int64 1022 + 1023 + // config values related to backoff indexing repos with one or more consecutive failures 1024 + backoffDuration time.Duration 1025 + maxBackoffDuration time.Duration 1022 1026 } 1023 1027 1024 1028 func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { ··· 1034 1038 fs.StringVar(&rc.hostname, "hostname", hostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1035 1039 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1036 1040 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") 1041 + fs.DurationVar(&rc.backoffDuration, "backoff_duration", 10*time.Minute, "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1042 + fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", 120*time.Minute, "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1037 1043 } 1038 1044 1039 1045 func startServer(conf rootConfig) error { ··· 1169 1175 1170 1176 logger := sglog.Scoped("server", "periodically reindexes enabled repositories on sourcegraph") 1171 1177 1178 + q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1179 + 1172 1180 return &Server{ 1173 1181 logger: logger, 1174 1182 Sourcegraph: sg, ··· 1179 1187 MergeInterval: conf.mergeInterval, 1180 1188 CPUCount: cpuCount, 1181 1189 TargetSizeBytes: conf.targetSize * 1024 * 1024, 1190 + queue: *q, 1182 1191 minSizeBytes: conf.minSize * 1024 * 1024, 1183 1192 shardMerging: zoekt.ShardMergingEnabled(), 1184 1193 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
+67 -29
cmd/zoekt-sourcegraph-indexserver/queue.go
··· 16 16 17 17 "github.com/prometheus/client_golang/prometheus" 18 18 "github.com/prometheus/client_golang/prometheus/promauto" 19 + 20 + sglog "github.com/sourcegraph/log" 19 21 ) 20 22 21 23 type queueItem struct { ··· 36 38 // dateAddedToQueue is the time when this indexing job was added to the queue. If this item is no longer 37 39 // in the heap (i.e. it has been processed already), this value is nonsensical. 38 40 dateAddedToQueue time.Time 41 + // backoff will handle backing off of future indexing requests for a duration of time based on previous failures 42 + backoff backoff 39 43 } 40 44 41 45 // Queue is a priority queue which returns the next repo to index. It is safe ··· 43 47 // 44 48 // (!indexed, time added to the queue) 45 49 // 46 - // We use the above since: 47 - // 48 - // * We rather index a repo sooner if we know the commit is stale. 49 - // * The order of repos returned by Sourcegraph API are ordered by importance. 50 + // We use the above since we'd rather index a repo sooner if we know the commit is stale. 50 51 type Queue struct { 51 - mu sync.Mutex 52 - items map[uint32]*queueItem 53 - pq pqueue 54 - seq int64 52 + mu sync.Mutex 53 + items map[uint32]*queueItem 54 + pq pqueue 55 + seq int64 56 + logger sglog.Logger 57 + newQueueItem func(uint32) *queueItem 58 + } 59 + 60 + func NewQueue(backoffDuration, maxBackoffDuration time.Duration, l sglog.Logger) *Queue { 61 + if backoffDuration < 0 || maxBackoffDuration < 0 { 62 + backoffDuration = 0 63 + maxBackoffDuration = 0 64 + } 65 + newQueueItem := func(repoID uint32) *queueItem { 66 + return &queueItem{ 67 + repoID: repoID, 68 + heapIdx: -1, 69 + backoff: backoff{ 70 + backoffDuration: backoffDuration, 71 + maxBackoff: maxBackoffDuration, 72 + }, 73 + } 74 + } 75 + 76 + q := &Queue{ 77 + newQueueItem: newQueueItem, 78 + logger: l.Scoped("queue", "zoekt-indexserver queue operations"), 79 + } 80 + 81 + q.init() 82 + return q 55 83 } 56 84 57 85 // Pop returns the opts of the next repo to index. If the queue is empty ok is ··· 99 127 item.opts = opts 100 128 } 101 129 if item.heapIdx < 0 { 102 - q.seq++ 103 - item.seq = q.seq 104 - item.dateAddedToQueue = time.Now() 130 + if item.backoff.Allow(time.Now()) { 131 + q.seq++ 132 + item.seq = q.seq 133 + item.dateAddedToQueue = time.Now() 105 134 106 - heap.Push(&q.pq, item) 107 - metricQueueLen.Set(float64(len(q.pq))) 108 - metricQueueCap.Set(float64(len(q.items))) 135 + heap.Push(&q.pq, item) 136 + metricQueueLen.Set(float64(len(q.pq))) 137 + metricQueueCap.Set(float64(len(q.items))) 138 + } 109 139 } else { 110 140 heap.Fix(&q.pq, item.heapIdx) 111 141 } ··· 129 159 if !ok { 130 160 missing = append(missing, id) 131 161 } else if item.heapIdx < 0 { 132 - q.seq++ 133 - item.seq = q.seq 134 - item.dateAddedToQueue = time.Now() 162 + if item.backoff.Allow(time.Now()) { 163 + q.seq++ 164 + item.seq = q.seq 165 + item.dateAddedToQueue = time.Now() 135 166 136 - heap.Push(&q.pq, item) 137 - metricQueueLen.Set(float64(len(q.pq))) 138 - metricQueueCap.Set(float64(len(q.items))) 167 + heap.Push(&q.pq, item) 168 + metricQueueLen.Set(float64(len(q.pq))) 169 + metricQueueCap.Set(float64(len(q.items))) 170 + } 139 171 } 140 172 } 141 173 ··· 252 284 item.indexState = state 253 285 if state != indexStateFail { 254 286 item.indexed = reflect.DeepEqual(opts, item.opts) 255 - } 287 + item.backoff.Reset() 288 + 289 + if item.heapIdx >= 0 { 290 + // We only update the position in the queue, never add it. 291 + heap.Fix(&q.pq, item.heapIdx) 292 + } 293 + } else { 294 + item.backoff.Fail(time.Now(), q.logger, item.opts) 256 295 257 - if item.heapIdx >= 0 { 258 - // We only update the position in the queue, never add it. 259 - heap.Fix(&q.pq, item.heapIdx) 296 + if item.heapIdx >= 0 { 297 + // Remove from queue 298 + heap.Remove(&q.pq, item.heapIdx) 299 + item.heapIdx = -1 300 + } 260 301 } 261 302 262 303 q.mu.Unlock() ··· 324 365 325 366 item, ok := q.items[repoID] 326 367 if !ok { 327 - item = &queueItem{ 328 - repoID: repoID, 329 - heapIdx: -1, 330 - } 368 + item = q.newQueueItem(repoID) 331 369 q.items[repoID] = item 332 370 } 333 371 ··· 379 417 return item 380 418 } 381 419 382 - // lessQueueItemPriority returns true if the indexing priority x is less than that of y. 420 + // lessQueueItemPriority returns true if indexing x should be prioritized over indexing y 383 421 func lessQueueItemPriority(x, y *queueItem) bool { 384 422 // If we know x needs an update and y doesn't, then return true. Otherwise 385 423 // they are either equal priority or y is more urgent.
+13 -8
cmd/zoekt-sourcegraph-indexserver/queue_test.go
··· 8 8 "strconv" 9 9 "strings" 10 10 "testing" 11 + "time" 11 12 12 13 "github.com/google/go-cmp/cmp" 14 + "github.com/sourcegraph/log/logtest" 13 15 "github.com/sourcegraph/zoekt" 14 16 ) 15 17 16 18 func TestQueue(t *testing.T) { 17 - queue := &Queue{} 19 + backoffDuration := 1 * time.Millisecond 20 + queue := NewQueue(backoffDuration, backoffDuration, logtest.Scoped(t)) 18 21 19 22 for i := 0; i < 100; i++ { 20 23 queue.AddOrUpdate(mkHEADIndexOptions(i, strconv.Itoa(i))) ··· 52 55 func TestQueueFIFO(t *testing.T) { 53 56 // Tests that the queue fallbacks to FIFO if everything has the same 54 57 // priority 55 - queue := &Queue{} 58 + backoffDuration := 1 * time.Millisecond 59 + queue := NewQueue(backoffDuration, backoffDuration, logtest.Scoped(t)) 56 60 57 61 for i := 0; i < 100; i++ { 58 62 queue.AddOrUpdate(mkHEADIndexOptions(i, strconv.Itoa(i))) ··· 77 81 } 78 82 79 83 func TestQueue_MaybeRemoveMissing(t *testing.T) { 80 - queue := &Queue{} 84 + backoffDuration := 1 * time.Millisecond 85 + queue := NewQueue(backoffDuration, backoffDuration, logtest.Scoped(t)) 81 86 82 87 queue.AddOrUpdate(IndexOptions{RepoID: 1, Name: "foo"}) 83 88 queue.AddOrUpdate(IndexOptions{RepoID: 2, Name: "bar"}) ··· 94 99 } 95 100 96 101 func TestQueue_Bump(t *testing.T) { 97 - queue := &Queue{} 102 + backoffDuration := 1 * time.Millisecond 103 + queue := NewQueue(backoffDuration, backoffDuration, logtest.Scoped(t)) 98 104 99 105 queue.AddOrUpdate(IndexOptions{RepoID: 1, Name: "foo"}) 100 106 queue.AddOrUpdate(IndexOptions{RepoID: 2, Name: "bar"}) 101 107 102 - // Empty queue 103 - for ok := true; ok; _, ok = queue.Pop() { 104 - } 108 + emptyQueue(queue) 105 109 106 110 // Bump 2 and 3. 3 doesn't exist, so only 2 should exist. 107 111 missing := queue.Bump([]uint32{2, 3}) ··· 160 164 return strings.Join(outputLines, "\n") 161 165 } 162 166 163 - queue := &Queue{} 167 + backoffDuration := 1 * time.Millisecond 168 + queue := NewQueue(backoffDuration, backoffDuration, logtest.Scoped(t)) 164 169 165 170 // setup: add two repositories to the queue and pop one of them 166 171 poppedRepository := mkHEADIndexOptions(0, "popped")