fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

indexserver: introduce indexMutex (#389)

This struct abstracts away how we do locking on the index directory in a
process. Previously we had a very coarse sync.Mutex whenever kicking off
an operation which mutated the directory. We plan on introducing some
concurrency to allow multiple repositories to be indexed at once.
However, this requires us to distinguish from global locks vs locking
just for a repository. This is what indexMutex achieves.

We don't yet use "indexMutex.With" concurrently, but will in an upcoming
commit.

Additionally a slight bug is fixed on how we report indexing times.
Previously we included the time taken to acquire the index lock for
elapsed time. This means indexing might be reported to take a very long
time, but in reality we just are blocked on waiting for merge or cleanup
to finish.

Because of this bug included are a few very coarse metrics. We could go
much deeper like we do in sched.go. However, I believe this handles some
interesting cases so will leave that as follow-up work. Now when we have
the queue being very slow to process, this might point towards the
global lock being the issue.

Test Plan: go test -race

plz-review-url: https://plz.review/review/6336

+131 -31
+13 -11
cmd/zoekt-sourcegraph-indexserver/cleanup.go
··· 437 437 if _, err := os.Stat(filepath.Join(s.IndexDir, "EXPLODE")); err == nil { 438 438 cmd := exec.Command("zoekt-merge-index", "explode", path) 439 439 440 - s.muIndexDir.Lock() 441 - b, err := cmd.CombinedOutput() 442 - s.muIndexDir.Unlock() 440 + var b []byte 441 + s.muIndexDir.Global(func() { 442 + b, err = cmd.CombinedOutput() 443 + }) 443 444 444 445 if err != nil { 445 446 debug.Printf("failed to explode compound shard %s: %s", path, string(b)) ··· 453 454 debug.Printf("failed getting all file paths for %s", path) 454 455 continue 455 456 } 456 - s.muIndexDir.Lock() 457 - for _, p := range paths { 458 - os.Remove(p) 459 - } 460 - s.muIndexDir.Unlock() 457 + s.muIndexDir.Global(func() { 458 + for _, p := range paths { 459 + os.Remove(p) 460 + } 461 + }) 461 462 shardsLog(s.IndexDir, "delete", []shard{{Path: path}}) 462 463 continue 463 464 } 464 465 } 465 466 466 - s.muIndexDir.Lock() 467 - removed, err := removeTombstones(path) 468 - s.muIndexDir.Unlock() 467 + var removed []*zoekt.Repository 468 + s.muIndexDir.Global(func() { 469 + removed, err = removeTombstones(path) 470 + }) 469 471 470 472 if err != nil { 471 473 debug.Printf("error while removing tombstones in %s: %s", fn, err)
+86
cmd/zoekt-sourcegraph-indexserver/index_mutex.go
··· 1 + package main 2 + 3 + import ( 4 + "sync" 5 + 6 + "github.com/prometheus/client_golang/prometheus" 7 + "github.com/prometheus/client_golang/prometheus/promauto" 8 + ) 9 + 10 + // indexMutex is the concurrency control we have for operations that operate 11 + // on the index directory. We have two broad operations: global and repository 12 + // specific. A global operation is like a write lock on the whole directory. A 13 + // repository operation ensure we don't have multiple operations happening for 14 + // the same repository. 15 + type indexMutex struct { 16 + // indexMu protects state in index directory. global takes write lock, repo 17 + // takes read lock. 18 + indexMu sync.RWMutex 19 + 20 + // runningMu protects running. You need to first be holding indexMu. 21 + runningMu sync.Mutex 22 + 23 + // running maps by name since that is what we key by on disk. Once we start 24 + // keying by repo ID on disk, we should switch to uint32. 25 + running map[string]struct{} 26 + } 27 + 28 + func (m *indexMutex) With(repoName string, f func()) bool { 29 + m.indexMu.RLock() 30 + defer m.indexMu.RUnlock() 31 + 32 + // init running; check and set running[repoName] 33 + m.runningMu.Lock() 34 + if m.running == nil { 35 + m.running = map[string]struct{}{} 36 + } 37 + _, alreadyRunning := m.running[repoName] 38 + m.running[repoName] = struct{}{} 39 + m.runningMu.Unlock() 40 + 41 + if alreadyRunning { 42 + metricIndexMutexAlreadyRunning.Inc() 43 + return false 44 + } 45 + 46 + // release running[repoName] 47 + defer func() { 48 + m.runningMu.Lock() 49 + delete(m.running, repoName) 50 + m.runningMu.Unlock() 51 + }() 52 + 53 + metricIndexMutexRepo.Inc() 54 + defer metricIndexMutexRepo.Dec() 55 + 56 + f() 57 + 58 + return true 59 + } 60 + 61 + func (m *indexMutex) Global(f func()) { 62 + metricIndexMutexGlobal.Inc() 63 + defer metricIndexMutexGlobal.Dec() 64 + 65 + m.indexMu.Lock() 66 + defer m.indexMu.Unlock() 67 + 68 + f() 69 + } 70 + 71 + var ( 72 + metricIndexMutexAlreadyRunning = promauto.NewCounter(prometheus.CounterOpts{ 73 + Name: "index_mutex_already_running_total", 74 + Help: "Total number of times we skipped processing a repository since an index was already running.", 75 + }) 76 + 77 + metricIndexMutexGlobal = promauto.NewGauge(prometheus.GaugeOpts{ 78 + Name: "index_mutex_global", 79 + Help: "The number of goroutines trying to or holding the global lock.", 80 + }) 81 + 82 + metricIndexMutexRepo = promauto.NewGauge(prometheus.GaugeOpts{ 83 + Name: "index_mutex_repository", 84 + Help: "The number of goroutines successfully holding a repo lock.", 85 + }) 86 + )
+32 -20
cmd/zoekt-sourcegraph-indexserver/main.go
··· 163 163 164 164 queue Queue 165 165 166 - // Protects the index directory from concurrent access. 167 - muIndexDir sync.Mutex 166 + // muIndexDir protects the index directory from concurrent access. 167 + muIndexDir indexMutex 168 168 169 169 // If true, shard merging is enabled. 170 170 shardMerging bool ··· 322 322 cleanupDone := make(chan struct{}) 323 323 go func() { 324 324 defer close(cleanupDone) 325 - s.muIndexDir.Lock() 326 - cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 327 - s.muIndexDir.Unlock() 325 + s.muIndexDir.Global(func() { 326 + cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 327 + }) 328 328 }() 329 329 330 330 repos.IterateIndexOptions(s.queue.AddOrUpdate) ··· 375 375 time.Sleep(time.Second) 376 376 continue 377 377 } 378 - start := time.Now() 378 + 379 379 args := s.indexArgs(opts) 380 380 381 - s.muIndexDir.Lock() 382 - state, err := s.Index(args) 383 - s.muIndexDir.Unlock() 381 + alreadyRunning := s.muIndexDir.With(opts.Name, func() { 382 + // only record time taken once we hold the lock. This avoids us 383 + // recording time taken while merging/cleanup runs. 384 + start := time.Now() 384 385 385 - elapsed := time.Since(start) 386 + state, err := s.Index(args) 386 387 387 - metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 388 + elapsed := time.Since(start) 388 389 389 - if err != nil { 390 - log.Printf("error indexing %s: %s", args.String(), err) 391 - } 390 + metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 392 391 393 - switch state { 394 - case indexStateSuccess: 395 - log.Printf("updated index %s in %v", args.String(), elapsed) 396 - case indexStateSuccessMeta: 397 - log.Printf("updated meta %s in %v", args.String(), elapsed) 392 + if err != nil { 393 + log.Printf("error indexing %s: %s", args.String(), err) 394 + } 395 + 396 + switch state { 397 + case indexStateSuccess: 398 + log.Printf("updated index %s in %v", args.String(), elapsed) 399 + case indexStateSuccessMeta: 400 + log.Printf("updated meta %s in %v", args.String(), elapsed) 401 + } 402 + s.queue.SetIndexed(opts, state) 403 + }) 404 + 405 + if alreadyRunning { 406 + // Someone else is processing the repository. We can just skip this job 407 + // since the repository will be added back to the queue and we will 408 + // converge to the correct behaviour. 409 + debug.Printf("index job for repository already running: %s", args) 410 + continue 398 411 } 399 - s.queue.SetIndexed(opts, state) 400 412 } 401 413 } 402 414