fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

Split off shard watching into separate source file.

Change-Id: Ic164eba52dcb64c472016868049a5d4c677e1138

+204 -182
-182
shards/shards.go
··· 15 15 package shards 16 16 17 17 import ( 18 - "fmt" 19 18 "log" 20 - "os" 21 - "path/filepath" 22 19 "runtime" 23 20 "runtime/debug" 24 21 "sort" ··· 26 23 27 24 "golang.org/x/net/context" 28 25 29 - "github.com/fsnotify/fsnotify" 30 26 "github.com/google/zoekt" 31 27 "github.com/google/zoekt/query" 32 28 ) 33 - 34 - type searchShard struct { 35 - zoekt.Searcher 36 - mtime time.Time 37 - } 38 - 39 - type shardWatcher struct { 40 - dir string 41 - 42 - // Limit the number of parallel queries. Since searching is 43 - // CPU bound, we can't do better than #CPU queries in 44 - // parallel. If we do so, we just create more memory 45 - // pressure. 46 - throttle chan struct{} 47 - 48 - shards map[string]*searchShard 49 - quit chan struct{} 50 - } 51 - 52 - func loadShard(fn string) (*searchShard, error) { 53 - f, err := os.Open(fn) 54 - if err != nil { 55 - return nil, err 56 - } 57 - fi, err := f.Stat() 58 - if err != nil { 59 - return nil, err 60 - } 61 - 62 - iFile, err := zoekt.NewIndexFile(f) 63 - if err != nil { 64 - return nil, err 65 - } 66 - s, err := zoekt.NewSearcher(iFile) 67 - if err != nil { 68 - iFile.Close() 69 - return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err) 70 - } 71 - 72 - return &searchShard{ 73 - mtime: fi.ModTime(), 74 - Searcher: s, 75 - }, nil 76 - } 77 - 78 - func (s *shardWatcher) String() string { 79 - return fmt.Sprintf("shardWatcher(%s)", s.dir) 80 - } 81 - 82 - func (s *shardWatcher) scan() error { 83 - fs, err := filepath.Glob(filepath.Join(s.dir, "*.zoekt")) 84 - if err != nil { 85 - return err 86 - } 87 - 88 - if len(fs) == 0 { 89 - return fmt.Errorf("directory %s is empty", s.dir) 90 - } 91 - 92 - ts := map[string]time.Time{} 93 - for _, fn := range fs { 94 - key := filepath.Base(fn) 95 - fi, err := os.Lstat(fn) 96 - if err != nil { 97 - continue 98 - } 99 - 100 - ts[key] = fi.ModTime() 101 - } 102 - 103 - s.lock() 104 - var toLoad []string 105 - for k, mtime := range ts { 106 - if s.shards[k] == nil || s.shards[k].mtime != mtime { 107 - toLoad = append(toLoad, k) 108 - } 109 - } 110 - 111 - var toDrop []string 112 - // Unload deleted shards. 113 - for k := range s.shards { 114 - if _, ok := ts[k]; !ok { 115 - toDrop = append(toDrop, k) 116 - } 117 - } 118 - s.unlock() 119 - 120 - for _, t := range toDrop { 121 - log.Printf("unloading: %s", t) 122 - s.replace(t, nil) 123 - } 124 - 125 - for _, t := range toLoad { 126 - shard, err := loadShard(filepath.Join(s.dir, t)) 127 - log.Printf("reloading: %s, err %v ", t, err) 128 - if err != nil { 129 - continue 130 - } 131 - s.replace(t, shard) 132 - } 133 - 134 - return nil 135 - } 136 - 137 - func (s *shardWatcher) rlock() { 138 - s.throttle <- struct{}{} 139 - } 140 - 141 - // getShards returns the currently loaded shards. The shards must be 142 - // accessed under a rlock call. 143 - func (s *shardWatcher) getShards() []zoekt.Searcher { 144 - var res []zoekt.Searcher 145 - for _, sh := range s.shards { 146 - res = append(res, sh) 147 - } 148 - return res 149 - } 150 - 151 - func (s *shardWatcher) runlock() { 152 - <-s.throttle 153 - } 154 - 155 - func (s *shardWatcher) lock() { 156 - n := cap(s.throttle) 157 - for n > 0 { 158 - s.throttle <- struct{}{} 159 - n-- 160 - } 161 - } 162 - 163 - func (s *shardWatcher) unlock() { 164 - n := cap(s.throttle) 165 - for n > 0 { 166 - <-s.throttle 167 - n-- 168 - } 169 - } 170 - 171 - func (s *shardWatcher) replace(key string, shard *searchShard) { 172 - s.lock() 173 - defer s.unlock() 174 - old := s.shards[key] 175 - if old != nil { 176 - old.Close() 177 - } 178 - if shard != nil { 179 - s.shards[key] = shard 180 - } else { 181 - delete(s.shards, key) 182 - } 183 - } 184 - 185 - func (s *shardWatcher) watch() error { 186 - watcher, err := fsnotify.NewWatcher() 187 - if err != nil { 188 - return err 189 - } 190 - if err := watcher.Add(s.dir); err != nil { 191 - return err 192 - } 193 - 194 - go func() { 195 - for { 196 - select { 197 - case <-watcher.Events: 198 - s.scan() 199 - case err := <-watcher.Errors: 200 - if err != nil { 201 - log.Println("watcher error:", err) 202 - } 203 - case <-s.quit: 204 - watcher.Close() 205 - return 206 - } 207 - } 208 - }() 209 - return nil 210 - } 211 29 212 30 // NewShardedSearcher returns a searcher instance that loads all 213 31 // shards corresponding to a glob into memory.
+204
shards/watcher.go
··· 1 + // Copyright 2017 Google Inc. All rights reserved. 2 + // 3 + // Licensed under the Apache License, Version 2.0 (the "License"); 4 + // you may not use this file except in compliance with the License. 5 + // You may obtain a copy of the License at 6 + // 7 + // http://www.apache.org/licenses/LICENSE-2.0 8 + // 9 + // Unless required by applicable law or agreed to in writing, software 10 + // distributed under the License is distributed on an "AS IS" BASIS, 11 + // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 + // See the License for the specific language governing permissions and 13 + // limitations under the License. 14 + 15 + package shards 16 + 17 + import ( 18 + "fmt" 19 + "log" 20 + "os" 21 + "path/filepath" 22 + "time" 23 + 24 + "github.com/fsnotify/fsnotify" 25 + "github.com/google/zoekt" 26 + ) 27 + 28 + type searchShard struct { 29 + zoekt.Searcher 30 + mtime time.Time 31 + } 32 + 33 + type shardWatcher struct { 34 + dir string 35 + 36 + // Limit the number of parallel queries. Since searching is 37 + // CPU bound, we can't do better than #CPU queries in 38 + // parallel. If we do so, we just create more memory 39 + // pressure. 40 + throttle chan struct{} 41 + 42 + shards map[string]*searchShard 43 + quit chan struct{} 44 + } 45 + 46 + func loadShard(fn string) (*searchShard, error) { 47 + f, err := os.Open(fn) 48 + if err != nil { 49 + return nil, err 50 + } 51 + fi, err := f.Stat() 52 + if err != nil { 53 + return nil, err 54 + } 55 + 56 + iFile, err := zoekt.NewIndexFile(f) 57 + if err != nil { 58 + return nil, err 59 + } 60 + s, err := zoekt.NewSearcher(iFile) 61 + if err != nil { 62 + iFile.Close() 63 + return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err) 64 + } 65 + 66 + return &searchShard{ 67 + mtime: fi.ModTime(), 68 + Searcher: s, 69 + }, nil 70 + } 71 + 72 + func (s *shardWatcher) String() string { 73 + return fmt.Sprintf("shardWatcher(%s)", s.dir) 74 + } 75 + 76 + func (s *shardWatcher) scan() error { 77 + fs, err := filepath.Glob(filepath.Join(s.dir, "*.zoekt")) 78 + if err != nil { 79 + return err 80 + } 81 + 82 + if len(fs) == 0 { 83 + return fmt.Errorf("directory %s is empty", s.dir) 84 + } 85 + 86 + ts := map[string]time.Time{} 87 + for _, fn := range fs { 88 + key := filepath.Base(fn) 89 + fi, err := os.Lstat(fn) 90 + if err != nil { 91 + continue 92 + } 93 + 94 + ts[key] = fi.ModTime() 95 + } 96 + 97 + s.lock() 98 + var toLoad []string 99 + for k, mtime := range ts { 100 + if s.shards[k] == nil || s.shards[k].mtime != mtime { 101 + toLoad = append(toLoad, k) 102 + } 103 + } 104 + 105 + var toDrop []string 106 + // Unload deleted shards. 107 + for k := range s.shards { 108 + if _, ok := ts[k]; !ok { 109 + toDrop = append(toDrop, k) 110 + } 111 + } 112 + s.unlock() 113 + 114 + for _, t := range toDrop { 115 + log.Printf("unloading: %s", t) 116 + s.replace(t, nil) 117 + } 118 + 119 + for _, t := range toLoad { 120 + shard, err := loadShard(filepath.Join(s.dir, t)) 121 + log.Printf("reloading: %s, err %v ", t, err) 122 + if err != nil { 123 + continue 124 + } 125 + s.replace(t, shard) 126 + } 127 + 128 + return nil 129 + } 130 + 131 + func (s *shardWatcher) rlock() { 132 + s.throttle <- struct{}{} 133 + } 134 + 135 + // getShards returns the currently loaded shards. The shards must be 136 + // accessed under a rlock call. 137 + func (s *shardWatcher) getShards() []zoekt.Searcher { 138 + var res []zoekt.Searcher 139 + for _, sh := range s.shards { 140 + res = append(res, sh) 141 + } 142 + return res 143 + } 144 + 145 + func (s *shardWatcher) runlock() { 146 + <-s.throttle 147 + } 148 + 149 + func (s *shardWatcher) lock() { 150 + n := cap(s.throttle) 151 + for n > 0 { 152 + s.throttle <- struct{}{} 153 + n-- 154 + } 155 + } 156 + 157 + func (s *shardWatcher) unlock() { 158 + n := cap(s.throttle) 159 + for n > 0 { 160 + <-s.throttle 161 + n-- 162 + } 163 + } 164 + 165 + func (s *shardWatcher) replace(key string, shard *searchShard) { 166 + s.lock() 167 + defer s.unlock() 168 + old := s.shards[key] 169 + if old != nil { 170 + old.Close() 171 + } 172 + if shard != nil { 173 + s.shards[key] = shard 174 + } else { 175 + delete(s.shards, key) 176 + } 177 + } 178 + 179 + func (s *shardWatcher) watch() error { 180 + watcher, err := fsnotify.NewWatcher() 181 + if err != nil { 182 + return err 183 + } 184 + if err := watcher.Add(s.dir); err != nil { 185 + return err 186 + } 187 + 188 + go func() { 189 + for { 190 + select { 191 + case <-watcher.Events: 192 + s.scan() 193 + case err := <-watcher.Errors: 194 + if err != nil { 195 + log.Println("watcher error:", err) 196 + } 197 + case <-s.quit: 198 + watcher.Close() 199 + return 200 + } 201 + } 202 + }() 203 + return nil 204 + }