Files
grafana/pkg/storage/secret/metadata/secure_value_store.go
T

707 lines
22 KiB
Go

package metadata
import (
"context"
"fmt"
"strconv"
"time"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana-app-sdk/logging"
secretv1beta1 "github.com/grafana/grafana/apps/secret/pkg/apis/secret/v1beta1"
"github.com/grafana/grafana/pkg/registry/apis/secret/contracts"
"github.com/grafana/grafana/pkg/registry/apis/secret/xkube"
"github.com/grafana/grafana/pkg/storage/secret/metadata/metrics"
"github.com/grafana/grafana/pkg/storage/unified/sql"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
"go.opentelemetry.io/otel/codes"
)
var _ contracts.SecureValueMetadataStorage = (*secureValueMetadataStorage)(nil)
func ProvideSecureValueMetadataStorage(
clock contracts.Clock,
db contracts.Database,
tracer trace.Tracer,
reg prometheus.Registerer,
) (contracts.SecureValueMetadataStorage, error) {
return &secureValueMetadataStorage{
clock: clock,
db: db,
dialect: sqltemplate.DialectForDriver(db.DriverName()),
metrics: metrics.NewStorageMetrics(reg),
tracer: tracer,
}, nil
}
// secureValueMetadataStorage is the actual implementation of the secure value (metadata) storage.
type secureValueMetadataStorage struct {
clock contracts.Clock
db contracts.Database
dialect sqltemplate.Dialect
metrics *metrics.StorageMetrics
tracer trace.Tracer
}
func (s *secureValueMetadataStorage) Create(ctx context.Context, keeper string, sv *secretv1beta1.SecureValue, actorUID string) (_ *secretv1beta1.SecureValue, svmCreateErr error) {
start := s.clock.Now()
name := sv.GetName()
namespace := sv.GetNamespace()
ctx, span := s.tracer.Start(ctx, "SecureValueMetadataStorage.Create", trace.WithAttributes(
attribute.String("name", name),
attribute.String("namespace", namespace),
attribute.String("keeper", keeper),
attribute.String("actorUID", actorUID),
))
defer span.End()
defer func() {
success := svmCreateErr == nil
args := []any{
"name", name,
"namespace", namespace,
"keeper", keeper,
"actorUID", actorUID,
}
args = append(args, "success", success)
if !success {
span.SetStatus(codes.Error, "SecureValueMetadataStorage.Create failed")
span.RecordError(svmCreateErr)
args = append(args, "error", svmCreateErr)
}
logging.FromContext(ctx).Info("SecureValueMetadataStorage.Create", args...)
s.metrics.SecureValueMetadataCreateDuration.WithLabelValues(strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
}()
// Set inside the transaction callback
var row *secureValueDB
err := s.db.Transaction(ctx, func(ctx context.Context) error {
latest, err := s.getLatestVersionAndCreated(ctx, xkube.Namespace(sv.Namespace), sv.Name)
if err != nil {
return fmt.Errorf("fetching latest secure value version: %w", err)
}
version := int64(1)
if latest.version > 0 {
version = latest.version + 1
}
// Some other concurrent request may have created the version we're trying to create,
// if that's the case, we'll retry with a new version up to max attempts.
maxAttempts := 3
attempts := 0
for {
sv.Status.Version = version
now := s.clock.Now().UTC().Unix()
createdAt := now
if latest.createdAt > 0 {
createdAt = latest.createdAt
}
updatedAt := now
createdBy := actorUID
if latest.createdBy != "" {
createdBy = latest.createdBy
}
updatedBy := actorUID
row, err = toCreateRow(createdAt, updatedAt, keeper, sv, createdBy, updatedBy)
if err != nil {
return fmt.Errorf("to create row: %w", err)
}
req := createSecureValue{
SQLTemplate: sqltemplate.New(s.dialect),
Row: row,
}
query, err := sqltemplate.Execute(sqlSecureValueCreate, req)
if err != nil {
return fmt.Errorf("execute template %q: %w", sqlSecureValueCreate.Name(), err)
}
res, err := s.db.ExecContext(ctx, query, req.GetArgs()...)
if err != nil {
if sql.IsRowAlreadyExistsError(err) {
if attempts < maxAttempts {
attempts += 1
version += 1
continue
}
return fmt.Errorf("namespace=%+v name=%+v %w", sv.Namespace, sv.Name, contracts.ErrSecureValueAlreadyExists)
}
return fmt.Errorf("inserting row: %w", err)
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("getting rows affected: %w", err)
}
if rowsAffected != 1 {
return fmt.Errorf("expected 1 row affected, got %d for %s on %s", rowsAffected, row.Name, row.Namespace)
}
return nil
}
})
if err != nil {
return nil, fmt.Errorf("db failure: %w", err)
}
createdSecureValue, err := row.toKubernetes()
if err != nil {
return nil, fmt.Errorf("convert to kubernetes object: %w", err)
}
return createdSecureValue, nil
}
type versionAndCreated struct {
createdAt int64
createdBy string
version int64
}
func (s *secureValueMetadataStorage) getLatestVersionAndCreated(ctx context.Context, namespace xkube.Namespace, name string) (versionAndCreated, error) {
ctx, span := s.tracer.Start(ctx, "SecureValueMetadataStorage.getLatestVersionAndCreated", trace.WithAttributes(
attribute.String("name", name),
attribute.String("namespace", namespace.String()),
))
defer span.End()
req := getLatestSecureValueVersionAndCreatedAt{
SQLTemplate: sqltemplate.New(s.dialect),
Namespace: namespace.String(),
Name: name,
}
q, err := sqltemplate.Execute(sqlGetLatestSecureValueVersionAndCreatedAt, req)
if err != nil {
return versionAndCreated{}, fmt.Errorf("execute template %q: %w", sqlGetLatestSecureValueVersionAndCreatedAt.Name(), err)
}
rows, err := s.db.QueryContext(ctx, q, req.GetArgs()...)
if err != nil {
return versionAndCreated{}, fmt.Errorf("fetching latest version for secure value: namespace=%+v name=%+v %w", namespace, name, err)
}
defer func() { _ = rows.Close() }()
if err := rows.Err(); err != nil {
return versionAndCreated{}, fmt.Errorf("error executing query: %w", err)
}
if !rows.Next() {
return versionAndCreated{}, nil
}
var (
createdAt int64
createdBy string
version int64
active bool
namespaceFromDB string
nameFromDB string
)
if err := rows.Scan(&createdAt, &createdBy, &version, &active, &namespaceFromDB, &nameFromDB); err != nil {
return versionAndCreated{}, fmt.Errorf("scanning version and created from returned rows: %w", err)
}
if namespaceFromDB != namespace.String() || nameFromDB != name {
return versionAndCreated{}, fmt.Errorf("bug: expected to find version and created for namespace=%+v name=%+v but got for namespace=%+v name=%+v",
namespace, name, namespaceFromDB, nameFromDB)
}
if !active {
createdAt = 0
createdBy = ""
}
return versionAndCreated{
createdAt: createdAt,
createdBy: createdBy,
version: version,
}, nil
}
func (s *secureValueMetadataStorage) readActiveVersion(ctx context.Context, namespace xkube.Namespace, name string, opts contracts.ReadOpts) (secureValueDB, error) {
req := readSecureValue{
SQLTemplate: sqltemplate.New(s.dialect),
Namespace: namespace.String(),
Name: name,
IsForUpdate: opts.ForUpdate,
}
query, err := sqltemplate.Execute(sqlSecureValueRead, req)
if err != nil {
return secureValueDB{}, fmt.Errorf("execute template %q: %w", sqlSecureValueRead.Name(), err)
}
res, err := s.db.QueryContext(ctx, query, req.GetArgs()...)
if err != nil {
return secureValueDB{}, fmt.Errorf("reading row: %w", err)
}
defer func() { _ = res.Close() }()
var secureValue secureValueDB
if !res.Next() {
return secureValueDB{}, contracts.ErrSecureValueNotFound
}
if err := res.Scan(
&secureValue.GUID, &secureValue.Name, &secureValue.Namespace,
&secureValue.Annotations, &secureValue.Labels,
&secureValue.Created, &secureValue.CreatedBy,
&secureValue.Updated, &secureValue.UpdatedBy,
&secureValue.Description, &secureValue.Keeper, &secureValue.Decrypters, &secureValue.Ref, &secureValue.ExternalID, &secureValue.Active, &secureValue.Version,
&secureValue.OwnerReferenceAPIGroup, &secureValue.OwnerReferenceAPIVersion, &secureValue.OwnerReferenceKind, &secureValue.OwnerReferenceName,
); err != nil {
return secureValueDB{}, fmt.Errorf("failed to scan secure value row: %w", err)
}
if err := res.Err(); err != nil {
return secureValueDB{}, fmt.Errorf("read rows error: %w", err)
}
if secureValue.Namespace != namespace.String() || secureValue.Name != name {
return secureValueDB{}, fmt.Errorf("bug: expected to read secure value %+v from namespace %+v, but got a different row", name, namespace)
}
return secureValue, nil
}
func (s *secureValueMetadataStorage) Read(ctx context.Context, namespace xkube.Namespace, name string, opts contracts.ReadOpts) (_ *secretv1beta1.SecureValue, readErr error) {
start := s.clock.Now()
ctx, span := s.tracer.Start(ctx, "SecureValueMetadataStorage.Read", trace.WithAttributes(
attribute.String("name", name),
attribute.String("namespace", namespace.String()),
attribute.Bool("isForUpdate", opts.ForUpdate),
))
defer span.End()
defer func() {
success := readErr == nil
args := []any{
"name", name,
"namespace", namespace.String(),
"success", success,
}
if !success {
span.SetStatus(codes.Error, "SecureValueMetadataStorage.Read failed")
span.RecordError(readErr)
args = append(args, "error", readErr)
}
logging.FromContext(ctx).Info("SecureValueMetadataStorage.Read", args...)
s.metrics.SecureValueMetadataGetDuration.WithLabelValues(strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
}()
secureValue, err := s.readActiveVersion(ctx, namespace, name, opts)
if err != nil {
return nil, err
}
secureValueKub, err := secureValue.toKubernetes()
if err != nil {
return nil, fmt.Errorf("convert to kubernetes object: %w", err)
}
return secureValueKub, nil
}
func (s *secureValueMetadataStorage) List(ctx context.Context, namespace xkube.Namespace) (svList []secretv1beta1.SecureValue, listErr error) {
start := s.clock.Now()
ctx, span := s.tracer.Start(ctx, "SecureValueMetadataStorage.List", trace.WithAttributes(
attribute.String("namespace", namespace.String()),
))
defer span.End()
defer func() {
success := listErr == nil
span.SetAttributes(attribute.Int("returnedList.count", len(svList)))
args := []any{
"namespace", namespace.String(),
"success", success,
}
if !success {
span.SetStatus(codes.Error, "SecureValueMetadataStorage.List failed")
span.RecordError(listErr)
args = append(args, "error", listErr)
}
logging.FromContext(ctx).Info("SecureValueMetadataStorage.List", args...)
s.metrics.SecureValueMetadataListDuration.WithLabelValues(strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
}()
req := listSecureValue{
SQLTemplate: sqltemplate.New(s.dialect),
Namespace: namespace.String(),
}
q, err := sqltemplate.Execute(sqlSecureValueList, req)
if err != nil {
return nil, fmt.Errorf("execute template %q: %w", sqlSecureValueList.Name(), err)
}
rows, err := s.db.QueryContext(ctx, q, req.GetArgs()...)
if err != nil {
return nil, fmt.Errorf("listing secure values: %w", err)
}
defer func() { _ = rows.Close() }()
secureValues := make([]secretv1beta1.SecureValue, 0)
for rows.Next() {
row := secureValueDB{}
err = rows.Scan(&row.GUID,
&row.Name, &row.Namespace, &row.Annotations,
&row.Labels,
&row.Created, &row.CreatedBy,
&row.Updated, &row.UpdatedBy,
&row.Description, &row.Keeper, &row.Decrypters,
&row.Ref, &row.ExternalID, &row.Version, &row.Active,
&row.OwnerReferenceAPIGroup, &row.OwnerReferenceAPIVersion, &row.OwnerReferenceKind, &row.OwnerReferenceName,
)
if err != nil {
return nil, fmt.Errorf("error reading secure value row: %w", err)
}
if !row.Active {
return nil, fmt.Errorf("bug: read an inactive version: row=%+v", row)
}
if row.Namespace != namespace.String() {
return nil, fmt.Errorf("bug: expected to list secure values from namespace %+v but got one from namespace %+v", namespace.String(), row.Namespace)
}
secureValue, err := row.toKubernetes()
if err != nil {
return nil, fmt.Errorf("convert to kubernetes object: %w", err)
}
secureValues = append(secureValues, *secureValue)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("read rows error: %w", err)
}
return secureValues, nil
}
func (s *secureValueMetadataStorage) SetVersionToActive(ctx context.Context, namespace xkube.Namespace, name string, version int64) error {
ctx, span := s.tracer.Start(ctx, "SecureValueMetadataStorage.SetVersionToActive", trace.WithAttributes(
attribute.String("name", name),
attribute.String("namespace", namespace.String()),
attribute.Int64("version", version),
))
defer span.End()
req := secureValueSetVersionToActive{
SQLTemplate: sqltemplate.New(s.dialect),
Namespace: namespace.String(),
Name: name,
Version: version,
}
q, err := sqltemplate.Execute(sqlSecureValueSetVersionToActive, req)
if err != nil {
return fmt.Errorf("execute template %q: %w", sqlSecureValueSetVersionToActive.Name(), err)
}
res, err := s.db.ExecContext(ctx, q, req.GetArgs()...)
if err != nil {
return fmt.Errorf("setting secure value version to active: namespace=%+v name=%+v version=%+v %w", namespace, name, version, err)
}
// validate modified count
modifiedCount, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("fetching number of modified rows: %w", err)
}
if modifiedCount == 0 {
return fmt.Errorf("expected to modify at least one row but modified 0: modifiedCount=%d", modifiedCount)
}
return nil
}
func (s *secureValueMetadataStorage) SetVersionToInactive(ctx context.Context, namespace xkube.Namespace, name string, version int64) error {
ctx, span := s.tracer.Start(ctx, "SecureValueMetadataStorage.SetVersionToInactive", trace.WithAttributes(
attribute.String("name", name),
attribute.String("namespace", namespace.String()),
attribute.Int64("version", version),
))
defer span.End()
req := secureValueSetVersionToInactive{
SQLTemplate: sqltemplate.New(s.dialect),
Namespace: namespace.String(),
Name: name,
Version: version,
}
q, err := sqltemplate.Execute(sqlSecureValueSetVersionToInactive, req)
if err != nil {
return fmt.Errorf("execute template %q: %w", sqlSecureValueSetVersionToInactive.Name(), err)
}
res, err := s.db.ExecContext(ctx, q, req.GetArgs()...)
if err != nil {
return fmt.Errorf("setting secure value version to active: namespace=%+v name=%+v version=%+v %w", namespace, name, version, err)
}
modifiedCount, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("fetching number of modified rows: %w", err)
}
if modifiedCount > 1 {
return fmt.Errorf("expected to modify at at most one row but modified more: modifiedCount=%d", modifiedCount)
}
return nil
}
func (s *secureValueMetadataStorage) SetExternalID(ctx context.Context, namespace xkube.Namespace, name string, version int64, externalID contracts.ExternalID) (setExtIDErr error) {
start := s.clock.Now()
ctx, span := s.tracer.Start(ctx, "SecureValueMetadataStorage.SetExternalID", trace.WithAttributes(
attribute.String("name", name),
attribute.String("namespace", namespace.String()),
attribute.String("externalID", externalID.String()),
attribute.Int64("version", version),
))
defer span.End()
defer func() {
success := setExtIDErr == nil
args := []any{
"name", name,
"namespace", namespace.String(),
"success", success,
"version", strconv.FormatInt(version, 10),
"externalID", externalID.String(),
}
if !success {
span.SetStatus(codes.Error, "SecureValueMetadataStorage.SetExternalID failed")
span.RecordError(setExtIDErr)
args = append(args, "error", setExtIDErr)
}
logging.FromContext(ctx).Info("SecureValueMetadataStorage.SetExternalID", args...)
s.metrics.SecureValueSetExternalIDDuration.WithLabelValues(strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
}()
req := updateExternalIdSecureValue{
SQLTemplate: sqltemplate.New(s.dialect),
Namespace: namespace.String(),
Name: name,
Version: version,
ExternalID: externalID.String(),
}
q, err := sqltemplate.Execute(sqlSecureValueUpdateExternalId, req)
if err != nil {
return fmt.Errorf("execute template %q: %w", sqlSecureValueUpdateExternalId.Name(), err)
}
res, err := s.db.ExecContext(ctx, q, req.GetArgs()...)
if err != nil {
return fmt.Errorf("setting secure value external id: namespace=%+v name=%+v externalID=%+v %w", namespace, name, externalID, err)
}
// validate modified cound
modifiedCount, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("getting updated rows update external id secure value: %w", err)
}
if modifiedCount > 1 {
return fmt.Errorf("secureValueMetadataStorage.SetExternalID: modified more than one secret, this is a bug, check the where condition: modifiedCount=%d", modifiedCount)
}
return nil
}
func (s *secureValueMetadataStorage) Delete(ctx context.Context, namespace xkube.Namespace, name string, version int64) (err error) {
start := s.clock.Now()
ctx, span := s.tracer.Start(ctx, "SecureValueMetadataStorage.Delete", trace.WithAttributes(
attribute.String("name", name),
attribute.String("namespace", namespace.String()),
attribute.Int64("version", version),
))
defer span.End()
defer func() {
success := err == nil
args := []any{
"namespace", namespace.String(),
"name", name,
"version", strconv.FormatInt(version, 10),
"success", success,
}
if !success {
span.SetStatus(codes.Error, "SecureValueMetadataStorage.Delete failed")
span.RecordError(err)
args = append(args, "error", err)
}
logging.FromContext(ctx).Info("SecureValueMetadataStorage.Delete", args...)
s.metrics.SecureValueDeleteDuration.WithLabelValues(strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
}()
req := deleteSecureValue{
SQLTemplate: sqltemplate.New(s.dialect),
Namespace: namespace.String(),
Name: name,
Version: version,
}
q, err := sqltemplate.Execute(sqlSecureValueDelete, req)
if err != nil {
return fmt.Errorf("execute template %q: %w", sqlSecureValueDelete.Name(), err)
}
res, err := s.db.ExecContext(ctx, q, req.GetArgs()...)
if err != nil {
return fmt.Errorf("deleting secure value: namespace=%+v name=%+v version=%+v %w", namespace, name, version, err)
}
modifiedCount, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("getting rows affected: %w", err)
}
// Deleting is idempotent so modifiedCunt must be in {0, 1}
if modifiedCount > 1 {
return fmt.Errorf("secureValueMetadataStorage.Delete: delete more than one secret, this is a bug, check the where condition: modifiedCount=%d", modifiedCount)
}
return nil
}
func (s *secureValueMetadataStorage) LeaseInactiveSecureValues(ctx context.Context, maxBatchSize uint16) (out []secretv1beta1.SecureValue, err error) {
start := s.clock.Now()
ctx, span := s.tracer.Start(ctx, "SecureValueMetadataStorage.LeaseInactiveSecureValues", trace.WithAttributes(
attribute.Int("maxBatchSize", int(maxBatchSize)),
))
defer span.End()
defer func() {
success := err == nil
if !success {
span.SetStatus(codes.Error, "SecureValueMetadataStorage.LeaseInactiveSecureValues failed")
span.RecordError(err)
}
s.metrics.SecureValueDeleteDuration.WithLabelValues(strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
}()
leaseToken := uuid.NewString()
if err := s.acquireLeases(ctx, leaseToken, maxBatchSize); err != nil {
return nil, fmt.Errorf("acquiring leases for inactive secure values: %w", err)
}
secureValues, err := s.listByLeaseToken(ctx, leaseToken)
if err != nil {
return nil, fmt.Errorf("fetching secure values by lease token: %w", err)
}
return secureValues, nil
}
func (s *secureValueMetadataStorage) acquireLeases(ctx context.Context, leaseToken string, maxBatchSize uint16) error {
req := leaseInactiveSecureValues{
SQLTemplate: sqltemplate.New(s.dialect),
LeaseToken: leaseToken,
MaxBatchSize: maxBatchSize,
MinAge: int64((300 * time.Second).Seconds()),
LeaseTTL: int64((30 * time.Second).Seconds()),
Now: s.clock.Now().UTC().Unix(),
}
q, err := sqltemplate.Execute(sqlSecureValueLeaseInactive, req)
if err != nil {
return fmt.Errorf("execute template %q: %w", sqlSecureValueLeaseInactive.Name(), err)
}
if _, err := s.db.ExecContext(ctx, q, req.GetArgs()...); err != nil {
return fmt.Errorf("leasing inactive secure values: %w", err)
}
return nil
}
func (s *secureValueMetadataStorage) listByLeaseToken(ctx context.Context, leaseToken string) ([]secretv1beta1.SecureValue, error) {
req := listSecureValuesByLeaseToken{
SQLTemplate: sqltemplate.New(s.dialect),
LeaseToken: leaseToken,
}
q, err := sqltemplate.Execute(sqlSecureValueListByLeaseToken, req)
if err != nil {
return nil, fmt.Errorf("execute template %q: %w", sqlSecureValueListByLeaseToken.Name(), err)
}
rows, err := s.db.QueryContext(ctx, q, req.GetArgs()...)
if err != nil {
return nil, fmt.Errorf("listing secure values: %w", err)
}
defer func() { _ = rows.Close() }()
secureValues := make([]secretv1beta1.SecureValue, 0)
for rows.Next() {
row := secureValueDB{}
var leaseTokenDB string
err = rows.Scan(&row.GUID,
&row.Name, &row.Namespace, &row.Annotations,
&row.Labels,
&row.Created, &row.CreatedBy,
&row.Updated, &row.UpdatedBy,
&row.Description, &row.Keeper, &row.Decrypters,
&row.Ref, &row.ExternalID, &row.Version, &row.Active,
&row.OwnerReferenceAPIGroup, &row.OwnerReferenceAPIVersion, &row.OwnerReferenceKind, &row.OwnerReferenceName,
&leaseTokenDB,
)
if err != nil {
return nil, fmt.Errorf("error reading secure value row: %w", err)
}
if leaseTokenDB != leaseToken {
return nil, fmt.Errorf("bug: expected to list secure values with lease token %+v but got a secure value with another lease token %+v", leaseToken, leaseToken)
}
secureValue, err := row.toKubernetes()
if err != nil {
return nil, fmt.Errorf("convert to kubernetes object: %w", err)
}
secureValues = append(secureValues, *secureValue)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("read rows error: %w", err)
}
return secureValues, nil
}