fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Copyright 2017 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package search 16 17import ( 18 "fmt" 19 "log" 20 "os" 21 "path/filepath" 22 "sort" 23 "strconv" 24 "strings" 25 "sync" 26 "time" 27 28 "github.com/fsnotify/fsnotify" 29 "github.com/sourcegraph/zoekt/index" 30) 31 32type shardLoader interface { 33 // Load a new file. 34 load(filenames ...string) 35 drop(filenames ...string) 36} 37 38type DirectoryWatcher struct { 39 dir string 40 timestamps map[string]time.Time 41 loader shardLoader 42 43 // closed once ready 44 ready chan struct{} 45 readyErr error 46 47 closeOnce sync.Once 48 // quit is closed by Close to signal the directory watcher to stop. 49 quit chan struct{} 50 // stopped is closed once the directory watcher has stopped. 51 stopped chan struct{} 52} 53 54func (sw *DirectoryWatcher) Stop() { 55 sw.closeOnce.Do(func() { 56 close(sw.quit) 57 <-sw.stopped 58 }) 59} 60 61func newDirectoryWatcher(dir string, loader shardLoader) (*DirectoryWatcher, error) { 62 sw := &DirectoryWatcher{ 63 dir: dir, 64 timestamps: map[string]time.Time{}, 65 loader: loader, 66 ready: make(chan struct{}), 67 quit: make(chan struct{}), 68 stopped: make(chan struct{}), 69 } 70 71 go func() { 72 defer close(sw.ready) 73 74 if err := sw.scan(); err != nil { 75 sw.readyErr = err 76 return 77 } 78 79 if err := sw.watch(); err != nil { 80 sw.readyErr = err 81 return 82 } 83 }() 84 85 return sw, nil 86} 87 88func (s *DirectoryWatcher) WaitUntilReady() error { 89 <-s.ready 90 return s.readyErr 91} 92 93func (s *DirectoryWatcher) String() string { 94 return fmt.Sprintf("shardWatcher(%s)", s.dir) 95} 96 97// versionFromPath extracts url encoded repository name and 98// index format version from a shard name from builder. 99func versionFromPath(path string) (string, int) { 100 und := strings.LastIndex(path, "_") 101 if und < 0 { 102 return path, 0 103 } 104 105 dot := strings.Index(path[und:], ".") 106 if dot < 0 { 107 return path, 0 108 } 109 dot += und 110 111 version, err := strconv.Atoi(path[und+2 : dot]) 112 if err != nil { 113 return path, 0 114 } 115 116 return path[:und], version 117} 118 119func (s *DirectoryWatcher) scan() error { 120 // NOTE: if you change which file extensions are read, please update the 121 // watch implementation. 122 fs, err := filepath.Glob(filepath.Join(s.dir, "*.zoekt")) 123 if err != nil { 124 return err 125 } 126 127 latest := map[string]int{} 128 for _, fn := range fs { 129 name, version := versionFromPath(fn) 130 131 // In the case of downgrades, avoid reading 132 // newer index formats. 133 if version > index.IndexFormatVersion && version > index.NextIndexFormatVersion { 134 continue 135 } 136 137 if latest[name] < version { 138 latest[name] = version 139 } 140 } 141 142 ts := map[string]time.Time{} 143 for _, fn := range fs { 144 if name, version := versionFromPath(fn); latest[name] != version { 145 continue 146 } 147 148 fi, err := os.Lstat(fn) 149 if err != nil { 150 continue 151 } 152 153 ts[fn] = fi.ModTime() 154 155 fiMeta, err := os.Lstat(fn + ".meta") 156 if err != nil { 157 continue 158 } 159 if fiMeta.ModTime().After(fi.ModTime()) { 160 ts[fn] = fiMeta.ModTime() 161 } 162 } 163 164 var toLoad []string 165 for k, mtime := range ts { 166 if t, ok := s.timestamps[k]; !ok || t != mtime { 167 toLoad = append(toLoad, k) 168 s.timestamps[k] = mtime 169 } 170 } 171 172 var toDrop []string 173 // Unload deleted shards. 174 for k := range s.timestamps { 175 if _, ok := ts[k]; !ok { 176 toDrop = append(toDrop, k) 177 delete(s.timestamps, k) 178 } 179 } 180 181 if len(toDrop) > 0 { 182 log.Printf("[INFO] unloading %d shard(s): %s", len(toDrop), humanTruncateList(toDrop, 5)) 183 } 184 185 s.loader.drop(toDrop...) 186 s.loader.load(toLoad...) 187 188 return nil 189} 190 191func humanTruncateList(paths []string, max int) string { 192 sort.Strings(paths) 193 var b strings.Builder 194 for i, p := range paths { 195 if i >= max { 196 fmt.Fprintf(&b, "... %d more", len(paths)-i) 197 break 198 } 199 if i > 0 { 200 b.WriteString(", ") 201 } 202 b.WriteString(filepath.Base(p)) 203 } 204 return b.String() 205} 206 207func (s *DirectoryWatcher) watch() error { 208 watcher, err := fsnotify.NewWatcher() 209 if err != nil { 210 return err 211 } 212 if err := watcher.Add(s.dir); err != nil { 213 return err 214 } 215 216 // intermediate signal channel so if there are multiple watcher.Events we 217 // only call scan once. 218 signal := make(chan struct{}, 1) 219 220 go func() { 221 notify := func() { 222 select { 223 case signal <- struct{}{}: 224 default: 225 } 226 } 227 228 ticker := time.NewTicker(time.Minute) 229 230 for { 231 select { 232 case event := <-watcher.Events: 233 // Only notify if a file we read in has changed. This is important to 234 // avoid all the events writing to temporary files. 235 if strings.HasSuffix(event.Name, ".zoekt") || strings.HasSuffix(event.Name, ".meta") { 236 notify() 237 } 238 239 case <-ticker.C: 240 // Periodically just double check the disk 241 notify() 242 243 case err := <-watcher.Errors: 244 // Ignore ErrEventOverflow since we rely on the presence of events so 245 // safe to ignore. 246 if err != nil && err != fsnotify.ErrEventOverflow { 247 log.Println("[ERROR] watcher error:", err) 248 } 249 250 case <-s.quit: 251 watcher.Close() 252 ticker.Stop() 253 close(signal) 254 return 255 } 256 } 257 }() 258 259 go func() { 260 defer close(s.stopped) 261 for range signal { 262 if err := s.scan(); err != nil { 263 log.Println("[ERROR] watcher error:", err) 264 } 265 } 266 }() 267 268 return nil 269}