fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 12 kB View raw
1package search 2 3import ( 4 "context" 5 "log" 6 "os" 7 "strconv" 8 "strings" 9 "time" 10 11 "github.com/prometheus/client_golang/prometheus" 12 "github.com/prometheus/client_golang/prometheus/promauto" 13 "golang.org/x/sync/semaphore" 14) 15 16// Note: This is a Sourcegraph specific addition to allow long running queries 17// along normal interactive queries. 18 19// scheduler is for managing concurrent searches. 20type scheduler interface { 21 // Acquire blocks until a normal process is created (ie for a search 22 // request). See process documentation. It will only return an error if the 23 // context expires. 24 Acquire(ctx context.Context) (*process, error) 25} 26 27// The ZOEKTSCHED environment variable controls variables within the 28// scheduler. It is a comma-separated list of name=val pairs setting these 29// named variables: 30// 31// disable: setting disable=1 will use the old zoekt scheduler. 32// 33// batchdiv: settings batchDiv=X will make the batch queue size 1/X of the 34// interactive queue size. By default it is 4. 35// 36// interactiveseconds: settings interactiveseconds=X will allow search 37// queries to run in the larger interactive queue for Xs before moving them 38// to the batch queue. 39// 40// Note: these tuneables should be regarded as temporary while we experiment 41// with our scheduler in production. They should not be relied upon in 42// customers/sourcegraph.com in a permanent manor (only temporary). 43var zoektSched = parseTuneables(os.Getenv("ZOEKTSCHED")) 44 45// newScheduler returns a scheduler for use in searches. It will return a 46// multiScheduler unless that has been disabled with the environment variable 47// SCHED_DISABLE. If so it will an equivalent scheduler as upstream zoekt. 48func newScheduler(capacity int64) scheduler { 49 if zoektSched["disable"] == 1 { 50 log.Println("ZOEKTSCHED=disable=1 specified. Using old zoekt scheduler.") 51 return &semaphoreScheduler{ 52 throttle: semaphore.NewWeighted(capacity), 53 capacity: capacity, 54 } 55 } 56 return newMultiScheduler(capacity) 57} 58 59// multiScheduler is for managing concurrent searches. Its goals are: 60// 61// 1. Limit the number of concurrent searches. 62// 2. Co-operatively limit long running searches. 63// 3. No tuneables. 64// 65// ### Limit the number of concurrent searches 66// 67// Searching is CPU bound, so we can't do better than #CPU queries 68// concurrently. If we do so, we just create more memory pressure. 69// 70// ### Co-operatively limit long running searches 71// 72// Some searches are slow. Either due to a hard to execute search query (can't 73// use trigram index) or a large number of results. We want to support this 74// use case while still allowing interactive queries to be fast. 75// 76// ### No tuneables 77// 78// We want to avoid the need to tune the scheduler depending on the workload / 79// instance. As such we use a simple design whose inputs are time and number 80// of CPUs. 81// 82// ## Design 83// 84// We use semaphores to limit the number of running processes. A process 85// represents something which has acquired from the semaphore. An exclusive 86// process acquires the full semaphore. Every process is either fast or slow. A 87// process starts as fast, but is downgraded to slow after a period of time. 88// time. Downgrading relies on a process co-operatively deciding to downgrade. 89// 90// We intentionally keep the algorithm simple, but have a general interface to 91// allow improvements as we learn more. 92type multiScheduler struct { 93 semInteractive *sema 94 semBatch *sema 95 96 // interactiveDuration is how long we run a search query at interactive 97 // priority before downgrading it to a batch/slow query. 98 interactiveDuration time.Duration 99} 100 101func newMultiScheduler(capacity int64) *multiScheduler { 102 batchdiv := zoektSched["batchdiv"] 103 if batchdiv == 0 { 104 // Burst up to 1/4 of interactive capacity for batch. 105 batchdiv = 4 106 } else { 107 log.Printf("ZOEKTSCHED=batchdiv=%d specified. Batch queue size 1/%d of %d.", batchdiv, batchdiv, capacity) 108 } 109 110 batchCap := capacity / int64(batchdiv) 111 if batchCap == 0 { 112 batchCap = 1 113 } 114 115 interactiveseconds := zoektSched["interactiveseconds"] 116 if interactiveseconds == 0 { 117 interactiveseconds = 5 118 } else { 119 log.Printf("ZOEKTSCHED=interactiveseconds=%d specified. Search requests will move to batch queue after %d seconds.", interactiveseconds, interactiveseconds) 120 } 121 122 return &multiScheduler{ 123 semInteractive: newSema(capacity, "interactive"), 124 semBatch: newSema(batchCap, "batch"), 125 126 interactiveDuration: time.Duration(interactiveseconds) * time.Second, 127 } 128} 129 130// Acquire implements scheduler.Acquire. 131func (s *multiScheduler) Acquire(ctx context.Context) (*process, error) { 132 // There are two stages, interactive and batch. We first start by acquiring the interactive mode semaphore. 133 // At some point in the future (if this search request is expensive enough), 134 // yieldFunc will switch us to the batch mode semaphore. 135 // 136 // It's possible for "sem" to be nil if we fail while switching to batch. In this scenario, 137 // the nil value will prevent us from releasing twice. 138 139 sem := s.semInteractive 140 141 if err := sem.Acquire(ctx); err != nil { 142 return nil, err 143 } 144 145 return &process{ 146 releaseFunc: func() { 147 if sem != nil { 148 sem.Release() 149 sem = nil 150 } 151 }, 152 yieldTimer: newDeadlineTimer(time.Now().Add(s.interactiveDuration)), 153 yieldFunc: func(ctx context.Context) error { 154 if sem != nil { 155 sem.Release() 156 sem = nil 157 } 158 159 // Try to acquire batch. Only set sem if we succeed so we know we can 160 // clean it up. If this fails we assume the process will stop running 161 // (ctx has expired). 162 semNext := s.semBatch 163 if err := semNext.Acquire(ctx); err != nil { 164 return err 165 } 166 167 sem = semNext 168 return nil 169 }, 170 }, nil 171} 172 173// semaphoreScheduler shares a single semaphore for all searches. An exclusive 174// process acquires the full semaphore. This is equivalent to how concurrency 175// is managed in upstream. It exists as a fallback while we test 176// multiScheduler. 177type semaphoreScheduler struct { 178 throttle *semaphore.Weighted 179 capacity int64 180} 181 182// Acquire implements scheduler.Acquire. 183func (s *semaphoreScheduler) Acquire(ctx context.Context) (*process, error) { 184 return s.acquire(ctx, 1) 185} 186 187// Exclusive implements scheduler.Exclusive. 188func (s *semaphoreScheduler) Exclusive() *process { 189 // Won't error since context.Background won't expire. 190 proc, _ := s.acquire(context.Background(), s.capacity) 191 return proc 192} 193 194func (s *semaphoreScheduler) acquire(ctx context.Context, weight int64) (*process, error) { 195 if err := s.throttle.Acquire(ctx, weight); err != nil { 196 return nil, err 197 } 198 return &process{ 199 releaseFunc: func() { 200 s.throttle.Release(weight) 201 }, 202 }, nil 203} 204 205// process represents a running search query or an exclusive process. When the 206// process is done a call to Release is required. 207type process struct { 208 // yieldTimer ensures we only call yieldFunc once after a deadline. 209 yieldTimer *deadlineTimer 210 // yieldFunc is called once by Yield. 211 yieldFunc func(context.Context) error 212 213 // releaseFunc is called once by Release 214 releaseFunc func() 215} 216 217// Release the resources/locks/semaphores associated with this process. Can 218// only be called once. 219func (p *process) Release() { 220 if p.yieldTimer != nil { 221 p.yieldTimer.Stop() 222 } 223 224 p.releaseFunc() 225} 226 227// Yield may block to allow another process to run. This should be called 228// relatively often by a search to allow other processes to run. This can not 229// be called concurrently. 230// 231// The only error it will return is a context error if ctx expires. In that 232// case the process should stop running and call Release. 233func (p *process) Yield(ctx context.Context) error { 234 // Return immediately if we have already yielded or if we haven't used up our full timeslice 235 // (represented via yieldTimer). 236 if p.yieldTimer == nil || !p.yieldTimer.Exceeded() { 237 return nil 238 } 239 240 // We've just exceeded our timeslice. 241 242 // First, try to yield. This can return an error if our context expired. 243 err := p.yieldFunc(ctx) 244 if err != nil { 245 return err 246 } 247 248 // We've successfully yielded. Second, stop our timer and mark it nil so we don't call 249 // yieldFunc again. 250 p.yieldTimer.Stop() 251 p.yieldTimer = nil 252 253 return nil 254} 255 256// newDeadlineTimer returns a timer which fires after deadline. Once it fires 257// Exceeded will always return true. Callers must call Stop when done to 258// release resources. 259func newDeadlineTimer(deadline time.Time) *deadlineTimer { 260 return &deadlineTimer{ 261 t: time.NewTimer(time.Until(deadline)), 262 } 263} 264 265type deadlineTimer struct { 266 // t.C fires after deadline. Once it fires we set to nil to indicate it has 267 // fired. 268 t *time.Timer 269} 270 271// Exceeded returns true if time is after the deadline. 272func (t *deadlineTimer) Exceeded() bool { 273 if t.t == nil { 274 return true 275 } 276 select { 277 case <-t.t.C: 278 default: 279 return false 280 } 281 282 t.Stop() 283 284 return true 285} 286 287// Stop stops the underlying timer. Can be called multiple times. 288func (t *deadlineTimer) Stop() { 289 if t.t == nil { 290 return 291 } 292 t.t.Stop() 293 t.t = nil 294} 295 296// parseTuneables parses a comma separated string of key=value pairs. "=value" 297// is optional, defaults to 1. value is expected to be an int. Errors are 298// ignored (value will be 0). 299func parseTuneables(v string) map[string]int { 300 m := map[string]int{} 301 302 for _, kv := range strings.Split(v, ",") { 303 if kv == "" { 304 continue 305 } 306 307 p := strings.SplitN(kv, "=", 2) 308 if len(p) == 1 { 309 m[p[0]] = 1 310 } else { 311 m[p[0]], _ = strconv.Atoi(p[1]) 312 } 313 } 314 315 return m 316} 317 318// We use a gauge and counter to track the number of processes in each 319// state. They can be one of the following states: 320// 321// 1. global queued 322// 2. interactive queued 323// 3. interactive running 324// 4. batch queued 325// 5. batch running 326// 327// From each state you either transition to the next state or the process 328// ends. 329// 330// Additionally once a process transitions from "global queued" it will be 331// "global running" until termination. This is an additional state on top of 332// the ones listed above. 333// 334// Global refers to the global scheduler lock. A process can only be blocked 335// in global queued if an exclusive lock has been acquired. 336// 337// We have counters for each possible reason a process finished: 338// 339// - interactive timedout 340// - batch timedout 341// - released 342// 343// We have separate gauges and counters for exclusive processes which index 344// what we track for normal processes: 345// 346// - exclusive queued 347// - exclusive running 348var ( 349 metricSched = promauto.NewGaugeVec(prometheus.GaugeOpts{ 350 Name: "zoekt_shards_sched", 351 Help: "The current number of zoekt scheduler processes in a state.", 352 }, []string{"type", "state"}) 353 metricSchedTotal = promauto.NewCounterVec(prometheus.CounterOpts{ 354 Name: "zoekt_shards_sched_total", 355 Help: "The total number of zoekt scheduler processes in a state.", 356 }, []string{"type", "state"}) 357) 358 359// sema is a semaphore which tracks its state in prometheus. 360type sema struct { 361 sem *semaphore.Weighted 362 363 metricQueued *gaugeCounter 364 metricRunning *gaugeCounter 365 metricTimedoutTotal prometheus.Counter 366} 367 368func newSema(capacity int64, typ string) *sema { 369 return &sema{ 370 sem: semaphore.NewWeighted(capacity), 371 372 metricQueued: &gaugeCounter{ 373 gauge: metricSched.WithLabelValues(typ, "queued"), 374 counter: metricSchedTotal.WithLabelValues(typ, "queued"), 375 }, 376 metricRunning: &gaugeCounter{ 377 gauge: metricSched.WithLabelValues(typ, "running"), 378 counter: metricSchedTotal.WithLabelValues(typ, "running"), 379 }, 380 metricTimedoutTotal: metricSchedTotal.WithLabelValues(typ, "timedout"), 381 } 382} 383 384func (s *sema) Acquire(ctx context.Context) error { 385 s.metricQueued.Inc() 386 defer s.metricQueued.Dec() 387 388 err := s.sem.Acquire(ctx, 1) 389 if err != nil { 390 s.metricTimedoutTotal.Inc() 391 return err 392 } 393 394 s.metricRunning.Inc() 395 396 return nil 397} 398 399func (s *sema) Release() { 400 s.sem.Release(1) 401 s.metricRunning.Dec() 402} 403 404// gaugeCounter is a wrapper around a gauge and a counter. Whenever the gauge 405// is incremented so is the counter. Decrement only affects the gauge. 406type gaugeCounter struct { 407 gauge prometheus.Gauge 408 counter prometheus.Counter 409} 410 411func (m *gaugeCounter) Inc() { 412 m.gauge.Inc() 413 m.counter.Inc() 414} 415 416func (m *gaugeCounter) Dec() { 417 m.gauge.Dec() 418}