fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

Revert "shards: respect scheduler and use smarter synchronization for List (#750)" (#752)

This reverts commit 931974dd953e07ce8fbe28e3b615323c2d156e40.

After monitoring production we are seeing increased latencies in List
calls. Additionally we are seeing many more scheduler transitions into
interactive queued. The only realistic cause of this was this commit, so
we are reverting for now until further investigation.

Test Plan: go test

+28 -64
+28 -64
shards/shards.go
··· 28 28 "sync" 29 29 "time" 30 30 31 - "golang.org/x/sync/errgroup" 32 31 "golang.org/x/sync/semaphore" 33 32 34 33 "github.com/prometheus/client_golang/prometheus" ··· 895 894 err error 896 895 } 897 896 898 - func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions) (result *zoekt.RepoList, _ error) { 897 + func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions, sink chan shardListResult) { 899 898 metricListShardRunning.Inc() 900 899 defer func() { 901 900 metricListShardRunning.Dec() 902 - // If we panic, we log the panic and set Crashes (but do not return an 903 - // error). 904 901 if r := recover(); r != nil { 905 902 log.Printf("crashed shard: %s: %s, %s", s.String(), r, debug.Stack()) 906 - result = &zoekt.RepoList{Crashes: 1} 903 + sink <- shardListResult{ 904 + &zoekt.RepoList{Crashes: 1}, nil, 905 + } 907 906 } 908 907 }() 909 908 910 - return s.List(ctx, q, opts) 909 + ms, err := s.List(ctx, q, opts) 910 + sink <- shardListResult{ms, err} 911 911 } 912 912 913 913 func (ss *shardedSearcher) List(ctx context.Context, q query.Q, opts *zoekt.ListOptions) (rl *zoekt.RepoList, err error) { ··· 968 968 return &agg, nil 969 969 } 970 970 971 - // We use an errgroup so that when an error is encountered we can stop 972 - // feeder and report the first error seen. 973 - ctx, cancel := context.WithCancel(ctx) 974 - defer cancel() 975 - g, ctx := errgroup.WithContext(ctx) 976 - 977 - // Bound work by number of CPUs. 978 - workers := min(runtime.GOMAXPROCS(0), len(shards)) 971 + shardCount := len(shards) 972 + all := make(chan shardListResult, shardCount) 973 + feeder := make(chan zoekt.Searcher, len(shards)) 974 + for _, s := range shards { 975 + feeder <- s 976 + } 977 + close(feeder) 979 978 980 - var ( 981 - feeder = make(chan zoekt.Searcher, workers) 982 - all = make(chan *zoekt.RepoList, workers) 983 - ) 984 - 985 - // Send shards to feeder until context is canceled. 986 - g.Go(func() error { 987 - defer close(feeder) 988 - for _, s := range shards { 989 - // If context is canceled we stop consuming from shards and cancel the 990 - // errgroup. 991 - if err := proc.Yield(ctx); err != nil { 992 - return err 993 - } 994 - feeder <- s 995 - } 996 - return nil 997 - }) 998 - 999 - // Start up workers goroutines to consume feeder, do listing of a shard and 1000 - // send results down all. If an error is encountered we cancel the errgroup. 1001 - for range workers { 1002 - g.Go(func() error { 979 + for i := 0; i < runtime.GOMAXPROCS(0); i++ { 980 + go func() { 1003 981 for s := range feeder { 1004 - result, err := listOneShard(ctx, s, q, opts) 1005 - if err != nil { 1006 - return err 1007 - } 1008 - all <- result 982 + listOneShard(ctx, s, q, opts, all) 1009 983 } 1010 - return nil 1011 - }) 984 + }() 1012 985 } 1013 986 1014 - // Once all goroutines in errgroup is done, we know nothing more will be 1015 - // sent to all so close it. We rely on this sync point such that workersErr 1016 - // will be written to before we are finished reading from all. 1017 - var workersErr error 1018 - go func() { 1019 - workersErr = g.Wait() 1020 - close(all) 1021 - }() 987 + uniq := map[string]*zoekt.RepoListEntry{} 988 + 989 + for range shards { 990 + r := <-all 991 + if r.err != nil { 992 + return nil, r.err 993 + } 1022 994 1023 - // Aggregate results from all. 1024 - uniq := map[string]*zoekt.RepoListEntry{} 1025 - for rl := range all { 1026 - agg.Crashes += rl.Crashes 1027 - agg.Stats.Add(&rl.Stats) 995 + agg.Crashes += r.rl.Crashes 996 + agg.Stats.Add(&r.rl.Stats) 1028 997 1029 - for _, r := range rl.Repos { 998 + for _, r := range r.rl.Repos { 1030 999 prev, ok := uniq[r.Repository.Name] 1031 1000 if !ok { 1032 1001 cp := *r // We need to copy because we mutate r.Stats when merging duplicates ··· 1036 1005 } 1037 1006 } 1038 1007 1039 - for id, r := range rl.ReposMap { 1008 + for id, r := range r.rl.ReposMap { 1040 1009 _, ok := agg.ReposMap[id] 1041 1010 if !ok { 1042 1011 agg.ReposMap[id] = r 1043 1012 } 1044 1013 } 1045 - } 1046 - 1047 - // workersErr will now be set since all is closed. 1048 - if workersErr != nil { 1049 - return nil, workersErr 1050 1014 } 1051 1015 1052 1016 agg.Repos = make([]*zoekt.RepoListEntry, 0, len(uniq))