fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Copyright 2017 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package shards 16 17import ( 18 "fmt" 19 "log" 20 "os" 21 "path/filepath" 22 "sort" 23 "strconv" 24 "strings" 25 "sync" 26 "time" 27 28 "github.com/fsnotify/fsnotify" 29 "github.com/sourcegraph/zoekt" 30) 31 32type shardLoader interface { 33 // Load a new file. 34 load(filenames ...string) 35 drop(filenames ...string) 36} 37 38type DirectoryWatcher struct { 39 dir string 40 timestamps map[string]time.Time 41 loader shardLoader 42 43 // closed once ready 44 ready chan struct{} 45 readyErr error 46 47 closeOnce sync.Once 48 // quit is closed by Close to signal the directory watcher to stop. 49 quit chan struct{} 50 // stopped is closed once the directory watcher has stopped. 51 stopped chan struct{} 52} 53 54func (sw *DirectoryWatcher) Stop() { 55 sw.closeOnce.Do(func() { 56 close(sw.quit) 57 <-sw.stopped 58 }) 59} 60 61func newDirectoryWatcher(dir string, loader shardLoader) (*DirectoryWatcher, error) { 62 sw := &DirectoryWatcher{ 63 dir: dir, 64 timestamps: map[string]time.Time{}, 65 loader: loader, 66 ready: make(chan struct{}), 67 quit: make(chan struct{}), 68 stopped: make(chan struct{}), 69 } 70 71 go func() { 72 defer close(sw.ready) 73 74 if err := sw.scan(); err != nil { 75 sw.readyErr = err 76 return 77 } 78 79 if err := sw.watch(); err != nil { 80 sw.readyErr = err 81 return 82 } 83 }() 84 85 return sw, nil 86} 87 88func (s *DirectoryWatcher) WaitUntilReady() error { 89 <-s.ready 90 return s.readyErr 91} 92 93func (s *DirectoryWatcher) String() string { 94 return fmt.Sprintf("shardWatcher(%s)", s.dir) 95} 96 97// versionFromPath extracts url encoded repository name and 98// index format version from a shard name from builder. 99func versionFromPath(path string) (string, int) { 100 und := strings.LastIndex(path, "_") 101 if und < 0 { 102 return path, 0 103 } 104 105 dot := strings.Index(path[und:], ".") 106 if dot < 0 { 107 return path, 0 108 } 109 dot += und 110 111 version, err := strconv.Atoi(path[und+2 : dot]) 112 if err != nil { 113 return path, 0 114 } 115 116 return path[:und], version 117} 118 119func (s *DirectoryWatcher) scan() error { 120 fs, err := filepath.Glob(filepath.Join(s.dir, "*.zoekt")) 121 if err != nil { 122 return err 123 } 124 125 latest := map[string]int{} 126 for _, fn := range fs { 127 name, version := versionFromPath(fn) 128 129 // In the case of downgrades, avoid reading 130 // newer index formats. 131 if version > zoekt.IndexFormatVersion && version > zoekt.NextIndexFormatVersion { 132 continue 133 } 134 135 if latest[name] < version { 136 latest[name] = version 137 } 138 } 139 140 ts := map[string]time.Time{} 141 for _, fn := range fs { 142 if name, version := versionFromPath(fn); latest[name] != version { 143 continue 144 } 145 146 fi, err := os.Lstat(fn) 147 if err != nil { 148 continue 149 } 150 151 ts[fn] = fi.ModTime() 152 153 fiMeta, err := os.Lstat(fn + ".meta") 154 if err != nil { 155 continue 156 } 157 if fiMeta.ModTime().After(fi.ModTime()) { 158 ts[fn] = fiMeta.ModTime() 159 } 160 } 161 162 var toLoad []string 163 for k, mtime := range ts { 164 if t, ok := s.timestamps[k]; !ok || t != mtime { 165 toLoad = append(toLoad, k) 166 s.timestamps[k] = mtime 167 } 168 } 169 170 var toDrop []string 171 // Unload deleted shards. 172 for k := range s.timestamps { 173 if _, ok := ts[k]; !ok { 174 toDrop = append(toDrop, k) 175 delete(s.timestamps, k) 176 } 177 } 178 179 if len(toDrop) > 0 { 180 log.Printf("unloading %d shard(s): %s", len(toDrop), humanTruncateList(toDrop, 5)) 181 } 182 183 s.loader.drop(toDrop...) 184 s.loader.load(toLoad...) 185 186 return nil 187} 188 189func humanTruncateList(paths []string, max int) string { 190 sort.Strings(paths) 191 var b strings.Builder 192 for i, p := range paths { 193 if i >= max { 194 fmt.Fprintf(&b, "... %d more", len(paths)-i) 195 break 196 } 197 if i > 0 { 198 b.WriteString(", ") 199 } 200 b.WriteString(filepath.Base(p)) 201 } 202 return b.String() 203} 204 205func (s *DirectoryWatcher) watch() error { 206 watcher, err := fsnotify.NewWatcher() 207 if err != nil { 208 return err 209 } 210 if err := watcher.Add(s.dir); err != nil { 211 return err 212 } 213 214 // intermediate signal channel so if there are multiple watcher.Events we 215 // only call scan once. 216 signal := make(chan struct{}, 1) 217 218 go func() { 219 for { 220 select { 221 case <-watcher.Events: 222 select { 223 case signal <- struct{}{}: 224 default: 225 } 226 case err := <-watcher.Errors: 227 // Ignore ErrEventOverflow since we rely on the presence of events so 228 // safe to ignore. 229 if err != nil && err != fsnotify.ErrEventOverflow { 230 log.Println("watcher error:", err) 231 } 232 case <-s.quit: 233 watcher.Close() 234 close(signal) 235 return 236 } 237 } 238 }() 239 240 go func() { 241 defer close(s.stopped) 242 for range signal { 243 if err := s.scan(); err != nil { 244 log.Println("watcher error:", err) 245 } 246 } 247 }() 248 249 return nil 250}