fork of https://github.com/sourcegraph/zoekt
1// Copyright 2017 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package shards
16
17import (
18 "fmt"
19 "log"
20 "os"
21 "path/filepath"
22 "sort"
23 "strconv"
24 "strings"
25 "sync"
26 "time"
27
28 "github.com/fsnotify/fsnotify"
29 "github.com/sourcegraph/zoekt"
30)
31
32type shardLoader interface {
33 // Load a new file.
34 load(filenames ...string)
35 drop(filenames ...string)
36}
37
38type DirectoryWatcher struct {
39 dir string
40 timestamps map[string]time.Time
41 loader shardLoader
42
43 // closed once ready
44 ready chan struct{}
45 readyErr error
46
47 closeOnce sync.Once
48 // quit is closed by Close to signal the directory watcher to stop.
49 quit chan struct{}
50 // stopped is closed once the directory watcher has stopped.
51 stopped chan struct{}
52}
53
54func (sw *DirectoryWatcher) Stop() {
55 sw.closeOnce.Do(func() {
56 close(sw.quit)
57 <-sw.stopped
58 })
59}
60
61func newDirectoryWatcher(dir string, loader shardLoader) (*DirectoryWatcher, error) {
62 sw := &DirectoryWatcher{
63 dir: dir,
64 timestamps: map[string]time.Time{},
65 loader: loader,
66 ready: make(chan struct{}),
67 quit: make(chan struct{}),
68 stopped: make(chan struct{}),
69 }
70
71 go func() {
72 defer close(sw.ready)
73
74 if err := sw.scan(); err != nil {
75 sw.readyErr = err
76 return
77 }
78
79 if err := sw.watch(); err != nil {
80 sw.readyErr = err
81 return
82 }
83 }()
84
85 return sw, nil
86}
87
88func (s *DirectoryWatcher) WaitUntilReady() error {
89 <-s.ready
90 return s.readyErr
91}
92
93func (s *DirectoryWatcher) String() string {
94 return fmt.Sprintf("shardWatcher(%s)", s.dir)
95}
96
97// versionFromPath extracts url encoded repository name and
98// index format version from a shard name from builder.
99func versionFromPath(path string) (string, int) {
100 und := strings.LastIndex(path, "_")
101 if und < 0 {
102 return path, 0
103 }
104
105 dot := strings.Index(path[und:], ".")
106 if dot < 0 {
107 return path, 0
108 }
109 dot += und
110
111 version, err := strconv.Atoi(path[und+2 : dot])
112 if err != nil {
113 return path, 0
114 }
115
116 return path[:und], version
117}
118
119func (s *DirectoryWatcher) scan() error {
120 // NOTE: if you change which file extensions are read, please update the
121 // watch implementation.
122 fs, err := filepath.Glob(filepath.Join(s.dir, "*.zoekt"))
123 if err != nil {
124 return err
125 }
126
127 latest := map[string]int{}
128 for _, fn := range fs {
129 name, version := versionFromPath(fn)
130
131 // In the case of downgrades, avoid reading
132 // newer index formats.
133 if version > zoekt.IndexFormatVersion && version > zoekt.NextIndexFormatVersion {
134 continue
135 }
136
137 if latest[name] < version {
138 latest[name] = version
139 }
140 }
141
142 ts := map[string]time.Time{}
143 for _, fn := range fs {
144 if name, version := versionFromPath(fn); latest[name] != version {
145 continue
146 }
147
148 fi, err := os.Lstat(fn)
149 if err != nil {
150 continue
151 }
152
153 ts[fn] = fi.ModTime()
154
155 fiMeta, err := os.Lstat(fn + ".meta")
156 if err != nil {
157 continue
158 }
159 if fiMeta.ModTime().After(fi.ModTime()) {
160 ts[fn] = fiMeta.ModTime()
161 }
162 }
163
164 var toLoad []string
165 for k, mtime := range ts {
166 if t, ok := s.timestamps[k]; !ok || t != mtime {
167 toLoad = append(toLoad, k)
168 s.timestamps[k] = mtime
169 }
170 }
171
172 var toDrop []string
173 // Unload deleted shards.
174 for k := range s.timestamps {
175 if _, ok := ts[k]; !ok {
176 toDrop = append(toDrop, k)
177 delete(s.timestamps, k)
178 }
179 }
180
181 if len(toDrop) > 0 {
182 log.Printf("unloading %d shard(s): %s", len(toDrop), humanTruncateList(toDrop, 5))
183 }
184
185 s.loader.drop(toDrop...)
186 s.loader.load(toLoad...)
187
188 return nil
189}
190
191func humanTruncateList(paths []string, max int) string {
192 sort.Strings(paths)
193 var b strings.Builder
194 for i, p := range paths {
195 if i >= max {
196 fmt.Fprintf(&b, "... %d more", len(paths)-i)
197 break
198 }
199 if i > 0 {
200 b.WriteString(", ")
201 }
202 b.WriteString(filepath.Base(p))
203 }
204 return b.String()
205}
206
207func (s *DirectoryWatcher) watch() error {
208 watcher, err := fsnotify.NewWatcher()
209 if err != nil {
210 return err
211 }
212 if err := watcher.Add(s.dir); err != nil {
213 return err
214 }
215
216 // intermediate signal channel so if there are multiple watcher.Events we
217 // only call scan once.
218 signal := make(chan struct{}, 1)
219
220 go func() {
221 notify := func() {
222 select {
223 case signal <- struct{}{}:
224 default:
225 }
226 }
227
228 ticker := time.NewTicker(time.Minute)
229
230 for {
231 select {
232 case event := <-watcher.Events:
233 // Only notify if a file we read in has changed. This is important to
234 // avoid all the events writing to temporary files.
235 if strings.HasSuffix(event.Name, ".zoekt") || strings.HasSuffix(event.Name, ".meta") {
236 notify()
237 }
238
239 case <-ticker.C:
240 // Periodically just double check the disk
241 notify()
242
243 case err := <-watcher.Errors:
244 // Ignore ErrEventOverflow since we rely on the presence of events so
245 // safe to ignore.
246 if err != nil && err != fsnotify.ErrEventOverflow {
247 log.Println("watcher error:", err)
248 }
249
250 case <-s.quit:
251 watcher.Close()
252 ticker.Stop()
253 close(signal)
254 return
255 }
256 }
257 }()
258
259 go func() {
260 defer close(s.stopped)
261 for range signal {
262 if err := s.scan(); err != nil {
263 log.Println("watcher error:", err)
264 }
265 }
266 }()
267
268 return nil
269}