fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

sourcegraph-indexserver: GRPC, implement DeleteAllData (#923)

Relates to #920
Relates to SPLF-874

This implements DeleteAllData. We hold the global lock while deleting all simple shards belonging to a tenant. We also handle compound shards by disassembling them first.

Note that this "only" deletes persisted data. Updating the queue, for example, seems fragile because it might immediately get updated by Sourcegraph. This implies that Sourcegraph first has to delete the tenant in the Sourcegraph DB first and then call this new endpoint. Even if the queue still has a reference to a deleted tenant, indexserver won't be able to retrieve index options or clone the repo from gitserver.

Test plan:
- new unit tests
- manual testing:
I ran this together with Sourcegraph and triggered a delete by calling DeleteAllData directly. I confirmed that all shards, including compound shards are deleted.

+392 -68
+1 -52
cmd/zoekt-merge-index/main.go
··· 87 87 return merge(filepath.Dir(paths[0]), paths) 88 88 } 89 89 90 - // explode splits the input shard into individual shards and places them in dstDir. 91 - // Temporary files created in the process are removed on a best effort basis. 92 - func explode(dstDir string, inputShard string) error { 93 - f, err := os.Open(inputShard) 94 - if err != nil { 95 - return err 96 - } 97 - defer f.Close() 98 - 99 - indexFile, err := index.NewIndexFile(f) 100 - if err != nil { 101 - return err 102 - } 103 - defer indexFile.Close() 104 - 105 - exploded, err := index.Explode(dstDir, indexFile) 106 - defer func() { 107 - // best effort removal of tmp files. If os.Remove fails, indexserver will delete 108 - // the leftover tmp files during the next cleanup. 109 - for tmpFn := range exploded { 110 - os.Remove(tmpFn) 111 - } 112 - }() 113 - if err != nil { 114 - return fmt.Errorf("zoekt.Explode: %w", err) 115 - } 116 - 117 - // remove the input shard first to avoid duplicate indexes. In the worst case, 118 - // the process is interrupted just after we delete the compound shard, in which 119 - // case we have to reindex the lost repos. 120 - paths, err := index.IndexFilePaths(inputShard) 121 - if err != nil { 122 - return err 123 - } 124 - for _, path := range paths { 125 - err = os.Remove(path) 126 - if err != nil { 127 - return err 128 - } 129 - } 130 - 131 - // best effort rename shards. 132 - for tmpFn, dstFn := range exploded { 133 - if err := os.Rename(tmpFn, dstFn); err != nil { 134 - log.Printf("explode: rename failed: %s", err) 135 - } 136 - } 137 - 138 - return nil 139 - } 140 - 141 90 func explodeCmd(path string) error { 142 - return explode(filepath.Dir(path), path) 91 + return index.Explode(filepath.Dir(path), path) 143 92 } 144 93 145 94 func main() {
+4 -2
cmd/zoekt-merge-index/main_test.go
··· 8 8 "sort" 9 9 "testing" 10 10 11 + "github.com/stretchr/testify/require" 12 + 11 13 "github.com/sourcegraph/zoekt" 14 + "github.com/sourcegraph/zoekt/index" 12 15 "github.com/sourcegraph/zoekt/internal/shards" 13 16 "github.com/sourcegraph/zoekt/query" 14 - "github.com/stretchr/testify/require" 15 17 ) 16 18 17 19 func TestMerge(t *testing.T) { ··· 62 64 63 65 cs, err := filepath.Glob(filepath.Join(dir, "compound-*.zoekt")) 64 66 require.NoError(t, err) 65 - err = explode(dir, cs[0]) 67 + err = index.Explode(dir, cs[0]) 66 68 require.NoError(t, err) 67 69 68 70 cs, err = filepath.Glob(filepath.Join(dir, "compound-*.zoekt"))
+50 -5
cmd/zoekt-sourcegraph-indexserver/main.go
··· 65 65 "github.com/sourcegraph/zoekt/internal/tenant" 66 66 67 67 "go.uber.org/automaxprocs/maxprocs" 68 + "go.uber.org/multierr" 68 69 "golang.org/x/net/trace" 69 70 "golang.org/x/sys/unix" 70 71 "google.golang.org/grpc" ··· 1046 1047 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 1047 1048 } 1048 1049 1049 - // DeleteAllData deletes all shards in the index and trash belonging to the 1050 - // tenant associated with the request. This is stubbed out for now. 1051 - func (s *Server) DeleteAllData(_ context.Context, _ *indexserverv1.DeleteAllDataRequest) (*indexserverv1.DeleteAllDataResponse, error) { 1052 - s.logger.Warn("DeleteAllData") 1053 - return &indexserverv1.DeleteAllDataResponse{}, nil 1050 + // DeleteAllData deletes all shards in the index and trash dir belonging to the 1051 + // tenant associated with the request. The deletion is best-effort, which means 1052 + // we will delete as much as possible. If no error is returned, the caller can 1053 + // be certain that all data has been deleted. 1054 + func (s *Server) DeleteAllData(ctx context.Context, _ *indexserverv1.DeleteAllDataRequest) (*indexserverv1.DeleteAllDataResponse, error) { 1055 + tnt, err := tenant.FromContext(ctx) 1056 + if err != nil { 1057 + return nil, err 1058 + } 1059 + s.logger.Warn("DeleteAllData", sglog.Int("tenant_id", tnt.ID())) 1060 + 1061 + var merr error 1062 + s.muIndexDir.Global(func() { 1063 + // First, explode all compound shards that have repos from the tenant in 1064 + // question. Because we hold the global lock, we can be sure that no new 1065 + // merges start while we do this. 1066 + if err := s.explodeTenantCompoundShards(ctx, func(path string) error { 1067 + // We call explode in a separate process to protect indexserver. 1068 + cmd := defaultExplodeCmd(path) 1069 + 1070 + stdoutBuf := &bytes.Buffer{} 1071 + stderrBuf := &bytes.Buffer{} 1072 + cmd.Stdout = stdoutBuf 1073 + cmd.Stderr = stderrBuf 1074 + 1075 + err := cmd.Run() 1076 + if err != nil { 1077 + errorLog.Printf("explode failed: %v (stderr: %s)", err, stderrBuf.String()) 1078 + return err 1079 + } 1080 + 1081 + infoLog.Printf("exploded shard: %s", stdoutBuf.String()) 1082 + 1083 + return nil 1084 + }); err != nil { 1085 + merr = multierr.Append(merr, err) 1086 + } 1087 + 1088 + // Invariant: all shards from the tenant are simple shards. 1089 + 1090 + if err := purgeTenantShards(ctx, s.IndexDir); err != nil { 1091 + merr = multierr.Append(merr, err) 1092 + } 1093 + if err := purgeTenantShards(ctx, filepath.Join(s.IndexDir, ".trash")); err != nil { 1094 + merr = multierr.Append(merr, err) 1095 + } 1096 + }) 1097 + 1098 + return &indexserverv1.DeleteAllDataResponse{}, merr 1054 1099 } 1055 1100 1056 1101 func listIndexed(indexDir string) []uint32 {
+46
cmd/zoekt-sourcegraph-indexserver/merge.go
··· 2 2 3 3 import ( 4 4 "bytes" 5 + "context" 5 6 "os" 6 7 "os/exec" 7 8 "path/filepath" ··· 12 13 "github.com/prometheus/client_golang/prometheus" 13 14 "github.com/prometheus/client_golang/prometheus/promauto" 14 15 "github.com/sourcegraph/zoekt/index" 16 + "github.com/sourcegraph/zoekt/internal/tenant" 15 17 "go.uber.org/atomic" 16 18 ) 17 19 ··· 41 43 42 44 func defaultMergeCmd(args ...string) *exec.Cmd { 43 45 cmd := exec.Command("zoekt-merge-index", "merge") 46 + cmd.Args = append(cmd.Args, args...) 47 + return cmd 48 + } 49 + 50 + func defaultExplodeCmd(args ...string) *exec.Cmd { 51 + cmd := exec.Command("zoekt-merge-index", "explode") 44 52 cmd.Args = append(cmd.Args, args...) 45 53 return cmd 46 54 } ··· 233 241 c.shards = append(c.shards, cand) 234 242 c.size += cand.sizeBytes 235 243 } 244 + 245 + // explodeTenantCompoundShards explodes all compound shards that have repos from 246 + // the tenant in question. The caller must hold the global lock. 247 + func (s *Server) explodeTenantCompoundShards(ctx context.Context, explodeFunc func(path string) error) error { 248 + tnt, err := tenant.FromContext(ctx) 249 + if err != nil { 250 + return err 251 + } 252 + 253 + paths, err := filepath.Glob(filepath.Join(s.IndexDir, "compound-*")) 254 + if err != nil { 255 + return err 256 + } 257 + if len(paths) == 0 { 258 + return nil 259 + } 260 + 261 + nextCompoundShard: 262 + for _, path := range paths { 263 + // We don't use ReadMetadataPathAlive because we want to detect 264 + // tombstoned repos, too. 265 + repos, _, err := index.ReadMetadataPath(path) 266 + if err != nil { 267 + return err 268 + } 269 + for _, repo := range repos { 270 + if repo.TenantID == tnt.ID() { 271 + err := explodeFunc(path) 272 + if err != nil { 273 + return err 274 + } 275 + 276 + continue nextCompoundShard 277 + } 278 + } 279 + } 280 + return nil 281 + }
+62
cmd/zoekt-sourcegraph-indexserver/merge_test.go
··· 10 10 "strings" 11 11 "testing" 12 12 13 + "github.com/stretchr/testify/require" 14 + 13 15 "github.com/sourcegraph/zoekt" 14 16 "github.com/sourcegraph/zoekt/index" 17 + "github.com/sourcegraph/zoekt/internal/tenant/tenanttest" 15 18 ) 16 19 17 20 func TestHasMultipleShards(t *testing.T) { ··· 196 199 checkCount(dir, "*_v16.00000.zoekt", tc.wantSimple) 197 200 }) 198 201 } 202 + } 203 + 204 + func TestExplodeTenantCompoundShards(t *testing.T) { 205 + tenanttest.MockEnforce(t) 206 + dir := t.TempDir() 207 + s := &Server{IndexDir: dir} 208 + 209 + // Create two compound shards: 210 + // 1. One with repos from tenant 1 and 2 211 + // 2. One with repos from tenant 2 and 3 212 + cs1 := createCompoundShard(t, dir, []uint32{1, 2}, func(in *zoekt.Repository) { 213 + if in.ID == 1 { 214 + in.TenantID = 1 215 + } else { 216 + in.TenantID = 2 217 + } 218 + }) 219 + 220 + cs2 := createCompoundShard(t, dir, []uint32{3, 4}, func(in *zoekt.Repository) { 221 + if in.ID == 3 { 222 + in.TenantID = 2 223 + } else { 224 + in.TenantID = 3 225 + } 226 + }) 227 + 228 + // Create context with tenant 1 229 + ctx := tenanttest.NewTestContext() 230 + 231 + // Explode shards for tenant 1 232 + err := s.explodeTenantCompoundShards(ctx, func(path string) error { 233 + // For this test we call explode directly instead of calling it in a 234 + // separate process. 235 + return index.Explode(dir, path) 236 + }) 237 + require.NoError(t, err) 238 + 239 + // Check that only cs1 was exploded (since it contained a repo from tenant 240 + // 1) and cs2 remains untouched 241 + require.NoFileExists(t, cs1) 242 + require.FileExists(t, cs2) 243 + 244 + // Check that we have 2 simple shards (from cs1) and 1 compound shard (cs2) 245 + simpleShards, err := filepath.Glob(filepath.Join(dir, "*_v16.00000.zoekt")) 246 + require.NoError(t, err) 247 + require.Len(t, simpleShards, 2, "expected 2 simple shards") 248 + 249 + // check that the simple shards are from tenant 1 and 2 250 + for _, shard := range simpleShards { 251 + repos, _, err := index.ReadMetadataPath(shard) 252 + require.NoError(t, err) 253 + for _, repo := range repos { 254 + require.Contains(t, []int{1, 2}, repo.TenantID, "expected tenant 1 or 2, but got %d", repo.TenantID) 255 + } 256 + } 257 + 258 + compoundShards, err := filepath.Glob(filepath.Join(dir, "compound-*")) 259 + require.NoError(t, err) 260 + require.Len(t, compoundShards, 1, "expected 1 compound shard") 199 261 } 200 262 201 263 func copyTestShards(dstDir string, srcShards []string) ([]string, error) {
+73
cmd/zoekt-sourcegraph-indexserver/purge.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "os" 6 + "path/filepath" 7 + "strings" 8 + 9 + "go.uber.org/multierr" 10 + 11 + "github.com/sourcegraph/zoekt/index" 12 + "github.com/sourcegraph/zoekt/internal/tenant" 13 + ) 14 + 15 + // purgeTenantShards removes all simple shards from dir on a best-effort basis. 16 + // It returns an error if there is no tenant in the context or if it encounters 17 + // an error while removing a shard. 18 + func purgeTenantShards(ctx context.Context, dir string) error { 19 + tnt, err := tenant.FromContext(ctx) 20 + if err != nil { 21 + return err 22 + } 23 + 24 + d, err := os.Open(dir) 25 + if err != nil { 26 + return err 27 + } 28 + defer d.Close() 29 + 30 + names, err := d.Readdirnames(-1) 31 + if err != nil { 32 + return err 33 + } 34 + 35 + var merr error 36 + for _, n := range names { 37 + path := filepath.Join(dir, n) 38 + fi, err := os.Stat(path) 39 + if err != nil { 40 + merr = multierr.Append(merr, err) 41 + continue 42 + } 43 + if fi.IsDir() || filepath.Ext(path) != ".zoekt" { 44 + continue 45 + } 46 + 47 + // Skip compound shards. 48 + if strings.HasPrefix(filepath.Base(path), "compound-") { 49 + continue 50 + } 51 + 52 + repos, _, err := index.ReadMetadataPath(path) 53 + if err != nil { 54 + merr = multierr.Append(merr, err) 55 + continue 56 + } 57 + // Since we excluded compound shards, we know there is exactly one repo 58 + if repos[0].TenantID == tnt.ID() { 59 + paths, err := index.IndexFilePaths(path) 60 + if err != nil { 61 + merr = multierr.Append(merr, err) 62 + continue 63 + } 64 + for _, p := range paths { 65 + if err := os.Remove(p); err != nil { 66 + merr = multierr.Append(merr, err) 67 + } 68 + } 69 + } 70 + } 71 + 72 + return merr 73 + }
+101
cmd/zoekt-sourcegraph-indexserver/purge_test.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "path/filepath" 6 + "testing" 7 + 8 + "github.com/sourcegraph/zoekt" 9 + "github.com/sourcegraph/zoekt/internal/tenant/tenanttest" 10 + "github.com/stretchr/testify/require" 11 + ) 12 + 13 + func TestPurgeTenantShards(t *testing.T) { 14 + // TestPurgeTenantShards verifies both the basic shard purging functionality 15 + // and proper isolation between tenants. It ensures that: 16 + // 1. Shards are only purged when a valid tenant context is provided 17 + // 2. Only shards belonging to the specified tenant are purged 18 + // 3. Compound shards are preserved regardless of tenant 19 + // 4. Other tenants' shards remain untouched 20 + dir := t.TempDir() 21 + 22 + // Create test shards for different tenants 23 + tenant1Ctx := tenanttest.NewTestContext() 24 + tenant2Ctx := tenanttest.NewTestContext() 25 + 26 + // Helper to set tenant ID for test shards 27 + setTenantID := func(id int) func(in *zoekt.Repository) { 28 + return func(in *zoekt.Repository) { 29 + in.TenantID = id 30 + } 31 + } 32 + 33 + // Create test shards for tenant 1 34 + tenant1Shard1 := filepath.Join(dir, "tenant1_repo1.zoekt") 35 + tenant1Shard2 := filepath.Join(dir, "tenant1_repo2.zoekt") 36 + createTestShard(t, "tenant1_repo1", 1, tenant1Shard1, setTenantID(1)) 37 + createTestShard(t, "tenant1_repo2", 2, tenant1Shard2, setTenantID(1)) 38 + 39 + // Create test shards for tenant 2 40 + tenant2Shard := filepath.Join(dir, "tenant2_repo1.zoekt") 41 + createTestShard(t, "tenant2_repo1", 3, tenant2Shard, setTenantID(2)) 42 + 43 + // Create a compound shard (should be skipped) 44 + compoundShard := filepath.Join(dir, "compound-1234.zoekt") 45 + createTestShard(t, "compound_repo", 4, compoundShard, setTenantID(1)) 46 + 47 + // Test cases 48 + tests := []struct { 49 + name string 50 + ctx context.Context 51 + wantErr bool 52 + check func(t *testing.T, dir string) 53 + }{ 54 + { 55 + name: "no tenant in context", 56 + ctx: context.Background(), 57 + wantErr: true, 58 + check: func(t *testing.T, dir string) { 59 + // All files should still exist 60 + require.FileExists(t, tenant1Shard1) 61 + require.FileExists(t, tenant1Shard2) 62 + require.FileExists(t, tenant2Shard) 63 + require.FileExists(t, compoundShard) 64 + }, 65 + }, 66 + { 67 + name: "purge tenant 1 shards", 68 + ctx: tenant1Ctx, 69 + check: func(t *testing.T, dir string) { 70 + // Tenant 1 shards should be deleted 71 + require.NoFileExists(t, tenant1Shard1) 72 + require.NoFileExists(t, tenant1Shard2) 73 + // Other shards should still exist 74 + require.FileExists(t, tenant2Shard) 75 + require.FileExists(t, compoundShard) 76 + }, 77 + }, 78 + { 79 + name: "purge tenant 2 shards", 80 + ctx: tenant2Ctx, 81 + check: func(t *testing.T, dir string) { 82 + // Tenant 2 shard should be deleted 83 + require.NoFileExists(t, tenant2Shard) 84 + // Compound shard should still exist 85 + require.FileExists(t, compoundShard) 86 + }, 87 + }, 88 + } 89 + 90 + for _, tt := range tests { 91 + t.Run(tt.name, func(t *testing.T) { 92 + err := purgeTenantShards(tt.ctx, dir) 93 + if tt.wantErr { 94 + require.Error(t, err) 95 + return 96 + } 97 + require.NoError(t, err) 98 + tt.check(t, dir) 99 + }) 100 + } 101 + }
+1 -1
go.mod
··· 130 130 github.com/yusufpapurcu/wmi v1.2.4 // indirect 131 131 go.opentelemetry.io/otel/metric v1.33.0 // indirect 132 132 go.opentelemetry.io/proto/otlp v1.5.0 // indirect 133 - go.uber.org/multierr v1.11.0 // indirect 133 + go.uber.org/multierr v1.11.0 134 134 go.uber.org/zap v1.27.0 // indirect 135 135 golang.org/x/crypto v0.32.0 // indirect 136 136 golang.org/x/text v0.21.0 // indirect
+54 -8
index/merge.go
··· 134 134 return sb, nil 135 135 } 136 136 137 - // Explode takes an IndexFile f and creates 1 simple shard per repository 138 - // contained in f. Explode returns a map of tmpName -> dstName. It is the 139 - // responsibility of the caller to rename the temporary shard(s) and delete the 140 - // input shard. 141 - func Explode(dstDir string, f IndexFile) (map[string]string, error) { 142 - return explode(dstDir, f) 137 + // Explode takes an input shard and creates 1 simple shard per repository. It is 138 + // a wrapper around explode that takes care of removing the input shard and 139 + // renaming the temporary shards. 140 + func Explode(dstDir string, inputShard string) error { 141 + f, err := os.Open(inputShard) 142 + if err != nil { 143 + return err 144 + } 145 + defer f.Close() 146 + 147 + indexFile, err := NewIndexFile(f) 148 + if err != nil { 149 + return err 150 + } 151 + defer indexFile.Close() 152 + 153 + exploded, err := explode(dstDir, indexFile) 154 + defer func() { 155 + // best effort removal of tmp files. If os.Remove fails, indexserver will delete 156 + // the leftover tmp files during the next cleanup. 157 + for tmpFn := range exploded { 158 + os.Remove(tmpFn) 159 + } 160 + }() 161 + if err != nil { 162 + return fmt.Errorf("zoekt.Explode: %w", err) 163 + } 164 + 165 + // remove the input shard first to avoid duplicate indexes. In the worst case, 166 + // the process is interrupted just after we delete the compound shard, in which 167 + // case we have to reindex the lost repos. 168 + paths, err := IndexFilePaths(inputShard) 169 + if err != nil { 170 + return err 171 + } 172 + for _, path := range paths { 173 + err = os.Remove(path) 174 + if err != nil { 175 + return err 176 + } 177 + } 178 + 179 + // best effort rename shards. 180 + for tmpFn, dstFn := range exploded { 181 + if err := os.Rename(tmpFn, dstFn); err != nil { 182 + log.Printf("explode: rename failed: %s", err) 183 + } 184 + } 185 + 186 + return nil 143 187 } 144 188 145 189 type shardBuilderFunc func(ib *ShardBuilder) 146 190 147 - // explode offers a richer signature compared to Explode for testing. You 148 - // probably want to call Explode instead. 191 + // explode takes an IndexFile f and creates 1 simple shard per repository 192 + // contained in f. explode returns a map of tmpName -> dstName. It is the 193 + // responsibility of the caller to rename the temporary shard(s) and delete the 194 + // input shard. 149 195 func explode(dstDir string, f IndexFile, ibFuncs ...shardBuilderFunc) (map[string]string, error) { 150 196 searcher, err := NewSearcher(f) 151 197 if err != nil {