fork of https://github.com/sourcegraph/zoekt
1// Copyright 2017 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package search
16
17import (
18 "fmt"
19 "log"
20 "os"
21 "path/filepath"
22 "sort"
23 "strconv"
24 "strings"
25 "sync"
26 "time"
27
28 "github.com/fsnotify/fsnotify"
29
30 "github.com/sourcegraph/zoekt/index"
31)
32
33type shardLoader interface {
34 // Load a new file.
35 load(filenames ...string)
36 drop(filenames ...string)
37}
38
39type DirectoryWatcher struct {
40 dir string
41 timestamps map[string]time.Time
42 loader shardLoader
43
44 // closed once ready
45 ready chan struct{}
46 readyErr error
47
48 closeOnce sync.Once
49 // quit is closed by Close to signal the directory watcher to stop.
50 quit chan struct{}
51 // stopped is closed once the directory watcher has stopped.
52 stopped chan struct{}
53}
54
55func (sw *DirectoryWatcher) Stop() {
56 sw.closeOnce.Do(func() {
57 close(sw.quit)
58 <-sw.stopped
59 })
60}
61
62func newDirectoryWatcher(dir string, loader shardLoader) (*DirectoryWatcher, error) {
63 sw := &DirectoryWatcher{
64 dir: dir,
65 timestamps: map[string]time.Time{},
66 loader: loader,
67 ready: make(chan struct{}),
68 quit: make(chan struct{}),
69 stopped: make(chan struct{}),
70 }
71
72 go func() {
73 defer close(sw.ready)
74
75 if err := sw.scan(); err != nil {
76 sw.readyErr = err
77 return
78 }
79
80 if err := sw.watch(); err != nil {
81 sw.readyErr = err
82 return
83 }
84 }()
85
86 return sw, nil
87}
88
89func (s *DirectoryWatcher) WaitUntilReady() error {
90 <-s.ready
91 return s.readyErr
92}
93
94func (s *DirectoryWatcher) String() string {
95 return fmt.Sprintf("shardWatcher(%s)", s.dir)
96}
97
98// versionFromPath extracts url encoded repository name and
99// index format version from a shard name from builder.
100func versionFromPath(path string) (string, int) {
101 und := strings.LastIndex(path, "_")
102 if und < 0 {
103 return path, 0
104 }
105
106 dot := strings.Index(path[und:], ".")
107 if dot < 0 {
108 return path, 0
109 }
110 dot += und
111
112 version, err := strconv.Atoi(path[und+2 : dot])
113 if err != nil {
114 return path, 0
115 }
116
117 return path[:und], version
118}
119
120func (s *DirectoryWatcher) scan() error {
121 // NOTE: if you change which file extensions are read, please update the
122 // watch implementation.
123 fs, err := filepath.Glob(filepath.Join(s.dir, "*.zoekt"))
124 if err != nil {
125 return err
126 }
127
128 latest := map[string]int{}
129 for _, fn := range fs {
130 name, version := versionFromPath(fn)
131
132 // In the case of downgrades, avoid reading
133 // newer index formats.
134 if version > index.IndexFormatVersion && version > index.NextIndexFormatVersion {
135 continue
136 }
137
138 if latest[name] < version {
139 latest[name] = version
140 }
141 }
142
143 ts := map[string]time.Time{}
144 for _, fn := range fs {
145 if name, version := versionFromPath(fn); latest[name] != version {
146 continue
147 }
148
149 fi, err := os.Lstat(fn)
150 if err != nil {
151 continue
152 }
153
154 ts[fn] = fi.ModTime()
155
156 fiMeta, err := os.Lstat(fn + ".meta")
157 if err != nil {
158 continue
159 }
160 if fiMeta.ModTime().After(fi.ModTime()) {
161 ts[fn] = fiMeta.ModTime()
162 }
163 }
164
165 var toLoad []string
166 for k, mtime := range ts {
167 if t, ok := s.timestamps[k]; !ok || t != mtime {
168 toLoad = append(toLoad, k)
169 s.timestamps[k] = mtime
170 }
171 }
172
173 var toDrop []string
174 // Unload deleted shards.
175 for k := range s.timestamps {
176 if _, ok := ts[k]; !ok {
177 toDrop = append(toDrop, k)
178 delete(s.timestamps, k)
179 }
180 }
181
182 if len(toDrop) > 0 {
183 log.Printf("[INFO] unloading %d shard(s): %s", len(toDrop), humanTruncateList(toDrop, 5))
184 }
185
186 s.loader.drop(toDrop...)
187 s.loader.load(toLoad...)
188
189 return nil
190}
191
192func humanTruncateList(paths []string, max int) string {
193 sort.Strings(paths)
194 var b strings.Builder
195 for i, p := range paths {
196 if i >= max {
197 fmt.Fprintf(&b, "... %d more", len(paths)-i)
198 break
199 }
200 if i > 0 {
201 b.WriteString(", ")
202 }
203 b.WriteString(filepath.Base(p))
204 }
205 return b.String()
206}
207
208func (s *DirectoryWatcher) watch() error {
209 watcher, err := fsnotify.NewWatcher()
210 if err != nil {
211 return err
212 }
213 if err := watcher.Add(s.dir); err != nil {
214 return err
215 }
216
217 // intermediate signal channel so if there are multiple watcher.Events we
218 // only call scan once.
219 signal := make(chan struct{}, 1)
220
221 go func() {
222 notify := func() {
223 select {
224 case signal <- struct{}{}:
225 default:
226 }
227 }
228
229 ticker := time.NewTicker(time.Minute)
230
231 for {
232 select {
233 case event := <-watcher.Events:
234 // Only notify if a file we read in has changed. This is important to
235 // avoid all the events writing to temporary files.
236 if strings.HasSuffix(event.Name, ".zoekt") || strings.HasSuffix(event.Name, ".meta") {
237 notify()
238 }
239
240 case <-ticker.C:
241 // Periodically just double check the disk
242 notify()
243
244 case err := <-watcher.Errors:
245 // Ignore ErrEventOverflow since we rely on the presence of events so
246 // safe to ignore.
247 if err != nil && err != fsnotify.ErrEventOverflow {
248 log.Println("[ERROR] watcher error:", err)
249 }
250
251 case <-s.quit:
252 watcher.Close()
253 ticker.Stop()
254 close(signal)
255 return
256 }
257 }
258 }()
259
260 go func() {
261 defer close(s.stopped)
262 for range signal {
263 if err := s.scan(); err != nil {
264 log.Println("[ERROR] watcher error:", err)
265 }
266 }
267 }()
268
269 return nil
270}