fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

shards: factor out collectSender (#440)

This is the functionality used in Search to aggregate factored out into
a StreamSender. The intention is to reuse this in streaming for initial
ranking work.

Additionally we introduce some logic to aggregate progress. This is the
same logic used in streaming. Right now the batch interface doesn't set
progress since that doesn't make sense, so clients will ignore it. When
collectSender is used by streaming it is important that the field is
set.

Test Plan: covered by existing tests

+85 -23
+71
shards/aggregate.go
··· 1 + package shards 2 + 3 + import ( 4 + "github.com/sourcegraph/zoekt" 5 + ) 6 + 7 + // collectSender is a sender that will aggregate results. Once sending is 8 + // done, you call Done to returned the aggregated result which are ranked. 9 + // 10 + // Note: It aggregates Progress as well, and expects that the 11 + // MaxPendingPriority it receives are monotonically decreasing. 12 + type collectSender struct { 13 + aggregate *zoekt.SearchResult 14 + maxDocDisplayCount int 15 + } 16 + 17 + func newCollectSender(opts *zoekt.SearchOptions) *collectSender { 18 + return &collectSender{ 19 + maxDocDisplayCount: opts.MaxDocDisplayCount, 20 + } 21 + } 22 + 23 + func (c *collectSender) Send(r *zoekt.SearchResult) { 24 + if c.aggregate == nil { 25 + c.aggregate = &zoekt.SearchResult{ 26 + RepoURLs: map[string]string{}, 27 + LineFragments: map[string]string{}, 28 + } 29 + } 30 + 31 + c.aggregate.Stats.Add(r.Stats) 32 + 33 + if len(r.Files) > 0 { 34 + c.aggregate.Files = append(c.aggregate.Files, r.Files...) 35 + 36 + for k, v := range r.RepoURLs { 37 + c.aggregate.RepoURLs[k] = v 38 + } 39 + for k, v := range r.LineFragments { 40 + c.aggregate.LineFragments[k] = v 41 + } 42 + } 43 + 44 + // The priority of our aggregate is the largest priority we collect. 45 + if c.aggregate.Priority < r.Priority { 46 + c.aggregate.Priority = r.Priority 47 + } 48 + 49 + // We receive monotonically decreasing values, so we update on every call. 50 + c.aggregate.MaxPendingPriority = r.MaxPendingPriority 51 + } 52 + 53 + // Done returns the aggregated result. Before returning them the files are 54 + // ranked and truncated according to the input SearchOptions. 55 + // 56 + // If no results are aggregated, ok is false and the result is nil. 57 + func (c *collectSender) Done() (_ *zoekt.SearchResult, ok bool) { 58 + if c.aggregate == nil { 59 + return nil, false 60 + } 61 + 62 + agg := c.aggregate 63 + c.aggregate = nil 64 + 65 + zoekt.SortFilesByScore(agg.Files) 66 + if max := c.maxDocDisplayCount; max > 0 && len(agg.Files) > max { 67 + agg.Files = agg.Files[:max] 68 + } 69 + 70 + return agg, true 71 + }
+14 -23
shards/shards.go
··· 457 457 ctx, cancel := context.WithCancel(ctx) 458 458 defer cancel() 459 459 460 - aggregate := &zoekt.SearchResult{ 461 - RepoURLs: map[string]string{}, 462 - LineFragments: map[string]string{}, 463 - } 460 + collectSender := newCollectSender(opts) 464 461 465 462 start := time.Now() 466 463 proc, err := ss.sched.Acquire(ctx) ··· 469 466 } 470 467 defer proc.Release() 471 468 tr.LazyPrintf("acquired process") 472 - aggregate.Wait = time.Since(start) 469 + 470 + wait := time.Since(start) 473 471 start = time.Now() 474 472 475 - done, err := streamSearch(ctx, proc, q, opts, ss.getShards(), stream.SenderFunc(func(r *zoekt.SearchResult) { 476 - aggregate.Stats.Add(r.Stats) 477 - 478 - if len(r.Files) > 0 { 479 - aggregate.Files = append(aggregate.Files, r.Files...) 480 - 481 - for k, v := range r.RepoURLs { 482 - aggregate.RepoURLs[k] = v 483 - } 484 - for k, v := range r.LineFragments { 485 - aggregate.LineFragments[k] = v 486 - } 487 - } 488 - })) 473 + done, err := streamSearch(ctx, proc, q, opts, ss.getShards(), collectSender) 489 474 defer done() 490 475 if err != nil { 491 476 return nil, err 492 477 } 493 478 494 - zoekt.SortFilesByScore(aggregate.Files) 495 - if max := opts.MaxDocDisplayCount; max > 0 && len(aggregate.Files) > max { 496 - aggregate.Files = aggregate.Files[:max] 479 + aggregate, ok := collectSender.Done() 480 + if !ok { 481 + aggregate = &zoekt.SearchResult{ 482 + RepoURLs: map[string]string{}, 483 + LineFragments: map[string]string{}, 484 + } 497 485 } 486 + 498 487 copyFiles(aggregate) 499 488 500 - aggregate.Duration = time.Since(start) 489 + aggregate.Stats.Wait = wait 490 + aggregate.Stats.Duration = time.Since(start) 491 + 501 492 return aggregate, nil 502 493 } 503 494