fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package gitindex 16 17import ( 18 "bufio" 19 "bytes" 20 "encoding/hex" 21 "fmt" 22 "io" 23 "os/exec" 24 "strconv" 25 "sync" 26 "syscall" 27 28 "github.com/go-git/go-git/v5/plumbing" 29) 30 31type catfileReaderOptions struct { 32 filterSpec string 33} 34 35// catfileReader provides streaming access to git blob objects via a pipelined 36// "git cat-file --batch --buffer" process. A writer goroutine feeds all blob 37// SHAs to stdin while the caller reads responses one at a time, similar to 38// archive/tar.Reader. 39// 40// The --buffer flag switches git's output from per-object flush (write_or_die) 41// to libc stdio buffering (fwrite), reducing syscalls. After stdin EOF, git 42// calls fflush(stdout) to deliver any remaining output. 43// 44// Usage: 45// 46// cr, err := newCatfileReader(repoDir, ids, catfileReaderOptions{}) 47// if err != nil { ... } 48// defer cr.Close() 49// 50// for { 51// size, missing, excluded, err := cr.Next() 52// if err == io.EOF { break } 53// if missing { continue } 54// if excluded { continue } 55// if size > maxSize { continue } // unread bytes auto-skipped 56// content := make([]byte, size) 57// io.ReadFull(cr, content) 58// } 59type catfileReader struct { 60 cmd *exec.Cmd 61 reader *bufio.Reader 62 writeErr <-chan error 63 64 // pending tracks unread content bytes + trailing LF for the current 65 // entry. Next() discards any pending bytes before reading the next header. 66 pending int 67 68 closeOnce sync.Once 69 closeErr error 70} 71 72// newCatfileReader starts a "git cat-file --batch --buffer" process and feeds 73// all ids to its stdin via a background goroutine. The caller must call Close 74// when done. Pass a zero-value catfileReaderOptions when no options are needed. 75func newCatfileReader(repoDir string, ids []plumbing.Hash, opts catfileReaderOptions) (*catfileReader, error) { 76 args := []string{"cat-file", "--batch", "--buffer"} 77 if opts.filterSpec != "" { 78 args = append(args, "--filter="+opts.filterSpec) 79 } 80 81 cmd := exec.Command("git", args...) 82 cmd.Dir = repoDir 83 84 stdin, err := cmd.StdinPipe() 85 if err != nil { 86 return nil, fmt.Errorf("stdin pipe: %w", err) 87 } 88 89 stdout, err := cmd.StdoutPipe() 90 if err != nil { 91 stdin.Close() 92 return nil, fmt.Errorf("stdout pipe: %w", err) 93 } 94 95 if err := cmd.Start(); err != nil { 96 stdin.Close() 97 stdout.Close() 98 return nil, fmt.Errorf("start git cat-file: %w", err) 99 } 100 101 // Writer goroutine: feed all SHAs then close stdin to trigger flush. 102 writeErr := make(chan error, 1) 103 go func() { 104 defer close(writeErr) 105 defer stdin.Close() 106 bw := bufio.NewWriterSize(stdin, 64*1024) 107 var hexBuf [41]byte 108 hexBuf[40] = '\n' 109 for _, id := range ids { 110 hex.Encode(hexBuf[:40], id[:]) 111 if _, err := bw.Write(hexBuf[:]); err != nil { 112 writeErr <- err 113 return 114 } 115 } 116 writeErr <- bw.Flush() 117 }() 118 119 return &catfileReader{ 120 cmd: cmd, 121 reader: bufio.NewReaderSize(stdout, 512*1024), 122 writeErr: writeErr, 123 }, nil 124} 125 126// Next advances to the next blob entry. It returns the blob's size and whether 127// it is missing or excluded by the configured filter. Any unread content from 128// the previous entry is automatically discarded. Returns io.EOF when all 129// entries have been consumed. 130// 131// After Next returns successfully with missing=false and excluded=false, call 132// Read to consume the blob content, or call Next again to skip it. 133func (cr *catfileReader) Next() (size int, missing bool, excluded bool, err error) { 134 // Discard unread content from the previous entry. 135 if cr.pending > 0 { 136 if _, err := cr.reader.Discard(cr.pending); err != nil { 137 return 0, false, false, fmt.Errorf("discard pending bytes: %w", err) 138 } 139 cr.pending = 0 140 } 141 142 headerBytes, err := cr.reader.ReadBytes('\n') 143 if err != nil { 144 if err == io.EOF { 145 return 0, false, false, io.EOF 146 } 147 return 0, false, false, fmt.Errorf("read header: %w", err) 148 } 149 header := headerBytes[:len(headerBytes)-1] // trim \n 150 151 if bytes.HasSuffix(header, []byte(" missing")) { 152 return 0, true, false, nil 153 } 154 155 if bytes.HasSuffix(header, []byte(" excluded")) { 156 return 0, false, true, nil 157 } 158 159 // Parse size from "<oid> <type> <size>". 160 lastSpace := bytes.LastIndexByte(header, ' ') 161 if lastSpace == -1 { 162 return 0, false, false, fmt.Errorf("unexpected header: %q", header) 163 } 164 size, err = strconv.Atoi(string(header[lastSpace+1:])) 165 if err != nil { 166 return 0, false, false, fmt.Errorf("parse size from %q: %w", header, err) 167 } 168 169 // Track pending bytes: content + trailing LF. 170 cr.pending = size + 1 171 return size, false, false, nil 172} 173 174// Read reads from the current blob's content. Implements io.Reader. Returns 175// io.EOF when the blob's content has been fully read (the trailing LF 176// delimiter is consumed automatically). 177func (cr *catfileReader) Read(p []byte) (int, error) { 178 if cr.pending <= 0 { 179 return 0, io.EOF 180 } 181 182 // Don't read into the trailing LF byte — reserve it. 183 contentRemaining := cr.pending - 1 184 if contentRemaining <= 0 { 185 // Only the trailing LF remains; consume it and signal EOF. 186 if _, err := cr.reader.ReadByte(); err != nil { 187 return 0, fmt.Errorf("read trailing LF: %w", err) 188 } 189 cr.pending = 0 190 return 0, io.EOF 191 } 192 193 // Limit the read to the remaining content bytes. 194 if len(p) > contentRemaining { 195 p = p[:contentRemaining] 196 } 197 n, err := cr.reader.Read(p) 198 cr.pending -= n 199 if err != nil { 200 return n, err 201 } 202 203 // If we've consumed all content bytes, also consume the trailing LF. 204 if cr.pending == 1 { 205 if _, err := cr.reader.ReadByte(); err != nil { 206 return n, fmt.Errorf("read trailing LF: %w", err) 207 } 208 cr.pending = 0 209 } 210 211 return n, nil 212} 213 214// Close shuts down the cat-file process and waits for it to exit. 215// It is safe to call Close multiple times or concurrently. 216func (cr *catfileReader) Close() error { 217 cr.closeOnce.Do(func() { 218 // Kill first to avoid blocking on drain when there are many 219 // unconsumed entries. Gitaly uses the same kill-first pattern. 220 _ = cr.cmd.Process.Kill() 221 // Drain any buffered stdout so the pipe closes cleanly. 222 // Must complete before cmd.Wait(), which closes the pipe. 223 _, _ = io.Copy(io.Discard, cr.reader) 224 // Wait for writer goroutine (unblocks via broken pipe from Kill). 225 <-cr.writeErr 226 err := cr.cmd.Wait() 227 // Suppress the expected "signal: killed" error from our own Kill(). 228 if isKilledErr(err) { 229 err = nil 230 } 231 cr.closeErr = err 232 }) 233 return cr.closeErr 234} 235 236// isKilledErr reports whether err is an exec.ExitError caused by SIGKILL. 237func isKilledErr(err error) bool { 238 exitErr, ok := err.(*exec.ExitError) 239 if !ok { 240 return false 241 } 242 ws, ok := exitErr.Sys().(syscall.WaitStatus) 243 return ok && ws.Signal() == syscall.SIGKILL 244}