fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

shards: reduce blocking in locking strategy (#186)

This commit is an attempt to reduce the blocking that occurs when
multiScheduler.Acquire calls pile up after a multiScheduler.Exclusive
call happens, waiting for already running searches to finish AND the
work protected by the Exclusive process.

To that end, we:

1. Remove the "Exclusive" lock functionality from "multiScheduler" which
now concerns itself only with controlling concurrent searches and
their fairness (interactive + batch).

2. Make "shardedSearcher.replace" work with batches to amortize the
initial loading sorting cost instead of sorting it on the search
path.

3. Leverage "runtime.SetFinalizer" to call "Close" on a
"*rankedSearcher" that was dropped when it is no longer referenced
anywhere by any on-going searches.

Co-authored-by: Keegan Carruthers-Smith <keegan.csmith@gmail.com>

+236 -296
+1 -1
shards/eval_test.go
··· 15 15 addShard := func(docs ...zoekt.Document) { 16 16 b := testIndexBuilder(t, &zoekt.Repository{ID: 1, Name: "reponame"}, docs...) 17 17 shard := searcherForTest(t, b) 18 - ss.replace(fmt.Sprintf("key-%d", nextShardNum), shard) 18 + ss.replace(map[string]zoekt.Searcher{fmt.Sprintf("key-%d", nextShardNum): shard}) 19 19 nextShardNum++ 20 20 } 21 21 addShard(
+2 -133
shards/sched.go
··· 6 6 "os" 7 7 "strconv" 8 8 "strings" 9 - "sync" 10 9 "time" 11 10 12 11 "github.com/prometheus/client_golang/prometheus" ··· 23 22 // request). See process documentation. It will only return an error if the 24 23 // context expires. 25 24 Acquire(ctx context.Context) (*process, error) 26 - 27 - // Exclusive blocks until an exclusive process is created. An exclusive 28 - // process is the only running process. See process documentation. 29 - Exclusive() *process 30 25 } 31 26 32 27 // The ZOEKTSCHED environment variable controls variables within the ··· 64 59 // multiScheduler is for managing concurrent searches. Its goals are: 65 60 // 66 61 // 1. Limit the number of concurrent searches. 67 - // 2. Allow exclusive access. 68 - // 3. Co-operatively limit long running searches. 69 - // 4. No tuneables. 62 + // 2. Co-operatively limit long running searches. 63 + // 3. No tuneables. 70 64 // 71 65 // ### Limit the number of concurrent searches 72 66 // 73 67 // Searching is CPU bound, so we can't do better than #CPU queries 74 68 // concurrently. If we do so, we just create more memory pressure. 75 69 // 76 - // ### Allow exclusive access 77 - // 78 - // During the time the shard list is accessed and a search is actually done on 79 - // a shard it can't be closed. As such while a search is running we do not 80 - // allow any closing of shards. However, we do need to close and add shards as 81 - // the indexer proceeds. To do this we have an exclusive process which will be 82 - // the only one running. This is like a Lock on a RWMutex, while a normal 83 - // search is a RLock. 84 70 // 85 71 // ### Co-operatively limit long running searches 86 72 // ··· 105 91 // We intentionally keep the algorithm simple, but have a general interface to 106 92 // allow improvements as we learn more. 107 93 type multiScheduler struct { 108 - // Exclusive process holds a write lock, search processes hold read locks. 109 - mu *rwmutex 110 94 semInteractive *sema 111 95 semBatch *sema 112 96 ··· 137 121 } 138 122 139 123 return &multiScheduler{ 140 - mu: newRWMutex(), 141 124 semInteractive: newSema(capacity, "interactive"), 142 125 semBatch: newSema(batchCap, "batch"), 143 126 ··· 147 130 148 131 // Acquire implements scheduler.Acquire. 149 132 func (s *multiScheduler) Acquire(ctx context.Context) (*process, error) { 150 - if err := s.mu.RLock(ctx); err != nil { 151 - return nil, err 152 - } 153 - 154 133 // Start in interactive. yieldFunc will switch us to batch. sem can be nil 155 134 // if we fail while switching to batch. nil value prevents us releasing 156 135 // twice. 157 136 sem := s.semInteractive 158 137 159 138 if err := sem.Acquire(ctx); err != nil { 160 - s.mu.RUnlock() 161 139 return nil, err 162 140 } 163 141 ··· 167 145 sem.Release() 168 146 sem = nil 169 147 } 170 - s.mu.RUnlock() 171 148 }, 172 149 yieldTimer: newDeadlineTimer(time.Now().Add(s.interactiveDuration)), 173 150 yieldFunc: func(ctx context.Context) error { ··· 188 165 return nil 189 166 }, 190 167 }, nil 191 - } 192 - 193 - // Exclusive implements scheduler.Exclusive. 194 - func (s *multiScheduler) Exclusive() *process { 195 - // Exclusive process holds a write lock on mu, which ensures we have no 196 - // processes running (search semaphores are empty). 197 - // 198 - // exclusive processes will never yield, so we leave yieldTimer and 199 - // yieldFunc nil. 200 - s.mu.Lock() 201 - return &process{ 202 - releaseFunc: func() { 203 - s.mu.Unlock() 204 - }, 205 - } 206 168 } 207 169 208 170 // semaphoreScheduler shares a single semaphore for all searches. An exclusive ··· 430 392 func (s *sema) Release() { 431 393 s.sem.Release(1) 432 394 s.metricRunning.Dec() 433 - } 434 - 435 - // rwmutex is a wrapper around sync.RWMutex. It additionally respects context 436 - // cancellation and will track the state of the mutex in prometheus. 437 - type rwmutex struct { 438 - mu sync.RWMutex 439 - 440 - metricQueued *gaugeCounter 441 - metricRunning *gaugeCounter 442 - metricTimedoutTotal prometheus.Counter 443 - 444 - metricExclusiveQueued *gaugeCounter 445 - metricExclusiveRunning *gaugeCounter 446 - } 447 - 448 - func newRWMutex() *rwmutex { 449 - return &rwmutex{ 450 - metricQueued: &gaugeCounter{ 451 - gauge: metricSched.WithLabelValues("global", "queued"), 452 - counter: metricSchedTotal.WithLabelValues("global", "queued"), 453 - }, 454 - metricRunning: &gaugeCounter{ 455 - gauge: metricSched.WithLabelValues("global", "running"), 456 - counter: metricSchedTotal.WithLabelValues("global", "running"), 457 - }, 458 - metricTimedoutTotal: metricSchedTotal.WithLabelValues("global", "timedout"), 459 - 460 - metricExclusiveQueued: &gaugeCounter{ 461 - gauge: metricSched.WithLabelValues("exclusive", "queued"), 462 - counter: metricSchedTotal.WithLabelValues("exclusive", "queued"), 463 - }, 464 - metricExclusiveRunning: &gaugeCounter{ 465 - gauge: metricSched.WithLabelValues("exclusive", "running"), 466 - counter: metricSchedTotal.WithLabelValues("exclusive", "running"), 467 - }, 468 - } 469 - } 470 - 471 - func (s *rwmutex) RLock(ctx context.Context) error { 472 - s.metricQueued.Inc() 473 - defer s.metricQueued.Dec() 474 - 475 - err := rlockAcquire(ctx, &s.mu) 476 - if err != nil { 477 - s.metricTimedoutTotal.Inc() 478 - return err 479 - } 480 - 481 - s.metricRunning.Inc() 482 - 483 - return nil 484 - } 485 - 486 - func (s *rwmutex) RUnlock() { 487 - s.mu.RUnlock() 488 - s.metricRunning.Dec() 489 - } 490 - 491 - func (s *rwmutex) Lock() { 492 - s.metricExclusiveQueued.Inc() 493 - defer s.metricExclusiveQueued.Dec() 494 - 495 - s.mu.Lock() 496 - s.metricExclusiveRunning.Inc() 497 - } 498 - 499 - func (s *rwmutex) Unlock() { 500 - s.mu.Unlock() 501 - s.metricExclusiveRunning.Dec() 502 - } 503 - 504 - func rlockAcquire(ctx context.Context, mu *sync.RWMutex) error { 505 - // Lock in goroutine to respect ctx 506 - done := make(chan struct{}) 507 - go func() { 508 - mu.RLock() 509 - close(done) 510 - }() 511 - 512 - select { 513 - case <-done: 514 - return nil 515 - 516 - case <-ctx.Done(): 517 - // We can't cancel RLock. So we wait for it to lock in the background and 518 - // immediately unlock. 519 - go func() { 520 - <-done 521 - mu.RUnlock() 522 - }() 523 - 524 - return ctx.Err() 525 - } 526 395 } 527 396 528 397 // gaugeCounter is a wrapper around a gauge and a counter. Whenever the gauge
-17
shards/sched_test.go
··· 168 168 t.Fatal("expected second acquire after cap to fail") 169 169 } 170 170 171 - // We check that exclusive works by trying to acquire one and ensuring it 172 - // only works once we have released all other existing procs 173 - exclusiveC := make(chan *process) 174 - go func() { 175 - exclusiveC <- sched.Exclusive() 176 - }() 177 - 178 - select { 179 - case <-exclusiveC: 180 - t.Fatal("should not acquire exclusive since other procs are running") 181 - case <-time.After(10 * time.Millisecond): 182 - } 183 - 184 171 for _, p := range procs { 185 172 p.Release() 186 173 } 187 174 procs = nil 188 - 189 - // Now we should get exclusive 190 - proc := <-exclusiveC 191 - proc.Release() 192 175 } 193 176 194 177 func quickCtx(t *testing.T) context.Context {
+154 -95
shards/shards.go
··· 25 25 "sort" 26 26 "strconv" 27 27 "sync" 28 + "sync/atomic" 28 29 "time" 29 30 30 31 "golang.org/x/sync/errgroup" 32 + "golang.org/x/sync/semaphore" 31 33 32 34 "github.com/google/zoekt" 33 35 "github.com/google/zoekt/query" ··· 129 131 Name: "zoekt_list_shard_running", 130 132 Help: "The number of concurrent list requests in a shard running", 131 133 }) 132 - metricShardCloseDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{ 133 - Name: "zoekt_shard_close_duration_seconds", 134 - Help: "The time it takes to close a Searcher.", 135 - Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30}, 136 - }) 137 - metricRankCacheUpdateDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{ 138 - Name: "zoekt_rank_cache_update_duration_seconds", 139 - Help: "The time it takes to update the shard cache with new ranked shards.", 134 + metricShardsBatchReplaceDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{ 135 + Name: "zoekt_shards_batch_replace_duration_seconds", 136 + Help: "The time it takes to replace a batch of Searchers.", 140 137 Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30}, 141 138 }) 142 - 143 139 metricListAllRepos = promauto.NewGauge(prometheus.GaugeOpts{ 144 140 Name: "zoekt_list_all_stats_repos", 145 141 Help: "The last List(true) value for RepoStats.Repos. Repos is used for aggregrating the number of repositories.", ··· 177 173 type rankedShard struct { 178 174 zoekt.Searcher 179 175 180 - priority float64 176 + priority float64 // maximum priority across all repos in the shard 181 177 182 178 // We have out of band ranking on compound shards which can change even if 183 179 // the shard file does not. So we compute a rank in getShards. We store 184 - // names here to avoid the cost of List in the search request path. 180 + // repos here to avoid the cost of List in the search request path. 185 181 repos []*zoekt.Repository 186 182 } 187 183 ··· 192 188 // pressure. 193 189 sched scheduler 194 190 195 - shards map[string]rankedShard 191 + mu sync.Mutex // protects writes to shards 192 + shards map[string]*rankedShard 196 193 197 - rankedLock sync.Mutex // guards ranked 198 - ranked []rankedShard 194 + ranked atomic.Value 199 195 } 200 196 201 197 func newShardedSearcher(n int64) *shardedSearcher { 202 198 ss := &shardedSearcher{ 203 - shards: make(map[string]rankedShard), 199 + shards: make(map[string]*rankedShard), 204 200 sched: newScheduler(n), 205 201 } 206 202 return ss ··· 243 239 ss *shardedSearcher 244 240 } 245 241 246 - func (tl *loader) load(key string) { 247 - shard, err := loadShard(key) 248 - if err != nil { 249 - metricShardsLoadFailedTotal.Inc() 250 - log.Printf("reloading: %s, err %v ", key, err) 251 - return 242 + func (tl *loader) load(keys ...string) { 243 + var ( 244 + mu sync.Mutex // synchronizes writes to the shards map 245 + wg sync.WaitGroup // used to wait for all shards to load 246 + sem = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(0))) 247 + shards = make(map[string]zoekt.Searcher, len(keys)) 248 + ) 249 + 250 + log.Printf("loading %d shard(s): %s", len(keys), humanTruncateList(keys, 5)) 251 + 252 + lastProgress := time.Now() 253 + for i, key := range keys { 254 + // If taking a while to start-up occasionally give a progress message 255 + if time.Since(lastProgress) > 10*time.Second { 256 + log.Printf("still need to load %d shards...", len(keys)-i) 257 + lastProgress = time.Now() 258 + } 259 + 260 + _ = sem.Acquire(context.Background(), 1) 261 + wg.Add(1) 262 + 263 + go func(key string) { 264 + defer sem.Release(1) 265 + defer wg.Done() 266 + 267 + shard, err := loadShard(key) 268 + if err != nil { 269 + metricShardsLoadFailedTotal.Inc() 270 + log.Printf("reloading: %s, err %v ", key, err) 271 + return 272 + } 273 + metricShardsLoadedTotal.Inc() 274 + 275 + mu.Lock() 276 + shards[key] = shard 277 + mu.Unlock() 278 + }(key) 252 279 } 253 280 254 - metricShardsLoadedTotal.Inc() 255 - tl.ss.replace(key, shard) 281 + wg.Wait() 282 + 283 + tl.ss.replace(shards) 256 284 } 257 285 258 - func (tl *loader) drop(key string) { 259 - tl.ss.replace(key, nil) 286 + func (tl *loader) drop(keys ...string) { 287 + shards := make(map[string]zoekt.Searcher, len(keys)) 288 + for _, key := range keys { 289 + shards[key] = nil 290 + } 291 + tl.ss.replace(shards) 260 292 } 261 293 262 294 func (ss *shardedSearcher) String() string { ··· 265 297 266 298 // Close closes references to open files. It may be called only once. 267 299 func (ss *shardedSearcher) Close() { 268 - proc := ss.sched.Exclusive() 269 - defer proc.Release() 270 - for _, s := range ss.shards { 271 - s.Close() 300 + ss.mu.Lock() 301 + shards := make(map[string]zoekt.Searcher, len(ss.shards)) 302 + for k := range ss.shards { 303 + shards[k] = nil 272 304 } 273 - ss.shards = make(map[string]rankedShard) 305 + ss.mu.Unlock() 306 + 307 + ss.replace(shards) 274 308 } 275 309 276 - func selectRepoSet(shards []rankedShard, q query.Q) ([]rankedShard, query.Q) { 310 + func selectRepoSet(shards []*rankedShard, q query.Q) ([]*rankedShard, query.Q) { 277 311 and, ok := q.(*query.And) 278 312 if !ok { 279 313 return shards, q ··· 336 370 setSize = len(shards) 337 371 } 338 372 339 - filtered := make([]rankedShard, 0, setSize) 373 + filtered := make([]*rankedShard, 0, setSize) 340 374 filteredAll := true 341 375 342 376 for _, s := range shards { ··· 440 474 aggregate.Wait = time.Since(start) 441 475 start = time.Now() 442 476 443 - err = ss.streamSearch(ctx, proc, q, opts, stream.SenderFunc(func(r *zoekt.SearchResult) { 477 + done, err := ss.streamSearch(ctx, proc, q, opts, stream.SenderFunc(func(r *zoekt.SearchResult) { 444 478 began := time.Now() 445 479 446 480 aggregate.Lock() ··· 470 504 471 505 metricSearchAggregateDuration.WithLabelValues("total").Observe(time.Since(began).Seconds()) 472 506 })) 507 + defer done() 473 508 if err != nil { 474 509 return nil, err 475 510 } ··· 507 542 }, 508 543 }) 509 544 510 - return ss.streamSearch(ctx, proc, q, opts, stream.SenderFunc(func(event *zoekt.SearchResult) { 545 + done, err := ss.streamSearch(ctx, proc, q, opts, stream.SenderFunc(func(event *zoekt.SearchResult) { 511 546 copyFiles(event) 512 547 sender.Send(event) 513 548 })) 549 + done() 550 + return err 514 551 } 515 552 516 - func (ss *shardedSearcher) streamSearch(ctx context.Context, proc *process, q query.Q, opts *zoekt.SearchOptions, sender zoekt.Sender) (err error) { 553 + // streamSearch is an internal helper since both Search and StreamSearch are largely similiar. 554 + // 555 + // done must always be called, even if err is non-nil. The SearchResults sent 556 + // via sender contain references to the underlying mmap data that the garbage 557 + // collector can't see. Calling done informs the garbage collector it is free 558 + // to collect those shards. The caller must call copyFiles on any 559 + // SearchResults it returns/streams out before calling done. 560 + func (ss *shardedSearcher) streamSearch(ctx context.Context, proc *process, q query.Q, opts *zoekt.SearchOptions, sender zoekt.Sender) (done func(), err error) { 517 561 tr, ctx := trace.New(ctx, "shardedSearcher.streamSearch", "") 518 562 tr.LazyLog(q, true) 519 563 tr.LazyPrintf("opts: %+v", opts) ··· 561 605 // cap(feeder) searches run while yield blocks. However, doing it this way 562 606 // avoids needing to have synchronization in yield, so is done for 563 607 // simplicity. 564 - feeder := make(chan rankedShard, runtime.GOMAXPROCS(0)) 608 + feeder := make(chan *rankedShard, runtime.GOMAXPROCS(0)) 565 609 g.Go(func() error { 566 610 defer close(feeder) 567 611 // Note: shards is sorted in order of descending priority. ··· 625 669 return nil 626 670 }) 627 671 } 628 - return g.Wait() 672 + return func() { 673 + runtime.KeepAlive(shards) 674 + }, g.Wait() 629 675 } 630 676 631 677 func copySlice(src *[]byte) { ··· 634 680 *src = dst 635 681 } 636 682 637 - // copyFiles must be protected by shardedSearcher.sched. 638 683 func copyFiles(sr *zoekt.SearchResult) { 639 684 for i := range sr.Files { 640 685 copySlice(&sr.Files[i].Content) ··· 801 846 802 847 // getShards returns the currently loaded shards. The shards are sorted by decreasing 803 848 // rank and should not be mutated. 804 - func (s *shardedSearcher) getShards() []rankedShard { 805 - start := time.Now() 806 - s.rankedLock.Lock() 807 - defer s.rankedLock.Unlock() 808 - if len(s.ranked) > 0 { 809 - metricRankCacheUpdateDurationSeconds.Observe(time.Since(start).Seconds()) 810 - return s.ranked 811 - } 812 - 813 - // Holding rankedLock during the search ensures that we only perform 814 - // the sort once-- any blocked goroutines would take just as long to 815 - // perform the sort themselves. 816 - res := make([]rankedShard, 0, len(s.shards)) 817 - for _, sh := range s.shards { 818 - res = append(res, sh) 819 - } 820 - sort.Slice(res, func(i, j int) bool { 821 - priorityDiff := res[i].priority - res[j].priority 822 - if priorityDiff != 0 { 823 - return priorityDiff > 0 824 - } 825 - if len(res[i].repos) == 0 || len(res[j].repos) == 0 { 826 - // Protect against empty names which can happen if we fail to List or 827 - // the shard is full of tombstones. Prefer the shard which has names. 828 - return len(res[i].repos) >= len(res[j].repos) 829 - } 830 - return res[i].repos[0].Name < res[j].repos[0].Name 831 - }) 832 - 833 - s.ranked = res 834 - 835 - return res 849 + func (s *shardedSearcher) getShards() []*rankedShard { 850 + ranked, _ := s.ranked.Load().([]*rankedShard) 851 + return ranked 836 852 } 837 853 838 - func mkRankedShard(s zoekt.Searcher) rankedShard { 854 + func mkRankedShard(s zoekt.Searcher) *rankedShard { 839 855 q := query.Const{Value: true} 840 856 result, err := s.List(context.Background(), &q, nil) 841 857 if err != nil { 842 - return rankedShard{Searcher: s} 858 + return &rankedShard{Searcher: s} 843 859 } 844 860 if len(result.Repos) == 0 { 845 - return rankedShard{Searcher: s} 861 + return &rankedShard{Searcher: s} 846 862 } 847 863 848 864 var ( ··· 860 876 } 861 877 } 862 878 863 - return rankedShard{ 879 + return &rankedShard{ 864 880 Searcher: s, 865 881 repos: repos, 866 882 priority: maxPriority, 867 883 } 868 884 } 869 885 870 - func (s *shardedSearcher) replace(key string, shard zoekt.Searcher) { 871 - var ranked rankedShard 872 - if shard != nil { 873 - ranked = mkRankedShard(shard) 874 - } 886 + func (s *shardedSearcher) replace(shards map[string]zoekt.Searcher) { 887 + defer func(began time.Time) { 888 + metricShardsBatchReplaceDurationSeconds.Observe(time.Since(began).Seconds()) 889 + }(time.Now()) 875 890 876 - proc := s.sched.Exclusive() 891 + s.mu.Lock() 892 + defer s.mu.Unlock() 877 893 878 - old := s.shards[key] 879 - if shard == nil { 880 - delete(s.shards, key) 881 - } else { 882 - s.shards[key] = ranked 883 - } 884 - s.rankedLock.Lock() 885 - s.ranked = nil 886 - s.rankedLock.Unlock() 894 + for key, shard := range shards { 895 + var r *rankedShard 896 + if shard != nil { 897 + r = mkRankedShard(shard) 898 + } 887 899 888 - proc.Release() 900 + old := s.shards[key] 901 + if shard == nil { 902 + delete(s.shards, key) 903 + } else { 904 + s.shards[key] = r 905 + } 889 906 890 - if old.Searcher != nil { 891 - start := time.Now() 892 - old.Close() 893 - metricShardCloseDurationSeconds.Observe(time.Since(start).Seconds()) 907 + if old != nil && old.Searcher != nil { 908 + // _ ___ /^^\ /^\ /^^\_ 909 + // _ _@)@) \ ,,/ '` ~ `'~~ ', `\. 910 + // _/o\_ _ _ _/~`.`...'~\ ./~~..,'`','',.,' ' ~: 911 + // / `,'.~,~.~ . , . , ~|, ,/ .,' , ,. .. ,,. `, ~\_ 912 + // ( ' _' _ '_` _ ' . , `\_/ .' ..' ' ` ` `.. `, \_ 913 + // ~V~ V~ V~ V~ ~\ ` ' . ' , ' .,.,''`.,.''`.,.``. ', \_ 914 + // _/\ /\ /\ /\_/, . ' , `_/~\_ .' .,. ,, , _/~\_ `. `. '., \_ 915 + // < ~ ~ '~`'~'`, ., . `_: ::: \_ ' `_/ ::: \_ `.,' . ', \_ 916 + // \ ' `_ '`_ _ ',/ _::_::_ \ _ _/ _::_::_ \ `.,'.,`., \-,-,-,_,_, 917 + // `'~~ `'~~ `'~~ `'~~ \(_)(_)(_)/ `~~' \(_)(_)(_)/ ~'`\_.._,._,'_;_;_;_;_; 918 + // 919 + // We can't just call Close now, because there may be ongoing searches 920 + // which have old in the shards list. Previously we used an exclusive 921 + // lock to gaurentee there were no concurrent searches. However, that 922 + // led to blocking on the read path. 923 + // 924 + // We could introduce granular locking per rankedShard to know when 925 + // there are no more references. However, this becomes tricky in 926 + // practice. Instead we rely on the garbage collector noticing old is no 927 + // longer used. We take care in our searchers to runtime.KeepAlive until 928 + // we have stopped referencing the underling mmap data. 929 + runtime.SetFinalizer(old, func(r *rankedShard) { 930 + r.Close() 931 + }) 932 + } 894 933 } 895 934 896 - metricShardsLoaded.Set(float64(len(s.shards))) 935 + ranked := make([]*rankedShard, 0, len(s.shards)) 936 + for _, r := range s.shards { 937 + ranked = append(ranked, r) 938 + } 939 + 940 + sort.Slice(ranked, func(i, j int) bool { 941 + priorityDiff := ranked[i].priority - ranked[j].priority 942 + if priorityDiff != 0 { 943 + return priorityDiff > 0 944 + } 945 + if len(ranked[i].repos) == 0 || len(ranked[j].repos) == 0 { 946 + // Protect against empty names which can happen if we fail to List or 947 + // the shard is full of tombstones. Prefer the shard which has names. 948 + return len(ranked[i].repos) >= len(ranked[j].repos) 949 + } 950 + return ranked[i].repos[0].Name < ranked[j].repos[0].Name 951 + }) 952 + 953 + s.ranked.Store(ranked) 954 + 955 + metricShardsLoaded.Set(float64(len(ranked))) 897 956 } 898 957 899 958 func loadShard(fn string) (zoekt.Searcher, error) {
+65 -18
shards/shards_test.go
··· 22 22 "log" 23 23 "math" 24 24 "os" 25 + "reflect" 25 26 "runtime" 26 27 "sort" 28 + "strconv" 27 29 "testing" 28 30 "time" 29 31 ··· 57 59 log.SetOutput(out) 58 60 defer log.SetOutput(os.Stderr) 59 61 ss := newShardedSearcher(2) 60 - ss.shards = map[string]rankedShard{ 61 - "x": {Searcher: &crashSearcher{}}, 62 - } 62 + ss.ranked.Store([]*rankedShard{{Searcher: &crashSearcher{}}}) 63 63 64 64 q := &query.Substring{Pattern: "hoi"} 65 65 opts := &zoekt.SearchOptions{} ··· 131 131 132 132 n := 10 * runtime.GOMAXPROCS(0) 133 133 for i := 0; i < n; i++ { 134 - ss.replace(fmt.Sprintf("shard%d", i), 135 - &rankSearcher{ 136 - rank: uint16(i), 137 - }) 134 + ss.replace(map[string]zoekt.Searcher{ 135 + fmt.Sprintf("shard%d", i): &rankSearcher{rank: uint16(i)}, 136 + }) 138 137 } 139 138 140 139 if res, err := ss.Search(context.Background(), &query.Substring{Pattern: "bla"}, &zoekt.SearchOptions{}); err != nil { ··· 168 167 } 169 168 } 170 169 170 + func TestShardedSearcher_Ranking(t *testing.T) { 171 + ss := newShardedSearcher(1) 172 + 173 + var nextShardNum int 174 + addShard := func(repo string, priority float64, docs ...zoekt.Document) { 175 + r := &zoekt.Repository{ID: hash(repo), Name: repo} 176 + r.RawConfig = map[string]string{ 177 + "public": "1", 178 + "priority": strconv.FormatFloat(priority, 'f', 2, 64), 179 + } 180 + b := testIndexBuilder(t, r, docs...) 181 + shard := searcherForTest(t, b) 182 + ss.replace(map[string]zoekt.Searcher{ 183 + fmt.Sprintf("key-%d", nextShardNum): shard, 184 + }) 185 + nextShardNum++ 186 + } 187 + 188 + addShard("weekend-project", 0.25, zoekt.Document{Name: "f2", Content: []byte("foo bas")}) 189 + addShard("moderately-popular", 0.5, zoekt.Document{Name: "f3", Content: []byte("foo bar")}) 190 + addShard("weekend-project-2", 0.25, zoekt.Document{Name: "f2", Content: []byte("foo bas")}) 191 + addShard("super-star", 0.9, zoekt.Document{Name: "f1", Content: []byte("foo bar bas")}) 192 + 193 + want := []string{ 194 + "super-star", 195 + "moderately-popular", 196 + "weekend-project", 197 + "weekend-project-2", 198 + } 199 + 200 + var have []string 201 + for _, s := range ss.getShards() { 202 + for _, r := range s.repos { 203 + have = append(have, r.Name) 204 + } 205 + } 206 + 207 + if !reflect.DeepEqual(want, have) { 208 + t.Fatalf("\nwant: %s\nhave: %s", want, have) 209 + } 210 + } 211 + 171 212 func TestFilteringShardsByRepoSet(t *testing.T) { 172 213 ss := newShardedSearcher(1) 173 214 ··· 181 222 repoSetNames = append(repoSetNames, repoName) 182 223 } 183 224 184 - ss.replace(shardName, &rankSearcher{ 185 - repo: &zoekt.Repository{ID: hash(repoName), Name: repoName}, 186 - rank: uint16(n - i), 225 + ss.replace(map[string]zoekt.Searcher{ 226 + shardName: &rankSearcher{ 227 + repo: &zoekt.Repository{ID: hash(repoName), Name: repoName}, 228 + rank: uint16(n - i), 229 + }, 187 230 }) 188 231 } 189 232 ··· 274 317 } 275 318 276 319 ss := newShardedSearcher(2) 277 - ss.replace("key", searcher) 320 + ss.replace(map[string]zoekt.Searcher{"key": searcher}) 278 321 279 322 var opts zoekt.SearchOptions 280 323 q := &query.Substring{Pattern: "needle"} ··· 321 364 322 365 // Test duplicate removal when ListOptions.Minimal is true and false 323 366 ss := newShardedSearcher(4) 324 - ss.replace("1", searcherForTest(t, testIndexBuilder(t, repos[0]))) 325 - ss.replace("2", searcherForTest(t, testIndexBuilder(t, repos[0]))) 326 - ss.replace("3", searcherForTest(t, testIndexBuilder(t, repos[1]))) 327 - ss.replace("4", searcherForTest(t, testIndexBuilder(t, repos[1]))) 367 + ss.replace(map[string]zoekt.Searcher{ 368 + "1": searcherForTest(t, testIndexBuilder(t, repos[0])), 369 + "2": searcherForTest(t, testIndexBuilder(t, repos[0])), 370 + "3": searcherForTest(t, testIndexBuilder(t, repos[1])), 371 + "4": searcherForTest(t, testIndexBuilder(t, repos[1])), 372 + }) 328 373 329 374 for _, tc := range []struct { 330 375 name string ··· 472 517 repos := reposForTest(3000) 473 518 var repoSetIDs []uint32 474 519 520 + shards := make(map[string]zoekt.Searcher, len(repos)) 475 521 for i, r := range repos { 476 - searcher := testSearcherForRepo(b, r, filesPerRepo) 477 - ss.replace(r.Name, searcher) 522 + shards[r.Name] = testSearcherForRepo(b, r, filesPerRepo) 478 523 if i%2 == 0 { 479 524 repoSetIDs = append(repoSetIDs, r.ID) 480 525 } 481 526 } 527 + 528 + ss.replace(shards) 482 529 483 530 ctx := context.Background() 484 531 opts := &zoekt.SearchOptions{} ··· 541 588 r.RawConfig = rawConfig 542 589 b := testIndexBuilder(t, r, docs...) 543 590 shard := searcherForTest(t, b) 544 - ss.replace(fmt.Sprintf("key-%d", nextShardNum), shard) 591 + ss.replace(map[string]zoekt.Searcher{fmt.Sprintf("key-%d", nextShardNum): shard}) 545 592 nextShardNum++ 546 593 } 547 594 addShard("public", map[string]string{"public": "1"}, zoekt.Document{Name: "f1", Content: []byte("foo bar bas")})
+6 -28
shards/watcher.go
··· 19 19 "log" 20 20 "os" 21 21 "path/filepath" 22 - "runtime" 23 22 "sort" 24 23 "strconv" 25 24 "strings" ··· 31 30 ) 32 31 33 32 type shardLoader interface { 34 - // Load a new file. Should be safe for concurrent calls. 35 - load(filename string) 36 - drop(filename string) 33 + // Load a new file. 34 + load(filenames ...string) 35 + drop(filenames ...string) 37 36 } 38 37 39 38 type DirectoryWatcher struct { ··· 163 162 if len(toDrop) > 0 { 164 163 log.Printf("unloading %d shard(s): %s", len(toDrop), humanTruncateList(toDrop, 5)) 165 164 } 166 - for _, t := range toDrop { 167 - s.loader.drop(t) 168 - } 165 + 166 + s.loader.drop(toDrop...) 169 167 170 168 if len(toLoad) == 0 { 171 169 return nil 172 170 } 173 171 174 - log.Printf("loading %d shard(s): %s", len(toLoad), humanTruncateList(toLoad, 5)) 175 - 176 - // Limit amount of concurrent shard loads. 177 - throttle := make(chan struct{}, runtime.GOMAXPROCS(0)) 178 - lastProgress := time.Now() 179 - for i, t := range toLoad { 180 - // If taking a while to start-up occasionally give a progress message 181 - if time.Since(lastProgress) > 10*time.Second { 182 - log.Printf("still need to load %d shards...", len(toLoad)-i) 183 - lastProgress = time.Now() 184 - } 185 - 186 - throttle <- struct{}{} 187 - go func(k string) { 188 - s.loader.load(k) 189 - <-throttle 190 - }(t) 191 - } 192 - for i := 0; i < cap(throttle); i++ { 193 - throttle <- struct{}{} 194 - } 172 + s.loader.load(toLoad...) 195 173 196 174 return nil 197 175 }
+8 -4
shards/watcher_test.go
··· 30 30 drops chan string 31 31 } 32 32 33 - func (l *loggingLoader) load(k string) { 34 - l.loads <- k 33 + func (l *loggingLoader) load(keys ...string) { 34 + for _, key := range keys { 35 + l.loads <- key 36 + } 35 37 } 36 38 37 - func (l *loggingLoader) drop(k string) { 38 - l.drops <- k 39 + func (l *loggingLoader) drop(keys ...string) { 40 + for _, key := range keys { 41 + l.drops <- key 42 + } 39 43 } 40 44 41 45 func advanceFS() {