Revert "Unistore : Ensure Watch works in HA mode." (#94097)

Revert "Unistore : Ensure Watch works in HA mode. (#93428)"

This reverts commit 0a26c9e9ae.
This commit is contained in:
Georges Chaudy
2024-10-01 14:45:47 -04:00
committed by GitHub
parent 78290301f4
commit 7c3fc2f261
28 changed files with 699 additions and 475 deletions
+19 -27
View File
@@ -22,7 +22,6 @@ import (
)
const trace_prefix = "sql.resource."
const defaultPollingInterval = 100 * time.Millisecond
type Backend interface {
resource.StorageBackend
@@ -31,9 +30,8 @@ type Backend interface {
}
type BackendOptions struct {
DBProvider db.DBProvider
Tracer trace.Tracer
PollingInterval time.Duration
DBProvider db.DBProvider
Tracer trace.Tracer
}
func NewBackend(opts BackendOptions) (Backend, error) {
@@ -45,17 +43,12 @@ func NewBackend(opts BackendOptions) (Backend, error) {
}
ctx, cancel := context.WithCancel(context.Background())
pollingInterval := opts.PollingInterval
if pollingInterval == 0 {
pollingInterval = defaultPollingInterval
}
return &backend{
done: ctx.Done(),
cancel: cancel,
log: log.New("sql-resource-server"),
tracer: opts.Tracer,
dbProvider: opts.DBProvider,
pollingInterval: pollingInterval,
done: ctx.Done(),
cancel: cancel,
log: log.New("sql-resource-server"),
tracer: opts.Tracer,
dbProvider: opts.DBProvider,
}, nil
}
@@ -77,7 +70,6 @@ type backend struct {
// watch streaming
//stream chan *resource.WatchEvent
pollingInterval time.Duration
}
func (b *backend) Init(ctx context.Context) error {
@@ -188,6 +180,7 @@ func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64,
return nil
})
return newVersion, err
}
@@ -519,7 +512,8 @@ func (b *backend) WatchWriteEvents(ctx context.Context) (<-chan *resource.Writte
}
func (b *backend) poller(ctx context.Context, since groupResourceRV, stream chan<- *resource.WrittenEvent) {
t := time.NewTicker(b.pollingInterval)
interval := 100 * time.Millisecond // TODO make this configurable
t := time.NewTicker(interval)
defer close(stream)
defer t.Stop()
@@ -532,7 +526,7 @@ func (b *backend) poller(ctx context.Context, since groupResourceRV, stream chan
grv, err := b.listLatestRVs(ctx)
if err != nil {
b.log.Error("get the latest resource version", "err", err)
t.Reset(b.pollingInterval)
t.Reset(interval)
continue
}
for group, items := range grv {
@@ -549,7 +543,7 @@ func (b *backend) poller(ctx context.Context, since groupResourceRV, stream chan
next, err := b.poll(ctx, group, resource, since[group][resource], stream)
if err != nil {
b.log.Error("polling for resource", "err", err)
t.Reset(b.pollingInterval)
t.Reset(interval)
continue
}
if next > since[group][resource] {
@@ -558,7 +552,7 @@ func (b *backend) poller(ctx context.Context, since groupResourceRV, stream chan
}
}
t.Reset(b.pollingInterval)
t.Reset(interval)
}
}
}
@@ -642,8 +636,7 @@ func (b *backend) poll(ctx context.Context, grp string, res string, since int64,
Resource: rec.Key.Resource,
Name: rec.Key.Name,
},
Type: resource.WatchEvent_Type(rec.Action),
PreviousRV: rec.PreviousRV,
Type: resource.WatchEvent_Type(rec.Action),
},
ResourceVersion: rec.ResourceVersion,
// Timestamp: , // TODO: add timestamp
@@ -670,16 +663,15 @@ func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemp
if errors.Is(err, sql.ErrNoRows) {
// if there wasn't a row associated with the given resource, we create one with
// version 2 to match the etcd behavior.
// version 1
if _, err = dbutil.Exec(ctx, x, sqlResourceVersionInsert, sqlResourceVersionRequest{
SQLTemplate: sqltemplate.New(d),
Group: key.Group,
Resource: key.Resource,
resourceVersion: &resourceVersion{1},
SQLTemplate: sqltemplate.New(d),
Group: key.Group,
Resource: key.Resource,
}); err != nil {
return 0, fmt.Errorf("insert into resource_version: %w", err)
}
return 2, nil
return 1, nil
}
if err != nil {
+4 -4
View File
@@ -227,7 +227,7 @@ func TestResourceVersionAtomicInc(t *testing.T) {
v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey)
require.NoError(t, err)
require.Equal(t, int64(2), v)
require.Equal(t, int64(1), v)
})
t.Run("happy path - update existing row", func(t *testing.T) {
@@ -304,7 +304,7 @@ func TestBackend_create(t *testing.T) {
v, err := b.create(ctx, event)
require.NoError(t, err)
require.Equal(t, int64(2), v)
require.Equal(t, int64(1), v)
})
t.Run("error inserting into resource", func(t *testing.T) {
@@ -409,7 +409,7 @@ func TestBackend_update(t *testing.T) {
v, err := b.update(ctx, event)
require.NoError(t, err)
require.Equal(t, int64(2), v)
require.Equal(t, int64(1), v)
})
t.Run("error in first update to resource", func(t *testing.T) {
@@ -513,7 +513,7 @@ func TestBackend_delete(t *testing.T) {
v, err := b.delete(ctx, event)
require.NoError(t, err)
require.Equal(t, int64(2), v)
require.Equal(t, int64(1), v)
})
t.Run("error deleting resource", func(t *testing.T) {
@@ -6,7 +6,6 @@ INSERT INTO {{ .Ident "resource_history" }}
{{ .Ident "namespace" }},
{{ .Ident "name" }},
{{ .Ident "previous_resource_version"}},
{{ .Ident "value" }},
{{ .Ident "action" }}
)
@@ -18,7 +17,6 @@ INSERT INTO {{ .Ident "resource_history" }}
{{ .Arg .WriteEvent.Key.Namespace }},
{{ .Arg .WriteEvent.Key.Name }},
{{ .Arg .WriteEvent.PreviousRV }},
{{ .Arg .WriteEvent.Value }},
{{ .Arg .WriteEvent.Type }}
)
@@ -5,8 +5,7 @@ SELECT
{{ .Ident "resource" | .Into .Response.Key.Resource }},
{{ .Ident "name" | .Into .Response.Key.Name }},
{{ .Ident "value" | .Into .Response.Value }},
{{ .Ident "action" | .Into .Response.Action }},
{{ .Ident "previous_resource_version" | .Into .Response.PreviousRV }}
{{ .Ident "action" | .Into .Response.Action }}
FROM {{ .Ident "resource_history" }}
WHERE 1 = 1
@@ -7,7 +7,6 @@ INSERT INTO {{ .Ident "resource" }}
{{ .Ident "namespace" }},
{{ .Ident "name" }},
{{ .Ident "previous_resource_version" }},
{{ .Ident "value" }},
{{ .Ident "action" }}
)
@@ -18,7 +17,6 @@ INSERT INTO {{ .Ident "resource" }}
{{ .Arg .WriteEvent.Key.Namespace }},
{{ .Arg .WriteEvent.Key.Name }},
{{ .Arg .WriteEvent.PreviousRV }},
{{ .Arg .WriteEvent.Value }},
{{ .Arg .WriteEvent.Type }}
)
@@ -8,6 +8,6 @@ INSERT INTO {{ .Ident "resource_version" }}
VALUES (
{{ .Arg .Group }},
{{ .Arg .Resource }},
2
1
)
;
@@ -10,7 +10,8 @@ func initResourceTables(mg *migrator.Migrator) string {
marker := "Initialize resource tables"
mg.AddMigration(marker, &migrator.RawSQLMigration{})
resource_table := migrator.Table{
tables := []migrator.Table{}
tables = append(tables, migrator.Table{
Name: "resource",
Columns: []*migrator.Column{
// primary identifier
@@ -32,8 +33,9 @@ func initResourceTables(mg *migrator.Migrator) string {
Indices: []*migrator.Index{
{Cols: []string{"namespace", "group", "resource", "name"}, Type: migrator.UniqueIndex},
},
}
resource_history_table := migrator.Table{
})
tables = append(tables, migrator.Table{
Name: "resource_history",
Columns: []*migrator.Column{
// primary identifier
@@ -60,9 +62,7 @@ func initResourceTables(mg *migrator.Migrator) string {
// index to support watch poller
{Cols: []string{"resource_version"}, Type: migrator.IndexType},
},
}
tables := []migrator.Table{resource_table, resource_history_table}
})
// tables = append(tables, migrator.Table{
// Name: "resource_label_set",
@@ -97,13 +97,5 @@ func initResourceTables(mg *migrator.Migrator) string {
}
}
mg.AddMigration("Add column previous_resource_version in resource_history", migrator.NewAddColumnMigration(resource_history_table, &migrator.Column{
Name: "previous_resource_version", Type: migrator.DB_BigInt, Nullable: false,
}))
mg.AddMigration("Add column previous_resource_version in resource", migrator.NewAddColumnMigration(resource_table, &migrator.Column{
Name: "previous_resource_version", Type: migrator.DB_BigInt, Nullable: false,
}))
return marker
}
-2
View File
@@ -70,7 +70,6 @@ func (r sqlResourceRequest) Validate() error {
type historyPollResponse struct {
Key resource.ResourceKey
ResourceVersion int64
PreviousRV int64
Value []byte
Action int
}
@@ -102,7 +101,6 @@ func (r *sqlResourceHistoryPollRequest) Results() (*historyPollResponse, error)
Name: r.Response.Key.Name,
},
ResourceVersion: r.Response.ResourceVersion,
PreviousRV: r.Response.PreviousRV,
Value: r.Response.Value,
Action: r.Response.Action,
}, nil
+1 -14
View File
@@ -104,18 +104,6 @@ func TestUnifiedStorageQueries(t *testing.T) {
},
},
},
sqlResourceHistoryPoll: {
{
Name: "single path",
Data: &sqlResourceHistoryPollRequest{
SQLTemplate: mocks.NewTestingSQLTemplate(),
Resource: "res",
Group: "group",
SinceResourceVersion: 1234,
Response: new(historyPollResponse),
},
},
},
sqlResourceUpdateRV: {
{
@@ -155,8 +143,7 @@ func TestUnifiedStorageQueries(t *testing.T) {
Data: &sqlResourceRequest{
SQLTemplate: mocks.NewTestingSQLTemplate(),
WriteEvent: resource.WriteEvent{
Key: &resource.ResourceKey{},
PreviousRV: 1234,
Key: &resource.ResourceKey{},
},
},
},
@@ -5,7 +5,6 @@ INSERT INTO `resource_history`
`resource`,
`namespace`,
`name`,
`previous_resource_version`,
`value`,
`action`
)
@@ -15,7 +14,6 @@ INSERT INTO `resource_history`
'',
'',
'',
1234,
'[]',
'UNKNOWN'
)
@@ -1,16 +0,0 @@
SELECT
`resource_version`,
`namespace`,
`group`,
`resource`,
`name`,
`value`,
`action`,
`previous_resource_version`
FROM `resource_history`
WHERE 1 = 1
AND `group` = 'group'
AND `resource` = 'res'
AND `resource_version` > 1234
ORDER BY `resource_version` ASC
;
@@ -5,7 +5,6 @@ INSERT INTO `resource`
`resource`,
`namespace`,
`name`,
`previous_resource_version`,
`value`,
`action`
)
@@ -15,7 +14,6 @@ INSERT INTO `resource`
'rr',
'nn',
'name',
123,
'[]',
'ADDED'
)
@@ -7,6 +7,6 @@ INSERT INTO `resource_version`
VALUES (
'',
'',
2
1
)
;
@@ -5,7 +5,6 @@ INSERT INTO "resource_history"
"resource",
"namespace",
"name",
"previous_resource_version",
"value",
"action"
)
@@ -15,7 +14,6 @@ INSERT INTO "resource_history"
'',
'',
'',
1234,
'[]',
'UNKNOWN'
)
@@ -1,16 +0,0 @@
SELECT
"resource_version",
"namespace",
"group",
"resource",
"name",
"value",
"action",
"previous_resource_version"
FROM "resource_history"
WHERE 1 = 1
AND "group" = 'group'
AND "resource" = 'res'
AND "resource_version" > 1234
ORDER BY "resource_version" ASC
;
@@ -5,7 +5,6 @@ INSERT INTO "resource"
"resource",
"namespace",
"name",
"previous_resource_version",
"value",
"action"
)
@@ -15,7 +14,6 @@ INSERT INTO "resource"
'rr',
'nn',
'name',
123,
'[]',
'ADDED'
)
@@ -7,6 +7,6 @@ INSERT INTO "resource_version"
VALUES (
'',
'',
2
1
)
;
@@ -5,7 +5,6 @@ INSERT INTO "resource_history"
"resource",
"namespace",
"name",
"previous_resource_version",
"value",
"action"
)
@@ -15,7 +14,6 @@ INSERT INTO "resource_history"
'',
'',
'',
1234,
'[]',
'UNKNOWN'
)
@@ -1,16 +0,0 @@
SELECT
"resource_version",
"namespace",
"group",
"resource",
"name",
"value",
"action",
"previous_resource_version"
FROM "resource_history"
WHERE 1 = 1
AND "group" = 'group'
AND "resource" = 'res'
AND "resource_version" > 1234
ORDER BY "resource_version" ASC
;
@@ -5,7 +5,6 @@ INSERT INTO "resource"
"resource",
"namespace",
"name",
"previous_resource_version",
"value",
"action"
)
@@ -15,7 +14,6 @@ INSERT INTO "resource"
'rr',
'nn',
'name',
123,
'[]',
'ADDED'
)
@@ -7,6 +7,6 @@ INSERT INTO "resource_version"
VALUES (
'',
'',
2
1
)
;