From f0a6327edc84864948007bf17d0a131fccda565c Mon Sep 17 00:00:00 2001 From: Mariell Hoversholm Date: Mon, 31 Mar 2025 15:06:31 +0200 Subject: [PATCH] Unified Storage: Don't read before create (#102906) * Unified Storage: Don't read before create * test: use the existing test infra * fix: support pq We use pgx, but it seems to be wrapped in a pq driver shim, causing the errors to be remapped to pq's type. Weird situation. * feat: support CDK backend * revert: there is a postgres_tests block * fix(CDK): only check existence on ADDED updates * fix(CDK): use ReadResource to deal with deleted files --- pkg/apiserver/storage/testing/store_tests.go | 15 ++++++ pkg/storage/unified/apistore/store_test.go | 7 +++ pkg/storage/unified/resource/cdk_backend.go | 12 +++++ pkg/storage/unified/resource/errors.go | 4 +- pkg/storage/unified/resource/server.go | 12 +---- pkg/storage/unified/sql/backend.go | 7 +++ .../unified/testing/storage_backend.go | 54 ++++++++++++++++--- 7 files changed, 93 insertions(+), 18 deletions(-) diff --git a/pkg/apiserver/storage/testing/store_tests.go b/pkg/apiserver/storage/testing/store_tests.go index a58df4b4010..4e57011234c 100644 --- a/pkg/apiserver/storage/testing/store_tests.go +++ b/pkg/apiserver/storage/testing/store_tests.go @@ -2456,6 +2456,21 @@ func RunTestGuaranteedUpdateChecksStoredData(ctx context.Context, t *testing.T, } } +func RunTestValidUpdate(ctx context.Context, t *testing.T, store storage.Interface) { + pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}} + key, pod := testPropagateStore(ctx, t, store, pod) + + err := store.GuaranteedUpdate(ctx, key, &example.Pod{}, false, nil, + storage.SimpleUpdate(func(o runtime.Object) (runtime.Object, error) { + pod := o.(*example.Pod) + pod.Spec.Hostname = "example" + return pod, nil + }), pod) + if err != nil { + t.Errorf("got error on update: %v", err) + } +} + func RunTestGuaranteedUpdateWithConflict(ctx context.Context, t *testing.T, store storage.Interface) { key, _ := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}) diff --git a/pkg/storage/unified/apistore/store_test.go b/pkg/storage/unified/apistore/store_test.go index 3c66beda27d..82207f11928 100644 --- a/pkg/storage/unified/apistore/store_test.go +++ b/pkg/storage/unified/apistore/store_test.go @@ -117,6 +117,13 @@ func TestCreateWithKeyExist(t *testing.T) { storagetesting.RunTestCreateWithKeyExist(ctx, t, store) } +func TestValidUpdate(t *testing.T) { + ctx, store, destroyFunc, err := testSetup(t) + defer destroyFunc() + assert.NoError(t, err) + storagetesting.RunTestValidUpdate(ctx, t, store) +} + func TestGet(t *testing.T) { ctx, store, destroyFunc, err := testSetup(t) defer destroyFunc() diff --git a/pkg/storage/unified/resource/cdk_backend.go b/pkg/storage/unified/resource/cdk_backend.go index 19a7385a3f5..f784802146d 100644 --- a/pkg/storage/unified/resource/cdk_backend.go +++ b/pkg/storage/unified/resource/cdk_backend.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "github.com/grafana/grafana/pkg/apimachinery/utils" + "github.com/grafana/grafana/pkg/storage/unified/backend" ) type CDKBackendOptions struct { @@ -116,6 +117,17 @@ func (s *cdkBackend) GetResourceStats(ctx context.Context, namespace string, min } func (s *cdkBackend) WriteEvent(ctx context.Context, event WriteEvent) (rv int64, err error) { + if event.Type == WatchEvent_ADDED { + // ReadResource deals with deleted values (i.e. a file exists but has generation -999). + resp := s.ReadResource(ctx, &ReadRequest{Key: event.Key}) + if resp.Error != nil && resp.Error.Code != http.StatusNotFound { + return 0, GetError(resp.Error) + } + if resp.Value != nil { + return 0, backend.ErrResourceAlreadyExists + } + } + // Scope the lock { s.mutex.Lock() diff --git a/pkg/storage/unified/resource/errors.go b/pkg/storage/unified/resource/errors.go index 48faa27a303..69625f99b94 100644 --- a/pkg/storage/unified/resource/errors.go +++ b/pkg/storage/unified/resource/errors.go @@ -43,8 +43,8 @@ func AsErrorResult(err error) *ErrorResult { return nil } - apistatus, ok := err.(apierrors.APIStatus) - if ok { + var apistatus apierrors.APIStatus + if errors.As(err, &apistatus) { s := apistatus.Status() res := &ErrorResult{ Message: s.Message, diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index c0992e8e1ec..03066765a8e 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -476,22 +476,14 @@ func (s *server) Create(ctx context.Context, req *CreateRequest) (*CreateRespons return rsp, nil } - found := s.backend.ReadResource(ctx, &ReadRequest{Key: req.Key}) - if found != nil && len(found.Value) > 0 { - rsp.Error = &ErrorResult{ - Code: http.StatusConflict, - Reason: string(metav1.StatusReasonAlreadyExists), - Message: "key already exists", // TODO?? soft delete replace? - } - return rsp, nil - } - event, e := s.newEvent(ctx, user, req.Key, req.Value, nil) if e != nil { rsp.Error = e return rsp, nil } + // If the resource already exists, the create will return an already exists error that is remapped appropriately by AsErrorResult. + // This also benefits from ACID behaviours on our databases, so we avoid race conditions. var err error rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, *event) if err != nil { diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index ec754fc5c51..f69f2166789 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -13,6 +13,7 @@ import ( "github.com/google/uuid" unifiedbackend "github.com/grafana/grafana/pkg/storage/unified/backend" "github.com/jackc/pgx/v5/pgconn" + "github.com/lib/pq" "github.com/mattn/go-sqlite3" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/trace" @@ -397,6 +398,12 @@ func isRowAlreadyExistsError(err error) bool { return pg.Code == "23505" // unique_violation } + var pqerr *pq.Error + if errors.As(err, &pqerr) { + // https://www.postgresql.org/docs/current/errcodes-appendix.html + return pqerr.Code == "23505" // unique_violation + } + var mysqlerr *mysql.MySQLError if errors.As(err, &mysqlerr) { // https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html diff --git a/pkg/storage/unified/testing/storage_backend.go b/pkg/storage/unified/testing/storage_backend.go index a5fd66b1230..a81a69a95fa 100644 --- a/pkg/storage/unified/testing/storage_backend.go +++ b/pkg/storage/unified/testing/storage_backend.go @@ -3,6 +3,7 @@ package test import ( "context" "fmt" + "net/http" "slices" "strings" "testing" @@ -10,8 +11,11 @@ import ( "github.com/go-jose/go-jose/v3/jwt" "github.com/google/uuid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apiserver/pkg/endpoints/request" "github.com/grafana/authlib/authn" "github.com/grafana/authlib/types" @@ -22,12 +26,13 @@ import ( // Test names for the storage backend test suite const ( - TestHappyPath = "happy path" - TestWatchWriteEvents = "watch write events from latest" - TestList = "list" - TestBlobSupport = "blob support" - TestGetResourceStats = "get resource stats" - TestListHistory = "list history" + TestHappyPath = "happy path" + TestWatchWriteEvents = "watch write events from latest" + TestList = "list" + TestBlobSupport = "blob support" + TestGetResourceStats = "get resource stats" + TestListHistory = "list history" + TestCreateNewResource = "create new resource" ) type NewBackendFunc func(ctx context.Context) resource.StorageBackend @@ -70,6 +75,7 @@ func RunStorageBackendTest(t *testing.T, newBackend NewBackendFunc, opts *TestOp {TestBlobSupport, runTestIntegrationBlobSupport}, {TestGetResourceStats, runTestIntegrationBackendGetResourceStats}, {TestListHistory, runTestIntegrationBackendListHistory}, + {TestCreateNewResource, runTestIntegrationBackendCreateNewResource}, } for _, tc := range cases { @@ -907,6 +913,42 @@ func runTestIntegrationBlobSupport(t *testing.T, backend resource.StorageBackend }) } +func runTestIntegrationBackendCreateNewResource(t *testing.T, backend resource.StorageBackend, nsPrefix string) { + ctx := types.WithAuthInfo(t.Context(), authn.NewAccessTokenAuthInfo(authn.Claims[authn.AccessTokenClaims]{ + Claims: jwt.Claims{ + Subject: "testuser", + }, + Rest: authn.AccessTokenClaims{}, + })) + + server := newServer(t, backend) + ns := nsPrefix + "-create-resource" + ctx = request.WithNamespace(ctx, ns) + + request := &resource.CreateRequest{ + Key: &resource.ResourceKey{ + Namespace: "default", + Group: "test.grafana", + Resource: "Test", + Name: "test", + }, + Value: []byte(`{"apiVersion":"test.grafana/v0alpha1","kind":"Test","metadata":{"name":"test","namespace":"default"}}`), + } + + response, err := server.Create(ctx, request) + require.NoError(t, err, "create resource") + require.Nil(t, response.Error, "create resource response.Error") + + t.Run("gracefully handles resource already exists error", func(t *testing.T) { + response, err := server.Create(ctx, request) + require.NoError(t, err, "create resource") + require.NotNil(t, response.GetError(), "create resource response.Error") + assert.Equal(t, int32(http.StatusConflict), response.GetError().GetCode(), "create resource response.Error.Code") + assert.Equal(t, string(metav1.StatusReasonAlreadyExists), response.GetError().GetReason(), "create resource response.Error.Reason") + t.Logf("Error: %v", response.GetError()) // only prints on failure, so this is fine + }) +} + // WriteEventOption is a function that modifies WriteEventOptions type WriteEventOption func(*WriteEventOptions)