fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package gitindex
16
17import (
18 "bufio"
19 "bytes"
20 "encoding/hex"
21 "fmt"
22 "io"
23 "os/exec"
24 "strconv"
25 "sync"
26 "syscall"
27
28 "github.com/go-git/go-git/v5/plumbing"
29)
30
31// catfileReader provides streaming access to git blob objects via a pipelined
32// "git cat-file --batch --buffer" process. A writer goroutine feeds all blob
33// SHAs to stdin while the caller reads responses one at a time, similar to
34// archive/tar.Reader.
35//
36// The --buffer flag switches git's output from per-object flush (write_or_die)
37// to libc stdio buffering (fwrite), reducing syscalls. After stdin EOF, git
38// calls fflush(stdout) to deliver any remaining output.
39//
40// Usage:
41//
42// cr, err := newCatfileReader(repoDir, ids)
43// if err != nil { ... }
44// defer cr.Close()
45//
46// for {
47// size, missing, err := cr.Next()
48// if err == io.EOF { break }
49// if missing { continue }
50// if size > maxSize { continue } // unread bytes auto-skipped
51// content := make([]byte, size)
52// io.ReadFull(cr, content)
53// }
54type catfileReader struct {
55 cmd *exec.Cmd
56 reader *bufio.Reader
57 writeErr <-chan error
58
59 // pending tracks unread content bytes + trailing LF for the current
60 // entry. Next() discards any pending bytes before reading the next header.
61 pending int
62
63 closeOnce sync.Once
64 closeErr error
65}
66
67// newCatfileReader starts a "git cat-file --batch --buffer" process and feeds
68// all ids to its stdin via a background goroutine. The caller must call Close
69// when done.
70func newCatfileReader(repoDir string, ids []plumbing.Hash) (*catfileReader, error) {
71 cmd := exec.Command("git", "cat-file", "--batch", "--buffer")
72 cmd.Dir = repoDir
73
74 stdin, err := cmd.StdinPipe()
75 if err != nil {
76 return nil, fmt.Errorf("stdin pipe: %w", err)
77 }
78
79 stdout, err := cmd.StdoutPipe()
80 if err != nil {
81 stdin.Close()
82 return nil, fmt.Errorf("stdout pipe: %w", err)
83 }
84
85 if err := cmd.Start(); err != nil {
86 stdin.Close()
87 stdout.Close()
88 return nil, fmt.Errorf("start git cat-file: %w", err)
89 }
90
91 // Writer goroutine: feed all SHAs then close stdin to trigger flush.
92 writeErr := make(chan error, 1)
93 go func() {
94 defer close(writeErr)
95 defer stdin.Close()
96 bw := bufio.NewWriterSize(stdin, 64*1024)
97 var hexBuf [41]byte
98 hexBuf[40] = '\n'
99 for _, id := range ids {
100 hex.Encode(hexBuf[:40], id[:])
101 if _, err := bw.Write(hexBuf[:]); err != nil {
102 writeErr <- err
103 return
104 }
105 }
106 writeErr <- bw.Flush()
107 }()
108
109 return &catfileReader{
110 cmd: cmd,
111 reader: bufio.NewReaderSize(stdout, 512*1024),
112 writeErr: writeErr,
113 }, nil
114}
115
116// Next advances to the next blob entry. It returns the blob's size and whether
117// it is missing. Any unread content from the previous entry is automatically
118// discarded. Returns io.EOF when all entries have been consumed.
119//
120// After Next returns successfully with missing=false, call Read to consume the
121// blob content, or call Next again to skip it.
122func (cr *catfileReader) Next() (size int, missing bool, err error) {
123 // Discard unread content from the previous entry.
124 if cr.pending > 0 {
125 if _, err := cr.reader.Discard(cr.pending); err != nil {
126 return 0, false, fmt.Errorf("discard pending bytes: %w", err)
127 }
128 cr.pending = 0
129 }
130
131 headerBytes, err := cr.reader.ReadBytes('\n')
132 if err != nil {
133 if err == io.EOF {
134 return 0, false, io.EOF
135 }
136 return 0, false, fmt.Errorf("read header: %w", err)
137 }
138 header := headerBytes[:len(headerBytes)-1] // trim \n
139
140 if bytes.HasSuffix(header, []byte(" missing")) {
141 return 0, true, nil
142 }
143
144 // Parse size from "<oid> <type> <size>".
145 lastSpace := bytes.LastIndexByte(header, ' ')
146 if lastSpace == -1 {
147 return 0, false, fmt.Errorf("unexpected header: %q", header)
148 }
149 size, err = strconv.Atoi(string(header[lastSpace+1:]))
150 if err != nil {
151 return 0, false, fmt.Errorf("parse size from %q: %w", header, err)
152 }
153
154 // Track pending bytes: content + trailing LF.
155 cr.pending = size + 1
156 return size, false, nil
157}
158
159// Read reads from the current blob's content. Implements io.Reader. Returns
160// io.EOF when the blob's content has been fully read (the trailing LF
161// delimiter is consumed automatically).
162func (cr *catfileReader) Read(p []byte) (int, error) {
163 if cr.pending <= 0 {
164 return 0, io.EOF
165 }
166
167 // Don't read into the trailing LF byte — reserve it.
168 contentRemaining := cr.pending - 1
169 if contentRemaining <= 0 {
170 // Only the trailing LF remains; consume it and signal EOF.
171 if _, err := cr.reader.ReadByte(); err != nil {
172 return 0, fmt.Errorf("read trailing LF: %w", err)
173 }
174 cr.pending = 0
175 return 0, io.EOF
176 }
177
178 // Limit the read to the remaining content bytes.
179 if len(p) > contentRemaining {
180 p = p[:contentRemaining]
181 }
182 n, err := cr.reader.Read(p)
183 cr.pending -= n
184 if err != nil {
185 return n, err
186 }
187
188 // If we've consumed all content bytes, also consume the trailing LF.
189 if cr.pending == 1 {
190 if _, err := cr.reader.ReadByte(); err != nil {
191 return n, fmt.Errorf("read trailing LF: %w", err)
192 }
193 cr.pending = 0
194 }
195
196 return n, nil
197}
198
199// Close shuts down the cat-file process and waits for it to exit.
200// It is safe to call Close multiple times or concurrently.
201func (cr *catfileReader) Close() error {
202 cr.closeOnce.Do(func() {
203 // Kill first to avoid blocking on drain when there are many
204 // unconsumed entries. Gitaly uses the same kill-first pattern.
205 _ = cr.cmd.Process.Kill()
206 // Drain any buffered stdout so the pipe closes cleanly.
207 // Must complete before cmd.Wait(), which closes the pipe.
208 _, _ = io.Copy(io.Discard, cr.reader)
209 // Wait for writer goroutine (unblocks via broken pipe from Kill).
210 <-cr.writeErr
211 err := cr.cmd.Wait()
212 // Suppress the expected "signal: killed" error from our own Kill().
213 if isKilledErr(err) {
214 err = nil
215 }
216 cr.closeErr = err
217 })
218 return cr.closeErr
219}
220
221// isKilledErr reports whether err is an exec.ExitError caused by SIGKILL.
222func isKilledErr(err error) bool {
223 exitErr, ok := err.(*exec.ExitError)
224 if !ok {
225 return false
226 }
227 ws, ok := exitErr.Sys().(syscall.WaitStatus)
228 return ok && ws.Signal() == syscall.SIGKILL
229}