fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

Extract samplingSender and use it for gRPC (#637)

author
Camden Cheek
committer
GitHub
date (Aug 11, 2023, 12:13 PM -0600) commit 956d775e parent d5723536
+133 -40
+6 -1
grpc/server.go
··· 46 46 onMatch := stream.SenderFunc(func(res *zoekt.SearchResult) { 47 47 ss.Send(res.ToProto()) 48 48 }) 49 + sampler := stream.NewSamplingSender(onMatch) 49 50 50 - return s.streamer.StreamSearch(ss.Context(), q, zoekt.SearchOptionsFromProto(req.GetOpts()), onMatch) 51 + err = s.streamer.StreamSearch(ss.Context(), q, zoekt.SearchOptionsFromProto(req.GetOpts()), sampler) 52 + if err == nil { 53 + sampler.Flush() 54 + } 55 + return err 51 56 } 52 57 53 58 func (s *Server) List(ctx context.Context, req *v1.ListRequest) (*v1.ListResponse, error) {
+62 -39
stream/stream.go
··· 76 76 } 77 77 }() 78 78 79 - agg := zoekt.SearchResult{} 80 - aggCount := 0 81 - 82 79 send := func(zsr *zoekt.SearchResult) { 83 - 84 80 err := eventWriter.event(eventMatches, zsr) 85 81 if err != nil { 86 82 _ = eventWriter.event(eventError, err) ··· 88 84 } 89 85 } 90 86 91 - err = h.Searcher.StreamSearch(ctx, args.Q, args.Opts, SenderFunc(func(event *zoekt.SearchResult) { 92 - // We don't want to send events over the wire if they don't contain file 93 - // matches. Hence, in case we didn't find any results, we aggregate the stats 94 - // and send them out in regular intervals. 95 - if len(event.Files) == 0 { 96 - aggCount++ 97 - 98 - agg.Stats.Add(event.Stats) 99 - agg.Progress = event.Progress 87 + sampler := NewSamplingSender(SenderFunc(send)) 100 88 101 - if aggCount%100 == 0 && !agg.Stats.Zero() { 102 - send(&agg) 103 - agg = zoekt.SearchResult{} 104 - } 89 + err = h.Searcher.StreamSearch(ctx, args.Q, args.Opts, sampler) 105 90 106 - return 107 - } 108 - 109 - // If we have aggregate stats, we merge them with the new event before sending 110 - // it. We drop agg.Progress, because we assume that event.Progress reflects the 111 - // latest status. 112 - if !agg.Stats.Zero() { 113 - event.Stats.Add(agg.Stats) 114 - agg = zoekt.SearchResult{} 115 - } 116 - 117 - send(event) 118 - })) 119 - 120 - if err == nil && !agg.Stats.Zero() { 121 - send(&zoekt.SearchResult{ 122 - Stats: agg.Stats, 123 - Progress: zoekt.Progress{ 124 - Priority: math.Inf(-1), 125 - MaxPendingPriority: math.Inf(-1), 126 - }, 127 - }) 91 + if err == nil { 92 + sampler.Flush() 128 93 } 129 94 130 95 if err != nil { ··· 184 149 }) 185 150 rpc.RegisterGob() 186 151 } 152 + 153 + // NewSamplingSender is a zoekt.Sender that samples stats events 154 + // to avoid sending many empty stats events over the wire. 155 + func NewSamplingSender(next zoekt.Sender) *samplingSender { 156 + return &samplingSender{ 157 + next: next, 158 + agg: zoekt.SearchResult{}, 159 + aggCount: 0, 160 + } 161 + } 162 + 163 + type samplingSender struct { 164 + next zoekt.Sender 165 + agg zoekt.SearchResult 166 + aggCount int 167 + } 168 + 169 + func (s *samplingSender) Send(event *zoekt.SearchResult) { 170 + // We don't want to send events over the wire if they don't contain file 171 + // matches. Hence, in case we didn't find any results, we aggregate the stats 172 + // and send them out in regular intervals. 173 + if len(event.Files) == 0 { 174 + s.aggCount++ 175 + 176 + s.agg.Stats.Add(event.Stats) 177 + s.agg.Progress = event.Progress 178 + 179 + if s.aggCount%100 == 0 && !s.agg.Stats.Zero() { 180 + s.next.Send(&s.agg) 181 + s.agg = zoekt.SearchResult{} 182 + } 183 + 184 + return 185 + } 186 + 187 + // If we have aggregate stats, we merge them with the new event before sending 188 + // it. We drop agg.Progress, because we assume that event.Progress reflects the 189 + // latest status. 190 + if !s.agg.Stats.Zero() { 191 + event.Stats.Add(s.agg.Stats) 192 + s.agg = zoekt.SearchResult{} 193 + } 194 + 195 + s.next.Send(event) 196 + } 197 + 198 + // Flush sends any aggregated stats that we haven't sent yet 199 + func (s *samplingSender) Flush() { 200 + if !s.agg.Stats.Zero() { 201 + s.next.Send(&zoekt.SearchResult{ 202 + Stats: s.agg.Stats, 203 + Progress: zoekt.Progress{ 204 + Priority: math.Inf(-1), 205 + MaxPendingPriority: math.Inf(-1), 206 + }, 207 + }) 208 + } 209 + }
+65
stream/stream_test.go
··· 195 195 sender.Send(sr) 196 196 return nil 197 197 } 198 + 199 + func TestSamplingStream(t *testing.T) { 200 + nonZeroStats := zoekt.Stats{ 201 + ContentBytesLoaded: 10, 202 + } 203 + filesEvent := &zoekt.SearchResult{ 204 + Files: make([]zoekt.FileMatch, 10), 205 + Stats: nonZeroStats, 206 + } 207 + fileEvents := func(n int) []*zoekt.SearchResult { 208 + res := make([]*zoekt.SearchResult, n) 209 + for i := 0; i < n; i++ { 210 + res[i] = filesEvent 211 + } 212 + return res 213 + } 214 + statsEvent := &zoekt.SearchResult{ 215 + Stats: nonZeroStats, 216 + } 217 + statsEvents := func(n int) []*zoekt.SearchResult { 218 + res := make([]*zoekt.SearchResult, n) 219 + for i := 0; i < n; i++ { 220 + res[i] = statsEvent 221 + } 222 + return res 223 + } 224 + cases := []struct { 225 + events []*zoekt.SearchResult 226 + beforeFlushCount int 227 + afterFlushCount int 228 + }{ 229 + // These test cases assume that the sampler only forwards 230 + // every 100 stats-only event. In case the sampling logic 231 + // changes, these tests are not valuable. 232 + {nil, 0, 0}, 233 + {fileEvents(1), 1, 1}, 234 + {fileEvents(2), 2, 2}, 235 + {fileEvents(200), 200, 200}, 236 + {append(fileEvents(1), statsEvents(1)...), 1, 2}, 237 + {append(fileEvents(1), statsEvents(2)...), 1, 2}, 238 + {append(fileEvents(1), statsEvents(99)...), 1, 2}, 239 + {append(fileEvents(1), statsEvents(100)...), 2, 2}, 240 + {statsEvents(500), 5, 5}, 241 + {statsEvents(501), 5, 6}, 242 + } 243 + 244 + for _, tc := range cases { 245 + count := 0 246 + ss := NewSamplingSender(SenderFunc(func(*zoekt.SearchResult) { 247 + count += 1 248 + })) 249 + 250 + for _, event := range tc.events { 251 + ss.Send(event) 252 + } 253 + if count != tc.beforeFlushCount { 254 + t.Fatalf("expected %d events, got %d", tc.beforeFlushCount, count) 255 + } 256 + ss.Flush() 257 + 258 + if count != tc.afterFlushCount { 259 + t.Fatalf("expected %d events, got %d", tc.afterFlushCount, count) 260 + } 261 + } 262 + }