fork of https://github.com/sourcegraph/zoekt
1package search
2
3import (
4 "context"
5 "log"
6 "os"
7 "strconv"
8 "strings"
9 "time"
10
11 "github.com/prometheus/client_golang/prometheus"
12 "github.com/prometheus/client_golang/prometheus/promauto"
13 "golang.org/x/sync/semaphore"
14)
15
16// Note: This is a Sourcegraph specific addition to allow long running queries
17// along normal interactive queries.
18
19// scheduler is for managing concurrent searches.
20type scheduler interface {
21 // Acquire blocks until a normal process is created (ie for a search
22 // request). See process documentation. It will only return an error if the
23 // context expires.
24 Acquire(ctx context.Context) (*process, error)
25}
26
27// The ZOEKTSCHED environment variable controls variables within the
28// scheduler. It is a comma-separated list of name=val pairs setting these
29// named variables:
30//
31// disable: setting disable=1 will use the old zoekt scheduler.
32//
33// batchdiv: settings batchDiv=X will make the batch queue size 1/X of the
34// interactive queue size. By default it is 4.
35//
36// interactiveseconds: settings interactiveseconds=X will allow search
37// queries to run in the larger interactive queue for Xs before moving them
38// to the batch queue.
39//
40// Note: these tuneables should be regarded as temporary while we experiment
41// with our scheduler in production. They should not be relied upon in
42// customers/sourcegraph.com in a permanent manor (only temporary).
43var zoektSched = parseTuneables(os.Getenv("ZOEKTSCHED"))
44
45// newScheduler returns a scheduler for use in searches. It will return a
46// multiScheduler unless that has been disabled with the environment variable
47// SCHED_DISABLE. If so it will an equivalent scheduler as upstream zoekt.
48func newScheduler(capacity int64) scheduler {
49 if zoektSched["disable"] == 1 {
50 log.Println("ZOEKTSCHED=disable=1 specified. Using old zoekt scheduler.")
51 return &semaphoreScheduler{
52 throttle: semaphore.NewWeighted(capacity),
53 capacity: capacity,
54 }
55 }
56 return newMultiScheduler(capacity)
57}
58
59// multiScheduler is for managing concurrent searches. Its goals are:
60//
61// 1. Limit the number of concurrent searches.
62// 2. Co-operatively limit long running searches.
63// 3. No tuneables.
64//
65// ### Limit the number of concurrent searches
66//
67// Searching is CPU bound, so we can't do better than #CPU queries
68// concurrently. If we do so, we just create more memory pressure.
69//
70// ### Co-operatively limit long running searches
71//
72// Some searches are slow. Either due to a hard to execute search query (can't
73// use trigram index) or a large number of results. We want to support this
74// use case while still allowing interactive queries to be fast.
75//
76// ### No tuneables
77//
78// We want to avoid the need to tune the scheduler depending on the workload /
79// instance. As such we use a simple design whose inputs are time and number
80// of CPUs.
81//
82// ## Design
83//
84// We use semaphores to limit the number of running processes. A process
85// represents something which has acquired from the semaphore. An exclusive
86// process acquires the full semaphore. Every process is either fast or slow. A
87// process starts as fast, but is downgraded to slow after a period of time.
88// time. Downgrading relies on a process co-operatively deciding to downgrade.
89//
90// We intentionally keep the algorithm simple, but have a general interface to
91// allow improvements as we learn more.
92type multiScheduler struct {
93 semInteractive *sema
94 semBatch *sema
95
96 // interactiveDuration is how long we run a search query at interactive
97 // priority before downgrading it to a batch/slow query.
98 interactiveDuration time.Duration
99}
100
101func newMultiScheduler(capacity int64) *multiScheduler {
102 batchdiv := zoektSched["batchdiv"]
103 if batchdiv == 0 {
104 // Burst up to 1/4 of interactive capacity for batch.
105 batchdiv = 4
106 } else {
107 log.Printf("ZOEKTSCHED=batchdiv=%d specified. Batch queue size 1/%d of %d.", batchdiv, batchdiv, capacity)
108 }
109
110 batchCap := capacity / int64(batchdiv)
111 if batchCap == 0 {
112 batchCap = 1
113 }
114
115 interactiveseconds := zoektSched["interactiveseconds"]
116 if interactiveseconds == 0 {
117 interactiveseconds = 5
118 } else {
119 log.Printf("ZOEKTSCHED=interactiveseconds=%d specified. Search requests will move to batch queue after %d seconds.", interactiveseconds, interactiveseconds)
120 }
121
122 return &multiScheduler{
123 semInteractive: newSema(capacity, "interactive"),
124 semBatch: newSema(batchCap, "batch"),
125
126 interactiveDuration: time.Duration(interactiveseconds) * time.Second,
127 }
128}
129
130// Acquire implements scheduler.Acquire.
131func (s *multiScheduler) Acquire(ctx context.Context) (*process, error) {
132 // There are two stages, interactive and batch. We first start by acquiring the interactive mode semaphore.
133 // At some point in the future (if this search request is expensive enough),
134 // yieldFunc will switch us to the batch mode semaphore.
135 //
136 // It's possible for "sem" to be nil if we fail while switching to batch. In this scenario,
137 // the nil value will prevent us from releasing twice.
138
139 sem := s.semInteractive
140
141 if err := sem.Acquire(ctx); err != nil {
142 return nil, err
143 }
144
145 return &process{
146 releaseFunc: func() {
147 if sem != nil {
148 sem.Release()
149 sem = nil
150 }
151 },
152 yieldTimer: newDeadlineTimer(time.Now().Add(s.interactiveDuration)),
153 yieldFunc: func(ctx context.Context) error {
154 if sem != nil {
155 sem.Release()
156 sem = nil
157 }
158
159 // Try to acquire batch. Only set sem if we succeed so we know we can
160 // clean it up. If this fails we assume the process will stop running
161 // (ctx has expired).
162 semNext := s.semBatch
163 if err := semNext.Acquire(ctx); err != nil {
164 return err
165 }
166
167 sem = semNext
168 return nil
169 },
170 }, nil
171}
172
173// semaphoreScheduler shares a single semaphore for all searches. An exclusive
174// process acquires the full semaphore. This is equivalent to how concurrency
175// is managed in upstream. It exists as a fallback while we test
176// multiScheduler.
177type semaphoreScheduler struct {
178 throttle *semaphore.Weighted
179 capacity int64
180}
181
182// Acquire implements scheduler.Acquire.
183func (s *semaphoreScheduler) Acquire(ctx context.Context) (*process, error) {
184 return s.acquire(ctx, 1)
185}
186
187// Exclusive implements scheduler.Exclusive.
188func (s *semaphoreScheduler) Exclusive() *process {
189 // Won't error since context.Background won't expire.
190 proc, _ := s.acquire(context.Background(), s.capacity)
191 return proc
192}
193
194func (s *semaphoreScheduler) acquire(ctx context.Context, weight int64) (*process, error) {
195 if err := s.throttle.Acquire(ctx, weight); err != nil {
196 return nil, err
197 }
198 return &process{
199 releaseFunc: func() {
200 s.throttle.Release(weight)
201 },
202 }, nil
203}
204
205// process represents a running search query or an exclusive process. When the
206// process is done a call to Release is required.
207type process struct {
208 // yieldTimer ensures we only call yieldFunc once after a deadline.
209 yieldTimer *deadlineTimer
210 // yieldFunc is called once by Yield.
211 yieldFunc func(context.Context) error
212
213 // releaseFunc is called once by Release
214 releaseFunc func()
215}
216
217// Release the resources/locks/semaphores associated with this process. Can
218// only be called once.
219func (p *process) Release() {
220 if p.yieldTimer != nil {
221 p.yieldTimer.Stop()
222 }
223
224 p.releaseFunc()
225}
226
227// Yield may block to allow another process to run. This should be called
228// relatively often by a search to allow other processes to run. This can not
229// be called concurrently.
230//
231// The only error it will return is a context error if ctx expires. In that
232// case the process should stop running and call Release.
233func (p *process) Yield(ctx context.Context) error {
234 // Return immediately if we have already yielded or if we haven't used up our full timeslice
235 // (represented via yieldTimer).
236 if p.yieldTimer == nil || !p.yieldTimer.Exceeded() {
237 return nil
238 }
239
240 // We've just exceeded our timeslice.
241
242 // First, try to yield. This can return an error if our context expired.
243 err := p.yieldFunc(ctx)
244 if err != nil {
245 return err
246 }
247
248 // We've successfully yielded. Second, stop our timer and mark it nil so we don't call
249 // yieldFunc again.
250 p.yieldTimer.Stop()
251 p.yieldTimer = nil
252
253 return nil
254}
255
256// newDeadlineTimer returns a timer which fires after deadline. Once it fires
257// Exceeded will always return true. Callers must call Stop when done to
258// release resources.
259func newDeadlineTimer(deadline time.Time) *deadlineTimer {
260 return &deadlineTimer{
261 t: time.NewTimer(time.Until(deadline)),
262 }
263}
264
265type deadlineTimer struct {
266 // t.C fires after deadline. Once it fires we set to nil to indicate it has
267 // fired.
268 t *time.Timer
269}
270
271// Exceeded returns true if time is after the deadline.
272func (t *deadlineTimer) Exceeded() bool {
273 if t.t == nil {
274 return true
275 }
276 select {
277 case <-t.t.C:
278 default:
279 return false
280 }
281
282 t.Stop()
283
284 return true
285}
286
287// Stop stops the underlying timer. Can be called multiple times.
288func (t *deadlineTimer) Stop() {
289 if t.t == nil {
290 return
291 }
292 t.t.Stop()
293 t.t = nil
294}
295
296// parseTuneables parses a comma separated string of key=value pairs. "=value"
297// is optional, defaults to 1. value is expected to be an int. Errors are
298// ignored (value will be 0).
299func parseTuneables(v string) map[string]int {
300 m := map[string]int{}
301
302 for _, kv := range strings.Split(v, ",") {
303 if kv == "" {
304 continue
305 }
306
307 p := strings.SplitN(kv, "=", 2)
308 if len(p) == 1 {
309 m[p[0]] = 1
310 } else {
311 m[p[0]], _ = strconv.Atoi(p[1])
312 }
313 }
314
315 return m
316}
317
318// We use a gauge and counter to track the number of processes in each
319// state. They can be one of the following states:
320//
321// 1. global queued
322// 2. interactive queued
323// 3. interactive running
324// 4. batch queued
325// 5. batch running
326//
327// From each state you either transition to the next state or the process
328// ends.
329//
330// Additionally once a process transitions from "global queued" it will be
331// "global running" until termination. This is an additional state on top of
332// the ones listed above.
333//
334// Global refers to the global scheduler lock. A process can only be blocked
335// in global queued if an exclusive lock has been acquired.
336//
337// We have counters for each possible reason a process finished:
338//
339// - interactive timedout
340// - batch timedout
341// - released
342//
343// We have separate gauges and counters for exclusive processes which index
344// what we track for normal processes:
345//
346// - exclusive queued
347// - exclusive running
348var (
349 metricSched = promauto.NewGaugeVec(prometheus.GaugeOpts{
350 Name: "zoekt_shards_sched",
351 Help: "The current number of zoekt scheduler processes in a state.",
352 }, []string{"type", "state"})
353 metricSchedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
354 Name: "zoekt_shards_sched_total",
355 Help: "The total number of zoekt scheduler processes in a state.",
356 }, []string{"type", "state"})
357)
358
359// sema is a semaphore which tracks its state in prometheus.
360type sema struct {
361 sem *semaphore.Weighted
362
363 metricQueued *gaugeCounter
364 metricRunning *gaugeCounter
365 metricTimedoutTotal prometheus.Counter
366}
367
368func newSema(capacity int64, typ string) *sema {
369 return &sema{
370 sem: semaphore.NewWeighted(capacity),
371
372 metricQueued: &gaugeCounter{
373 gauge: metricSched.WithLabelValues(typ, "queued"),
374 counter: metricSchedTotal.WithLabelValues(typ, "queued"),
375 },
376 metricRunning: &gaugeCounter{
377 gauge: metricSched.WithLabelValues(typ, "running"),
378 counter: metricSchedTotal.WithLabelValues(typ, "running"),
379 },
380 metricTimedoutTotal: metricSchedTotal.WithLabelValues(typ, "timedout"),
381 }
382}
383
384func (s *sema) Acquire(ctx context.Context) error {
385 s.metricQueued.Inc()
386 defer s.metricQueued.Dec()
387
388 err := s.sem.Acquire(ctx, 1)
389 if err != nil {
390 s.metricTimedoutTotal.Inc()
391 return err
392 }
393
394 s.metricRunning.Inc()
395
396 return nil
397}
398
399func (s *sema) Release() {
400 s.sem.Release(1)
401 s.metricRunning.Dec()
402}
403
404// gaugeCounter is a wrapper around a gauge and a counter. Whenever the gauge
405// is incremented so is the counter. Decrement only affects the gauge.
406type gaugeCounter struct {
407 gauge prometheus.Gauge
408 counter prometheus.Counter
409}
410
411func (m *gaugeCounter) Inc() {
412 m.gauge.Inc()
413 m.counter.Inc()
414}
415
416func (m *gaugeCounter) Dec() {
417 m.gauge.Dec()
418}