diff --git a/pkg/apiserver/storage/testing/watcher_tests.go b/pkg/apiserver/storage/testing/watcher_tests.go index 213b7553faa..1684f15819f 100644 --- a/pkg/apiserver/storage/testing/watcher_tests.go +++ b/pkg/apiserver/storage/testing/watcher_tests.go @@ -1407,25 +1407,22 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, }, - // Not Supported by unistore because there is no way to differentiate between: - // - SendInitialEvents=nil && resourceVersion=0 - // - sendInitialEvents=false && resourceVersion=0 - // This is a Legacy feature in k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go#196 - // { - // name: "legacy, RV=0", - // resourceVersion: "0", - // initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, - // expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, - // podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, - // expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, - // }, - // { - // name: "legacy, RV=unset", - // initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, - // expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, - // podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, - // expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, - // }, + + { + name: "legacy, RV=0", + resourceVersion: "0", + initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, + expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, + podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, + expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, + }, + { + name: "legacy, RV=unset", + initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, + expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, + podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, + expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, + }, } for idx, scenario := range scenarios { t.Run(scenario.name, func(t *testing.T) { diff --git a/pkg/storage/unified/apistore/store.go b/pkg/storage/unified/apistore/store.go index d4911da167c..4d2947ff54b 100644 --- a/pkg/storage/unified/apistore/store.go +++ b/pkg/storage/unified/apistore/store.go @@ -26,6 +26,7 @@ import ( "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend/factory" "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" "github.com/grafana/grafana/pkg/apimachinery/utils" grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic" @@ -50,6 +51,7 @@ type Storage struct { store resource.ResourceClient getKey func(string) (*resource.ResourceKey, error) + watchSet *WatchSet versioner storage.Versioner } @@ -82,7 +84,8 @@ func NewStorage( trigger: trigger, indexers: indexers, - getKey: keyParser, + watchSet: NewWatchSet(), + getKey: keyParser, versioner: &storage.APIObjectVersioner{}, } @@ -109,7 +112,9 @@ func NewStorage( } } - return s, func() {}, nil + return s, func() { + s.watchSet.cleanupWatchers() + }, nil } func (s *Storage) Versioner() storage.Versioner { @@ -160,6 +165,11 @@ func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, ou }) } + s.watchSet.notifyWatchers(watch.Event{ + Object: out.DeepCopyObject(), + Type: watch.Added, + }, nil) + return nil } @@ -216,11 +226,16 @@ func (s *Storage) Delete( if err := s.versioner.UpdateObject(out, uint64(rsp.ResourceVersion)); err != nil { return err } + + s.watchSet.notifyWatchers(watch.Event{ + Object: out.DeepCopyObject(), + Type: watch.Deleted, + }, nil) return nil } // This version is not yet passing the watch tests -func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { +func (s *Storage) WatchNEXT(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { k, err := s.getKey(key) if err != nil { return watch.NewEmptyWatch(), nil @@ -240,11 +255,10 @@ func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOption if opts.SendInitialEvents != nil { cmd.SendInitialEvents = *opts.SendInitialEvents } - ctx, cancelWatch := context.WithCancel(ctx) + client, err := s.store.Watch(ctx, cmd) if err != nil { // if the context was canceled, just return a new empty watch - cancelWatch() if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, io.EOF) { return watch.NewEmptyWatch(), nil } @@ -252,11 +266,138 @@ func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOption } reporter := apierrors.NewClientErrorReporter(500, "WATCH", "") - decoder := newStreamDecoder(client, s.newFunc, predicate, s.codec, cancelWatch) + decoder := &streamDecoder{ + client: client, + newFunc: s.newFunc, + predicate: predicate, + codec: s.codec, + } return watch.NewStreamWatcher(decoder, reporter), nil } +// Watch begins watching the specified key. Events are decoded into API objects, +// and any items selected by the predicate are sent down to returned watch.Interface. +// resourceVersion may be used to specify what version to begin watching, +// which should be the current resourceVersion, and no longer rv+1 +// (e.g. reconnecting without missing any updates). +// If resource version is "0", this interface will get current object at given key +// and send it in an "ADDED" event, before watch starts. +func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { + k, err := s.getKey(key) + if err != nil { + return watch.NewEmptyWatch(), nil + } + + req, predicate, err := toListRequest(k, opts) + if err != nil { + return watch.NewEmptyWatch(), nil + } + + listObj := s.newListFunc() + + var namespace *string + if k.Namespace != "" { + namespace = &k.Namespace + } + + if ctx.Err() != nil { + return watch.NewEmptyWatch(), nil + } + + if (opts.SendInitialEvents == nil && req.ResourceVersion == 0) || (opts.SendInitialEvents != nil && *opts.SendInitialEvents) { + if err := s.GetList(ctx, key, opts, listObj); err != nil { + return nil, err + } + + listAccessor, err := meta.ListAccessor(listObj) + if err != nil { + klog.Errorf("could not determine new list accessor in watch") + return nil, err + } + // Updated if requesting RV was either "0" or "" + maybeUpdatedRV, err := s.versioner.ParseResourceVersion(listAccessor.GetResourceVersion()) + if err != nil { + klog.Errorf("could not determine new list RV in watch") + return nil, err + } + + jw := s.watchSet.newWatch(ctx, maybeUpdatedRV, predicate, s.versioner, namespace) + + initEvents := make([]watch.Event, 0) + listPtr, err := meta.GetItemsPtr(listObj) + if err != nil { + return nil, err + } + v, err := conversion.EnforcePtr(listPtr) + if err != nil || v.Kind() != reflect.Slice { + return nil, fmt.Errorf("need pointer to slice: %v", err) + } + + for i := 0; i < v.Len(); i++ { + obj, ok := v.Index(i).Addr().Interface().(runtime.Object) + if !ok { + return nil, fmt.Errorf("need item to be a runtime.Object: %v", err) + } + + initEvents = append(initEvents, watch.Event{ + Type: watch.Added, + Object: obj.DeepCopyObject(), + }) + } + + if predicate.AllowWatchBookmarks && len(initEvents) > 0 { + listRV, err := s.versioner.ParseResourceVersion(listAccessor.GetResourceVersion()) + if err != nil { + return nil, fmt.Errorf("could not get last init event's revision for bookmark: %v", err) + } + + bookmarkEvent := watch.Event{ + Type: watch.Bookmark, + Object: s.newFunc(), + } + + if err := s.versioner.UpdateObject(bookmarkEvent.Object, listRV); err != nil { + return nil, err + } + + bookmarkObject, err := meta.Accessor(bookmarkEvent.Object) + if err != nil { + return nil, fmt.Errorf("could not get bookmark object's acccesor: %v", err) + } + bookmarkObject.SetAnnotations(map[string]string{"k8s.io/initial-events-end": "true"}) + initEvents = append(initEvents, bookmarkEvent) + } + + jw.Start(initEvents...) + return jw, nil + } + + maybeUpdatedRV := uint64(req.ResourceVersion) + if maybeUpdatedRV == 0 { + rsp, err := s.store.List(ctx, &resource.ListRequest{ + Options: &resource.ListOptions{ + Key: k, + }, + Limit: 1, // we ignore the results, just look at the RV + }) + if err != nil { + return nil, err + } + if rsp.Error != nil { + return nil, resource.GetError(rsp.Error) + } + maybeUpdatedRV = uint64(rsp.ResourceVersion) + if maybeUpdatedRV < 1 { + return nil, fmt.Errorf("expecting a non-zero resource version") + } + } + jw := s.watchSet.newWatch(ctx, maybeUpdatedRV, predicate, s.versioner, namespace) + + jw.Start() + return jw, nil +} + // Get unmarshals object found at key into objPtr. On a not found error, will either // return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'. // Treats empty responses and nil response nodes exactly like a not found error. @@ -527,6 +668,17 @@ func (s *Storage) GuaranteedUpdate( return err } + if created { + s.watchSet.notifyWatchers(watch.Event{ + Object: destination.DeepCopyObject(), + Type: watch.Added, + }, nil) + } else { + s.watchSet.notifyWatchers(watch.Event{ + Object: destination.DeepCopyObject(), + Type: watch.Modified, + }, existingObj.DeepCopyObject()) + } return nil } diff --git a/pkg/storage/unified/apistore/store_test.go b/pkg/storage/unified/apistore/store_test.go index 287aeea5c41..8977693c966 100644 --- a/pkg/storage/unified/apistore/store_test.go +++ b/pkg/storage/unified/apistore/store_test.go @@ -92,13 +92,12 @@ func TestCreate(t *testing.T) { storagetesting.RunTestCreate(ctx, t, store, checkStorageInvariants(store)) } -// No TTL support in unifed storage -// func TestCreateWithTTL(t *testing.T) { -// ctx, store, destroyFunc, err := testSetup(t) -// defer destroyFunc() -// assert.NoError(t, err) -// storagetesting.RunTestCreateWithTTL(ctx, t, store) -// } +func TestCreateWithTTL(t *testing.T) { + ctx, store, destroyFunc, err := testSetup(t) + defer destroyFunc() + assert.NoError(t, err) + storagetesting.RunTestCreateWithTTL(ctx, t, store) +} func TestCreateWithKeyExist(t *testing.T) { ctx, store, destroyFunc, err := testSetup(t) diff --git a/pkg/storage/unified/apistore/stream.go b/pkg/storage/unified/apistore/stream.go index 9546e3e8b64..a425279185a 100644 --- a/pkg/storage/unified/apistore/stream.go +++ b/pkg/storage/unified/apistore/stream.go @@ -1,11 +1,9 @@ package apistore import ( - "context" "errors" "fmt" "io" - "sync" grpcCodes "google.golang.org/grpc/codes" grpcStatus "google.golang.org/grpc/status" @@ -19,23 +17,12 @@ import ( ) type streamDecoder struct { - client resource.ResourceStore_WatchClient - newFunc func() runtime.Object - predicate storage.SelectionPredicate - codec runtime.Codec - cancelWatch context.CancelFunc - done sync.WaitGroup + client resource.ResourceStore_WatchClient + newFunc func() runtime.Object + predicate storage.SelectionPredicate + codec runtime.Codec } -func newStreamDecoder(client resource.ResourceStore_WatchClient, newFunc func() runtime.Object, predicate storage.SelectionPredicate, codec runtime.Codec, cancelWatch context.CancelFunc) *streamDecoder { - return &streamDecoder{ - client: client, - newFunc: newFunc, - predicate: predicate, - codec: codec, - cancelWatch: cancelWatch, - } -} func (d *streamDecoder) toObject(w *resource.WatchEvent_Resource) (runtime.Object, error) { obj, _, err := d.codec.Decode(w.Value, nil, d.newFunc()) if err == nil { @@ -48,30 +35,25 @@ func (d *streamDecoder) toObject(w *resource.WatchEvent_Resource) (runtime.Objec return obj, err } -// nolint: gocyclo // we may be able to simplify this in the future, but this is a complex function by nature func (d *streamDecoder) Decode() (action watch.EventType, object runtime.Object, err error) { - d.done.Add(1) - defer d.done.Done() decode: for { - var evt *resource.WatchEvent - var err error - select { - case <-d.client.Context().Done(): - default: - evt, err = d.client.Recv() + err := d.client.Context().Err() + if err != nil { + klog.Errorf("client: context error: %s\n", err) + return watch.Error, nil, err } - switch { - case errors.Is(d.client.Context().Err(), context.Canceled): - return watch.Error, nil, io.EOF - case d.client.Context().Err() != nil: - return watch.Error, nil, d.client.Context().Err() - case errors.Is(err, io.EOF): - return watch.Error, nil, io.EOF - case grpcStatus.Code(err) == grpcCodes.Canceled: + evt, err := d.client.Recv() + if errors.Is(err, io.EOF) { return watch.Error, nil, err - case err != nil: + } + + if grpcStatus.Code(err) == grpcCodes.Canceled { + return watch.Error, nil, err + } + + if err != nil { klog.Errorf("client: error receiving result: %s", err) return watch.Error, nil, err } @@ -212,15 +194,10 @@ decode: } func (d *streamDecoder) Close() { - // Close the send stream err := d.client.CloseSend() if err != nil { klog.Errorf("error closing watch stream: %s", err) } - // Cancel the send context - d.cancelWatch() - // Wait for all decode operations to finish - d.done.Wait() } var _ watch.Decoder = (*streamDecoder)(nil) diff --git a/pkg/storage/unified/apistore/watcher_test.go b/pkg/storage/unified/apistore/watcher_test.go index 203d14c0729..fb4deb11811 100644 --- a/pkg/storage/unified/apistore/watcher_test.go +++ b/pkg/storage/unified/apistore/watcher_test.go @@ -7,9 +7,9 @@ package apistore import ( "context" + "fmt" "os" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -29,20 +29,7 @@ import ( "k8s.io/apiserver/pkg/storage/storagebackend/factory" storagetesting "github.com/grafana/grafana/pkg/apiserver/storage/testing" - infraDB "github.com/grafana/grafana/pkg/infra/db" - "github.com/grafana/grafana/pkg/services/featuremgmt" - "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/storage/unified/resource" - "github.com/grafana/grafana/pkg/storage/unified/sql" - "github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl" - "github.com/grafana/grafana/pkg/tests/testsuite" -) - -type StorageType string - -const ( - StorageTypeFile StorageType = "file" - StorageTypeUnified StorageType = "unified" ) var scheme = runtime.NewScheme() @@ -61,7 +48,6 @@ type setupOptions struct { prefix string resourcePrefix string groupResource schema.GroupResource - storageType StorageType } type setupOption func(*setupOptions, testing.TB) @@ -73,20 +59,10 @@ func withDefaults(options *setupOptions, t testing.TB) { options.prefix = t.TempDir() options.resourcePrefix = storagetesting.KeyFunc("", "") options.groupResource = schema.GroupResource{Resource: "pods"} - options.storageType = StorageTypeFile -} -func withStorageType(storageType StorageType) setupOption { - return func(options *setupOptions, t testing.TB) { - options.storageType = storageType - } } var _ setupOption = withDefaults -func TestMain(m *testing.M) { - testsuite.Run(m) -} - func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Interface, factory.DestroyFunc, error) { setupOpts := setupOptions{} opts = append([]setupOption{withDefaults}, opts...) @@ -109,56 +85,18 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte Metadata: fileblob.MetadataDontWrite, // skip }) require.NoError(t, err) + fmt.Printf("ROOT: %s\n\n", tmp) } ctx := storagetesting.NewContext() + backend, err := resource.NewCDKBackend(ctx, resource.CDKBackendOptions{ + Bucket: bucket, + }) + require.NoError(t, err) - var server resource.ResourceServer - switch setupOpts.storageType { - case StorageTypeFile: - backend, err := resource.NewCDKBackend(ctx, resource.CDKBackendOptions{ - Bucket: bucket, - }) - require.NoError(t, err) - - server, err = resource.NewResourceServer(resource.ResourceServerOptions{ - Backend: backend, - }) - require.NoError(t, err) - - // Issue a health check to ensure the server is initialized - _, err = server.IsHealthy(ctx, &resource.HealthCheckRequest{}) - require.NoError(t, err) - case StorageTypeUnified: - if testing.Short() { - t.Skip("skipping integration test") - } - dbstore := infraDB.InitTestDB(t) - cfg := setting.NewCfg() - features := featuremgmt.WithFeatures() - - eDB, err := dbimpl.ProvideResourceDB(dbstore, cfg, features, nil) - require.NoError(t, err) - require.NotNil(t, eDB) - - ret, err := sql.NewBackend(sql.BackendOptions{ - DBProvider: eDB, - PollingInterval: time.Millisecond, // Keep this fast - }) - require.NoError(t, err) - require.NotNil(t, ret) - ctx := storagetesting.NewContext() - err = ret.Init(ctx) - require.NoError(t, err) - - server, err = resource.NewResourceServer(resource.ResourceServerOptions{ - Backend: ret, - Diagnostics: ret, - Lifecycle: ret, - }) - require.NoError(t, err) - default: - t.Fatalf("unsupported storage type: %s", setupOpts.storageType) - } + server, err := resource.NewResourceServer(resource.ResourceServerOptions{ + Backend: backend, + }) + require.NoError(t, err) client := resource.NewLocalResourceClient(server) config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec) @@ -186,82 +124,55 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte } func TestWatch(t *testing.T) { - for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} { - t.Run(string(s), func(t *testing.T) { - ctx, store, destroyFunc, err := testSetup(t, withStorageType(s)) - defer destroyFunc() - assert.NoError(t, err) - storagetesting.RunTestWatch(ctx, t, store) - }) - } + ctx, store, destroyFunc, err := testSetup(t) + defer destroyFunc() + assert.NoError(t, err) + storagetesting.RunTestWatch(ctx, t, store) } func TestClusterScopedWatch(t *testing.T) { - for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} { - t.Run(string(s), func(t *testing.T) { - ctx, store, destroyFunc, err := testSetup(t) - defer destroyFunc() - assert.NoError(t, err) - storagetesting.RunTestClusterScopedWatch(ctx, t, store) - }) - } + ctx, store, destroyFunc, err := testSetup(t) + defer destroyFunc() + assert.NoError(t, err) + storagetesting.RunTestClusterScopedWatch(ctx, t, store) } func TestNamespaceScopedWatch(t *testing.T) { - for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} { - t.Run(string(s), func(t *testing.T) { - ctx, store, destroyFunc, err := testSetup(t) - defer destroyFunc() - assert.NoError(t, err) - storagetesting.RunTestNamespaceScopedWatch(ctx, t, store) - }) - } + ctx, store, destroyFunc, err := testSetup(t) + defer destroyFunc() + assert.NoError(t, err) + storagetesting.RunTestNamespaceScopedWatch(ctx, t, store) } func TestDeleteTriggerWatch(t *testing.T) { - for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} { - t.Run(string(s), func(t *testing.T) { - ctx, store, destroyFunc, err := testSetup(t) - defer destroyFunc() - assert.NoError(t, err) - storagetesting.RunTestDeleteTriggerWatch(ctx, t, store) - }) - } + ctx, store, destroyFunc, err := testSetup(t) + defer destroyFunc() + assert.NoError(t, err) + storagetesting.RunTestDeleteTriggerWatch(ctx, t, store) } -// Not Supported by unistore because there is no way to differentiate between: -// - SendInitialEvents=nil && resourceVersion=0 -// - sendInitialEvents=false && resourceVersion=0 -// This is a Legacy feature in k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go#196 -// func TestWatchFromZero(t *testing.T) { -// ctx, store, destroyFunc, err := testSetup(t) -// defer destroyFunc() -// assert.NoError(t, err) -// storagetesting.RunTestWatchFromZero(ctx, t, store, nil) -// } +func TestWatchFromZero(t *testing.T) { + ctx, store, destroyFunc, err := testSetup(t) + defer destroyFunc() + assert.NoError(t, err) + storagetesting.RunTestWatchFromZero(ctx, t, store, nil) +} // TestWatchFromNonZero tests that // - watch from non-0 should just watch changes after given version func TestWatchFromNonZero(t *testing.T) { - for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} { - t.Run(string(s), func(t *testing.T) { - ctx, store, destroyFunc, err := testSetup(t) - defer destroyFunc() - assert.NoError(t, err) - storagetesting.RunTestWatchFromNonZero(ctx, t, store) - }) - } + ctx, store, destroyFunc, err := testSetup(t) + defer destroyFunc() + assert.NoError(t, err) + storagetesting.RunTestWatchFromNonZero(ctx, t, store) } -/* -Only valid when using a cached storage func TestDelayedWatchDelivery(t *testing.T) { ctx, store, destroyFunc, err := testSetup(t) defer destroyFunc() assert.NoError(t, err) storagetesting.RunTestDelayedWatchDelivery(ctx, t, store) } -/* /* func TestWatchError(t *testing.T) { @@ -271,36 +182,24 @@ func TestWatchError(t *testing.T) { */ func TestWatchContextCancel(t *testing.T) { - for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} { - t.Run(string(s), func(t *testing.T) { - ctx, store, destroyFunc, err := testSetup(t) - defer destroyFunc() - assert.NoError(t, err) - storagetesting.RunTestWatchContextCancel(ctx, t, store) - }) - } + ctx, store, destroyFunc, err := testSetup(t) + defer destroyFunc() + assert.NoError(t, err) + storagetesting.RunTestWatchContextCancel(ctx, t, store) } func TestWatcherTimeout(t *testing.T) { - for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} { - t.Run(string(s), func(t *testing.T) { - ctx, store, destroyFunc, err := testSetup(t) - defer destroyFunc() - assert.NoError(t, err) - storagetesting.RunTestWatcherTimeout(ctx, t, store) - }) - } + ctx, store, destroyFunc, err := testSetup(t) + defer destroyFunc() + assert.NoError(t, err) + storagetesting.RunTestWatcherTimeout(ctx, t, store) } func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { - for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} { - t.Run(string(s), func(t *testing.T) { - ctx, store, destroyFunc, err := testSetup(t) - defer destroyFunc() - assert.NoError(t, err) - storagetesting.RunTestWatchDeleteEventObjectHaveLatestRV(ctx, t, store) - }) - } + ctx, store, destroyFunc, err := testSetup(t) + defer destroyFunc() + assert.NoError(t, err) + storagetesting.RunTestWatchDeleteEventObjectHaveLatestRV(ctx, t, store) } // TODO: enable when we support flow control and priority fairness @@ -322,47 +221,31 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { // setting allowWatchBookmarks query param against // etcd implementation doesn't have any effect. func TestWatchDispatchBookmarkEvents(t *testing.T) { - for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} { - t.Run(string(s), func(t *testing.T) { - ctx, store, destroyFunc, err := testSetup(t) - defer destroyFunc() - assert.NoError(t, err) - storagetesting.RunTestWatchDispatchBookmarkEvents(ctx, t, store, false) - }) - } + ctx, store, destroyFunc, err := testSetup(t) + defer destroyFunc() + assert.NoError(t, err) + storagetesting.RunTestWatchDispatchBookmarkEvents(ctx, t, store, false) } func TestSendInitialEventsBackwardCompatibility(t *testing.T) { - for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} { - t.Run(string(s), func(t *testing.T) { - ctx, store, destroyFunc, err := testSetup(t) - defer destroyFunc() - assert.NoError(t, err) - storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store) - }) - } + ctx, store, destroyFunc, err := testSetup(t) + defer destroyFunc() + assert.NoError(t, err) + storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store) } func TestEtcdWatchSemantics(t *testing.T) { - for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} { - t.Run(string(s), func(t *testing.T) { - ctx, store, destroyFunc, err := testSetup(t) - defer destroyFunc() - assert.NoError(t, err) - storagetesting.RunWatchSemantics(ctx, t, store) - }) - } + ctx, store, destroyFunc, err := testSetup(t) + defer destroyFunc() + assert.NoError(t, err) + storagetesting.RunWatchSemantics(ctx, t, store) } func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) { - for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} { - t.Run(string(s), func(t *testing.T) { - ctx, store, destroyFunc, err := testSetup(t) - defer destroyFunc() - assert.NoError(t, err) - storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store) - }) - } + ctx, store, destroyFunc, err := testSetup(t) + defer destroyFunc() + assert.NoError(t, err) + storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store) } func newPod() runtime.Object { diff --git a/pkg/storage/unified/apistore/watchset.go b/pkg/storage/unified/apistore/watchset.go new file mode 100644 index 00000000000..9c9d214b4b6 --- /dev/null +++ b/pkg/storage/unified/apistore/watchset.go @@ -0,0 +1,376 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/tilt-dev/tilt-apiserver/blob/main/pkg/storage/filepath/watchset.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Kubernetes Authors. + +package apistore + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/storage" + "k8s.io/klog/v2" +) + +const ( + UpdateChannelSize = 25 + InitialWatchNodesSize = 20 + InitialBufferedEventsSize = 25 +) + +type eventWrapper struct { + ev watch.Event + // optional: oldObject is only set for modifications for determining their type as necessary (when using predicate filtering) + oldObject runtime.Object +} + +type watchNode struct { + ctx context.Context + s *WatchSet + id uint64 + updateCh chan eventWrapper + outCh chan watch.Event + requestedRV uint64 + // the watch may or may not be namespaced for a namespaced resource. This is always nil for cluster-scoped kinds + watchNamespace *string + predicate storage.SelectionPredicate + versioner storage.Versioner +} + +// Keeps track of which watches need to be notified +type WatchSet struct { + mu sync.RWMutex + // mu protects both nodes and counter + nodes map[uint64]*watchNode + counter atomic.Uint64 + buffered []eventWrapper + bufferedMutex sync.RWMutex +} + +func NewWatchSet() *WatchSet { + return &WatchSet{ + buffered: make([]eventWrapper, 0, InitialBufferedEventsSize), + nodes: make(map[uint64]*watchNode, InitialWatchNodesSize), + } +} + +// Creates a new watch with a unique id, but +// does not start sending events to it until start() is called. +func (s *WatchSet) newWatch(ctx context.Context, requestedRV uint64, p storage.SelectionPredicate, versioner storage.Versioner, namespace *string) *watchNode { + s.counter.Add(1) + + node := &watchNode{ + ctx: ctx, + requestedRV: requestedRV, + id: s.counter.Load(), + s: s, + // updateCh size needs to be > 1 to allow slower clients to not block passing new events + updateCh: make(chan eventWrapper, UpdateChannelSize), + // outCh size needs to be > 1 for single process use-cases such as tests where watch and event seeding from CUD + // events is happening on the same thread + outCh: make(chan watch.Event, UpdateChannelSize), + predicate: p, + watchNamespace: namespace, + versioner: versioner, + } + + return node +} + +func (s *WatchSet) cleanupWatchers() { + s.mu.Lock() + defer s.mu.Unlock() + for _, w := range s.nodes { + w.stop() + } +} + +// oldObject is only passed in the event of a modification +// in case a predicate filtered watch is impacted as a result of modification +// NOTE: this function gives one the misperception that a newly added node will never +// get a double event, one from buffered and one from the update channel +// That perception is not true. Even though this function maintains the lock throughout the function body +// it is not true of the Start function. So basically, the Start function running after this function +// fully stands the chance of another future notifyWatchers double sending it the event through the two means mentioned +func (s *WatchSet) notifyWatchers(ev watch.Event, oldObject runtime.Object) { + s.mu.RLock() + defer s.mu.RUnlock() + + updateEv := eventWrapper{ + ev: ev, + } + if oldObject != nil { + updateEv.oldObject = oldObject + } + + // Events are always buffered. + // this is because of an inadvertent delay which is built into the watch process + // Watch() from storage returns Watch.Interface with a async start func. + // The only way to guarantee that we can interpret the passed RV correctly is to play it against missed events + // (notice the loop below over s.nodes isn't exactly going to work on a new node + // unless start is called on it) + s.bufferedMutex.Lock() + s.buffered = append(s.buffered, updateEv) + s.bufferedMutex.Unlock() + + for _, w := range s.nodes { + w.updateCh <- updateEv + } +} + +// isValid is not necessary to be called on oldObject in UpdateEvents - assuming the Watch pushes correctly setup eventWrapper our way +// first bool is whether the event is valid for current watcher +// second bool is whether checking the old value against the predicate may be valuable to the caller +// second bool may be a helpful aid to establish context around MODIFIED events +// (note that this second bool is only marked true if we pass other checks first, namely RV and namespace) +func (w *watchNode) isValid(e eventWrapper) (bool, bool, error) { + obj, err := meta.Accessor(e.ev.Object) + if err != nil { + klog.Error("Could not get accessor to object in event") + return false, false, nil + } + + eventRV, err := w.getResourceVersionAsInt(e.ev.Object) + if err != nil { + return false, false, err + } + + if eventRV < w.requestedRV { + return false, false, nil + } + + if w.watchNamespace != nil && *w.watchNamespace != obj.GetNamespace() { + return false, false, err + } + + valid, err := w.predicate.Matches(e.ev.Object) + if err != nil { + return false, false, err + } + + return valid, e.ev.Type == watch.Modified, nil +} + +// Only call this method if current object matches the predicate +func (w *watchNode) handleAddedForFilteredList(e eventWrapper) (*watch.Event, error) { + if e.oldObject == nil { + return nil, fmt.Errorf("oldObject should be set for modified events") + } + + ok, err := w.predicate.Matches(e.oldObject) + if err != nil { + return nil, err + } + + if !ok { + e.ev.Type = watch.Added + return &e.ev, nil + } + + return nil, nil +} + +func (w *watchNode) handleDeletedForFilteredList(e eventWrapper) (*watch.Event, error) { + if e.oldObject == nil { + return nil, fmt.Errorf("oldObject should be set for modified events") + } + + ok, err := w.predicate.Matches(e.oldObject) + if err != nil { + return nil, err + } + + if !ok { + return nil, nil + } + + // isn't a match but used to be + e.ev.Type = watch.Deleted + + oldObjectAccessor, err := meta.Accessor(e.oldObject) + if err != nil { + klog.Errorf("Could not get accessor to correct the old RV of filtered out object") + return nil, err + } + + currentRV, err := getResourceVersion(e.ev.Object) + if err != nil { + klog.Errorf("Could not get accessor to object in event") + return nil, err + } + + oldObjectAccessor.SetResourceVersion(currentRV) + e.ev.Object = e.oldObject + + return &e.ev, nil +} + +func (w *watchNode) processEvent(e eventWrapper, isInitEvent bool) error { + if isInitEvent { + // Init events have already been vetted against the predicate and other RV behavior + // Let them pass through + w.outCh <- e.ev + return nil + } + + valid, runDeleteFromFilteredListHandler, err := w.isValid(e) + if err != nil { + klog.Errorf("Could not determine validity of the event: %v", err) + return err + } + if valid { + if e.ev.Type == watch.Modified { + ev, err := w.handleAddedForFilteredList(e) + if err != nil { + return err + } + if ev != nil { + w.outCh <- *ev + } else { + // forward the original event if add handling didn't signal any impact + w.outCh <- e.ev + } + } else { + w.outCh <- e.ev + } + return nil + } + + if runDeleteFromFilteredListHandler { + if e.ev.Type == watch.Modified { + ev, err := w.handleDeletedForFilteredList(e) + if err != nil { + return err + } + if ev != nil { + w.outCh <- *ev + } + } // explicitly doesn't have an event forward for the else case here + return nil + } + + return nil +} + +// Start sending events to this watch. +func (w *watchNode) Start(initEvents ...watch.Event) { + w.s.mu.Lock() + w.s.nodes[w.id] = w + w.s.mu.Unlock() + + go func() { + maxRV := uint64(0) + for _, ev := range initEvents { + currentRV, err := w.getResourceVersionAsInt(ev.Object) + if err != nil { + klog.Errorf("Could not determine init event RV for deduplication of buffered events: %v", err) + continue + } + + if maxRV < currentRV { + maxRV = currentRV + } + + if err := w.processEvent(eventWrapper{ev: ev}, true); err != nil { + klog.Errorf("Could not process event: %v", err) + } + } + + // If we had no init events, simply rely on the passed RV + if maxRV == 0 { + maxRV = w.requestedRV + } + + w.s.bufferedMutex.RLock() + for _, e := range w.s.buffered { + eventRV, err := w.getResourceVersionAsInt(e.ev.Object) + if err != nil { + klog.Errorf("Could not determine RV for deduplication of buffered events: %v", err) + continue + } + + if maxRV >= eventRV { + continue + } else { + maxRV = eventRV + } + + if err := w.processEvent(e, false); err != nil { + klog.Errorf("Could not process event: %v", err) + } + } + w.s.bufferedMutex.RUnlock() + + for { + select { + case e, ok := <-w.updateCh: + if !ok { + close(w.outCh) + return + } + + eventRV, err := w.getResourceVersionAsInt(e.ev.Object) + if err != nil { + klog.Errorf("Could not determine RV for deduplication of channel events: %v", err) + continue + } + + if maxRV >= eventRV { + continue + } else { + maxRV = eventRV + } + + if err := w.processEvent(e, false); err != nil { + klog.Errorf("Could not process event: %v", err) + } + case <-w.ctx.Done(): + close(w.outCh) + return + } + } + }() +} + +func (w *watchNode) Stop() { + w.s.mu.Lock() + defer w.s.mu.Unlock() + w.stop() +} + +// Unprotected func: ensure mutex on the parent watch set is locked before calling +func (w *watchNode) stop() { + if _, ok := w.s.nodes[w.id]; ok { + delete(w.s.nodes, w.id) + close(w.updateCh) + } +} + +func (w *watchNode) ResultChan() <-chan watch.Event { + return w.outCh +} + +func getResourceVersion(obj runtime.Object) (string, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + klog.Error("Could not get accessor to object in event") + return "", err + } + return accessor.GetResourceVersion(), nil +} + +func (w *watchNode) getResourceVersionAsInt(obj runtime.Object) (uint64, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + klog.Error("Could not get accessor to object in event") + return 0, err + } + + return w.versioner.ParseResourceVersion(accessor.GetResourceVersion()) +} diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index c44c1ac1ebc..8484653049a 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -7,7 +7,6 @@ import ( "log/slog" "net/http" "sync" - "sync/atomic" "time" "go.opentelemetry.io/otel/trace" @@ -162,15 +161,14 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { var _ ResourceServer = &server{} type server struct { - tracer trace.Tracer - log *slog.Logger - backend StorageBackend - index ResourceIndexServer - diagnostics DiagnosticsServer - access WriteAccessHooks - lifecycle LifecycleHooks - now func() int64 - mostRecentRV atomic.Int64 // The most recent resource version seen by the server + tracer trace.Tracer + log *slog.Logger + backend StorageBackend + index ResourceIndexServer + diagnostics DiagnosticsServer + access WriteAccessHooks + lifecycle LifecycleHooks + now func() int64 // Background watch task -- this has permissions for everything ctx context.Context @@ -345,12 +343,12 @@ func (s *server) Create(ctx context.Context, req *CreateRequest) (*CreateRespons rsp.Error = e return rsp, nil } + var err error rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, *event) if err != nil { rsp.Error = AsErrorResult(err) } - s.log.Debug("server.WriteEvent", "type", event.Type, "rv", rsp.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "name", event.Key.Name, "resource", event.Key.Resource) return rsp, nil } @@ -556,8 +554,6 @@ func (s *server) initWatcher() error { for { // pipe all events v := <-events - s.log.Debug("Server. Streaming Event", "type", v.Type, "previousRV", v.PreviousRV, "group", v.Key.Group, "namespace", v.Key.Namespace, "resource", v.Key.Resource, "name", v.Key.Name) - s.mostRecentRV.Store(v.ResourceVersion) out <- v } }() @@ -573,67 +569,23 @@ func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error { return err } - // Start listening -- this will buffer any changes that happen while we backfill. - // If events are generated faster than we can process them, then some events will be dropped. - // TODO: Think of a way to allow the client to catch up. + // Start listening -- this will buffer any changes that happen while we backfill stream, err := s.broadcaster.Subscribe(ctx) if err != nil { return err } defer s.broadcaster.Unsubscribe(stream) - if !req.SendInitialEvents && req.Since == 0 { - // This is a temporary hack only relevant for tests to ensure that the first events are sent. - // This is required because the SQL backend polls the database every 100ms. - // TODO: Implement a getLatestResourceVersion method in the backend. - time.Sleep(10 * time.Millisecond) - } - - mostRecentRV := s.mostRecentRV.Load() // get the latest resource version - var initialEventsRV int64 // resource version coming from the initial events + since := req.Since if req.SendInitialEvents { - // Backfill the stream by adding every existing entities. - initialEventsRV, err = s.backend.ListIterator(ctx, &ListRequest{Options: req.Options}, func(iter ListIterator) error { - for iter.Next() { - if err := iter.Error(); err != nil { - return err - } - if err := srv.Send(&WatchEvent{ - Type: WatchEvent_ADDED, - Resource: &WatchEvent_Resource{ - Value: iter.Value(), - Version: iter.ResourceVersion(), - }, - }); err != nil { - return err - } - } - return nil - }) - if err != nil { - return err - } - } - if req.SendInitialEvents && req.AllowWatchBookmarks { - if err := srv.Send(&WatchEvent{ - Type: WatchEvent_BOOKMARK, - Resource: &WatchEvent_Resource{ - Version: initialEventsRV, - }, - }); err != nil { - return err + fmt.Printf("TODO... query\n") + // All initial events are CREATE + + if req.AllowWatchBookmarks { + fmt.Printf("TODO... send bookmark\n") } } - var since int64 // resource version to start watching from - switch { - case req.SendInitialEvents: - since = initialEventsRV - case req.Since == 0: - since = mostRecentRV - default: - since = req.Since - } for { select { case <-ctx.Done(): @@ -644,39 +596,23 @@ func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error { s.log.Debug("watch events closed") return nil } - s.log.Debug("Server Broadcasting", "type", event.Type, "rv", event.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "resource", event.Key.Resource, "name", event.Key.Name) + if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) { - value := event.Value - // remove the delete marker stored in the value for deleted objects - if event.Type == WatchEvent_DELETED { - value = []byte{} - } - resp := &WatchEvent{ + // Currently sending *every* event + // if req.Options.Labels != nil { + // // match *either* the old or new object + // } + // TODO: return values that match either the old or the new + + if err := srv.Send(&WatchEvent{ Timestamp: event.Timestamp, Type: event.Type, Resource: &WatchEvent_Resource{ - Value: value, + Value: event.Value, Version: event.ResourceVersion, }, - } - if event.PreviousRV > 0 { - prevObj, err := s.Read(ctx, &ReadRequest{Key: event.Key, ResourceVersion: event.PreviousRV}) - if err != nil { - // This scenario should never happen, but if it does, we should log it and continue - // sending the event without the previous object. The client will decide what to do. - s.log.Error("error reading previous object", "key", event.Key, "resource_version", event.PreviousRV, "error", prevObj.Error) - } else { - if prevObj.ResourceVersion != event.PreviousRV { - s.log.Error("resource version mismatch", "key", event.Key, "resource_version", event.PreviousRV, "actual", prevObj.ResourceVersion) - return fmt.Errorf("resource version mismatch") - } - resp.Previous = &WatchEvent_Resource{ - Value: prevObj.Value, - Version: prevObj.ResourceVersion, - } - } - } - if err := srv.Send(resp); err != nil { + // TODO... previous??? + }); err != nil { return err } } diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index 913d105babb..ae70e141219 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -22,7 +22,6 @@ import ( ) const trace_prefix = "sql.resource." -const defaultPollingInterval = 100 * time.Millisecond type Backend interface { resource.StorageBackend @@ -31,9 +30,8 @@ type Backend interface { } type BackendOptions struct { - DBProvider db.DBProvider - Tracer trace.Tracer - PollingInterval time.Duration + DBProvider db.DBProvider + Tracer trace.Tracer } func NewBackend(opts BackendOptions) (Backend, error) { @@ -45,17 +43,12 @@ func NewBackend(opts BackendOptions) (Backend, error) { } ctx, cancel := context.WithCancel(context.Background()) - pollingInterval := opts.PollingInterval - if pollingInterval == 0 { - pollingInterval = defaultPollingInterval - } return &backend{ - done: ctx.Done(), - cancel: cancel, - log: log.New("sql-resource-server"), - tracer: opts.Tracer, - dbProvider: opts.DBProvider, - pollingInterval: pollingInterval, + done: ctx.Done(), + cancel: cancel, + log: log.New("sql-resource-server"), + tracer: opts.Tracer, + dbProvider: opts.DBProvider, }, nil } @@ -77,7 +70,6 @@ type backend struct { // watch streaming //stream chan *resource.WatchEvent - pollingInterval time.Duration } func (b *backend) Init(ctx context.Context) error { @@ -188,6 +180,7 @@ func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64, return nil }) + return newVersion, err } @@ -519,7 +512,8 @@ func (b *backend) WatchWriteEvents(ctx context.Context) (<-chan *resource.Writte } func (b *backend) poller(ctx context.Context, since groupResourceRV, stream chan<- *resource.WrittenEvent) { - t := time.NewTicker(b.pollingInterval) + interval := 100 * time.Millisecond // TODO make this configurable + t := time.NewTicker(interval) defer close(stream) defer t.Stop() @@ -532,7 +526,7 @@ func (b *backend) poller(ctx context.Context, since groupResourceRV, stream chan grv, err := b.listLatestRVs(ctx) if err != nil { b.log.Error("get the latest resource version", "err", err) - t.Reset(b.pollingInterval) + t.Reset(interval) continue } for group, items := range grv { @@ -549,7 +543,7 @@ func (b *backend) poller(ctx context.Context, since groupResourceRV, stream chan next, err := b.poll(ctx, group, resource, since[group][resource], stream) if err != nil { b.log.Error("polling for resource", "err", err) - t.Reset(b.pollingInterval) + t.Reset(interval) continue } if next > since[group][resource] { @@ -558,7 +552,7 @@ func (b *backend) poller(ctx context.Context, since groupResourceRV, stream chan } } - t.Reset(b.pollingInterval) + t.Reset(interval) } } } @@ -642,8 +636,7 @@ func (b *backend) poll(ctx context.Context, grp string, res string, since int64, Resource: rec.Key.Resource, Name: rec.Key.Name, }, - Type: resource.WatchEvent_Type(rec.Action), - PreviousRV: rec.PreviousRV, + Type: resource.WatchEvent_Type(rec.Action), }, ResourceVersion: rec.ResourceVersion, // Timestamp: , // TODO: add timestamp @@ -670,16 +663,15 @@ func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemp if errors.Is(err, sql.ErrNoRows) { // if there wasn't a row associated with the given resource, we create one with - // version 2 to match the etcd behavior. + // version 1 if _, err = dbutil.Exec(ctx, x, sqlResourceVersionInsert, sqlResourceVersionRequest{ - SQLTemplate: sqltemplate.New(d), - Group: key.Group, - Resource: key.Resource, - resourceVersion: &resourceVersion{1}, + SQLTemplate: sqltemplate.New(d), + Group: key.Group, + Resource: key.Resource, }); err != nil { return 0, fmt.Errorf("insert into resource_version: %w", err) } - return 2, nil + return 1, nil } if err != nil { diff --git a/pkg/storage/unified/sql/backend_test.go b/pkg/storage/unified/sql/backend_test.go index 33b7bab7d6a..b24024aef90 100644 --- a/pkg/storage/unified/sql/backend_test.go +++ b/pkg/storage/unified/sql/backend_test.go @@ -227,7 +227,7 @@ func TestResourceVersionAtomicInc(t *testing.T) { v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey) require.NoError(t, err) - require.Equal(t, int64(2), v) + require.Equal(t, int64(1), v) }) t.Run("happy path - update existing row", func(t *testing.T) { @@ -304,7 +304,7 @@ func TestBackend_create(t *testing.T) { v, err := b.create(ctx, event) require.NoError(t, err) - require.Equal(t, int64(2), v) + require.Equal(t, int64(1), v) }) t.Run("error inserting into resource", func(t *testing.T) { @@ -409,7 +409,7 @@ func TestBackend_update(t *testing.T) { v, err := b.update(ctx, event) require.NoError(t, err) - require.Equal(t, int64(2), v) + require.Equal(t, int64(1), v) }) t.Run("error in first update to resource", func(t *testing.T) { @@ -513,7 +513,7 @@ func TestBackend_delete(t *testing.T) { v, err := b.delete(ctx, event) require.NoError(t, err) - require.Equal(t, int64(2), v) + require.Equal(t, int64(1), v) }) t.Run("error deleting resource", func(t *testing.T) { diff --git a/pkg/storage/unified/sql/data/resource_history_insert.sql b/pkg/storage/unified/sql/data/resource_history_insert.sql index 2669ef82447..018b65739d8 100644 --- a/pkg/storage/unified/sql/data/resource_history_insert.sql +++ b/pkg/storage/unified/sql/data/resource_history_insert.sql @@ -6,7 +6,6 @@ INSERT INTO {{ .Ident "resource_history" }} {{ .Ident "namespace" }}, {{ .Ident "name" }}, - {{ .Ident "previous_resource_version"}}, {{ .Ident "value" }}, {{ .Ident "action" }} ) @@ -18,7 +17,6 @@ INSERT INTO {{ .Ident "resource_history" }} {{ .Arg .WriteEvent.Key.Namespace }}, {{ .Arg .WriteEvent.Key.Name }}, - {{ .Arg .WriteEvent.PreviousRV }}, {{ .Arg .WriteEvent.Value }}, {{ .Arg .WriteEvent.Type }} ) diff --git a/pkg/storage/unified/sql/data/resource_history_poll.sql b/pkg/storage/unified/sql/data/resource_history_poll.sql index 8e4a7374fdb..bebfab9286d 100644 --- a/pkg/storage/unified/sql/data/resource_history_poll.sql +++ b/pkg/storage/unified/sql/data/resource_history_poll.sql @@ -5,8 +5,7 @@ SELECT {{ .Ident "resource" | .Into .Response.Key.Resource }}, {{ .Ident "name" | .Into .Response.Key.Name }}, {{ .Ident "value" | .Into .Response.Value }}, - {{ .Ident "action" | .Into .Response.Action }}, - {{ .Ident "previous_resource_version" | .Into .Response.PreviousRV }} + {{ .Ident "action" | .Into .Response.Action }} FROM {{ .Ident "resource_history" }} WHERE 1 = 1 diff --git a/pkg/storage/unified/sql/data/resource_insert.sql b/pkg/storage/unified/sql/data/resource_insert.sql index ccaca2f12f7..e127901ae50 100644 --- a/pkg/storage/unified/sql/data/resource_insert.sql +++ b/pkg/storage/unified/sql/data/resource_insert.sql @@ -7,7 +7,6 @@ INSERT INTO {{ .Ident "resource" }} {{ .Ident "namespace" }}, {{ .Ident "name" }}, - {{ .Ident "previous_resource_version" }}, {{ .Ident "value" }}, {{ .Ident "action" }} ) @@ -18,7 +17,6 @@ INSERT INTO {{ .Ident "resource" }} {{ .Arg .WriteEvent.Key.Namespace }}, {{ .Arg .WriteEvent.Key.Name }}, - {{ .Arg .WriteEvent.PreviousRV }}, {{ .Arg .WriteEvent.Value }}, {{ .Arg .WriteEvent.Type }} ) diff --git a/pkg/storage/unified/sql/data/resource_version_insert.sql b/pkg/storage/unified/sql/data/resource_version_insert.sql index 6c3aab0dcd4..6c2342905da 100644 --- a/pkg/storage/unified/sql/data/resource_version_insert.sql +++ b/pkg/storage/unified/sql/data/resource_version_insert.sql @@ -8,6 +8,6 @@ INSERT INTO {{ .Ident "resource_version" }} VALUES ( {{ .Arg .Group }}, {{ .Arg .Resource }}, - 2 + 1 ) ; diff --git a/pkg/storage/unified/sql/db/migrations/resource_mig.go b/pkg/storage/unified/sql/db/migrations/resource_mig.go index 38824569a05..adfd75a0b73 100644 --- a/pkg/storage/unified/sql/db/migrations/resource_mig.go +++ b/pkg/storage/unified/sql/db/migrations/resource_mig.go @@ -10,7 +10,8 @@ func initResourceTables(mg *migrator.Migrator) string { marker := "Initialize resource tables" mg.AddMigration(marker, &migrator.RawSQLMigration{}) - resource_table := migrator.Table{ + tables := []migrator.Table{} + tables = append(tables, migrator.Table{ Name: "resource", Columns: []*migrator.Column{ // primary identifier @@ -32,8 +33,9 @@ func initResourceTables(mg *migrator.Migrator) string { Indices: []*migrator.Index{ {Cols: []string{"namespace", "group", "resource", "name"}, Type: migrator.UniqueIndex}, }, - } - resource_history_table := migrator.Table{ + }) + + tables = append(tables, migrator.Table{ Name: "resource_history", Columns: []*migrator.Column{ // primary identifier @@ -60,9 +62,7 @@ func initResourceTables(mg *migrator.Migrator) string { // index to support watch poller {Cols: []string{"resource_version"}, Type: migrator.IndexType}, }, - } - - tables := []migrator.Table{resource_table, resource_history_table} + }) // tables = append(tables, migrator.Table{ // Name: "resource_label_set", @@ -97,13 +97,5 @@ func initResourceTables(mg *migrator.Migrator) string { } } - mg.AddMigration("Add column previous_resource_version in resource_history", migrator.NewAddColumnMigration(resource_history_table, &migrator.Column{ - Name: "previous_resource_version", Type: migrator.DB_BigInt, Nullable: false, - })) - - mg.AddMigration("Add column previous_resource_version in resource", migrator.NewAddColumnMigration(resource_table, &migrator.Column{ - Name: "previous_resource_version", Type: migrator.DB_BigInt, Nullable: false, - })) - return marker } diff --git a/pkg/storage/unified/sql/queries.go b/pkg/storage/unified/sql/queries.go index 11882f17cb2..893169c3f3a 100644 --- a/pkg/storage/unified/sql/queries.go +++ b/pkg/storage/unified/sql/queries.go @@ -70,7 +70,6 @@ func (r sqlResourceRequest) Validate() error { type historyPollResponse struct { Key resource.ResourceKey ResourceVersion int64 - PreviousRV int64 Value []byte Action int } @@ -102,7 +101,6 @@ func (r *sqlResourceHistoryPollRequest) Results() (*historyPollResponse, error) Name: r.Response.Key.Name, }, ResourceVersion: r.Response.ResourceVersion, - PreviousRV: r.Response.PreviousRV, Value: r.Response.Value, Action: r.Response.Action, }, nil diff --git a/pkg/storage/unified/sql/queries_test.go b/pkg/storage/unified/sql/queries_test.go index df7ed9167f7..b5ac7f57217 100644 --- a/pkg/storage/unified/sql/queries_test.go +++ b/pkg/storage/unified/sql/queries_test.go @@ -104,18 +104,6 @@ func TestUnifiedStorageQueries(t *testing.T) { }, }, }, - sqlResourceHistoryPoll: { - { - Name: "single path", - Data: &sqlResourceHistoryPollRequest{ - SQLTemplate: mocks.NewTestingSQLTemplate(), - Resource: "res", - Group: "group", - SinceResourceVersion: 1234, - Response: new(historyPollResponse), - }, - }, - }, sqlResourceUpdateRV: { { @@ -155,8 +143,7 @@ func TestUnifiedStorageQueries(t *testing.T) { Data: &sqlResourceRequest{ SQLTemplate: mocks.NewTestingSQLTemplate(), WriteEvent: resource.WriteEvent{ - Key: &resource.ResourceKey{}, - PreviousRV: 1234, + Key: &resource.ResourceKey{}, }, }, }, diff --git a/pkg/storage/unified/sql/testdata/mysql--resource_history_insert-insert into resource_history.sql b/pkg/storage/unified/sql/testdata/mysql--resource_history_insert-insert into resource_history.sql index d76132ae625..27f5000fc9f 100755 --- a/pkg/storage/unified/sql/testdata/mysql--resource_history_insert-insert into resource_history.sql +++ b/pkg/storage/unified/sql/testdata/mysql--resource_history_insert-insert into resource_history.sql @@ -5,7 +5,6 @@ INSERT INTO `resource_history` `resource`, `namespace`, `name`, - `previous_resource_version`, `value`, `action` ) @@ -15,7 +14,6 @@ INSERT INTO `resource_history` '', '', '', - 1234, '[]', 'UNKNOWN' ) diff --git a/pkg/storage/unified/sql/testdata/mysql--resource_history_poll-single path.sql b/pkg/storage/unified/sql/testdata/mysql--resource_history_poll-single path.sql deleted file mode 100755 index a29cf35d4da..00000000000 --- a/pkg/storage/unified/sql/testdata/mysql--resource_history_poll-single path.sql +++ /dev/null @@ -1,16 +0,0 @@ -SELECT - `resource_version`, - `namespace`, - `group`, - `resource`, - `name`, - `value`, - `action`, - `previous_resource_version` - FROM `resource_history` - WHERE 1 = 1 - AND `group` = 'group' - AND `resource` = 'res' - AND `resource_version` > 1234 - ORDER BY `resource_version` ASC -; diff --git a/pkg/storage/unified/sql/testdata/mysql--resource_insert-simple.sql b/pkg/storage/unified/sql/testdata/mysql--resource_insert-simple.sql index 5bf3424e55b..0897963b19c 100755 --- a/pkg/storage/unified/sql/testdata/mysql--resource_insert-simple.sql +++ b/pkg/storage/unified/sql/testdata/mysql--resource_insert-simple.sql @@ -5,7 +5,6 @@ INSERT INTO `resource` `resource`, `namespace`, `name`, - `previous_resource_version`, `value`, `action` ) @@ -15,7 +14,6 @@ INSERT INTO `resource` 'rr', 'nn', 'name', - 123, '[]', 'ADDED' ) diff --git a/pkg/storage/unified/sql/testdata/mysql--resource_version_insert-single path.sql b/pkg/storage/unified/sql/testdata/mysql--resource_version_insert-single path.sql index f99b2b00148..350f77472ab 100755 --- a/pkg/storage/unified/sql/testdata/mysql--resource_version_insert-single path.sql +++ b/pkg/storage/unified/sql/testdata/mysql--resource_version_insert-single path.sql @@ -7,6 +7,6 @@ INSERT INTO `resource_version` VALUES ( '', '', - 2 + 1 ) ; diff --git a/pkg/storage/unified/sql/testdata/postgres--resource_history_insert-insert into resource_history.sql b/pkg/storage/unified/sql/testdata/postgres--resource_history_insert-insert into resource_history.sql index a15a8db4b1e..643741bc3b1 100755 --- a/pkg/storage/unified/sql/testdata/postgres--resource_history_insert-insert into resource_history.sql +++ b/pkg/storage/unified/sql/testdata/postgres--resource_history_insert-insert into resource_history.sql @@ -5,7 +5,6 @@ INSERT INTO "resource_history" "resource", "namespace", "name", - "previous_resource_version", "value", "action" ) @@ -15,7 +14,6 @@ INSERT INTO "resource_history" '', '', '', - 1234, '[]', 'UNKNOWN' ) diff --git a/pkg/storage/unified/sql/testdata/postgres--resource_history_poll-single path.sql b/pkg/storage/unified/sql/testdata/postgres--resource_history_poll-single path.sql deleted file mode 100755 index d038317381a..00000000000 --- a/pkg/storage/unified/sql/testdata/postgres--resource_history_poll-single path.sql +++ /dev/null @@ -1,16 +0,0 @@ -SELECT - "resource_version", - "namespace", - "group", - "resource", - "name", - "value", - "action", - "previous_resource_version" - FROM "resource_history" - WHERE 1 = 1 - AND "group" = 'group' - AND "resource" = 'res' - AND "resource_version" > 1234 - ORDER BY "resource_version" ASC -; diff --git a/pkg/storage/unified/sql/testdata/postgres--resource_insert-simple.sql b/pkg/storage/unified/sql/testdata/postgres--resource_insert-simple.sql index fc2d22be1c4..9150eb59fef 100755 --- a/pkg/storage/unified/sql/testdata/postgres--resource_insert-simple.sql +++ b/pkg/storage/unified/sql/testdata/postgres--resource_insert-simple.sql @@ -5,7 +5,6 @@ INSERT INTO "resource" "resource", "namespace", "name", - "previous_resource_version", "value", "action" ) @@ -15,7 +14,6 @@ INSERT INTO "resource" 'rr', 'nn', 'name', - 123, '[]', 'ADDED' ) diff --git a/pkg/storage/unified/sql/testdata/postgres--resource_version_insert-single path.sql b/pkg/storage/unified/sql/testdata/postgres--resource_version_insert-single path.sql index 14b25955585..99003d5fefe 100755 --- a/pkg/storage/unified/sql/testdata/postgres--resource_version_insert-single path.sql +++ b/pkg/storage/unified/sql/testdata/postgres--resource_version_insert-single path.sql @@ -7,6 +7,6 @@ INSERT INTO "resource_version" VALUES ( '', '', - 2 + 1 ) ; diff --git a/pkg/storage/unified/sql/testdata/sqlite--resource_history_insert-insert into resource_history.sql b/pkg/storage/unified/sql/testdata/sqlite--resource_history_insert-insert into resource_history.sql index a15a8db4b1e..643741bc3b1 100755 --- a/pkg/storage/unified/sql/testdata/sqlite--resource_history_insert-insert into resource_history.sql +++ b/pkg/storage/unified/sql/testdata/sqlite--resource_history_insert-insert into resource_history.sql @@ -5,7 +5,6 @@ INSERT INTO "resource_history" "resource", "namespace", "name", - "previous_resource_version", "value", "action" ) @@ -15,7 +14,6 @@ INSERT INTO "resource_history" '', '', '', - 1234, '[]', 'UNKNOWN' ) diff --git a/pkg/storage/unified/sql/testdata/sqlite--resource_history_poll-single path.sql b/pkg/storage/unified/sql/testdata/sqlite--resource_history_poll-single path.sql deleted file mode 100755 index d038317381a..00000000000 --- a/pkg/storage/unified/sql/testdata/sqlite--resource_history_poll-single path.sql +++ /dev/null @@ -1,16 +0,0 @@ -SELECT - "resource_version", - "namespace", - "group", - "resource", - "name", - "value", - "action", - "previous_resource_version" - FROM "resource_history" - WHERE 1 = 1 - AND "group" = 'group' - AND "resource" = 'res' - AND "resource_version" > 1234 - ORDER BY "resource_version" ASC -; diff --git a/pkg/storage/unified/sql/testdata/sqlite--resource_insert-simple.sql b/pkg/storage/unified/sql/testdata/sqlite--resource_insert-simple.sql index fc2d22be1c4..9150eb59fef 100755 --- a/pkg/storage/unified/sql/testdata/sqlite--resource_insert-simple.sql +++ b/pkg/storage/unified/sql/testdata/sqlite--resource_insert-simple.sql @@ -5,7 +5,6 @@ INSERT INTO "resource" "resource", "namespace", "name", - "previous_resource_version", "value", "action" ) @@ -15,7 +14,6 @@ INSERT INTO "resource" 'rr', 'nn', 'name', - 123, '[]', 'ADDED' ) diff --git a/pkg/storage/unified/sql/testdata/sqlite--resource_version_insert-single path.sql b/pkg/storage/unified/sql/testdata/sqlite--resource_version_insert-single path.sql index 14b25955585..99003d5fefe 100755 --- a/pkg/storage/unified/sql/testdata/sqlite--resource_version_insert-single path.sql +++ b/pkg/storage/unified/sql/testdata/sqlite--resource_version_insert-single path.sql @@ -7,6 +7,6 @@ INSERT INTO "resource_version" VALUES ( '', '', - 2 + 1 ) ;