fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

gitindex: replace go-git blob reading with pipelined git cat-file --batch (#1021)

* gitindex: replace go-git blob reading with pipelined git cat-file --batch

Replace the serial go-git BlobObject calls in indexGitRepo with a single
pipelined "git cat-file --batch --buffer" subprocess. A writer goroutine
feeds all blob SHAs to stdin while the main goroutine reads responses
from stdout, forming a concurrent pipeline that eliminates per-object
packfile seek overhead and leverages git's internal delta base cache.

Submodule blobs fall back to the existing go-git createDocument path.

Benchmarked on kubernetes (29,188 files, 261 MB), Apple M1 Max, 5 runs:

go-git BlobObject (before):
Time: 2.94s Allocs: 685K Memory: 691 MB

cat-file pipelined (after):
Time: 0.60s Allocs: 58K Memory: 276 MB

Speedup: 4.9x time, 12x fewer allocs, 2.5x less memory

* gitindex: streaming catfileReader API, skip large blobs without reading

Replace the bulk readBlobsPipelined (which read all blobs into a
[]blobResult slice) with a streaming catfileReader modeled after
archive/tar.Reader:

cr, _ := newCatfileReader(repoDir, ids)
for {
size, missing, err := cr.Next()
if size > maxSize { continue } // auto-skipped, never read
content := make([]byte, size)
io.ReadFull(cr, content)
}

Next() reads the cat-file header and returns the blob's size. The
caller decides whether to Read the content or skip it — calling Next()
again automatically discards unread bytes via bufio.Reader.Discard.
Large blobs over SizeMax are never allocated or read into Go memory.

Also split the single interleaved loop into two: one for main-repo
blobs streamed via cat-file, one for submodule blobs via go-git's
createDocument. The builder sorts documents internally so ordering
between the loops does not matter.

Peak memory is now bounded by ShardMax (one shard's worth of content)
rather than total repository size.

* gitindex: harden catfileReader Close, add kill switch and SkipReasonMissing

Address review feedback on PR #1021:

- Make Close() idempotent via sync.Once; kill the git process first
(matching Gitaly's pattern) instead of draining all remaining stdout,
so early termination is fast. Suppress the expected SIGKILL exit error.
Add defer close(writeErr) in the writer goroutine to prevent deadlock
on double-close.

- Change Next() return and pending field from int64 to int, use
strconv.Atoi. Removes casts at all call sites; SizeMax is already int.

- Add SkipReasonMissing for blobs that git cat-file reports as missing,
instead of reusing SkipReasonTooLarge. Missing is unexpected for local
repos (corruption, shallow clone, gc race) so log a warning.

- Extract indexCatfileBlobs() with defer cr.Close(), eliminating four
manual Close() calls on error paths.

- Add ZOEKT_DISABLE_CATFILE_BATCH env var kill switch following the
existing ZOEKT_DISABLE_GOGIT_OPTIMIZATION pattern. When set, all blobs
fall back to the go-git createDocument path.

- Deduplicate skippedLargeDoc/skippedMissingDoc into skippedDoc(reason).

- Add 19 hardening tests covering Close lifecycle (double close,
concurrent close, early termination), Read edge cases (partial reads,
1-byte buffer, empty blobs, read-without-next), missing object
sequences, large blob byte precision, and duplicate SHAs.

Benchmarked on kubernetes (29,188 files): no performance regression
(geomean -0.89%, within noise).

+1549 -20
+229
gitindex/catfile.go
··· 1 + // Copyright 2016 Google Inc. All rights reserved. 2 + // 3 + // Licensed under the Apache License, Version 2.0 (the "License"); 4 + // you may not use this file except in compliance with the License. 5 + // You may obtain a copy of the License at 6 + // 7 + // http://www.apache.org/licenses/LICENSE-2.0 8 + // 9 + // Unless required by applicable law or agreed to in writing, software 10 + // distributed under the License is distributed on an "AS IS" BASIS, 11 + // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 + // See the License for the specific language governing permissions and 13 + // limitations under the License. 14 + 15 + package gitindex 16 + 17 + import ( 18 + "bufio" 19 + "bytes" 20 + "encoding/hex" 21 + "fmt" 22 + "io" 23 + "os/exec" 24 + "strconv" 25 + "sync" 26 + "syscall" 27 + 28 + "github.com/go-git/go-git/v5/plumbing" 29 + ) 30 + 31 + // catfileReader provides streaming access to git blob objects via a pipelined 32 + // "git cat-file --batch --buffer" process. A writer goroutine feeds all blob 33 + // SHAs to stdin while the caller reads responses one at a time, similar to 34 + // archive/tar.Reader. 35 + // 36 + // The --buffer flag switches git's output from per-object flush (write_or_die) 37 + // to libc stdio buffering (fwrite), reducing syscalls. After stdin EOF, git 38 + // calls fflush(stdout) to deliver any remaining output. 39 + // 40 + // Usage: 41 + // 42 + // cr, err := newCatfileReader(repoDir, ids) 43 + // if err != nil { ... } 44 + // defer cr.Close() 45 + // 46 + // for { 47 + // size, missing, err := cr.Next() 48 + // if err == io.EOF { break } 49 + // if missing { continue } 50 + // if size > maxSize { continue } // unread bytes auto-skipped 51 + // content := make([]byte, size) 52 + // io.ReadFull(cr, content) 53 + // } 54 + type catfileReader struct { 55 + cmd *exec.Cmd 56 + reader *bufio.Reader 57 + writeErr <-chan error 58 + 59 + // pending tracks unread content bytes + trailing LF for the current 60 + // entry. Next() discards any pending bytes before reading the next header. 61 + pending int 62 + 63 + closeOnce sync.Once 64 + closeErr error 65 + } 66 + 67 + // newCatfileReader starts a "git cat-file --batch --buffer" process and feeds 68 + // all ids to its stdin via a background goroutine. The caller must call Close 69 + // when done. 70 + func newCatfileReader(repoDir string, ids []plumbing.Hash) (*catfileReader, error) { 71 + cmd := exec.Command("git", "cat-file", "--batch", "--buffer") 72 + cmd.Dir = repoDir 73 + 74 + stdin, err := cmd.StdinPipe() 75 + if err != nil { 76 + return nil, fmt.Errorf("stdin pipe: %w", err) 77 + } 78 + 79 + stdout, err := cmd.StdoutPipe() 80 + if err != nil { 81 + stdin.Close() 82 + return nil, fmt.Errorf("stdout pipe: %w", err) 83 + } 84 + 85 + if err := cmd.Start(); err != nil { 86 + stdin.Close() 87 + stdout.Close() 88 + return nil, fmt.Errorf("start git cat-file: %w", err) 89 + } 90 + 91 + // Writer goroutine: feed all SHAs then close stdin to trigger flush. 92 + writeErr := make(chan error, 1) 93 + go func() { 94 + defer close(writeErr) 95 + defer stdin.Close() 96 + bw := bufio.NewWriterSize(stdin, 64*1024) 97 + var hexBuf [41]byte 98 + hexBuf[40] = '\n' 99 + for _, id := range ids { 100 + hex.Encode(hexBuf[:40], id[:]) 101 + if _, err := bw.Write(hexBuf[:]); err != nil { 102 + writeErr <- err 103 + return 104 + } 105 + } 106 + writeErr <- bw.Flush() 107 + }() 108 + 109 + return &catfileReader{ 110 + cmd: cmd, 111 + reader: bufio.NewReaderSize(stdout, 512*1024), 112 + writeErr: writeErr, 113 + }, nil 114 + } 115 + 116 + // Next advances to the next blob entry. It returns the blob's size and whether 117 + // it is missing. Any unread content from the previous entry is automatically 118 + // discarded. Returns io.EOF when all entries have been consumed. 119 + // 120 + // After Next returns successfully with missing=false, call Read to consume the 121 + // blob content, or call Next again to skip it. 122 + func (cr *catfileReader) Next() (size int, missing bool, err error) { 123 + // Discard unread content from the previous entry. 124 + if cr.pending > 0 { 125 + if _, err := cr.reader.Discard(cr.pending); err != nil { 126 + return 0, false, fmt.Errorf("discard pending bytes: %w", err) 127 + } 128 + cr.pending = 0 129 + } 130 + 131 + headerBytes, err := cr.reader.ReadBytes('\n') 132 + if err != nil { 133 + if err == io.EOF { 134 + return 0, false, io.EOF 135 + } 136 + return 0, false, fmt.Errorf("read header: %w", err) 137 + } 138 + header := headerBytes[:len(headerBytes)-1] // trim \n 139 + 140 + if bytes.HasSuffix(header, []byte(" missing")) { 141 + return 0, true, nil 142 + } 143 + 144 + // Parse size from "<oid> <type> <size>". 145 + lastSpace := bytes.LastIndexByte(header, ' ') 146 + if lastSpace == -1 { 147 + return 0, false, fmt.Errorf("unexpected header: %q", header) 148 + } 149 + size, err = strconv.Atoi(string(header[lastSpace+1:])) 150 + if err != nil { 151 + return 0, false, fmt.Errorf("parse size from %q: %w", header, err) 152 + } 153 + 154 + // Track pending bytes: content + trailing LF. 155 + cr.pending = size + 1 156 + return size, false, nil 157 + } 158 + 159 + // Read reads from the current blob's content. Implements io.Reader. Returns 160 + // io.EOF when the blob's content has been fully read (the trailing LF 161 + // delimiter is consumed automatically). 162 + func (cr *catfileReader) Read(p []byte) (int, error) { 163 + if cr.pending <= 0 { 164 + return 0, io.EOF 165 + } 166 + 167 + // Don't read into the trailing LF byte — reserve it. 168 + contentRemaining := cr.pending - 1 169 + if contentRemaining <= 0 { 170 + // Only the trailing LF remains; consume it and signal EOF. 171 + if _, err := cr.reader.ReadByte(); err != nil { 172 + return 0, fmt.Errorf("read trailing LF: %w", err) 173 + } 174 + cr.pending = 0 175 + return 0, io.EOF 176 + } 177 + 178 + // Limit the read to the remaining content bytes. 179 + if len(p) > contentRemaining { 180 + p = p[:contentRemaining] 181 + } 182 + n, err := cr.reader.Read(p) 183 + cr.pending -= n 184 + if err != nil { 185 + return n, err 186 + } 187 + 188 + // If we've consumed all content bytes, also consume the trailing LF. 189 + if cr.pending == 1 { 190 + if _, err := cr.reader.ReadByte(); err != nil { 191 + return n, fmt.Errorf("read trailing LF: %w", err) 192 + } 193 + cr.pending = 0 194 + } 195 + 196 + return n, nil 197 + } 198 + 199 + // Close shuts down the cat-file process and waits for it to exit. 200 + // It is safe to call Close multiple times or concurrently. 201 + func (cr *catfileReader) Close() error { 202 + cr.closeOnce.Do(func() { 203 + // Kill first to avoid blocking on drain when there are many 204 + // unconsumed entries. Gitaly uses the same kill-first pattern. 205 + _ = cr.cmd.Process.Kill() 206 + // Drain any buffered stdout so the pipe closes cleanly. 207 + // Must complete before cmd.Wait(), which closes the pipe. 208 + _, _ = io.Copy(io.Discard, cr.reader) 209 + // Wait for writer goroutine (unblocks via broken pipe from Kill). 210 + <-cr.writeErr 211 + err := cr.cmd.Wait() 212 + // Suppress the expected "signal: killed" error from our own Kill(). 213 + if isKilledErr(err) { 214 + err = nil 215 + } 216 + cr.closeErr = err 217 + }) 218 + return cr.closeErr 219 + } 220 + 221 + // isKilledErr reports whether err is an exec.ExitError caused by SIGKILL. 222 + func isKilledErr(err error) bool { 223 + exitErr, ok := err.(*exec.ExitError) 224 + if !ok { 225 + return false 226 + } 227 + ws, ok := exitErr.Sys().(syscall.WaitStatus) 228 + return ok && ws.Signal() == syscall.SIGKILL 229 + }
+159
gitindex/catfile_bench_test.go
··· 1 + package gitindex 2 + 3 + import ( 4 + "fmt" 5 + "io" 6 + "os" 7 + "testing" 8 + 9 + "github.com/go-git/go-git/v5/plumbing" 10 + ) 11 + 12 + // Set ZOEKT_BENCH_REPO to a git checkout to enable these benchmarks. 13 + // 14 + // git clone --depth=1 https://github.com/kubernetes/kubernetes /tmp/k8s 15 + // ZOEKT_BENCH_REPO=/tmp/k8s go test ./gitindex/ -bench=BenchmarkBlobRead -benchmem -count=5 -timeout=600s 16 + 17 + func requireBenchGitRepo(b *testing.B) string { 18 + b.Helper() 19 + dir := os.Getenv("ZOEKT_BENCH_REPO") 20 + if dir == "" { 21 + b.Skip("ZOEKT_BENCH_REPO not set") 22 + } 23 + return dir 24 + } 25 + 26 + // collectBlobKeys opens the repo, walks HEAD, and returns all fileKeys with 27 + // their BlobLocations plus the repo directory path. 28 + func collectBlobKeys(b *testing.B, repoDir string) (map[fileKey]BlobLocation, string) { 29 + b.Helper() 30 + 31 + repo, closer, err := openRepo(repoDir) 32 + if err != nil { 33 + b.Fatalf("openRepo: %v", err) 34 + } 35 + b.Cleanup(func() { closer.Close() }) 36 + 37 + head, err := repo.Head() 38 + if err != nil { 39 + b.Fatalf("Head: %v", err) 40 + } 41 + 42 + commit, err := repo.CommitObject(head.Hash()) 43 + if err != nil { 44 + b.Fatalf("CommitObject: %v", err) 45 + } 46 + 47 + tree, err := commit.Tree() 48 + if err != nil { 49 + b.Fatalf("Tree: %v", err) 50 + } 51 + 52 + rw := NewRepoWalker(repo, "https://example.com/repo", nil) 53 + if _, err := rw.CollectFiles(tree, "HEAD", nil); err != nil { 54 + b.Fatalf("CollectFiles: %v", err) 55 + } 56 + 57 + return rw.Files, repoDir 58 + } 59 + 60 + // sortedBlobKeys returns fileKeys for deterministic iteration. 61 + func sortedBlobKeys(files map[fileKey]BlobLocation) []fileKey { 62 + keys := make([]fileKey, 0, len(files)) 63 + for k := range files { 64 + keys = append(keys, k) 65 + } 66 + return keys 67 + } 68 + 69 + // BenchmarkBlobRead_GoGit measures the current go-git BlobObject approach: 70 + // sequential calls to repo.GitRepo.BlobObject(hash) for each file. 71 + func BenchmarkBlobRead_GoGit(b *testing.B) { 72 + repoDir := requireBenchGitRepo(b) 73 + files, _ := collectBlobKeys(b, repoDir) 74 + keys := sortedBlobKeys(files) 75 + b.Logf("collected %d blob keys", len(keys)) 76 + 77 + for _, n := range []int{1_000, 5_000, len(keys)} { 78 + n = min(n, len(keys)) 79 + subset := keys[:n] 80 + 81 + b.Run(fmt.Sprintf("files=%d", n), func(b *testing.B) { 82 + b.ReportAllocs() 83 + var totalBytes int64 84 + for b.Loop() { 85 + totalBytes = 0 86 + for _, key := range subset { 87 + loc := files[key] 88 + blob, err := loc.GitRepo.BlobObject(key.ID) 89 + if err != nil { 90 + b.Fatalf("BlobObject(%s): %v", key.ID, err) 91 + } 92 + r, err := blob.Reader() 93 + if err != nil { 94 + b.Fatalf("Reader: %v", err) 95 + } 96 + n, err := io.Copy(io.Discard, r) 97 + r.Close() 98 + if err != nil { 99 + b.Fatalf("Read: %v", err) 100 + } 101 + totalBytes += n 102 + } 103 + } 104 + b.ReportMetric(float64(totalBytes), "content-bytes/op") 105 + b.ReportMetric(float64(len(subset)), "files/op") 106 + }) 107 + } 108 + } 109 + 110 + // BenchmarkBlobRead_CatfileReader measures the streaming catfileReader approach: 111 + // all SHAs written to stdin at once via --buffer, responses read one at a time. 112 + // This is the production path used by indexGitRepo. 113 + func BenchmarkBlobRead_CatfileReader(b *testing.B) { 114 + repoDir := requireBenchGitRepo(b) 115 + files, gitDir := collectBlobKeys(b, repoDir) 116 + keys := sortedBlobKeys(files) 117 + b.Logf("collected %d blob keys", len(keys)) 118 + 119 + ids := make([]plumbing.Hash, len(keys)) 120 + for i, k := range keys { 121 + ids[i] = k.ID 122 + } 123 + 124 + for _, n := range []int{1_000, 5_000, len(keys)} { 125 + n = min(n, len(keys)) 126 + subset := ids[:n] 127 + 128 + b.Run(fmt.Sprintf("files=%d", n), func(b *testing.B) { 129 + b.ReportAllocs() 130 + var totalBytes int64 131 + for b.Loop() { 132 + totalBytes = 0 133 + cr, err := newCatfileReader(gitDir, subset) 134 + if err != nil { 135 + b.Fatalf("newCatfileReader: %v", err) 136 + } 137 + for range subset { 138 + size, missing, err := cr.Next() 139 + if err != nil { 140 + cr.Close() 141 + b.Fatalf("Next: %v", err) 142 + } 143 + if missing { 144 + continue 145 + } 146 + content := make([]byte, size) 147 + if _, err := io.ReadFull(cr, content); err != nil { 148 + cr.Close() 149 + b.Fatalf("ReadFull: %v", err) 150 + } 151 + totalBytes += int64(len(content)) 152 + } 153 + cr.Close() 154 + } 155 + b.ReportMetric(float64(totalBytes), "content-bytes/op") 156 + b.ReportMetric(float64(len(subset)), "files/op") 157 + }) 158 + } 159 + }
+813
gitindex/catfile_hardening_test.go
··· 1 + package gitindex 2 + 3 + import ( 4 + "bytes" 5 + "fmt" 6 + "io" 7 + "os" 8 + "os/exec" 9 + "path/filepath" 10 + "sync" 11 + "testing" 12 + "time" 13 + 14 + "github.com/go-git/go-git/v5/plumbing" 15 + ) 16 + 17 + // --- Close lifecycle tests --- 18 + 19 + // TestCatfileReader_DoubleClose verifies that Close is idempotent. 20 + // Calling Close twice must not deadlock or panic. 21 + func TestCatfileReader_DoubleClose(t *testing.T) { 22 + repoDir, blobs := createTestRepo(t) 23 + ids := []plumbing.Hash{blobs["hello.txt"]} 24 + 25 + cr, err := newCatfileReader(repoDir, ids) 26 + if err != nil { 27 + t.Fatal(err) 28 + } 29 + 30 + // Consume the entry so the process can exit cleanly. 31 + if _, _, err := cr.Next(); err != nil { 32 + t.Fatal(err) 33 + } 34 + 35 + if err := cr.Close(); err != nil { 36 + t.Fatalf("first Close: %v", err) 37 + } 38 + 39 + // Second Close must not deadlock or panic. 40 + done := make(chan error, 1) 41 + go func() { 42 + done <- cr.Close() 43 + }() 44 + 45 + select { 46 + case <-done: 47 + // Success — whether err is nil or not, it didn't block. 48 + case <-time.After(5 * time.Second): 49 + t.Fatal("second Close() deadlocked — writeErr channel was never closed") 50 + } 51 + } 52 + 53 + // TestCatfileReader_ConcurrentClose verifies that calling Close from 54 + // multiple goroutines simultaneously does not panic, deadlock, or 55 + // corrupt state. 56 + func TestCatfileReader_ConcurrentClose(t *testing.T) { 57 + repoDir, blobs := createTestRepo(t) 58 + ids := []plumbing.Hash{ 59 + blobs["hello.txt"], 60 + blobs["large.bin"], 61 + blobs["binary.bin"], 62 + } 63 + 64 + cr, err := newCatfileReader(repoDir, ids) 65 + if err != nil { 66 + t.Fatal(err) 67 + } 68 + 69 + // Read one entry, leave two unconsumed. 70 + if _, _, err := cr.Next(); err != nil { 71 + t.Fatal(err) 72 + } 73 + 74 + const goroutines = 5 75 + var wg sync.WaitGroup 76 + wg.Add(goroutines) 77 + barrier := make(chan struct{}) 78 + 79 + for i := 0; i < goroutines; i++ { 80 + go func() { 81 + defer wg.Done() 82 + <-barrier // all start at once 83 + cr.Close() 84 + }() 85 + } 86 + 87 + done := make(chan struct{}) 88 + go func() { 89 + close(barrier) 90 + wg.Wait() 91 + close(done) 92 + }() 93 + 94 + select { 95 + case <-done: 96 + // All goroutines returned. 97 + case <-time.After(10 * time.Second): 98 + t.Fatal("concurrent Close() deadlocked") 99 + } 100 + } 101 + 102 + // TestCatfileReader_CloseWithoutReading verifies that closing 103 + // immediately after creation (without reading any entries) completes 104 + // without hanging. 105 + func TestCatfileReader_CloseWithoutReading(t *testing.T) { 106 + repoDir, blobs := createTestRepo(t) 107 + ids := []plumbing.Hash{ 108 + blobs["hello.txt"], 109 + blobs["large.bin"], 110 + blobs["binary.bin"], 111 + blobs["empty.txt"], 112 + } 113 + 114 + cr, err := newCatfileReader(repoDir, ids) 115 + if err != nil { 116 + t.Fatal(err) 117 + } 118 + 119 + done := make(chan error, 1) 120 + go func() { 121 + done <- cr.Close() 122 + }() 123 + 124 + select { 125 + case err := <-done: 126 + if err != nil { 127 + t.Fatalf("Close: %v", err) 128 + } 129 + case <-time.After(10 * time.Second): 130 + t.Fatal("Close() without reading any entries hung") 131 + } 132 + } 133 + 134 + // TestCatfileReader_CloseBeforeExhausted_ManyBlobs simulates early 135 + // termination (e.g., builder.Add error) with many unconsumed blobs. 136 + // Close should complete promptly — not drain the entire git output. 137 + func TestCatfileReader_CloseBeforeExhausted_ManyBlobs(t *testing.T) { 138 + // Create a repo with many non-trivial files. 139 + dir := t.TempDir() 140 + repoDir := filepath.Join(dir, "repo") 141 + 142 + script := ` 143 + set -e 144 + git init -b main repo 145 + cd repo 146 + git config user.email "test@test.com" 147 + git config user.name "Test" 148 + for i in $(seq 1 200); do 149 + dd if=/dev/urandom bs=1024 count=10 of="file_$i.bin" 2>/dev/null 150 + done 151 + git add -A 152 + git commit -m "many files" 153 + ` 154 + cmd := exec.Command("/bin/sh", "-c", script) 155 + cmd.Dir = dir 156 + cmd.Stderr = os.Stderr 157 + if err := cmd.Run(); err != nil { 158 + t.Fatalf("create test repo: %v", err) 159 + } 160 + 161 + var ids []plumbing.Hash 162 + for i := 1; i <= 200; i++ { 163 + name := fmt.Sprintf("file_%d.bin", i) 164 + out, err := exec.Command("git", "-C", repoDir, "rev-parse", "HEAD:"+name).Output() 165 + if err != nil { 166 + t.Fatalf("rev-parse %s: %v", name, err) 167 + } 168 + ids = append(ids, plumbing.NewHash(string(out[:len(out)-1]))) 169 + } 170 + 171 + cr, err := newCatfileReader(repoDir, ids) 172 + if err != nil { 173 + t.Fatal(err) 174 + } 175 + 176 + // Read only 1 of 200 entries. 177 + if _, _, err := cr.Next(); err != nil { 178 + t.Fatal(err) 179 + } 180 + 181 + // Close should be fast (kill, not drain). With drain it still works but 182 + // is slow — we enforce a generous bound. 183 + start := time.Now() 184 + done := make(chan error, 1) 185 + go func() { 186 + done <- cr.Close() 187 + }() 188 + 189 + select { 190 + case <-done: 191 + elapsed := time.Since(start) 192 + // With Kill: sub-millisecond. Draining 200×10KB is fast too, so we 193 + // use a generous 3s bound that still catches pathological stalls. 194 + if elapsed > 3*time.Second { 195 + t.Errorf("Close took %v after reading 1 of 200 entries — consider killing instead of draining", elapsed) 196 + } 197 + case <-time.After(30 * time.Second): 198 + t.Fatal("Close() deadlocked with many unconsumed blobs") 199 + } 200 + } 201 + 202 + // --- Read edge-case tests --- 203 + 204 + // TestCatfileReader_ReadWithoutNext verifies that calling Read 205 + // before calling Next returns io.EOF, not a panic or garbage data. 206 + func TestCatfileReader_ReadWithoutNext(t *testing.T) { 207 + repoDir, blobs := createTestRepo(t) 208 + ids := []plumbing.Hash{blobs["hello.txt"]} 209 + 210 + cr, err := newCatfileReader(repoDir, ids) 211 + if err != nil { 212 + t.Fatal(err) 213 + } 214 + defer cr.Close() 215 + 216 + buf := make([]byte, 10) 217 + n, err := cr.Read(buf) 218 + if n != 0 || err != io.EOF { 219 + t.Fatalf("Read without Next: n=%d err=%v, want n=0 err=io.EOF", n, err) 220 + } 221 + } 222 + 223 + // TestCatfileReader_ReadAfterFullConsumption verifies that extra Read 224 + // calls after a blob is fully consumed return io.EOF, not duplicate 225 + // data or trailing LF bytes. 226 + func TestCatfileReader_ReadAfterFullConsumption(t *testing.T) { 227 + repoDir, blobs := createTestRepo(t) 228 + ids := []plumbing.Hash{blobs["hello.txt"]} 229 + 230 + cr, err := newCatfileReader(repoDir, ids) 231 + if err != nil { 232 + t.Fatal(err) 233 + } 234 + defer cr.Close() 235 + 236 + size, _, _ := cr.Next() 237 + content := make([]byte, size) 238 + if _, err := io.ReadFull(cr, content); err != nil { 239 + t.Fatal(err) 240 + } 241 + 242 + // Blob is fully read — additional Reads must return EOF. 243 + for i := 0; i < 3; i++ { 244 + buf := make([]byte, 10) 245 + n, err := cr.Read(buf) 246 + if n != 0 || err != io.EOF { 247 + t.Fatalf("Read #%d after full consumption: n=%d err=%v, want n=0 err=io.EOF", i, n, err) 248 + } 249 + } 250 + } 251 + 252 + // TestCatfileReader_SmallBufferReads reads a blob one byte at a time 253 + // and verifies the entire content is reconstructed correctly without 254 + // any trailing LF leaking into user content. 255 + func TestCatfileReader_SmallBufferReads(t *testing.T) { 256 + repoDir, blobs := createTestRepo(t) 257 + ids := []plumbing.Hash{blobs["hello.txt"]} 258 + 259 + cr, err := newCatfileReader(repoDir, ids) 260 + if err != nil { 261 + t.Fatal(err) 262 + } 263 + defer cr.Close() 264 + 265 + size, _, _ := cr.Next() 266 + 267 + var result []byte 268 + buf := make([]byte, 1) 269 + for { 270 + n, err := cr.Read(buf) 271 + if n > 0 { 272 + result = append(result, buf[:n]...) 273 + } 274 + if err == io.EOF { 275 + break 276 + } 277 + if err != nil { 278 + t.Fatal(err) 279 + } 280 + } 281 + 282 + if len(result) != size { 283 + t.Fatalf("read %d bytes, want %d", len(result), size) 284 + } 285 + if string(result) != "hello world\n" { 286 + t.Errorf("content = %q, want %q", result, "hello world\n") 287 + } 288 + } 289 + 290 + // TestCatfileReader_PartialReadThenNext reads only part of a blob's 291 + // content, then advances to the next entry. Verifies that the discard 292 + // of pending bytes doesn't corrupt the stream. 293 + func TestCatfileReader_PartialReadThenNext(t *testing.T) { 294 + repoDir, blobs := createTestRepo(t) 295 + ids := []plumbing.Hash{ 296 + blobs["hello.txt"], // 12 bytes: "hello world\n" 297 + blobs["binary.bin"], // variable, starts with 0x00 298 + } 299 + 300 + cr, err := newCatfileReader(repoDir, ids) 301 + if err != nil { 302 + t.Fatal(err) 303 + } 304 + defer cr.Close() 305 + 306 + // Read only 5 of 12 bytes from hello.txt. 307 + size, _, _ := cr.Next() 308 + if size != 12 { 309 + t.Fatalf("hello.txt size = %d, want 12", size) 310 + } 311 + partial := make([]byte, 5) 312 + if _, err := io.ReadFull(cr, partial); err != nil { 313 + t.Fatal(err) 314 + } 315 + if string(partial) != "hello" { 316 + t.Fatalf("partial = %q, want %q", partial, "hello") 317 + } 318 + 319 + // Advance — must discard remaining 7 content bytes + trailing LF. 320 + size, _, err = cr.Next() 321 + if err != nil { 322 + t.Fatalf("Next binary.bin after partial read: %v", err) 323 + } 324 + 325 + // Verify binary.bin content is intact. 326 + content := make([]byte, size) 327 + if _, err := io.ReadFull(cr, content); err != nil { 328 + t.Fatal(err) 329 + } 330 + if content[0] != 0x00 { 331 + t.Errorf("binary.bin first byte = 0x%02x after partial-read skip, want 0x00", content[0]) 332 + } 333 + } 334 + 335 + // TestCatfileReader_PartialReadExactlyOneByteShort reads size-1 bytes 336 + // from a blob. The pending field should be exactly 2 (1 content byte + 337 + // 1 trailing LF). This stresses the boundary between content and LF 338 + // in the discard path. 339 + func TestCatfileReader_PartialReadExactlyOneByteShort(t *testing.T) { 340 + repoDir, blobs := createTestRepo(t) 341 + ids := []plumbing.Hash{ 342 + blobs["hello.txt"], // 12 bytes 343 + blobs["binary.bin"], // starts with 0x00 344 + } 345 + 346 + cr, err := newCatfileReader(repoDir, ids) 347 + if err != nil { 348 + t.Fatal(err) 349 + } 350 + defer cr.Close() 351 + 352 + size, _, _ := cr.Next() 353 + // Read exactly size-1 bytes — leaves 1 content byte + trailing LF. 354 + buf := make([]byte, size-1) 355 + if _, err := io.ReadFull(cr, buf); err != nil { 356 + t.Fatal(err) 357 + } 358 + if string(buf) != "hello world" { // missing final \n 359 + t.Fatalf("partial = %q", buf) 360 + } 361 + 362 + // Advance — pending should be 2 (1 content byte + 1 LF). The 363 + // Discard call must handle this exact boundary correctly. 364 + size, missing, err := cr.Next() 365 + if err != nil { 366 + t.Fatalf("Next after size-1 partial read: %v", err) 367 + } 368 + if missing { 369 + t.Fatal("binary.bin unexpectedly missing") 370 + } 371 + 372 + // Read binary.bin to verify stream integrity. 373 + content := make([]byte, size) 374 + if _, err := io.ReadFull(cr, content); err != nil { 375 + t.Fatal(err) 376 + } 377 + if content[0] != 0x00 { 378 + t.Errorf("binary.bin[0] = 0x%02x after boundary skip, want 0x00", content[0]) 379 + } 380 + } 381 + 382 + // --- Empty / degenerate input tests --- 383 + 384 + // TestCatfileReader_EmptyIds verifies that an empty id slice produces 385 + // immediate EOF without errors. 386 + func TestCatfileReader_EmptyIds(t *testing.T) { 387 + repoDir, _ := createTestRepo(t) 388 + 389 + cr, err := newCatfileReader(repoDir, nil) 390 + if err != nil { 391 + t.Fatal(err) 392 + } 393 + defer cr.Close() 394 + 395 + _, _, err = cr.Next() 396 + if err != io.EOF { 397 + t.Fatalf("expected io.EOF for empty ids, got %v", err) 398 + } 399 + } 400 + 401 + // TestCatfileReader_MultipleEmptyBlobs stresses the trailing-LF 402 + // handling for size-0 blobs. Git still outputs a LF after a 0-byte 403 + // blob body. Repeated empty blobs test the pending=1 discard path. 404 + func TestCatfileReader_MultipleEmptyBlobs(t *testing.T) { 405 + repoDir, blobs := createTestRepo(t) 406 + 407 + // Send the empty blob SHA 5 times — git outputs each independently. 408 + emptyID := blobs["empty.txt"] 409 + ids := []plumbing.Hash{emptyID, emptyID, emptyID, emptyID, emptyID} 410 + 411 + cr, err := newCatfileReader(repoDir, ids) 412 + if err != nil { 413 + t.Fatal(err) 414 + } 415 + defer cr.Close() 416 + 417 + for i := range ids { 418 + size, missing, err := cr.Next() 419 + if err != nil { 420 + t.Fatalf("Next #%d: %v", i, err) 421 + } 422 + if missing { 423 + t.Fatalf("#%d unexpectedly missing", i) 424 + } 425 + if size != 0 { 426 + t.Fatalf("#%d size = %d, want 0", i, size) 427 + } 428 + // Don't read — Next should discard the trailing LF for us. 429 + } 430 + 431 + _, _, err = cr.Next() 432 + if err != io.EOF { 433 + t.Fatalf("expected EOF after %d empty blobs, got %v", len(ids), err) 434 + } 435 + } 436 + 437 + // TestCatfileReader_EmptyBlobRead verifies that reading a 0-byte blob 438 + // through the io.Reader interface returns 0 bytes and io.EOF, and that 439 + // the trailing LF is consumed transparently. 440 + func TestCatfileReader_EmptyBlobRead(t *testing.T) { 441 + repoDir, blobs := createTestRepo(t) 442 + ids := []plumbing.Hash{ 443 + blobs["empty.txt"], // 0 bytes 444 + blobs["hello.txt"], // 12 bytes — sentinel 445 + } 446 + 447 + cr, err := newCatfileReader(repoDir, ids) 448 + if err != nil { 449 + t.Fatal(err) 450 + } 451 + defer cr.Close() 452 + 453 + size, _, _ := cr.Next() 454 + if size != 0 { 455 + t.Fatalf("empty.txt size = %d", size) 456 + } 457 + 458 + // Explicitly Read on the 0-byte blob. 459 + buf := make([]byte, 10) 460 + n, err := cr.Read(buf) 461 + if n != 0 || err != io.EOF { 462 + t.Fatalf("Read empty blob: n=%d err=%v, want n=0 err=io.EOF", n, err) 463 + } 464 + 465 + // The trailing LF must have been consumed. Verify by reading the 466 + // next entry — if the LF leaked, the header parse would fail. 467 + size, _, err = cr.Next() 468 + if err != nil { 469 + t.Fatalf("Next hello.txt after empty blob Read: %v", err) 470 + } 471 + if size != 12 { 472 + t.Fatalf("hello.txt size = %d, want 12", size) 473 + } 474 + content := make([]byte, size) 475 + if _, err := io.ReadFull(cr, content); err != nil { 476 + t.Fatal(err) 477 + } 478 + if string(content) != "hello world\n" { 479 + t.Errorf("hello.txt = %q", content) 480 + } 481 + } 482 + 483 + // --- Missing object edge cases --- 484 + 485 + // TestCatfileReader_AllMissing verifies that a sequence of entirely 486 + // missing objects is handled gracefully — no errors, no panics, just 487 + // missing=true for each followed by EOF. 488 + func TestCatfileReader_AllMissing(t *testing.T) { 489 + repoDir, _ := createTestRepo(t) 490 + 491 + ids := []plumbing.Hash{ 492 + plumbing.NewHash("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef"), 493 + plumbing.NewHash("1111111111111111111111111111111111111111"), 494 + plumbing.NewHash("2222222222222222222222222222222222222222"), 495 + } 496 + 497 + cr, err := newCatfileReader(repoDir, ids) 498 + if err != nil { 499 + t.Fatal(err) 500 + } 501 + defer cr.Close() 502 + 503 + for i, id := range ids { 504 + _, missing, err := cr.Next() 505 + if err != nil { 506 + t.Fatalf("Next #%d (%s): %v", i, id, err) 507 + } 508 + if !missing { 509 + t.Errorf("expected #%d (%s) to be missing", i, id) 510 + } 511 + } 512 + 513 + _, _, err = cr.Next() 514 + if err != io.EOF { 515 + t.Fatalf("expected EOF after all missing, got %v", err) 516 + } 517 + } 518 + 519 + // TestCatfileReader_AlternatingMissingPresent interleaves missing and 520 + // present objects, verifying that stream alignment is maintained. 521 + func TestCatfileReader_AlternatingMissingPresent(t *testing.T) { 522 + repoDir, blobs := createTestRepo(t) 523 + 524 + fake1 := plumbing.NewHash("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef") 525 + fake2 := plumbing.NewHash("1111111111111111111111111111111111111111") 526 + 527 + ids := []plumbing.Hash{ 528 + fake1, 529 + blobs["hello.txt"], 530 + fake2, 531 + blobs["empty.txt"], 532 + blobs["binary.bin"], 533 + } 534 + 535 + cr, err := newCatfileReader(repoDir, ids) 536 + if err != nil { 537 + t.Fatal(err) 538 + } 539 + defer cr.Close() 540 + 541 + // fake1 — missing 542 + _, missing, err := cr.Next() 543 + if err != nil || !missing { 544 + t.Fatalf("fake1: err=%v missing=%v", err, missing) 545 + } 546 + 547 + // hello.txt — present, read it 548 + size, missing, err := cr.Next() 549 + if err != nil || missing { 550 + t.Fatalf("hello.txt: err=%v missing=%v", err, missing) 551 + } 552 + content := make([]byte, size) 553 + if _, err := io.ReadFull(cr, content); err != nil { 554 + t.Fatal(err) 555 + } 556 + if string(content) != "hello world\n" { 557 + t.Errorf("hello.txt = %q", content) 558 + } 559 + 560 + // fake2 — missing 561 + _, missing, err = cr.Next() 562 + if err != nil || !missing { 563 + t.Fatalf("fake2: err=%v missing=%v", err, missing) 564 + } 565 + 566 + // empty.txt — present, skip it 567 + size, missing, err = cr.Next() 568 + if err != nil || missing { 569 + t.Fatalf("empty.txt: err=%v missing=%v", err, missing) 570 + } 571 + if size != 0 { 572 + t.Errorf("empty.txt size = %d", size) 573 + } 574 + 575 + // binary.bin — present, read it 576 + size, missing, err = cr.Next() 577 + if err != nil || missing { 578 + t.Fatalf("binary.bin: err=%v missing=%v", err, missing) 579 + } 580 + binContent := make([]byte, size) 581 + if _, err := io.ReadFull(cr, binContent); err != nil { 582 + t.Fatal(err) 583 + } 584 + if binContent[0] != 0x00 { 585 + t.Errorf("binary.bin[0] = 0x%02x, want 0x00", binContent[0]) 586 + } 587 + 588 + _, _, err = cr.Next() 589 + if err != io.EOF { 590 + t.Fatalf("expected EOF, got %v", err) 591 + } 592 + } 593 + 594 + // TestCatfileReader_MissingThenSkip verifies that a missing object 595 + // followed by a present but skipped (unread) object doesn't corrupt 596 + // the stream. Missing objects have no content body, so there must be 597 + // no stale pending bytes interfering with the next header read. 598 + func TestCatfileReader_MissingThenSkip(t *testing.T) { 599 + repoDir, blobs := createTestRepo(t) 600 + 601 + fake := plumbing.NewHash("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef") 602 + ids := []plumbing.Hash{ 603 + fake, 604 + blobs["large.bin"], // 64KB — skip without reading 605 + blobs["hello.txt"], // sentinel — read to verify integrity 606 + } 607 + 608 + cr, err := newCatfileReader(repoDir, ids) 609 + if err != nil { 610 + t.Fatal(err) 611 + } 612 + defer cr.Close() 613 + 614 + // missing 615 + _, missing, _ := cr.Next() 616 + if !missing { 617 + t.Fatal("expected missing") 618 + } 619 + 620 + // large.bin — skip 621 + size, missing, err := cr.Next() 622 + if err != nil || missing { 623 + t.Fatalf("large.bin: err=%v missing=%v", err, missing) 624 + } 625 + if size != 64*1024 { 626 + t.Fatalf("large.bin size = %d", size) 627 + } 628 + // deliberately don't read 629 + 630 + // hello.txt — read after missing+skip 631 + size, missing, err = cr.Next() 632 + if err != nil || missing { 633 + t.Fatalf("hello.txt: err=%v missing=%v", err, missing) 634 + } 635 + content := make([]byte, size) 636 + if _, err := io.ReadFull(cr, content); err != nil { 637 + t.Fatal(err) 638 + } 639 + if string(content) != "hello world\n" { 640 + t.Errorf("hello.txt = %q", content) 641 + } 642 + } 643 + 644 + // --- Next() edge cases --- 645 + 646 + // TestCatfileReader_RepeatedNextAfterEOF verifies that calling Next 647 + // after EOF keeps returning EOF — not a panic, not a different error. 648 + func TestCatfileReader_RepeatedNextAfterEOF(t *testing.T) { 649 + repoDir, blobs := createTestRepo(t) 650 + ids := []plumbing.Hash{blobs["hello.txt"]} 651 + 652 + cr, err := newCatfileReader(repoDir, ids) 653 + if err != nil { 654 + t.Fatal(err) 655 + } 656 + defer cr.Close() 657 + 658 + // Consume and skip the only entry. 659 + if _, _, err := cr.Next(); err != nil { 660 + t.Fatal(err) 661 + } 662 + 663 + // First EOF. 664 + _, _, err = cr.Next() 665 + if err != io.EOF { 666 + t.Fatalf("first post-exhaust Next: %v, want io.EOF", err) 667 + } 668 + 669 + // Second and third EOF — must be stable. 670 + for i := 0; i < 2; i++ { 671 + _, _, err = cr.Next() 672 + if err != io.EOF { 673 + t.Fatalf("Next #%d after EOF: %v, want io.EOF", i+2, err) 674 + } 675 + } 676 + } 677 + 678 + // --- Large blob precision tests --- 679 + 680 + // TestCatfileReader_LargeBlobBytePrecision verifies that a 64KB blob 681 + // is read with byte-exact precision — no off-by-one from trailing LF 682 + // handling, no truncation, no extra bytes. 683 + func TestCatfileReader_LargeBlobBytePrecision(t *testing.T) { 684 + repoDir, blobs := createTestRepo(t) 685 + ids := []plumbing.Hash{blobs["large.bin"]} 686 + 687 + cr, err := newCatfileReader(repoDir, ids) 688 + if err != nil { 689 + t.Fatal(err) 690 + } 691 + defer cr.Close() 692 + 693 + size, _, err := cr.Next() 694 + if err != nil { 695 + t.Fatal(err) 696 + } 697 + if size != 64*1024 { 698 + t.Fatalf("size = %d, want %d", size, 64*1024) 699 + } 700 + 701 + // Read the full blob content. 702 + content := make([]byte, size) 703 + n, err := io.ReadFull(cr, content) 704 + if err != nil { 705 + t.Fatalf("ReadFull: %v (read %d of %d)", err, n, size) 706 + } 707 + if n != size { 708 + t.Fatalf("read %d bytes, want %d", n, size) 709 + } 710 + 711 + // Verify git agrees on the content via cat-file -p. 712 + expected, err := exec.Command("git", "-C", repoDir, "cat-file", "-p", blobs["large.bin"].String()).Output() 713 + if err != nil { 714 + t.Fatalf("git cat-file -p: %v", err) 715 + } 716 + if !bytes.Equal(content, expected) { 717 + t.Errorf("content mismatch: got %d bytes, git says %d bytes", len(content), len(expected)) 718 + // Find first divergence. 719 + for i := range content { 720 + if i >= len(expected) || content[i] != expected[i] { 721 + t.Errorf("first diff at byte %d: got 0x%02x, want 0x%02x", i, content[i], expected[i]) 722 + break 723 + } 724 + } 725 + } 726 + } 727 + 728 + // TestCatfileReader_LargeBlobChunkedRead reads a 64KB blob in 997-byte 729 + // chunks (a prime number that doesn't align with any power-of-2 buffer) 730 + // to verify no byte is lost or duplicated across read boundaries. 731 + func TestCatfileReader_LargeBlobChunkedRead(t *testing.T) { 732 + repoDir, blobs := createTestRepo(t) 733 + ids := []plumbing.Hash{blobs["large.bin"]} 734 + 735 + cr, err := newCatfileReader(repoDir, ids) 736 + if err != nil { 737 + t.Fatal(err) 738 + } 739 + defer cr.Close() 740 + 741 + size, _, _ := cr.Next() 742 + if size != 64*1024 { 743 + t.Fatalf("size = %d", size) 744 + } 745 + 746 + var result bytes.Buffer 747 + buf := make([]byte, 997) // prime-sized chunks 748 + for { 749 + n, err := cr.Read(buf) 750 + if n > 0 { 751 + result.Write(buf[:n]) 752 + } 753 + if err == io.EOF { 754 + break 755 + } 756 + if err != nil { 757 + t.Fatal(err) 758 + } 759 + } 760 + 761 + if result.Len() != size { 762 + t.Fatalf("total read = %d, want %d", result.Len(), size) 763 + } 764 + 765 + // Cross-check with git. 766 + expected, _ := exec.Command("git", "-C", repoDir, "cat-file", "-p", blobs["large.bin"].String()).Output() 767 + if !bytes.Equal(result.Bytes(), expected) { 768 + t.Error("chunked read content differs from git cat-file -p output") 769 + } 770 + } 771 + 772 + // --- Duplicate SHA test --- 773 + 774 + // TestCatfileReader_DuplicateSHAs verifies that requesting the same 775 + // SHA multiple times works — git cat-file --batch outputs the object 776 + // for each request independently. 777 + func TestCatfileReader_DuplicateSHAs(t *testing.T) { 778 + repoDir, blobs := createTestRepo(t) 779 + 780 + sha := blobs["hello.txt"] 781 + ids := []plumbing.Hash{sha, sha, sha} 782 + 783 + cr, err := newCatfileReader(repoDir, ids) 784 + if err != nil { 785 + t.Fatal(err) 786 + } 787 + defer cr.Close() 788 + 789 + for i := 0; i < 3; i++ { 790 + size, missing, err := cr.Next() 791 + if err != nil { 792 + t.Fatalf("Next #%d: %v", i, err) 793 + } 794 + if missing { 795 + t.Fatalf("#%d unexpectedly missing", i) 796 + } 797 + if size != 12 { 798 + t.Fatalf("#%d size = %d, want 12", i, size) 799 + } 800 + content := make([]byte, size) 801 + if _, err := io.ReadFull(cr, content); err != nil { 802 + t.Fatal(err) 803 + } 804 + if string(content) != "hello world\n" { 805 + t.Errorf("#%d content = %q", i, content) 806 + } 807 + } 808 + 809 + _, _, err = cr.Next() 810 + if err != io.EOF { 811 + t.Fatalf("expected EOF, got %v", err) 812 + } 813 + }
+230
gitindex/catfile_test.go
··· 1 + package gitindex 2 + 3 + import ( 4 + "io" 5 + "os" 6 + "os/exec" 7 + "path/filepath" 8 + "testing" 9 + 10 + "github.com/go-git/go-git/v5/plumbing" 11 + ) 12 + 13 + // createTestRepo creates a git repo with various test files and returns 14 + // the repo path and a map of filename -> blob SHA. 15 + func createTestRepo(t *testing.T) (string, map[string]plumbing.Hash) { 16 + t.Helper() 17 + dir := t.TempDir() 18 + repoDir := filepath.Join(dir, "repo") 19 + 20 + script := ` 21 + set -e 22 + git init -b main repo 23 + cd repo 24 + git config user.email "test@test.com" 25 + git config user.name "Test" 26 + 27 + # Normal text file 28 + echo "hello world" > hello.txt 29 + 30 + # Empty file 31 + touch empty.txt 32 + 33 + # Binary file with newlines embedded 34 + printf '\x00\x01\x02\nhello\nworld\n\x03\x04' > binary.bin 35 + 36 + # Large-ish file (64KB of data) 37 + dd if=/dev/urandom bs=1024 count=64 of=large.bin 2>/dev/null 38 + 39 + git add -A 40 + git commit -m "initial" 41 + ` 42 + cmd := exec.Command("/bin/sh", "-c", script) 43 + cmd.Dir = dir 44 + cmd.Stderr = os.Stderr 45 + if err := cmd.Run(); err != nil { 46 + t.Fatalf("create test repo: %v", err) 47 + } 48 + 49 + // Get blob SHAs for each file. 50 + blobs := map[string]plumbing.Hash{} 51 + for _, name := range []string{"hello.txt", "empty.txt", "binary.bin", "large.bin"} { 52 + out, err := exec.Command("git", "-C", repoDir, "rev-parse", "HEAD:"+name).Output() 53 + if err != nil { 54 + t.Fatalf("rev-parse %s: %v", name, err) 55 + } 56 + sha := string(out[:len(out)-1]) // trim newline 57 + blobs[name] = plumbing.NewHash(sha) 58 + } 59 + 60 + return repoDir, blobs 61 + } 62 + 63 + func TestCatfileReader(t *testing.T) { 64 + repoDir, blobs := createTestRepo(t) 65 + 66 + ids := []plumbing.Hash{ 67 + blobs["hello.txt"], 68 + blobs["empty.txt"], 69 + blobs["binary.bin"], 70 + blobs["large.bin"], 71 + } 72 + 73 + cr, err := newCatfileReader(repoDir, ids) 74 + if err != nil { 75 + t.Fatalf("newCatfileReader: %v", err) 76 + } 77 + defer cr.Close() 78 + 79 + // hello.txt 80 + size, missing, err := cr.Next() 81 + if err != nil { 82 + t.Fatalf("Next hello.txt: %v", err) 83 + } 84 + if missing { 85 + t.Fatal("hello.txt unexpectedly missing") 86 + } 87 + if size != 12 { 88 + t.Errorf("hello.txt size = %d, want 12", size) 89 + } 90 + content := make([]byte, size) 91 + if _, err := io.ReadFull(cr, content); err != nil { 92 + t.Fatalf("ReadFull hello.txt: %v", err) 93 + } 94 + if string(content) != "hello world\n" { 95 + t.Errorf("hello.txt content = %q", content) 96 + } 97 + 98 + // empty.txt 99 + size, missing, err = cr.Next() 100 + if err != nil { 101 + t.Fatalf("Next empty.txt: %v", err) 102 + } 103 + if size != 0 { 104 + t.Errorf("empty.txt size = %d, want 0", size) 105 + } 106 + 107 + // binary.bin — read content and verify binary data survives. 108 + size, missing, err = cr.Next() 109 + if err != nil { 110 + t.Fatalf("Next binary.bin: %v", err) 111 + } 112 + binContent := make([]byte, size) 113 + if _, err := io.ReadFull(cr, binContent); err != nil { 114 + t.Fatalf("ReadFull binary.bin: %v", err) 115 + } 116 + if binContent[0] != 0x00 || binContent[3] != '\n' { 117 + t.Errorf("binary.bin unexpected leading bytes: %x", binContent[:5]) 118 + } 119 + 120 + // large.bin 121 + size, missing, err = cr.Next() 122 + if err != nil { 123 + t.Fatalf("Next large.bin: %v", err) 124 + } 125 + if size != 64*1024 { 126 + t.Errorf("large.bin size = %d, want %d", size, 64*1024) 127 + } 128 + largeContent := make([]byte, size) 129 + if _, err := io.ReadFull(cr, largeContent); err != nil { 130 + t.Fatalf("ReadFull large.bin: %v", err) 131 + } 132 + 133 + // EOF after all entries. 134 + _, _, err = cr.Next() 135 + if err != io.EOF { 136 + t.Errorf("expected io.EOF after last entry, got %v", err) 137 + } 138 + } 139 + 140 + func TestCatfileReader_Skip(t *testing.T) { 141 + repoDir, blobs := createTestRepo(t) 142 + 143 + ids := []plumbing.Hash{ 144 + blobs["hello.txt"], 145 + blobs["large.bin"], 146 + blobs["binary.bin"], 147 + } 148 + 149 + cr, err := newCatfileReader(repoDir, ids) 150 + if err != nil { 151 + t.Fatalf("newCatfileReader: %v", err) 152 + } 153 + defer cr.Close() 154 + 155 + // Skip hello.txt by calling Next again without reading. 156 + _, _, err = cr.Next() 157 + if err != nil { 158 + t.Fatalf("Next hello.txt: %v", err) 159 + } 160 + 161 + // Skip large.bin too. 162 + size, _, err := cr.Next() 163 + if err != nil { 164 + t.Fatalf("Next large.bin: %v", err) 165 + } 166 + if size != 64*1024 { 167 + t.Errorf("large.bin size = %d, want %d", size, 64*1024) 168 + } 169 + 170 + // Read binary.bin after skipping two entries. 171 + size, _, err = cr.Next() 172 + if err != nil { 173 + t.Fatalf("Next binary.bin: %v", err) 174 + } 175 + content := make([]byte, size) 176 + if _, err := io.ReadFull(cr, content); err != nil { 177 + t.Fatalf("ReadFull binary.bin: %v", err) 178 + } 179 + if content[0] != 0x00 { 180 + t.Errorf("binary.bin first byte = %x, want 0x00", content[0]) 181 + } 182 + } 183 + 184 + func TestCatfileReader_Missing(t *testing.T) { 185 + repoDir, blobs := createTestRepo(t) 186 + 187 + fakeHash := plumbing.NewHash("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef") 188 + ids := []plumbing.Hash{ 189 + blobs["hello.txt"], 190 + fakeHash, 191 + blobs["empty.txt"], 192 + } 193 + 194 + cr, err := newCatfileReader(repoDir, ids) 195 + if err != nil { 196 + t.Fatalf("newCatfileReader: %v", err) 197 + } 198 + defer cr.Close() 199 + 200 + // hello.txt — read normally. 201 + size, missing, err := cr.Next() 202 + if err != nil || missing { 203 + t.Fatalf("Next hello.txt: err=%v missing=%v", err, missing) 204 + } 205 + content := make([]byte, size) 206 + if _, err := io.ReadFull(cr, content); err != nil { 207 + t.Fatalf("ReadFull hello.txt: %v", err) 208 + } 209 + if string(content) != "hello world\n" { 210 + t.Errorf("hello.txt = %q", content) 211 + } 212 + 213 + // fakeHash — missing. 214 + _, missing, err = cr.Next() 215 + if err != nil { 216 + t.Fatalf("Next fakeHash: %v", err) 217 + } 218 + if !missing { 219 + t.Error("expected fakeHash to be missing") 220 + } 221 + 222 + // empty.txt — still works after missing entry. 223 + size, missing, err = cr.Next() 224 + if err != nil || missing { 225 + t.Fatalf("Next empty.txt: err=%v missing=%v", err, missing) 226 + } 227 + if size != 0 { 228 + t.Errorf("empty.txt size = %d, want 0", size) 229 + } 230 + }
+111 -17
gitindex/index.go
··· 585 585 sort.Strings(names) 586 586 names = uniq(names) 587 587 588 - log.Printf("attempting to index %d total files", totalFiles) 589 - for idx, name := range names { 590 - keys := fileKeys[name] 588 + // Separate main-repo keys from submodule keys, collecting blob SHAs 589 + // for the main repo so we can stream them via git cat-file --batch. 590 + // ZOEKT_DISABLE_CATFILE_BATCH=true falls back to the go-git path for 591 + // all files, useful as a kill switch if the cat-file path causes issues. 592 + catfileBatchDisabled := cmp.Or(os.Getenv("ZOEKT_DISABLE_CATFILE_BATCH"), "false") 593 + useCatfileBatch := true 594 + if disabled, _ := strconv.ParseBool(catfileBatchDisabled); disabled { 595 + useCatfileBatch = false 596 + log.Printf("cat-file batch disabled via ZOEKT_DISABLE_CATFILE_BATCH, using go-git") 597 + } 591 598 592 - for _, key := range keys { 593 - doc, err := createDocument(key, repos, opts.BuildOptions) 594 - if err != nil { 595 - return false, err 599 + mainRepoKeys := make([]fileKey, 0, totalFiles) 600 + mainRepoIDs := make([]plumbing.Hash, 0, totalFiles) 601 + var submoduleKeys []fileKey 602 + 603 + for _, name := range names { 604 + for _, key := range fileKeys[name] { 605 + if useCatfileBatch && key.SubRepoPath == "" { 606 + mainRepoKeys = append(mainRepoKeys, key) 607 + mainRepoIDs = append(mainRepoIDs, key.ID) 608 + } else { 609 + submoduleKeys = append(submoduleKeys, key) 596 610 } 611 + } 612 + } 597 613 598 - if err := builder.Add(doc); err != nil { 599 - return false, fmt.Errorf("error adding document with name %s: %w", key.FullPath(), err) 614 + log.Printf("attempting to index %d total files (%d via cat-file, %d via go-git)", totalFiles, len(mainRepoIDs), len(submoduleKeys)) 615 + 616 + // Stream main-repo blobs via pipelined cat-file --batch --buffer. 617 + // Large blobs are skipped without reading content into memory. 618 + if len(mainRepoIDs) > 0 { 619 + cr, err := newCatfileReader(opts.RepoDir, mainRepoIDs) 620 + if err != nil { 621 + return false, fmt.Errorf("newCatfileReader: %w", err) 622 + } 623 + 624 + if err := indexCatfileBlobs(cr, mainRepoKeys, repos, opts, builder); err != nil { 625 + return false, err 626 + } 627 + } 628 + 629 + // Index submodule blobs via go-git. 630 + for idx, key := range submoduleKeys { 631 + doc, err := createDocument(key, repos, opts.BuildOptions) 632 + if err != nil { 633 + return false, err 634 + } 635 + 636 + if err := builder.Add(doc); err != nil { 637 + return false, fmt.Errorf("error adding document with name %s: %w", key.FullPath(), err) 638 + } 639 + 640 + if idx%10_000 == 0 { 641 + builder.CheckMemoryUsage() 642 + } 643 + } 644 + 645 + return true, builder.Finish() 646 + } 647 + 648 + // indexCatfileBlobs streams main-repo blobs from the catfileReader into the 649 + // builder. Large blobs are skipped without reading content into memory. 650 + // keys must correspond 1:1 (in order) with the ids passed to newCatfileReader. 651 + // The reader is always closed when this function returns. 652 + func indexCatfileBlobs(cr *catfileReader, keys []fileKey, repos map[fileKey]BlobLocation, opts Options, builder *index.Builder) error { 653 + defer cr.Close() 654 + 655 + for idx, key := range keys { 656 + size, missing, err := cr.Next() 657 + if err != nil { 658 + return fmt.Errorf("cat-file next for %s: %w", key.FullPath(), err) 659 + } 660 + 661 + branches := repos[key].Branches 662 + var doc index.Document 663 + 664 + if missing { 665 + // Unexpected for local repos — may indicate corruption, shallow 666 + // clone, or a race with git gc. Log a warning and skip. 667 + log.Printf("warning: blob %s missing for %s", key.ID, key.FullPath()) 668 + doc = skippedDoc(key, branches, index.SkipReasonMissing) 669 + } else { 670 + keyFullPath := key.FullPath() 671 + if size > opts.BuildOptions.SizeMax && !opts.BuildOptions.IgnoreSizeMax(keyFullPath) { 672 + // Skip without reading content into memory. 673 + doc = skippedDoc(key, branches, index.SkipReasonTooLarge) 674 + } else { 675 + // Pre-allocate and read the full blob content in one call. 676 + // io.ReadFull is preferred over io.LimitedReader here as it 677 + // avoids the intermediate allocation and the size is known. 678 + content := make([]byte, size) 679 + if _, err := io.ReadFull(cr, content); err != nil { 680 + return fmt.Errorf("read blob %s: %w", keyFullPath, err) 681 + } 682 + doc = index.Document{ 683 + SubRepositoryPath: key.SubRepoPath, 684 + Name: keyFullPath, 685 + Content: content, 686 + Branches: branches, 687 + } 600 688 } 689 + } 601 690 602 - if idx%10_000 == 0 { 603 - builder.CheckMemoryUsage() 604 - } 691 + if err := builder.Add(doc); err != nil { 692 + return fmt.Errorf("error adding document with name %s: %w", key.FullPath(), err) 693 + } 694 + 695 + if idx%10_000 == 0 { 696 + builder.CheckMemoryUsage() 605 697 } 606 698 } 607 - return true, builder.Finish() 699 + 700 + return nil 608 701 } 609 702 610 703 // openRepo opens a git repository in a way that's optimized for indexing. ··· 987 1080 988 1081 // We filter out large documents when fetching the repo. So if an object is too large, it will not be found. 989 1082 if errors.Is(err, plumbing.ErrObjectNotFound) { 990 - return skippedLargeDoc(key, branches), nil 1083 + return skippedDoc(key, branches, index.SkipReasonTooLarge), nil 991 1084 } 992 1085 993 1086 if err != nil { ··· 996 1089 997 1090 keyFullPath := key.FullPath() 998 1091 if blob.Size > int64(opts.SizeMax) && !opts.IgnoreSizeMax(keyFullPath) { 999 - return skippedLargeDoc(key, branches), nil 1092 + return skippedDoc(key, branches, index.SkipReasonTooLarge), nil 1000 1093 } 1001 1094 1002 1095 contents, err := blobContents(blob) ··· 1012 1105 }, nil 1013 1106 } 1014 1107 1015 - func skippedLargeDoc(key fileKey, branches []string) index.Document { 1108 + // skippedDoc creates a Document placeholder for a blob that was not indexed. 1109 + func skippedDoc(key fileKey, branches []string, reason index.SkipReason) index.Document { 1016 1110 return index.Document{ 1017 - SkipReason: index.SkipReasonTooLarge, 1111 + SkipReason: reason, 1018 1112 Name: key.FullPath(), 1019 1113 Branches: branches, 1020 1114 SubRepositoryPath: key.SubRepoPath,
+3
index/document.go
··· 26 26 SkipReasonTooSmall 27 27 SkipReasonBinary 28 28 SkipReasonTooManyTrigrams 29 + SkipReasonMissing 29 30 ) 30 31 31 32 func (s SkipReason) explanation() string { ··· 40 41 return "contains binary content" 41 42 case SkipReasonTooManyTrigrams: 42 43 return "contains too many trigrams" 44 + case SkipReasonMissing: 45 + return "object missing from repository" 43 46 default: 44 47 return "unknown skip reason" 45 48 }
+4 -3
index/file_category.go
··· 35 35 name := doc.Name 36 36 content := doc.Content 37 37 38 - // If this document was skipped because it was too large, just guess the category based on the filename to avoid 39 - // examining the contents. Note: passing nil content is allowed by the go-enry contract. 40 - if doc.SkipReason == SkipReasonTooLarge || doc.SkipReason == SkipReasonBinary { 38 + // If this document was skipped (too large, binary, or missing from the repo), 39 + // guess the category based on the filename to avoid examining the contents. 40 + // Note: passing nil content is allowed by the go-enry contract. 41 + if doc.SkipReason == SkipReasonTooLarge || doc.SkipReason == SkipReasonBinary || doc.SkipReason == SkipReasonMissing { 41 42 content = nil 42 43 } 43 44