fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "fmt"
5 "io"
6 "net/http"
7 "net/http/httptest"
8 "strconv"
9 "strings"
10 "testing"
11 "time"
12
13 "github.com/google/go-cmp/cmp"
14 "github.com/sourcegraph/log/logtest"
15 "github.com/sourcegraph/zoekt"
16)
17
18func TestQueue(t *testing.T) {
19 backoffDuration := 1 * time.Millisecond
20 queue := NewQueue(backoffDuration, backoffDuration, logtest.Scoped(t))
21
22 for i := 0; i < 100; i++ {
23 queue.AddOrUpdate(mkHEADIndexOptions(i, strconv.Itoa(i)))
24 }
25
26 // Odd numbers are already at the same commit
27 for i := 1; i < 100; i += 2 {
28 queue.SetIndexed(mkHEADIndexOptions(i, strconv.Itoa(i)), indexStateSuccess)
29 }
30
31 // Ensure we process all the even commits first, then odd.
32 want := 0
33 for {
34 item, ok := queue.Pop()
35 if !ok {
36 break
37 }
38 got, _ := strconv.Atoi(item.Opts.Branches[0].Version)
39 if got != want {
40 t.Fatalf("got %v, want %v", got, want)
41 }
42 want += 2
43 if want == 100 {
44 // We now switch to processing the odd numbers
45 want = 1
46 }
47
48 // sanity check the date added
49 if item.DateAddedToQueue.Unix() <= 0 {
50 t.Fatalf("invalid DateAddedToQueue %v", item.DateAddedToQueue)
51 }
52
53 // update current, shouldn't put the job in the queue
54 queue.SetIndexed(item.Opts, indexStateSuccess)
55 }
56 if want != 101 {
57 t.Fatalf("only popped %d items", want)
58 }
59}
60
61func TestQueueFIFO(t *testing.T) {
62 // Tests that the queue fallbacks to FIFO if everything has the same
63 // priority
64 backoffDuration := 1 * time.Millisecond
65 queue := NewQueue(backoffDuration, backoffDuration, logtest.Scoped(t))
66
67 for i := 0; i < 100; i++ {
68 queue.AddOrUpdate(mkHEADIndexOptions(i, strconv.Itoa(i)))
69 }
70
71 want := 0
72 for {
73 item, ok := queue.Pop()
74 if !ok {
75 break
76 }
77 got, _ := strconv.Atoi(item.Opts.Branches[0].Version)
78 if got != want {
79 t.Fatalf("got %v, want %v", item, want)
80 }
81 queue.SetIndexed(item.Opts, indexStateSuccess)
82 want++
83 }
84 if want != 100 {
85 t.Fatalf("only popped %d items", want)
86 }
87}
88
89func TestQueue_MaybeRemoveMissing(t *testing.T) {
90 backoffDuration := 1 * time.Millisecond
91 queue := NewQueue(backoffDuration, backoffDuration, logtest.Scoped(t))
92
93 queue.AddOrUpdate(IndexOptions{RepoID: 1, Name: "foo"})
94 queue.AddOrUpdate(IndexOptions{RepoID: 2, Name: "bar"})
95 queue.MaybeRemoveMissing([]uint32{2})
96
97 item, _ := queue.Pop()
98 if item.Opts.Name != "bar" {
99 t.Fatalf("queue should only contain bar, pop returned %v", item.Opts.Name)
100 }
101 _, ok := queue.Pop()
102 if ok {
103 t.Fatal("queue should be empty")
104 }
105}
106
107func TestQueue_Bump(t *testing.T) {
108 backoffDuration := 1 * time.Millisecond
109 queue := NewQueue(backoffDuration, backoffDuration, logtest.Scoped(t))
110
111 queue.AddOrUpdate(IndexOptions{RepoID: 1, Name: "foo"})
112 queue.AddOrUpdate(IndexOptions{RepoID: 2, Name: "bar"})
113
114 emptyQueue(queue)
115
116 // Bump 2 and 3. 3 doesn't exist, so only 2 should exist.
117 missing := queue.Bump([]uint32{2, 3})
118 if d := cmp.Diff([]uint32{3}, missing); d != "" {
119 t.Errorf("unexpected missing (-want, +got):\n%s", d)
120 }
121
122 want := []IndexOptions{{RepoID: 2, Name: "bar"}}
123 var got []IndexOptions
124 for {
125 item, ok := queue.Pop()
126 if !ok {
127 break
128 }
129 got = append(got, item.Opts)
130 }
131
132 if d := cmp.Diff(want, got); d != "" {
133 t.Errorf("unexpected items bumped into the queue (-want, +got):\n%s", d)
134 }
135}
136
137func TestQueue_Integration_DebugQueue(t *testing.T) {
138 // helper function to normalize the queue's debug output - this makes the test less brittle
139 // + makes it much less annoying to make edits to the expected output in a way that doesn't
140 // materially affect the caller
141 normalizeDebugOutput := func(output string) string {
142 output = strings.TrimSpace(output)
143
144 var outputLines []string
145 for i, line := range strings.Split(output, "\n") {
146 columns := []string{"Position", "Name", "ID", "IsOnQueue", "Age", "Branches"}
147 parts := strings.Fields(line) // Note: splitting on spaces like this would break for repositories that have more than one branch, but it's fine for just this test
148 if len(columns) != len(parts) {
149 t.Fatalf("normalizeDebugOutput: line %d: expected %d columns, got %d columns: %q", i, len(columns), len(parts), line)
150 }
151
152 if i > 0 { // skip past the first line which just contains the column headings
153
154 // The debug output contains time.Durations for tracking the amount of time an indexing job
155 // spent in the queue, but it's not reasonable to assert on this kind of timing minutia.
156 // So, for comparison purposes, we massage the contents of this field in the following manner:
157 //
158 // - "1m30s" -> "*" (for jobs that are still enqueued)
159 // - "-" -> "-" (for jobs that are tracked, but are not currently enqueued)
160
161 if parts[4] != "-" {
162 parts[4] = "*"
163 }
164 }
165
166 outputLines = append(outputLines, strings.Join(parts, " "))
167 }
168
169 return strings.Join(outputLines, "\n")
170 }
171
172 backoffDuration := 1 * time.Millisecond
173 queue := NewQueue(backoffDuration, backoffDuration, logtest.Scoped(t))
174
175 // setup: add two repositories to the queue and pop one of them
176 poppedRepository := mkHEADIndexOptions(0, "popped")
177 queuedRepository := mkHEADIndexOptions(1, "stillQueued")
178
179 queue.AddOrUpdate(poppedRepository)
180 queue.Pop()
181
182 queue.AddOrUpdate(queuedRepository)
183
184 // setup: start test http server that forwards requests to the
185 // queue instance
186 server := httptest.NewServer(http.HandlerFunc(queue.handleDebugQueue))
187 defer server.Close()
188
189 // test: send a request to the queue's debug endpoint
190 response, err := http.Get(server.URL)
191 if err != nil {
192 t.Fatalf(err.Error())
193 }
194
195 defer response.Body.Close()
196 raw, err := io.ReadAll(response.Body)
197 if err != nil {
198 t.Errorf("reading response body: %s", err)
199 }
200
201 actualOutput := normalizeDebugOutput(string(raw))
202
203 expectedOutput := `
204Position Name ID IsOnQueue Age Branches
2050 item-1 1 true * HEAD@stillQueued
2061 item-0 0 false - HEAD@popped
207`
208
209 expectedOutput = normalizeDebugOutput(expectedOutput)
210
211 // verify: ensure that the received output matches what we expect
212 if diff := cmp.Diff(expectedOutput, actualOutput); diff != "" {
213 t.Errorf("unexpected diff in output (-want +got):\n%s", diff)
214 }
215}
216
217func mkHEADIndexOptions(id int, version string) IndexOptions {
218 return IndexOptions{
219 RepoID: uint32(id),
220 Name: fmt.Sprintf("item-%d", id),
221 Branches: []zoekt.RepositoryBranch{{Name: "HEAD", Version: version}},
222 }
223}