fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

shards: respect scheduler and use smarter synchronization for List (#750)

Previously List would never call proc.Yield, which broke the
co-operative scheduler in the case of slow List calls. Additionally, we
used a naive concurrency (a large buffered channel) which shows up in
the profiler as 30% of CPU spent on chan_ related operations under List.

This commit follows how Search used to respect proc.Yield. See sched.go
in 90ed7bfd07e0a3137e3ee627cbf6824446df9c4d. We did not copy Search
since it uses a more complicated implementation than we need since it
supports streaming, while List is still batch only.

We needed to use errgroup to ensure we drained all channels in the case
of an error. Previously we did not need to do this since the channels
had a buffer size of len(shards), which gaurenteed nothing would ever
block. Now channels are never larger than the number of workers (<=
GOMAXPROCS).

Test Plan: go test covers the no error cases. In the case of errors we
manually tested by running zoekt-webserver and adding a random context
cancellation. We observed the error being reported and no List
goroutines running.

Co-authored-by: William Bezuidenhout <william.bezuidenhout@sourcegraph.com>

+63 -29
+63 -29
shards/shards.go
··· 27 27 "sync" 28 28 "time" 29 29 30 + "golang.org/x/sync/errgroup" 30 31 "golang.org/x/sync/semaphore" 31 32 32 33 "github.com/prometheus/client_golang/prometheus" ··· 872 873 err error 873 874 } 874 875 875 - func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions, sink chan shardListResult) { 876 + func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions) (result *zoekt.RepoList, _ error) { 876 877 metricListShardRunning.Inc() 877 878 defer func() { 878 879 metricListShardRunning.Dec() 880 + // If we panic, we log the panic and set Crashes (but do not return an 881 + // error). 879 882 if r := recover(); r != nil { 880 883 log.Printf("crashed shard: %s: %s, %s", s.String(), r, debug.Stack()) 881 - sink <- shardListResult{ 882 - &zoekt.RepoList{Crashes: 1}, nil, 883 - } 884 + result = &zoekt.RepoList{Crashes: 1} 884 885 } 885 886 }() 886 887 887 - ms, err := s.List(ctx, q, opts) 888 - sink <- shardListResult{ms, err} 888 + return s.List(ctx, q, opts) 889 889 } 890 890 891 891 func (ss *shardedSearcher) List(ctx context.Context, r query.Q, opts *zoekt.ListOptions) (rl *zoekt.RepoList, err error) { ··· 944 944 return &agg, nil 945 945 } 946 946 947 - shardCount := len(shards) 948 - all := make(chan shardListResult, shardCount) 949 - tr.LazyPrintf("shardCount: %d", len(shards)) 947 + // We use an errgroup so that when an error is encountered we can stop 948 + // feeder and report the first error seen. 949 + ctx, cancel := context.WithCancel(ctx) 950 + defer cancel() 951 + g, ctx := errgroup.WithContext(ctx) 950 952 951 - feeder := make(chan zoekt.Searcher, len(shards)) 952 - for _, s := range shards { 953 - feeder <- s 954 - } 955 - close(feeder) 953 + // Bound work by number of CPUs. 954 + workers := min(runtime.GOMAXPROCS(0), len(shards)) 955 + 956 + var ( 957 + feeder = make(chan zoekt.Searcher, workers) 958 + all = make(chan *zoekt.RepoList, workers) 959 + ) 960 + 961 + // Send shards to feeder until context is canceled. 962 + g.Go(func() error { 963 + defer close(feeder) 964 + for _, s := range shards { 965 + // If context is canceled we stop consuming from shards and cancel the 966 + // errgroup. 967 + if err := proc.Yield(ctx); err != nil { 968 + return err 969 + } 970 + feeder <- s 971 + } 972 + return nil 973 + }) 956 974 957 - for i := 0; i < runtime.GOMAXPROCS(0); i++ { 958 - go func() { 975 + // Start up workers goroutines to consume feeder, do listing of a shard and 976 + // send results down all. If an error is encountered we cancel the errgroup. 977 + for range workers { 978 + g.Go(func() error { 959 979 for s := range feeder { 960 - listOneShard(ctx, s, r, opts, all) 980 + result, err := listOneShard(ctx, s, r, opts) 981 + if err != nil { 982 + return err 983 + } 984 + all <- result 961 985 } 962 - }() 986 + return nil 987 + }) 963 988 } 964 989 990 + // Once all goroutines in errgroup is done, we know nothing more will be 991 + // sent to all so close it. We rely on this sync point such that workersErr 992 + // will be written to before we are finished reading from all. 993 + var workersErr error 994 + go func() { 995 + workersErr = g.Wait() 996 + close(all) 997 + }() 998 + 999 + // Aggregate results from all. 965 1000 uniq := map[string]*zoekt.RepoListEntry{} 966 - 967 - for range shards { 968 - r := <-all 969 - if r.err != nil { 970 - return nil, r.err 971 - } 972 - 973 - agg.Crashes += r.rl.Crashes 974 - agg.Stats.Add(&r.rl.Stats) 1001 + for rl := range all { 1002 + agg.Crashes += rl.Crashes 1003 + agg.Stats.Add(&rl.Stats) 975 1004 976 - for _, r := range r.rl.Repos { 1005 + for _, r := range rl.Repos { 977 1006 prev, ok := uniq[r.Repository.Name] 978 1007 if !ok { 979 1008 cp := *r // We need to copy because we mutate r.Stats when merging duplicates ··· 983 1012 } 984 1013 } 985 1014 986 - for id, r := range r.rl.ReposMap { 1015 + for id, r := range rl.ReposMap { 987 1016 _, ok := agg.ReposMap[id] 988 1017 if !ok { 989 1018 agg.ReposMap[id] = r 990 1019 } 991 1020 } 1021 + } 1022 + 1023 + // workersErr will now be set since all is closed. 1024 + if workersErr != nil { 1025 + return nil, workersErr 992 1026 } 993 1027 994 1028 agg.Repos = make([]*zoekt.RepoListEntry, 0, len(uniq))