fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "sync"
5
6 "github.com/prometheus/client_golang/prometheus"
7 "github.com/prometheus/client_golang/prometheus/promauto"
8)
9
10// indexMutex is the concurrency control we have for operations that operate
11// on the index directory. We have two broad operations: global and repository
12// specific. A global operation is like a write lock on the whole directory. A
13// repository operation ensure we don't have multiple operations happening for
14// the same repository.
15type indexMutex struct {
16 // indexMu protects state in index directory. global takes write lock, repo
17 // takes read lock.
18 indexMu sync.RWMutex
19
20 // runningMu protects running. You need to first be holding indexMu.
21 runningMu sync.Mutex
22
23 // running maps by name since that is what we key by on disk. Once we start
24 // keying by repo ID on disk, we should switch to uint32.
25 running map[string]struct{}
26}
27
28// With runs f if no other f with the same repoName is running. If f runs true
29// is returned, otherwise false is returned.
30//
31// With blocks if f runs or the Global lock is held.
32func (m *indexMutex) With(repoName string, f func()) bool {
33 m.indexMu.RLock()
34 defer m.indexMu.RUnlock()
35
36 // init running; check and set running[repoName]
37 m.runningMu.Lock()
38 if m.running == nil {
39 m.running = map[string]struct{}{}
40 }
41 _, alreadyRunning := m.running[repoName]
42 m.running[repoName] = struct{}{}
43 m.runningMu.Unlock()
44
45 if alreadyRunning {
46 metricIndexMutexAlreadyRunning.Inc()
47 return false
48 }
49
50 // release running[repoName]
51 defer func() {
52 m.runningMu.Lock()
53 delete(m.running, repoName)
54 m.runningMu.Unlock()
55 }()
56
57 metricIndexMutexRepo.Inc()
58 defer metricIndexMutexRepo.Dec()
59
60 f()
61
62 return true
63}
64
65// Global runs f once the global lock is held. IE no other Global or With f's
66// will be running.
67func (m *indexMutex) Global(f func()) {
68 metricIndexMutexGlobal.Inc()
69 defer metricIndexMutexGlobal.Dec()
70
71 m.indexMu.Lock()
72 defer m.indexMu.Unlock()
73
74 f()
75}
76
77var (
78 metricIndexMutexAlreadyRunning = promauto.NewCounter(prometheus.CounterOpts{
79 Name: "index_mutex_already_running_total",
80 Help: "Total number of times we skipped processing a repository since an index was already running.",
81 })
82
83 metricIndexMutexGlobal = promauto.NewGauge(prometheus.GaugeOpts{
84 Name: "index_mutex_global",
85 Help: "The number of goroutines trying to or holding the global lock.",
86 })
87
88 metricIndexMutexRepo = promauto.NewGauge(prometheus.GaugeOpts{
89 Name: "index_mutex_repository",
90 Help: "The number of goroutines successfully holding a repo lock.",
91 })
92)