fork of https://github.com/sourcegraph/zoekt
1// Copyright 2017 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package shards
16
17import (
18 "fmt"
19 "log"
20 "os"
21 "path/filepath"
22 "sort"
23 "strconv"
24 "strings"
25 "sync"
26 "time"
27
28 "github.com/fsnotify/fsnotify"
29 "github.com/sourcegraph/zoekt"
30)
31
32type shardLoader interface {
33 // Load a new file.
34 load(filenames ...string)
35 drop(filenames ...string)
36}
37
38type DirectoryWatcher struct {
39 dir string
40 timestamps map[string]time.Time
41 loader shardLoader
42
43 // closed once ready
44 ready chan struct{}
45 readyErr error
46
47 closeOnce sync.Once
48 // quit is closed by Close to signal the directory watcher to stop.
49 quit chan struct{}
50 // stopped is closed once the directory watcher has stopped.
51 stopped chan struct{}
52}
53
54func (sw *DirectoryWatcher) Stop() {
55 sw.closeOnce.Do(func() {
56 close(sw.quit)
57 <-sw.stopped
58 })
59}
60
61func newDirectoryWatcher(dir string, loader shardLoader) (*DirectoryWatcher, error) {
62 sw := &DirectoryWatcher{
63 dir: dir,
64 timestamps: map[string]time.Time{},
65 loader: loader,
66 ready: make(chan struct{}),
67 quit: make(chan struct{}),
68 stopped: make(chan struct{}),
69 }
70
71 go func() {
72 defer close(sw.ready)
73
74 if err := sw.scan(); err != nil {
75 sw.readyErr = err
76 return
77 }
78
79 if err := sw.watch(); err != nil {
80 sw.readyErr = err
81 return
82 }
83 }()
84
85 return sw, nil
86}
87
88func (s *DirectoryWatcher) WaitUntilReady() error {
89 <-s.ready
90 return s.readyErr
91}
92
93func (s *DirectoryWatcher) String() string {
94 return fmt.Sprintf("shardWatcher(%s)", s.dir)
95}
96
97// versionFromPath extracts url encoded repository name and
98// index format version from a shard name from builder.
99func versionFromPath(path string) (string, int) {
100 und := strings.LastIndex(path, "_")
101 if und < 0 {
102 return path, 0
103 }
104
105 dot := strings.Index(path[und:], ".")
106 if dot < 0 {
107 return path, 0
108 }
109 dot += und
110
111 version, err := strconv.Atoi(path[und+2 : dot])
112 if err != nil {
113 return path, 0
114 }
115
116 return path[:und], version
117}
118
119func (s *DirectoryWatcher) scan() error {
120 fs, err := filepath.Glob(filepath.Join(s.dir, "*.zoekt"))
121 if err != nil {
122 return err
123 }
124
125 latest := map[string]int{}
126 for _, fn := range fs {
127 name, version := versionFromPath(fn)
128
129 // In the case of downgrades, avoid reading
130 // newer index formats.
131 if version > zoekt.IndexFormatVersion && version > zoekt.NextIndexFormatVersion {
132 continue
133 }
134
135 if latest[name] < version {
136 latest[name] = version
137 }
138 }
139
140 ts := map[string]time.Time{}
141 for _, fn := range fs {
142 if name, version := versionFromPath(fn); latest[name] != version {
143 continue
144 }
145
146 fi, err := os.Lstat(fn)
147 if err != nil {
148 continue
149 }
150
151 ts[fn] = fi.ModTime()
152
153 fiMeta, err := os.Lstat(fn + ".meta")
154 if err != nil {
155 continue
156 }
157 if fiMeta.ModTime().After(fi.ModTime()) {
158 ts[fn] = fiMeta.ModTime()
159 }
160 }
161
162 var toLoad []string
163 for k, mtime := range ts {
164 if t, ok := s.timestamps[k]; !ok || t != mtime {
165 toLoad = append(toLoad, k)
166 s.timestamps[k] = mtime
167 }
168 }
169
170 var toDrop []string
171 // Unload deleted shards.
172 for k := range s.timestamps {
173 if _, ok := ts[k]; !ok {
174 toDrop = append(toDrop, k)
175 delete(s.timestamps, k)
176 }
177 }
178
179 if len(toDrop) > 0 {
180 log.Printf("unloading %d shard(s): %s", len(toDrop), humanTruncateList(toDrop, 5))
181 }
182
183 s.loader.drop(toDrop...)
184 s.loader.load(toLoad...)
185
186 return nil
187}
188
189func humanTruncateList(paths []string, max int) string {
190 sort.Strings(paths)
191 var b strings.Builder
192 for i, p := range paths {
193 if i >= max {
194 fmt.Fprintf(&b, "... %d more", len(paths)-i)
195 break
196 }
197 if i > 0 {
198 b.WriteString(", ")
199 }
200 b.WriteString(filepath.Base(p))
201 }
202 return b.String()
203}
204
205func (s *DirectoryWatcher) watch() error {
206 watcher, err := fsnotify.NewWatcher()
207 if err != nil {
208 return err
209 }
210 if err := watcher.Add(s.dir); err != nil {
211 return err
212 }
213
214 // intermediate signal channel so if there are multiple watcher.Events we
215 // only call scan once.
216 signal := make(chan struct{}, 1)
217
218 go func() {
219 for {
220 select {
221 case <-watcher.Events:
222 select {
223 case signal <- struct{}{}:
224 default:
225 }
226 case err := <-watcher.Errors:
227 // Ignore ErrEventOverflow since we rely on the presence of events so
228 // safe to ignore.
229 if err != nil && err != fsnotify.ErrEventOverflow {
230 log.Println("watcher error:", err)
231 }
232 case <-s.quit:
233 watcher.Close()
234 close(signal)
235 return
236 }
237 }
238 }()
239
240 go func() {
241 defer close(s.stopped)
242 for range signal {
243 if err := s.scan(); err != nil {
244 log.Println("watcher error:", err)
245 }
246 }
247 }()
248
249 return nil
250}