fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

rpc: cache encoding of queries (#159)

We are paying a large cost on horizontal clients since we encode the
query per replica. This is a potential solution where we add a new query
type which will cache the marshalled value. We plan to only use this for
places we speak gob encoding. As such we unwrap GobCache in the two
places we do gob decoding: Streaming and RPC.

+82 -2
+64
query/query.go
··· 15 15 package query 16 16 17 17 import ( 18 + "bytes" 19 + "encoding/gob" 18 20 "encoding/json" 19 21 "fmt" 20 22 "log" ··· 22 24 "regexp/syntax" 23 25 "sort" 24 26 "strings" 27 + "sync" 25 28 ) 26 29 27 30 var _ = log.Println ··· 29 32 // Q is a representation for a possibly hierarchical search query. 30 33 type Q interface { 31 34 String() string 35 + } 36 + 37 + // RPCUnwrap processes q to remove RPC specific elements from q. This is 38 + // needed because gob isn't flexible enough for us. This should be called by 39 + // RPC servers at the client/server boundary so that q works with the rest of 40 + // zoekt. 41 + func RPCUnwrap(q Q) Q { 42 + if cache, ok := q.(*GobCache); ok { 43 + return cache.Q 44 + } 45 + return q 32 46 } 33 47 34 48 // RawConfig filters repositories based on their encoded RawConfig map. ··· 326 340 case "auto": 327 341 q.CaseSensitive = (q.Regexp.String() != LowerRegexp(q.Regexp).String()) 328 342 } 343 + } 344 + 345 + // GobCache exists so we only pay the cost of marshalling a query once when we 346 + // aggregate it out over all the replicas. 347 + // 348 + // Our query and eval layer do not support GobCache. Instead, at the gob 349 + // boundaries (RPC and Streaming) we check if the Q is a GobCache and unwrap 350 + // it. 351 + // 352 + // "I wish we could get rid of this code soon enough" - tomas 353 + type GobCache struct { 354 + Q 355 + 356 + once sync.Once 357 + data []byte 358 + err error 359 + } 360 + 361 + // GobEncode implements gob.Encoder. 362 + func (q *GobCache) GobEncode() ([]byte, error) { 363 + q.once.Do(func() { 364 + var buf bytes.Buffer 365 + enc := gob.NewEncoder(&buf) 366 + q.err = enc.Encode(&gobWrapper{ 367 + WrappedQ: q.Q, 368 + }) 369 + q.data = buf.Bytes() 370 + }) 371 + return q.data, q.err 372 + } 373 + 374 + // GobDecode implements gob.Decoder. 375 + func (q *GobCache) GobDecode(data []byte) error { 376 + dec := gob.NewDecoder(bytes.NewBuffer(data)) 377 + var w gobWrapper 378 + err := dec.Decode(&w) 379 + if err != nil { 380 + return err 381 + } 382 + q.Q = w.WrappedQ 383 + return nil 384 + } 385 + 386 + // gobWrapper is needed so the gob decoder works. 387 + type gobWrapper struct { 388 + WrappedQ Q 389 + } 390 + 391 + func (q *GobCache) String() string { 392 + return fmt.Sprintf("GobCache(%s)", q.Q) 329 393 } 330 394 331 395 // Or is matched when any of its children is matched.
+9
rpc/internal/srv/srv.go
··· 42 42 defer cancel() 43 43 } 44 44 45 + if args.Q != nil { 46 + args.Q = query.RPCUnwrap(args.Q) 47 + } 48 + 45 49 r, err := s.Searcher.Search(ctx, args.Q, args.Opts) 46 50 if err != nil { 47 51 return err ··· 53 57 func (s *Searcher) List(ctx context.Context, args *ListArgs, reply *ListReply) error { 54 58 ctx, cancel := context.WithTimeout(ctx, defaultTimeout) 55 59 defer cancel() 60 + 61 + if args.Q != nil { 62 + args.Q = query.RPCUnwrap(args.Q) 63 + } 64 + 56 65 r, err := s.Searcher.List(ctx, args.Q, args.Opts) 57 66 if err != nil { 58 67 return err
+2 -1
rpc/rpc.go
··· 128 128 gob.Register(&query.And{}) 129 129 gob.Register(&query.Branch{}) 130 130 gob.Register(&query.Const{}) 131 + gob.Register(&query.GobCache{}) 131 132 gob.Register(&query.Language{}) 132 133 gob.Register(&query.Not{}) 133 134 gob.Register(&query.Or{}) 134 135 gob.Register(&query.Regexp{}) 135 - gob.Register(&query.RepoSet{}) 136 136 gob.Register(&query.RepoBranches{}) 137 + gob.Register(&query.RepoSet{}) 137 138 gob.Register(&query.Repo{}) 138 139 gob.Register(&query.Substring{}) 139 140 gob.Register(&query.Symbol{})
+5 -1
rpc/rpc_test.go
··· 44 44 client := rpc.Client(u.Host) 45 45 defer client.Close() 46 46 47 - r, err := client.Search(context.Background(), mock.WantSearch, &zoekt.SearchOptions{}) 47 + var cached query.Q = &query.GobCache{ 48 + Q: mock.WantSearch, 49 + } 50 + 51 + r, err := client.Search(context.Background(), cached, &zoekt.SearchOptions{}) 48 52 if err != nil { 49 53 t.Fatal(err) 50 54 }
+2
stream/stream.go
··· 59 59 return 60 60 } 61 61 62 + args.Q = query.RPCUnwrap(args.Q) 63 + 62 64 eventWriter, err := newEventStreamWriter(w) 63 65 if err != nil { 64 66 http.Error(w, err.Error(), http.StatusInternalServerError)