fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

Revert "Load shards as fast as we can"

The current strategy has an unexplained deadlock, which apparently
triggers when read requests happen concurrently with reloads.

Since the GCP load balancer issues a continuing stream of HTTP
requests for /, each of which triggers a read across shards, the
deadlock trigger in production reliably.

This reverts commit e9c19a019ce90ce9ce7021c81c8d0bb10aedeaed.

Change-Id: I5b5234d204a9e9303ab84b098443dd9e9709318f

+190 -231
+190 -2
shards/shards.go
··· 15 15 package shards 16 16 17 17 import ( 18 + "fmt" 18 19 "log" 20 + "os" 21 + "path/filepath" 19 22 "runtime" 20 23 "runtime/debug" 21 24 "sort" ··· 23 26 24 27 "golang.org/x/net/context" 25 28 29 + "github.com/fsnotify/fsnotify" 26 30 "github.com/google/zoekt" 27 31 "github.com/google/zoekt/query" 28 32 ) 29 33 34 + type searchShard struct { 35 + zoekt.Searcher 36 + mtime time.Time 37 + } 38 + 39 + type shardWatcher struct { 40 + dir string 41 + 42 + // Limit the number of parallel queries. Since searching is 43 + // CPU bound, we can't do better than #CPU queries in 44 + // parallel. If we do so, we just create more memory 45 + // pressure. 46 + throttle chan struct{} 47 + 48 + shards map[string]*searchShard 49 + quit chan struct{} 50 + } 51 + 52 + func loadShard(fn string) (*searchShard, error) { 53 + f, err := os.Open(fn) 54 + if err != nil { 55 + return nil, err 56 + } 57 + fi, err := f.Stat() 58 + if err != nil { 59 + return nil, err 60 + } 61 + 62 + iFile, err := zoekt.NewIndexFile(f) 63 + if err != nil { 64 + return nil, err 65 + } 66 + s, err := zoekt.NewSearcher(iFile) 67 + if err != nil { 68 + iFile.Close() 69 + return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err) 70 + } 71 + 72 + return &searchShard{ 73 + mtime: fi.ModTime(), 74 + Searcher: s, 75 + }, nil 76 + } 77 + 78 + func (s *shardWatcher) String() string { 79 + return fmt.Sprintf("shardWatcher(%s)", s.dir) 80 + } 81 + 82 + func (s *shardWatcher) scan() error { 83 + fs, err := filepath.Glob(filepath.Join(s.dir, "*.zoekt")) 84 + if err != nil { 85 + return err 86 + } 87 + 88 + if len(fs) == 0 { 89 + return fmt.Errorf("directory %s is empty", s.dir) 90 + } 91 + 92 + ts := map[string]time.Time{} 93 + for _, fn := range fs { 94 + key := filepath.Base(fn) 95 + fi, err := os.Lstat(fn) 96 + if err != nil { 97 + continue 98 + } 99 + 100 + ts[key] = fi.ModTime() 101 + } 102 + 103 + s.lock() 104 + var toLoad []string 105 + for k, mtime := range ts { 106 + if s.shards[k] == nil || s.shards[k].mtime != mtime { 107 + toLoad = append(toLoad, k) 108 + } 109 + } 110 + 111 + var toDrop []string 112 + // Unload deleted shards. 113 + for k := range s.shards { 114 + if _, ok := ts[k]; !ok { 115 + toDrop = append(toDrop, k) 116 + } 117 + } 118 + s.unlock() 119 + 120 + for _, t := range toDrop { 121 + log.Printf("unloading: %s", t) 122 + s.replace(t, nil) 123 + } 124 + 125 + for _, t := range toLoad { 126 + shard, err := loadShard(filepath.Join(s.dir, t)) 127 + log.Printf("reloading: %s, err %v ", t, err) 128 + if err != nil { 129 + continue 130 + } 131 + s.replace(t, shard) 132 + } 133 + 134 + return nil 135 + } 136 + 137 + func (s *shardWatcher) rlock() { 138 + s.throttle <- struct{}{} 139 + } 140 + 141 + // getShards returns the currently loaded shards. The shards must be 142 + // accessed under a rlock call. 143 + func (s *shardWatcher) getShards() []zoekt.Searcher { 144 + var res []zoekt.Searcher 145 + for _, sh := range s.shards { 146 + res = append(res, sh) 147 + } 148 + return res 149 + } 150 + 151 + func (s *shardWatcher) runlock() { 152 + <-s.throttle 153 + } 154 + 155 + func (s *shardWatcher) lock() { 156 + n := cap(s.throttle) 157 + for n > 0 { 158 + s.throttle <- struct{}{} 159 + n-- 160 + } 161 + } 162 + 163 + func (s *shardWatcher) unlock() { 164 + n := cap(s.throttle) 165 + for n > 0 { 166 + <-s.throttle 167 + n-- 168 + } 169 + } 170 + 171 + func (s *shardWatcher) replace(key string, shard *searchShard) { 172 + s.lock() 173 + defer s.unlock() 174 + old := s.shards[key] 175 + if old != nil { 176 + old.Close() 177 + } 178 + if shard != nil { 179 + s.shards[key] = shard 180 + } else { 181 + delete(s.shards, key) 182 + } 183 + } 184 + 185 + func (s *shardWatcher) watch() error { 186 + watcher, err := fsnotify.NewWatcher() 187 + if err != nil { 188 + return err 189 + } 190 + if err := watcher.Add(s.dir); err != nil { 191 + return err 192 + } 193 + 194 + go func() { 195 + for { 196 + select { 197 + case <-watcher.Events: 198 + s.scan() 199 + case err := <-watcher.Errors: 200 + if err != nil { 201 + log.Println("watcher error:", err) 202 + } 203 + case <-s.quit: 204 + watcher.Close() 205 + return 206 + } 207 + } 208 + }() 209 + return nil 210 + } 211 + 30 212 // NewShardedSearcher returns a searcher instance that loads all 31 213 // shards corresponding to a glob into memory. 32 214 func NewShardedSearcher(dir string) (zoekt.Searcher, error) { 33 - ss := newShardWatcher(dir) 215 + ss := shardWatcher{ 216 + dir: dir, 217 + shards: make(map[string]*searchShard), 218 + quit: make(chan struct{}, 1), 219 + throttle: make(chan struct{}, runtime.NumCPU()), 220 + } 221 + 34 222 if err := ss.scan(); err != nil { 35 223 return nil, err 36 224 } ··· 39 227 return nil, err 40 228 } 41 229 42 - return &shardedSearcher{ss}, nil 230 + return &shardedSearcher{&ss}, nil 43 231 } 44 232 45 233 // Close closes references to open files. It may be called only once.
-229
shards/watcher.go
··· 1 - // Copyright 2017 Google Inc. All rights reserved. 2 - // 3 - // Licensed under the Apache License, Version 2.0 (the "License"); 4 - // you may not use this file except in compliance with the License. 5 - // You may obtain a copy of the License at 6 - // 7 - // http://www.apache.org/licenses/LICENSE-2.0 8 - // 9 - // Unless required by applicable law or agreed to in writing, software 10 - // distributed under the License is distributed on an "AS IS" BASIS, 11 - // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 - // See the License for the specific language governing permissions and 13 - // limitations under the License. 14 - 15 - package shards 16 - 17 - import ( 18 - "fmt" 19 - "log" 20 - "os" 21 - "path/filepath" 22 - "runtime" 23 - "sync" 24 - "time" 25 - 26 - "github.com/fsnotify/fsnotify" 27 - "github.com/google/zoekt" 28 - ) 29 - 30 - type searchShard struct { 31 - zoekt.Searcher 32 - mtime time.Time 33 - } 34 - 35 - type shardWatcher struct { 36 - dir string 37 - 38 - // Limit the number of parallel queries. Since searching is 39 - // CPU bound, we can't do better than #CPU queries in 40 - // parallel. If we do so, we just create more memory 41 - // pressure. 42 - throttle chan struct{} 43 - 44 - shards map[string]*searchShard 45 - quit chan struct{} 46 - } 47 - 48 - func newShardWatcher(dir string) *shardWatcher { 49 - return &shardWatcher{ 50 - dir: dir, 51 - shards: make(map[string]*searchShard), 52 - quit: make(chan struct{}, 1), 53 - throttle: make(chan struct{}, runtime.NumCPU()), 54 - } 55 - } 56 - 57 - func loadShard(fn string) (*searchShard, error) { 58 - f, err := os.Open(fn) 59 - if err != nil { 60 - return nil, err 61 - } 62 - fi, err := f.Stat() 63 - if err != nil { 64 - f.Close() 65 - return nil, err 66 - } 67 - 68 - iFile, err := zoekt.NewIndexFile(f) 69 - if err != nil { 70 - return nil, err 71 - } 72 - s, err := zoekt.NewSearcher(iFile) 73 - if err != nil { 74 - iFile.Close() 75 - return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err) 76 - } 77 - 78 - return &searchShard{ 79 - mtime: fi.ModTime(), 80 - Searcher: s, 81 - }, nil 82 - } 83 - 84 - func (s *shardWatcher) String() string { 85 - return fmt.Sprintf("shardWatcher(%s)", s.dir) 86 - } 87 - 88 - func (s *shardWatcher) scan() error { 89 - fs, err := filepath.Glob(filepath.Join(s.dir, "*.zoekt")) 90 - if err != nil { 91 - return err 92 - } 93 - 94 - if len(fs) == 0 { 95 - return fmt.Errorf("directory %s is empty", s.dir) 96 - } 97 - 98 - ts := map[string]time.Time{} 99 - for _, fn := range fs { 100 - key := filepath.Base(fn) 101 - fi, err := os.Lstat(fn) 102 - if err != nil { 103 - continue 104 - } 105 - 106 - ts[key] = fi.ModTime() 107 - } 108 - 109 - s.lock() 110 - var toLoad []string 111 - for k, mtime := range ts { 112 - if s.shards[k] == nil || s.shards[k].mtime != mtime { 113 - toLoad = append(toLoad, k) 114 - } 115 - } 116 - 117 - var toDrop []string 118 - // Unload deleted shards. 119 - for k := range s.shards { 120 - if _, ok := ts[k]; !ok { 121 - toDrop = append(toDrop, k) 122 - } 123 - } 124 - s.unlock() 125 - 126 - for _, t := range toDrop { 127 - log.Printf("unloading: %s", t) 128 - s.replace(t, nil) 129 - } 130 - 131 - loadThrottle := make(chan struct{}, runtime.NumCPU()) 132 - var wg sync.WaitGroup 133 - for _, t := range toLoad { 134 - wg.Add(1) 135 - go func(name string) { 136 - loadThrottle <- struct{}{} 137 - s.loadShard(name) 138 - <-loadThrottle 139 - wg.Done() 140 - }(t) 141 - } 142 - wg.Wait() 143 - 144 - return nil 145 - } 146 - 147 - func (s *shardWatcher) loadShard(name string) { 148 - shard, err := loadShard(filepath.Join(s.dir, name)) 149 - log.Printf("reloading: %s, err %v ", name, err) 150 - if err != nil { 151 - return 152 - } 153 - s.replace(name, shard) 154 - } 155 - 156 - func (s *shardWatcher) rlock() { 157 - s.throttle <- struct{}{} 158 - } 159 - 160 - // getShards returns the currently loaded shards. The shards must be 161 - // accessed under a rlock call. 162 - func (s *shardWatcher) getShards() []zoekt.Searcher { 163 - var res []zoekt.Searcher 164 - for _, sh := range s.shards { 165 - res = append(res, sh) 166 - } 167 - return res 168 - } 169 - 170 - func (s *shardWatcher) runlock() { 171 - <-s.throttle 172 - } 173 - 174 - func (s *shardWatcher) lock() { 175 - n := cap(s.throttle) 176 - for n > 0 { 177 - s.throttle <- struct{}{} 178 - n-- 179 - } 180 - } 181 - 182 - func (s *shardWatcher) unlock() { 183 - n := cap(s.throttle) 184 - for n > 0 { 185 - <-s.throttle 186 - n-- 187 - } 188 - } 189 - 190 - func (s *shardWatcher) replace(key string, shard *searchShard) { 191 - s.lock() 192 - defer s.unlock() 193 - old := s.shards[key] 194 - if old != nil { 195 - old.Close() 196 - } 197 - if shard != nil { 198 - s.shards[key] = shard 199 - } else { 200 - delete(s.shards, key) 201 - } 202 - } 203 - 204 - func (s *shardWatcher) watch() error { 205 - watcher, err := fsnotify.NewWatcher() 206 - if err != nil { 207 - return err 208 - } 209 - if err := watcher.Add(s.dir); err != nil { 210 - return err 211 - } 212 - 213 - go func() { 214 - for { 215 - select { 216 - case <-watcher.Events: 217 - s.scan() 218 - case err := <-watcher.Errors: 219 - if err != nil { 220 - log.Println("watcher error:", err) 221 - } 222 - case <-s.quit: 223 - watcher.Close() 224 - return 225 - } 226 - } 227 - }() 228 - return nil 229 - }