fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package gitindex
16
17import (
18 "bufio"
19 "bytes"
20 "encoding/hex"
21 "fmt"
22 "io"
23 "os/exec"
24 "strconv"
25 "sync"
26 "syscall"
27
28 "github.com/go-git/go-git/v5/plumbing"
29)
30
31type catfileReaderOptions struct {
32 filterSpec string
33}
34
35// catfileReader provides streaming access to git blob objects via a pipelined
36// "git cat-file --batch --buffer" process. A writer goroutine feeds all blob
37// SHAs to stdin while the caller reads responses one at a time, similar to
38// archive/tar.Reader.
39//
40// The --buffer flag switches git's output from per-object flush (write_or_die)
41// to libc stdio buffering (fwrite), reducing syscalls. After stdin EOF, git
42// calls fflush(stdout) to deliver any remaining output.
43//
44// Usage:
45//
46// cr, err := newCatfileReader(repoDir, ids, catfileReaderOptions{})
47// if err != nil { ... }
48// defer cr.Close()
49//
50// for {
51// size, missing, excluded, err := cr.Next()
52// if err == io.EOF { break }
53// if missing { continue }
54// if excluded { continue }
55// if size > maxSize { continue } // unread bytes auto-skipped
56// content := make([]byte, size)
57// io.ReadFull(cr, content)
58// }
59type catfileReader struct {
60 cmd *exec.Cmd
61 reader *bufio.Reader
62 writeErr <-chan error
63
64 // pending tracks unread content bytes + trailing LF for the current
65 // entry. Next() discards any pending bytes before reading the next header.
66 pending int
67
68 closeOnce sync.Once
69 closeErr error
70}
71
72// newCatfileReader starts a "git cat-file --batch --buffer" process and feeds
73// all ids to its stdin via a background goroutine. The caller must call Close
74// when done. Pass a zero-value catfileReaderOptions when no options are needed.
75func newCatfileReader(repoDir string, ids []plumbing.Hash, opts catfileReaderOptions) (*catfileReader, error) {
76 args := []string{"cat-file", "--batch", "--buffer"}
77 if opts.filterSpec != "" {
78 args = append(args, "--filter="+opts.filterSpec)
79 }
80
81 cmd := exec.Command("git", args...)
82 cmd.Dir = repoDir
83
84 stdin, err := cmd.StdinPipe()
85 if err != nil {
86 return nil, fmt.Errorf("stdin pipe: %w", err)
87 }
88
89 stdout, err := cmd.StdoutPipe()
90 if err != nil {
91 stdin.Close()
92 return nil, fmt.Errorf("stdout pipe: %w", err)
93 }
94
95 if err := cmd.Start(); err != nil {
96 stdin.Close()
97 stdout.Close()
98 return nil, fmt.Errorf("start git cat-file: %w", err)
99 }
100
101 // Writer goroutine: feed all SHAs then close stdin to trigger flush.
102 writeErr := make(chan error, 1)
103 go func() {
104 defer close(writeErr)
105 defer stdin.Close()
106 bw := bufio.NewWriterSize(stdin, 64*1024)
107 var hexBuf [41]byte
108 hexBuf[40] = '\n'
109 for _, id := range ids {
110 hex.Encode(hexBuf[:40], id[:])
111 if _, err := bw.Write(hexBuf[:]); err != nil {
112 writeErr <- err
113 return
114 }
115 }
116 writeErr <- bw.Flush()
117 }()
118
119 return &catfileReader{
120 cmd: cmd,
121 reader: bufio.NewReaderSize(stdout, 512*1024),
122 writeErr: writeErr,
123 }, nil
124}
125
126// Next advances to the next blob entry. It returns the blob's size and whether
127// it is missing or excluded by the configured filter. Any unread content from
128// the previous entry is automatically discarded. Returns io.EOF when all
129// entries have been consumed.
130//
131// After Next returns successfully with missing=false and excluded=false, call
132// Read to consume the blob content, or call Next again to skip it.
133func (cr *catfileReader) Next() (size int, missing bool, excluded bool, err error) {
134 // Discard unread content from the previous entry.
135 if cr.pending > 0 {
136 if _, err := cr.reader.Discard(cr.pending); err != nil {
137 return 0, false, false, fmt.Errorf("discard pending bytes: %w", err)
138 }
139 cr.pending = 0
140 }
141
142 headerBytes, err := cr.reader.ReadBytes('\n')
143 if err != nil {
144 if err == io.EOF {
145 return 0, false, false, io.EOF
146 }
147 return 0, false, false, fmt.Errorf("read header: %w", err)
148 }
149 header := headerBytes[:len(headerBytes)-1] // trim \n
150
151 if bytes.HasSuffix(header, []byte(" missing")) {
152 return 0, true, false, nil
153 }
154
155 if bytes.HasSuffix(header, []byte(" excluded")) {
156 return 0, false, true, nil
157 }
158
159 // Parse size from "<oid> <type> <size>".
160 lastSpace := bytes.LastIndexByte(header, ' ')
161 if lastSpace == -1 {
162 return 0, false, false, fmt.Errorf("unexpected header: %q", header)
163 }
164 size, err = strconv.Atoi(string(header[lastSpace+1:]))
165 if err != nil {
166 return 0, false, false, fmt.Errorf("parse size from %q: %w", header, err)
167 }
168
169 // Track pending bytes: content + trailing LF.
170 cr.pending = size + 1
171 return size, false, false, nil
172}
173
174// Read reads from the current blob's content. Implements io.Reader. Returns
175// io.EOF when the blob's content has been fully read (the trailing LF
176// delimiter is consumed automatically).
177func (cr *catfileReader) Read(p []byte) (int, error) {
178 if cr.pending <= 0 {
179 return 0, io.EOF
180 }
181
182 // Don't read into the trailing LF byte — reserve it.
183 contentRemaining := cr.pending - 1
184 if contentRemaining <= 0 {
185 // Only the trailing LF remains; consume it and signal EOF.
186 if _, err := cr.reader.ReadByte(); err != nil {
187 return 0, fmt.Errorf("read trailing LF: %w", err)
188 }
189 cr.pending = 0
190 return 0, io.EOF
191 }
192
193 // Limit the read to the remaining content bytes.
194 if len(p) > contentRemaining {
195 p = p[:contentRemaining]
196 }
197 n, err := cr.reader.Read(p)
198 cr.pending -= n
199 if err != nil {
200 return n, err
201 }
202
203 // If we've consumed all content bytes, also consume the trailing LF.
204 if cr.pending == 1 {
205 if _, err := cr.reader.ReadByte(); err != nil {
206 return n, fmt.Errorf("read trailing LF: %w", err)
207 }
208 cr.pending = 0
209 }
210
211 return n, nil
212}
213
214// Close shuts down the cat-file process and waits for it to exit.
215// It is safe to call Close multiple times or concurrently.
216func (cr *catfileReader) Close() error {
217 cr.closeOnce.Do(func() {
218 // Kill first to avoid blocking on drain when there are many
219 // unconsumed entries. Gitaly uses the same kill-first pattern.
220 _ = cr.cmd.Process.Kill()
221 // Drain any buffered stdout so the pipe closes cleanly.
222 // Must complete before cmd.Wait(), which closes the pipe.
223 _, _ = io.Copy(io.Discard, cr.reader)
224 // Wait for writer goroutine (unblocks via broken pipe from Kill).
225 <-cr.writeErr
226 err := cr.cmd.Wait()
227 // Suppress the expected "signal: killed" error from our own Kill().
228 if isKilledErr(err) {
229 err = nil
230 }
231 cr.closeErr = err
232 })
233 return cr.closeErr
234}
235
236// isKilledErr reports whether err is an exec.ExitError caused by SIGKILL.
237func isKilledErr(err error) bool {
238 exitErr, ok := err.(*exec.ExitError)
239 if !ok {
240 return false
241 }
242 ws, ok := exitErr.Sys().(syscall.WaitStatus)
243 return ok && ws.Signal() == syscall.SIGKILL
244}