fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "fmt"
5 "io"
6 "net/http"
7 "net/http/httptest"
8 "strconv"
9 "strings"
10 "testing"
11 "time"
12
13 "github.com/google/go-cmp/cmp"
14 "github.com/sourcegraph/log/logtest"
15
16 "github.com/sourcegraph/zoekt"
17)
18
19func TestQueue(t *testing.T) {
20 backoffDuration := 1 * time.Millisecond
21 queue := NewQueue(backoffDuration, backoffDuration, logtest.Scoped(t))
22
23 for i := range 100 {
24 queue.AddOrUpdate(mkHEADIndexOptions(i, strconv.Itoa(i)))
25 }
26
27 // Odd numbers are already at the same commit
28 for i := 1; i < 100; i += 2 {
29 queue.SetIndexed(mkHEADIndexOptions(i, strconv.Itoa(i)), indexStateSuccess)
30 }
31
32 // Ensure we process all the even commits first, then odd.
33 want := 0
34 for {
35 item, ok := queue.Pop()
36 if !ok {
37 break
38 }
39 got, _ := strconv.Atoi(item.Opts.Branches[0].Version)
40 if got != want {
41 t.Fatalf("got %v, want %v", got, want)
42 }
43 want += 2
44 if want == 100 {
45 // We now switch to processing the odd numbers
46 want = 1
47 }
48
49 // sanity check the date added
50 if item.DateAddedToQueue.Unix() <= 0 {
51 t.Fatalf("invalid DateAddedToQueue %v", item.DateAddedToQueue)
52 }
53
54 // update current, shouldn't put the job in the queue
55 queue.SetIndexed(item.Opts, indexStateSuccess)
56 }
57 if want != 101 {
58 t.Fatalf("only popped %d items", want)
59 }
60}
61
62func TestQueueFIFO(t *testing.T) {
63 // Tests that the queue fallbacks to FIFO if everything has the same
64 // priority
65 backoffDuration := 1 * time.Millisecond
66 queue := NewQueue(backoffDuration, backoffDuration, logtest.Scoped(t))
67
68 for i := range 100 {
69 queue.AddOrUpdate(mkHEADIndexOptions(i, strconv.Itoa(i)))
70 }
71
72 want := 0
73 for {
74 item, ok := queue.Pop()
75 if !ok {
76 break
77 }
78 got, _ := strconv.Atoi(item.Opts.Branches[0].Version)
79 if got != want {
80 t.Fatalf("got %v, want %v", item, want)
81 }
82 queue.SetIndexed(item.Opts, indexStateSuccess)
83 want++
84 }
85 if want != 100 {
86 t.Fatalf("only popped %d items", want)
87 }
88}
89
90func TestQueue_MaybeRemoveMissing(t *testing.T) {
91 backoffDuration := 1 * time.Millisecond
92 queue := NewQueue(backoffDuration, backoffDuration, logtest.Scoped(t))
93
94 queue.AddOrUpdate(IndexOptions{RepoID: 1, Name: "foo"})
95 queue.AddOrUpdate(IndexOptions{RepoID: 2, Name: "bar"})
96 queue.MaybeRemoveMissing([]uint32{2})
97
98 item, _ := queue.Pop()
99 if item.Opts.Name != "bar" {
100 t.Fatalf("queue should only contain bar, pop returned %v", item.Opts.Name)
101 }
102 _, ok := queue.Pop()
103 if ok {
104 t.Fatal("queue should be empty")
105 }
106}
107
108func TestQueue_Bump(t *testing.T) {
109 backoffDuration := 1 * time.Millisecond
110 queue := NewQueue(backoffDuration, backoffDuration, logtest.Scoped(t))
111
112 queue.AddOrUpdate(IndexOptions{RepoID: 1, Name: "foo"})
113 queue.AddOrUpdate(IndexOptions{RepoID: 2, Name: "bar"})
114
115 emptyQueue(queue)
116
117 // Bump 2 and 3. 3 doesn't exist, so only 2 should exist.
118 missing := queue.Bump([]uint32{2, 3})
119 if d := cmp.Diff([]uint32{3}, missing); d != "" {
120 t.Errorf("unexpected missing (-want, +got):\n%s", d)
121 }
122
123 want := []IndexOptions{{RepoID: 2, Name: "bar"}}
124 var got []IndexOptions
125 for {
126 item, ok := queue.Pop()
127 if !ok {
128 break
129 }
130 got = append(got, item.Opts)
131 }
132
133 if d := cmp.Diff(want, got); d != "" {
134 t.Errorf("unexpected items bumped into the queue (-want, +got):\n%s", d)
135 }
136}
137
138func TestQueue_Integration_DebugQueue(t *testing.T) {
139 // helper function to normalize the queue's debug output - this makes the test less brittle
140 // + makes it much less annoying to make edits to the expected output in a way that doesn't
141 // materially affect the caller
142 normalizeDebugOutput := func(output string) string {
143 output = strings.TrimSpace(output)
144
145 var outputLines []string
146 for i, line := range strings.Split(output, "\n") {
147 columns := []string{"Position", "Name", "ID", "IsOnQueue", "Age", "Branches"}
148 parts := strings.Fields(line) // Note: splitting on spaces like this would break for repositories that have more than one branch, but it's fine for just this test
149 if len(columns) != len(parts) {
150 t.Fatalf("normalizeDebugOutput: line %d: expected %d columns, got %d columns: %q", i, len(columns), len(parts), line)
151 }
152
153 if i > 0 { // skip past the first line which just contains the column headings
154
155 // The debug output contains time.Durations for tracking the amount of time an indexing job
156 // spent in the queue, but it's not reasonable to assert on this kind of timing minutia.
157 // So, for comparison purposes, we massage the contents of this field in the following manner:
158 //
159 // - "1m30s" -> "*" (for jobs that are still enqueued)
160 // - "-" -> "-" (for jobs that are tracked, but are not currently enqueued)
161
162 if parts[4] != "-" {
163 parts[4] = "*"
164 }
165 }
166
167 outputLines = append(outputLines, strings.Join(parts, " "))
168 }
169
170 return strings.Join(outputLines, "\n")
171 }
172
173 backoffDuration := 1 * time.Millisecond
174 queue := NewQueue(backoffDuration, backoffDuration, logtest.Scoped(t))
175
176 // setup: add two repositories to the queue and pop one of them
177 poppedRepository := mkHEADIndexOptions(0, "popped")
178 queuedRepository := mkHEADIndexOptions(1, "stillQueued")
179
180 queue.AddOrUpdate(poppedRepository)
181 queue.Pop()
182
183 queue.AddOrUpdate(queuedRepository)
184
185 // setup: start test http server that forwards requests to the
186 // queue instance
187 server := httptest.NewServer(http.HandlerFunc(queue.handleDebugQueue))
188 defer server.Close()
189
190 // test: send a request to the queue's debug endpoint
191 response, err := http.Get(server.URL)
192 if err != nil {
193 t.Fatalf(err.Error())
194 }
195
196 defer response.Body.Close()
197 raw, err := io.ReadAll(response.Body)
198 if err != nil {
199 t.Errorf("reading response body: %s", err)
200 }
201
202 actualOutput := normalizeDebugOutput(string(raw))
203
204 expectedOutput := `
205Position Name ID IsOnQueue Age Branches
2060 item-1 1 true * HEAD@stillQueued
2071 item-0 0 false - HEAD@popped
208`
209
210 expectedOutput = normalizeDebugOutput(expectedOutput)
211
212 // verify: ensure that the received output matches what we expect
213 if diff := cmp.Diff(expectedOutput, actualOutput); diff != "" {
214 t.Errorf("unexpected diff in output (-want +got):\n%s", diff)
215 }
216}
217
218func mkHEADIndexOptions(id int, version string) IndexOptions {
219 return IndexOptions{
220 RepoID: uint32(id),
221 Name: fmt.Sprintf("item-%d", id),
222 Branches: []zoekt.RepositoryBranch{{Name: "HEAD", Version: version}},
223 }
224}