fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 5.6 kB View raw
1// Copyright 2017 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package search 16 17import ( 18 "fmt" 19 "log" 20 "os" 21 "path/filepath" 22 "sort" 23 "strconv" 24 "strings" 25 "sync" 26 "time" 27 28 "github.com/fsnotify/fsnotify" 29 30 "github.com/sourcegraph/zoekt/index" 31) 32 33type shardLoader interface { 34 // Load a new file. 35 load(filenames ...string) 36 drop(filenames ...string) 37} 38 39type DirectoryWatcher struct { 40 dir string 41 timestamps map[string]time.Time 42 loader shardLoader 43 44 // closed once ready 45 ready chan struct{} 46 readyErr error 47 48 closeOnce sync.Once 49 // quit is closed by Close to signal the directory watcher to stop. 50 quit chan struct{} 51 // stopped is closed once the directory watcher has stopped. 52 stopped chan struct{} 53} 54 55func (sw *DirectoryWatcher) Stop() { 56 sw.closeOnce.Do(func() { 57 close(sw.quit) 58 <-sw.stopped 59 }) 60} 61 62func newDirectoryWatcher(dir string, loader shardLoader) (*DirectoryWatcher, error) { 63 sw := &DirectoryWatcher{ 64 dir: dir, 65 timestamps: map[string]time.Time{}, 66 loader: loader, 67 ready: make(chan struct{}), 68 quit: make(chan struct{}), 69 stopped: make(chan struct{}), 70 } 71 72 go func() { 73 defer close(sw.ready) 74 75 if err := sw.scan(); err != nil { 76 sw.readyErr = err 77 return 78 } 79 80 if err := sw.watch(); err != nil { 81 sw.readyErr = err 82 return 83 } 84 }() 85 86 return sw, nil 87} 88 89func (s *DirectoryWatcher) WaitUntilReady() error { 90 <-s.ready 91 return s.readyErr 92} 93 94func (s *DirectoryWatcher) String() string { 95 return fmt.Sprintf("shardWatcher(%s)", s.dir) 96} 97 98// versionFromPath extracts url encoded repository name and 99// index format version from a shard name from builder. 100func versionFromPath(path string) (string, int) { 101 und := strings.LastIndex(path, "_") 102 if und < 0 { 103 return path, 0 104 } 105 106 dot := strings.Index(path[und:], ".") 107 if dot < 0 { 108 return path, 0 109 } 110 dot += und 111 112 version, err := strconv.Atoi(path[und+2 : dot]) 113 if err != nil { 114 return path, 0 115 } 116 117 return path[:und], version 118} 119 120func (s *DirectoryWatcher) scan() error { 121 // NOTE: if you change which file extensions are read, please update the 122 // watch implementation. 123 fs, err := filepath.Glob(filepath.Join(s.dir, "*.zoekt")) 124 if err != nil { 125 return err 126 } 127 128 latest := map[string]int{} 129 for _, fn := range fs { 130 name, version := versionFromPath(fn) 131 132 // In the case of downgrades, avoid reading 133 // newer index formats. 134 if version > index.IndexFormatVersion && version > index.NextIndexFormatVersion { 135 continue 136 } 137 138 if latest[name] < version { 139 latest[name] = version 140 } 141 } 142 143 ts := map[string]time.Time{} 144 for _, fn := range fs { 145 if name, version := versionFromPath(fn); latest[name] != version { 146 continue 147 } 148 149 fi, err := os.Lstat(fn) 150 if err != nil { 151 continue 152 } 153 154 ts[fn] = fi.ModTime() 155 156 fiMeta, err := os.Lstat(fn + ".meta") 157 if err != nil { 158 continue 159 } 160 if fiMeta.ModTime().After(fi.ModTime()) { 161 ts[fn] = fiMeta.ModTime() 162 } 163 } 164 165 var toLoad []string 166 for k, mtime := range ts { 167 if t, ok := s.timestamps[k]; !ok || t != mtime { 168 toLoad = append(toLoad, k) 169 s.timestamps[k] = mtime 170 } 171 } 172 173 var toDrop []string 174 // Unload deleted shards. 175 for k := range s.timestamps { 176 if _, ok := ts[k]; !ok { 177 toDrop = append(toDrop, k) 178 delete(s.timestamps, k) 179 } 180 } 181 182 if len(toDrop) > 0 { 183 log.Printf("[INFO] unloading %d shard(s): %s", len(toDrop), humanTruncateList(toDrop, 5)) 184 } 185 186 s.loader.drop(toDrop...) 187 s.loader.load(toLoad...) 188 189 return nil 190} 191 192func humanTruncateList(paths []string, max int) string { 193 sort.Strings(paths) 194 var b strings.Builder 195 for i, p := range paths { 196 if i >= max { 197 fmt.Fprintf(&b, "... %d more", len(paths)-i) 198 break 199 } 200 if i > 0 { 201 b.WriteString(", ") 202 } 203 b.WriteString(filepath.Base(p)) 204 } 205 return b.String() 206} 207 208func (s *DirectoryWatcher) watch() error { 209 watcher, err := fsnotify.NewWatcher() 210 if err != nil { 211 return err 212 } 213 if err := watcher.Add(s.dir); err != nil { 214 return err 215 } 216 217 // intermediate signal channel so if there are multiple watcher.Events we 218 // only call scan once. 219 signal := make(chan struct{}, 1) 220 221 go func() { 222 notify := func() { 223 select { 224 case signal <- struct{}{}: 225 default: 226 } 227 } 228 229 ticker := time.NewTicker(time.Minute) 230 231 for { 232 select { 233 case event := <-watcher.Events: 234 // Only notify if a file we read in has changed. This is important to 235 // avoid all the events writing to temporary files. 236 if strings.HasSuffix(event.Name, ".zoekt") || strings.HasSuffix(event.Name, ".meta") { 237 notify() 238 } 239 240 case <-ticker.C: 241 // Periodically just double check the disk 242 notify() 243 244 case err := <-watcher.Errors: 245 // Ignore ErrEventOverflow since we rely on the presence of events so 246 // safe to ignore. 247 if err != nil && err != fsnotify.ErrEventOverflow { 248 log.Println("[ERROR] watcher error:", err) 249 } 250 251 case <-s.quit: 252 watcher.Close() 253 ticker.Stop() 254 close(signal) 255 return 256 } 257 } 258 }() 259 260 go func() { 261 defer close(s.stopped) 262 for range signal { 263 if err := s.scan(); err != nil { 264 log.Println("[ERROR] watcher error:", err) 265 } 266 } 267 }() 268 269 return nil 270}