fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package gitindex 16 17import ( 18 "bufio" 19 "bytes" 20 "encoding/hex" 21 "fmt" 22 "io" 23 "os/exec" 24 "strconv" 25 "sync" 26 "syscall" 27 28 "github.com/go-git/go-git/v5/plumbing" 29) 30 31// catfileReader provides streaming access to git blob objects via a pipelined 32// "git cat-file --batch --buffer" process. A writer goroutine feeds all blob 33// SHAs to stdin while the caller reads responses one at a time, similar to 34// archive/tar.Reader. 35// 36// The --buffer flag switches git's output from per-object flush (write_or_die) 37// to libc stdio buffering (fwrite), reducing syscalls. After stdin EOF, git 38// calls fflush(stdout) to deliver any remaining output. 39// 40// Usage: 41// 42// cr, err := newCatfileReader(repoDir, ids) 43// if err != nil { ... } 44// defer cr.Close() 45// 46// for { 47// size, missing, err := cr.Next() 48// if err == io.EOF { break } 49// if missing { continue } 50// if size > maxSize { continue } // unread bytes auto-skipped 51// content := make([]byte, size) 52// io.ReadFull(cr, content) 53// } 54type catfileReader struct { 55 cmd *exec.Cmd 56 reader *bufio.Reader 57 writeErr <-chan error 58 59 // pending tracks unread content bytes + trailing LF for the current 60 // entry. Next() discards any pending bytes before reading the next header. 61 pending int 62 63 closeOnce sync.Once 64 closeErr error 65} 66 67// newCatfileReader starts a "git cat-file --batch --buffer" process and feeds 68// all ids to its stdin via a background goroutine. The caller must call Close 69// when done. 70func newCatfileReader(repoDir string, ids []plumbing.Hash) (*catfileReader, error) { 71 cmd := exec.Command("git", "cat-file", "--batch", "--buffer") 72 cmd.Dir = repoDir 73 74 stdin, err := cmd.StdinPipe() 75 if err != nil { 76 return nil, fmt.Errorf("stdin pipe: %w", err) 77 } 78 79 stdout, err := cmd.StdoutPipe() 80 if err != nil { 81 stdin.Close() 82 return nil, fmt.Errorf("stdout pipe: %w", err) 83 } 84 85 if err := cmd.Start(); err != nil { 86 stdin.Close() 87 stdout.Close() 88 return nil, fmt.Errorf("start git cat-file: %w", err) 89 } 90 91 // Writer goroutine: feed all SHAs then close stdin to trigger flush. 92 writeErr := make(chan error, 1) 93 go func() { 94 defer close(writeErr) 95 defer stdin.Close() 96 bw := bufio.NewWriterSize(stdin, 64*1024) 97 var hexBuf [41]byte 98 hexBuf[40] = '\n' 99 for _, id := range ids { 100 hex.Encode(hexBuf[:40], id[:]) 101 if _, err := bw.Write(hexBuf[:]); err != nil { 102 writeErr <- err 103 return 104 } 105 } 106 writeErr <- bw.Flush() 107 }() 108 109 return &catfileReader{ 110 cmd: cmd, 111 reader: bufio.NewReaderSize(stdout, 512*1024), 112 writeErr: writeErr, 113 }, nil 114} 115 116// Next advances to the next blob entry. It returns the blob's size and whether 117// it is missing. Any unread content from the previous entry is automatically 118// discarded. Returns io.EOF when all entries have been consumed. 119// 120// After Next returns successfully with missing=false, call Read to consume the 121// blob content, or call Next again to skip it. 122func (cr *catfileReader) Next() (size int, missing bool, err error) { 123 // Discard unread content from the previous entry. 124 if cr.pending > 0 { 125 if _, err := cr.reader.Discard(cr.pending); err != nil { 126 return 0, false, fmt.Errorf("discard pending bytes: %w", err) 127 } 128 cr.pending = 0 129 } 130 131 headerBytes, err := cr.reader.ReadBytes('\n') 132 if err != nil { 133 if err == io.EOF { 134 return 0, false, io.EOF 135 } 136 return 0, false, fmt.Errorf("read header: %w", err) 137 } 138 header := headerBytes[:len(headerBytes)-1] // trim \n 139 140 if bytes.HasSuffix(header, []byte(" missing")) { 141 return 0, true, nil 142 } 143 144 // Parse size from "<oid> <type> <size>". 145 lastSpace := bytes.LastIndexByte(header, ' ') 146 if lastSpace == -1 { 147 return 0, false, fmt.Errorf("unexpected header: %q", header) 148 } 149 size, err = strconv.Atoi(string(header[lastSpace+1:])) 150 if err != nil { 151 return 0, false, fmt.Errorf("parse size from %q: %w", header, err) 152 } 153 154 // Track pending bytes: content + trailing LF. 155 cr.pending = size + 1 156 return size, false, nil 157} 158 159// Read reads from the current blob's content. Implements io.Reader. Returns 160// io.EOF when the blob's content has been fully read (the trailing LF 161// delimiter is consumed automatically). 162func (cr *catfileReader) Read(p []byte) (int, error) { 163 if cr.pending <= 0 { 164 return 0, io.EOF 165 } 166 167 // Don't read into the trailing LF byte — reserve it. 168 contentRemaining := cr.pending - 1 169 if contentRemaining <= 0 { 170 // Only the trailing LF remains; consume it and signal EOF. 171 if _, err := cr.reader.ReadByte(); err != nil { 172 return 0, fmt.Errorf("read trailing LF: %w", err) 173 } 174 cr.pending = 0 175 return 0, io.EOF 176 } 177 178 // Limit the read to the remaining content bytes. 179 if len(p) > contentRemaining { 180 p = p[:contentRemaining] 181 } 182 n, err := cr.reader.Read(p) 183 cr.pending -= n 184 if err != nil { 185 return n, err 186 } 187 188 // If we've consumed all content bytes, also consume the trailing LF. 189 if cr.pending == 1 { 190 if _, err := cr.reader.ReadByte(); err != nil { 191 return n, fmt.Errorf("read trailing LF: %w", err) 192 } 193 cr.pending = 0 194 } 195 196 return n, nil 197} 198 199// Close shuts down the cat-file process and waits for it to exit. 200// It is safe to call Close multiple times or concurrently. 201func (cr *catfileReader) Close() error { 202 cr.closeOnce.Do(func() { 203 // Kill first to avoid blocking on drain when there are many 204 // unconsumed entries. Gitaly uses the same kill-first pattern. 205 _ = cr.cmd.Process.Kill() 206 // Drain any buffered stdout so the pipe closes cleanly. 207 // Must complete before cmd.Wait(), which closes the pipe. 208 _, _ = io.Copy(io.Discard, cr.reader) 209 // Wait for writer goroutine (unblocks via broken pipe from Kill). 210 <-cr.writeErr 211 err := cr.cmd.Wait() 212 // Suppress the expected "signal: killed" error from our own Kill(). 213 if isKilledErr(err) { 214 err = nil 215 } 216 cr.closeErr = err 217 }) 218 return cr.closeErr 219} 220 221// isKilledErr reports whether err is an exec.ExitError caused by SIGKILL. 222func isKilledErr(err error) bool { 223 exitErr, ok := err.(*exec.ExitError) 224 if !ok { 225 return false 226 } 227 ws, ok := exitErr.Sys().(syscall.WaitStatus) 228 return ok && ws.Signal() == syscall.SIGKILL 229}