fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

all: rely on priority in RawConfig only (#115)

This allows us to remove maintaining priority.json and loading it in. I
believe this simplifies the code quite a bit. We can do this now that we
have cheap metadata updates.

Change-Id: I78c3bfbdf8e06e9e47c05f96513e043e77a97815

+24 -153
-74
cmd/zoekt-sourcegraph-indexserver/main.go
··· 19 19 "net/url" 20 20 "os" 21 21 "os/exec" 22 - "path" 23 22 "path/filepath" 24 23 "runtime" 25 24 "sort" ··· 78 77 Name: "index_num_assigned", 79 78 Help: "Number of repos assigned to this indexer by code host", 80 79 }, []string{"codehost"}) 81 - 82 - metricNumPriorities = promauto.NewGauge(prometheus.GaugeOpts{ 83 - Name: "index_priorities_total", 84 - Help: "Number of indexed repos with non-zero priorities", 85 - }) 86 - 87 - metricNumPriorityUpdates = promauto.NewCounter(prometheus.CounterOpts{ 88 - Name: "index_priorities_update_total", 89 - Help: "Total number of times repo ranking has been written to priority.json", 90 - }) 91 80 92 81 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 93 82 Name: "index_failing_total", ··· 287 276 tr := trace.New("getIndexOptions", "") 288 277 tr.LazyPrintf("getting index options for %d repos", len(repos)) 289 278 290 - newPriorities := make(map[string]float64) 291 - 292 279 // We ask the frontend to get index options in batches. 293 280 for repos := range batched(repos, 1000) { 294 281 start := time.Now() ··· 308 295 tr.SetError() 309 296 continue 310 297 } 311 - if opt.Priority != 0 { 312 - newPriorities[name] = opt.Priority 313 - } 314 298 queue.AddOrUpdate(name, opt.IndexOptions) 315 299 } 316 300 } 317 - s.maybeUpdatePriorities(repos, newPriorities) 318 301 319 302 metricResolveRevisionsDuration.Observe(time.Since(start).Seconds()) 320 303 tr.Finish() ··· 347 330 } 348 331 queue.SetIndexed(name, opts, state) 349 332 } 350 - } 351 - 352 - // Update priority.json given new entries, and remove no longer tracked repos. 353 - // This doesn't simply write newPriorities because a transient getIndexOptions failure 354 - // would cause the associated repo to get deprioritized. 355 - func (s *Server) maybeUpdatePriorities(names []string, newPriorities map[string]float64) { 356 - priorityPath := path.Join(s.IndexDir, "priority.json") 357 - priorities := map[string]float64{} 358 - buf, err := ioutil.ReadFile(priorityPath) 359 - if err == nil { 360 - err = json.Unmarshal(buf, &priorities) 361 - if err != nil { 362 - log.Printf("warning: error loading old priority.json, treating as empty: %v", err) 363 - } 364 - } 365 - 366 - // maybe remove no-longer-tracked repos from the priorities list 367 - if len(names) != len(priorities) { 368 - set := make(map[string]struct{}, len(names)) 369 - for _, name := range names { 370 - set[name] = struct{}{} 371 - } 372 - for name := range priorities { 373 - if _, ok := set[name]; !ok { 374 - delete(priorities, name) 375 - } 376 - } 377 - } 378 - 379 - for name, priority := range newPriorities { 380 - priorities[name] = priority 381 - } 382 - 383 - metricNumPriorities.Set(float64(len(priorities))) 384 - 385 - newBuf, err := json.Marshal(priorities) 386 - if err != nil { 387 - log.Printf("error marshaling new priority.json: %v", err) 388 - } 389 - newBuf = append(newBuf, '\n') // prettier 390 - 391 - if bytes.Equal(buf, newBuf) { 392 - return // no need to rewrite priority.json 393 - } 394 - 395 - err = ioutil.WriteFile(priorityPath+".tmp", newBuf, 0644) 396 - if err != nil { 397 - log.Printf("error writing new priority.json: %v", err) 398 - return 399 - } 400 - 401 - err = os.Rename(priorityPath+".tmp", priorityPath) 402 - if err != nil { 403 - log.Printf("error renaming new priority.json into place: %v", err) 404 - } 405 - 406 - metricNumPriorityUpdates.Inc() 407 333 } 408 334 409 335 func batched(slice []string, size int) <-chan []string {
+24 -79
shards/shards.go
··· 16 16 17 17 import ( 18 18 "context" 19 - "encoding/json" 20 19 "fmt" 21 - "io/ioutil" 22 20 "log" 23 21 "math" 24 22 "os" 25 - "path" 26 23 "runtime" 27 24 "runtime/debug" 28 25 "sort" 26 + "strconv" 29 27 "sync" 30 28 "time" 31 29 ··· 148 146 149 147 rankedVersion uint64 150 148 ranked []rankedShard 151 - 152 - priority map[string]float64 153 149 } 154 150 155 151 func newShardedSearcher(n int64) *shardedSearcher { 156 152 ss := &shardedSearcher{ 157 - shards: make(map[string]rankedShard), 158 - sched: newScheduler(n), 159 - priority: make(map[string]float64), 153 + shards: make(map[string]rankedShard), 154 + sched: newScheduler(n), 160 155 } 161 156 return ss 162 157 } 163 158 164 - func (ss *shardedSearcher) watchPriorities(dir string, done chan struct{}) { 165 - priorityPath := path.Join(dir, "priority.json") 166 - var lastMtime time.Time 167 - 168 - loadPriority := func() { 169 - st, err := os.Stat(priorityPath) 170 - if err != nil { 171 - return // ignore missing file errors 172 - } 173 - if st.ModTime() == lastMtime { 174 - return // file is unchanged 175 - } 176 - lastMtime = st.ModTime() 177 - buf, err := ioutil.ReadFile(priorityPath) 178 - if err != nil { 179 - log.Printf("reloading priority.json, error %v", err) 180 - return 181 - } 182 - priority := make(map[string]float64) 183 - err = json.Unmarshal(buf, &priority) 184 - if err != nil { 185 - log.Printf("reloading priority.json, error %v", err) 186 - return 187 - } 188 - 189 - log.Printf("reloading priority.json: %d shards have priorities", len(priority)) 190 - 191 - // get an exclusive lock to update priority map and invalidate ranking 192 - proc := ss.sched.Exclusive() 193 - defer proc.Release() 194 - ss.priority = priority 195 - ss.rankedVersion++ 196 - ss.ranked = nil // will regenerate ranking on next request 197 - } 198 - 199 - loadPriority() 200 - 201 - // fsnotify is more efficient for watching a large number of files for changes, but a 202 - // single stat call once a minute to check one file's modified time is negligible. 203 - ticker := time.NewTicker(1 * time.Minute) 204 - for { 205 - select { 206 - case <-done: 207 - ticker.Stop() 208 - return 209 - case <-ticker.C: 210 - loadPriority() 211 - } 212 - } 213 - } 214 - 215 159 // NewDirectorySearcher returns a searcher instance that loads all 216 160 // shards corresponding to a glob into memory. 217 161 func NewDirectorySearcher(dir string) (zoekt.Streamer, error) { ··· 223 167 if err != nil { 224 168 return nil, err 225 169 } 226 - 227 - go ss.watchPriorities(dir, dw.quit) 228 170 229 171 ds := &directorySearcher{ 230 172 Streamer: ss, ··· 732 674 return s.ranked 733 675 } 734 676 735 - var res []rankedShard 677 + res := make([]rankedShard, 0, len(s.shards)) 736 678 for _, sh := range s.shards { 737 - // Add the current priority to the sorted list of ranked shards. 738 - // This will be used for downstream result reordering. 739 - res = append(res, rankedShard{ 740 - name: sh.name, 741 - priority: s.priority[sh.name], 742 - Searcher: sh.Searcher, 743 - }) 679 + res = append(res, sh) 744 680 } 745 681 sort.Slice(res, func(i, j int) bool { 746 682 priorityDiff := res[i].priority - res[j].priority ··· 764 700 return res 765 701 } 766 702 767 - func shardName(s zoekt.Searcher) string { 703 + func mkRankedShard(s zoekt.Searcher) rankedShard { 768 704 q := query.Repo{} 769 705 result, err := s.List(context.Background(), &q, nil) 770 706 if err != nil { 771 - return "" 707 + return rankedShard{Searcher: s} 772 708 } 773 709 if len(result.Repos) == 0 { 774 - return "" 710 + return rankedShard{Searcher: s} 711 + } 712 + 713 + repo := result.Repos[0].Repository 714 + 715 + var priority float64 716 + if repo.RawConfig != nil { 717 + priority, _ = strconv.ParseFloat(repo.RawConfig["priority"], 64) 775 718 } 776 - return result.Repos[0].Repository.Name 719 + 720 + return rankedShard{ 721 + Searcher: s, 722 + name: repo.Name, 723 + priority: priority, 724 + } 777 725 } 778 726 779 727 func (s *shardedSearcher) replace(key string, shard zoekt.Searcher) { 780 - var name string 728 + var ranked rankedShard 781 729 if shard != nil { 782 - name = shardName(shard) 730 + ranked = mkRankedShard(shard) 783 731 } 784 732 785 733 proc := s.sched.Exclusive() ··· 793 741 if shard == nil { 794 742 delete(s.shards, key) 795 743 } else { 796 - s.shards[key] = rankedShard{ 797 - name: name, 798 - Searcher: shard, 799 - } 744 + s.shards[key] = ranked 800 745 } 801 746 s.rankedVersion++ 802 747 s.ranked = nil