Don't use transaction in ListModifiedSince. (#110392)
* Don't use transaction in ListModifiedSince. To guarantee that we don't include events with RV > LatestRV, we include the check in SQL query instead. * Fix integration test by converting SQL comments into template comments.
This commit is contained in:
@@ -636,24 +636,11 @@ func (b *backend) listLatest(ctx context.Context, req *resourcepb.ListRequest, c
|
||||
// ListModifiedSince will return all resources that have changed since the given resource version.
|
||||
// If a resource has changes, only the latest change will be returned.
|
||||
func (b *backend) ListModifiedSince(ctx context.Context, key resource.NamespacedResource, sinceRv int64) (int64, iter.Seq2[*resource.ModifiedResource, error]) {
|
||||
tx, err := b.db.BeginTx(ctx, RepeatableRead)
|
||||
if err != nil {
|
||||
return 0, func(yield func(*resource.ModifiedResource, error) bool) {
|
||||
yield(nil, err)
|
||||
}
|
||||
}
|
||||
// We don't use an explicit transaction for fetching LatestRV and subsequent fetching of resources.
|
||||
// To guarantee that we don't include events with RV > LatestRV, we include the check in SQL query.
|
||||
|
||||
rollbackOnDefer := true
|
||||
defer func() {
|
||||
if rollbackOnDefer {
|
||||
if terr := tx.Rollback(); terr != nil {
|
||||
b.log.Warn("Error rolling back transaction in ListModifiedSince", "error", terr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Fetch latest RV within the transaction
|
||||
latestRv, err := b.fetchLatestRV(ctx, tx, b.dialect, key.Group, key.Resource)
|
||||
// Fetch latest RV.
|
||||
latestRv, err := b.fetchLatestRV(ctx, b.db, b.dialect, key.Group, key.Resource)
|
||||
if err != nil {
|
||||
return 0, func(yield func(*resource.ModifiedResource, error) bool) {
|
||||
yield(nil, err)
|
||||
@@ -668,35 +655,17 @@ func (b *backend) ListModifiedSince(ctx context.Context, key resource.Namespaced
|
||||
// since results are sorted by name ASC and rv DESC, we can get away with tracking the last seen
|
||||
lastSeen := ""
|
||||
|
||||
// We will rollback after iteration has finished.
|
||||
rollbackOnDefer = false
|
||||
|
||||
// rollback transaction if iterator not called within 30 seconds
|
||||
rollbackTimer := time.AfterFunc(30*time.Second, func() {
|
||||
if err := tx.Rollback(); err != nil && !errors.Is(err, sql.ErrTxDone) {
|
||||
b.log.Warn("rollback timer error", "err", err)
|
||||
}
|
||||
})
|
||||
|
||||
seq := func(yield func(*resource.ModifiedResource, error) bool) {
|
||||
rollbackTimer.Stop()
|
||||
|
||||
defer func() {
|
||||
// Always rollback the read-only transaction when iterator is done
|
||||
if rollbackErr := tx.Rollback(); rollbackErr != nil {
|
||||
b.log.Warn("Error rolling back transaction in ListModifiedSince", "error", rollbackErr)
|
||||
}
|
||||
}()
|
||||
|
||||
query := sqlResourceListModifiedSinceRequest{
|
||||
SQLTemplate: sqltemplate.New(b.dialect),
|
||||
Namespace: key.Namespace,
|
||||
Group: key.Group,
|
||||
Resource: key.Resource,
|
||||
SinceRv: sinceRv,
|
||||
LatestRv: latestRv,
|
||||
}
|
||||
|
||||
rows, err := dbutil.QueryRows(ctx, tx, sqlResourceHistoryListModifiedSince, query)
|
||||
rows, err := dbutil.QueryRows(ctx, b.db, sqlResourceHistoryListModifiedSince, query)
|
||||
if err != nil {
|
||||
yield(nil, err)
|
||||
return
|
||||
|
||||
@@ -10,5 +10,6 @@ FROM resource_history
|
||||
WHERE {{.Ident "namespace" }} = {{.Arg .Namespace }}
|
||||
AND {{.Ident "group" }} = {{.Arg .Group }}
|
||||
AND {{.Ident "resource" }} = {{.Arg .Resource }}
|
||||
AND {{.Ident "resource_version" }} > {{.Arg .SinceRv }} -- needs to be exclusive of the sinceRv
|
||||
AND {{.Ident "resource_version" }} > {{.Arg .SinceRv }} {{/* needs to exclude SinceRv */}}
|
||||
AND {{.Ident "resource_version" }} <= {{.Arg .LatestRv }} {{/* needs to include LatestRv */}}
|
||||
ORDER BY {{.Ident "name" }} ASC, {{.Ident "resource_version" }} DESC
|
||||
|
||||
@@ -432,7 +432,8 @@ type sqlResourceListModifiedSinceRequest struct {
|
||||
Namespace string
|
||||
Group string
|
||||
Resource string
|
||||
SinceRv int64
|
||||
SinceRv int64 // Exclusive
|
||||
LatestRv int64 // Inclusive
|
||||
}
|
||||
|
||||
func (r sqlResourceListModifiedSinceRequest) Validate() error {
|
||||
@@ -448,5 +449,8 @@ func (r sqlResourceListModifiedSinceRequest) Validate() error {
|
||||
if r.SinceRv < 0 {
|
||||
return fmt.Errorf("since resource version must be greater than or equal to zero")
|
||||
}
|
||||
if r.LatestRv < r.SinceRv {
|
||||
return fmt.Errorf("latest resource version must be greater or equal to since resource version")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -129,6 +129,7 @@ func TestUnifiedStorageQueries(t *testing.T) {
|
||||
Group: "group",
|
||||
Resource: "res",
|
||||
SinceRv: 10000,
|
||||
LatestRv: 20000,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
Vendored
+2
-1
@@ -10,5 +10,6 @@ FROM resource_history
|
||||
WHERE `namespace` = 'ns'
|
||||
AND `group` = 'group'
|
||||
AND `resource` = 'res'
|
||||
AND `resource_version` > 10000 -- needs to be exclusive of the sinceRv
|
||||
AND `resource_version` > 10000
|
||||
AND `resource_version` <= 20000
|
||||
ORDER BY `name` ASC, `resource_version` DESC
|
||||
|
||||
Vendored
+2
-1
@@ -10,5 +10,6 @@ FROM resource_history
|
||||
WHERE "namespace" = 'ns'
|
||||
AND "group" = 'group'
|
||||
AND "resource" = 'res'
|
||||
AND "resource_version" > 10000 -- needs to be exclusive of the sinceRv
|
||||
AND "resource_version" > 10000
|
||||
AND "resource_version" <= 20000
|
||||
ORDER BY "name" ASC, "resource_version" DESC
|
||||
|
||||
Vendored
+2
-1
@@ -10,5 +10,6 @@ FROM resource_history
|
||||
WHERE "namespace" = 'ns'
|
||||
AND "group" = 'group'
|
||||
AND "resource" = 'res'
|
||||
AND "resource_version" > 10000 -- needs to be exclusive of the sinceRv
|
||||
AND "resource_version" > 10000
|
||||
AND "resource_version" <= 20000
|
||||
ORDER BY "name" ASC, "resource_version" DESC
|
||||
|
||||
Reference in New Issue
Block a user