Compare commits

..

2 Commits

Author SHA1 Message Date
Nathan Verzemnieks 79a61a2b63 CloudWatch: fix error source for some query errors 2026-01-02 14:57:50 +01:00
Mustafa Sencer Özcan dc4c106e91 fix: use memory index if index file already open (#115720)
* feat: add lock structure into bleve index files

* fix: another approach

* fix: new check

* fix: build in memory if index file already open

* fix: update workspace

* fix: add test

* refactor: update func signature

* fix: address comments

* fix: make const
2026-01-02 13:51:51 +01:00
5 changed files with 135 additions and 33 deletions
+1 -1
View File
@@ -181,6 +181,7 @@ require (
github.com/xlab/treeprint v1.2.0 // @grafana/observability-traces-and-profiling
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // @grafana/grafana-operator-experience-squad
github.com/yudai/gojsondiff v1.0.0 // @grafana/grafana-backend-group
go.etcd.io/bbolt v1.4.2 // @grafana/grafana-search-and-storage
go.opentelemetry.io/collector/pdata v1.44.0 // @grafana/grafana-backend-group
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.64.0 // @grafana/plugins-platform-backend
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.63.0 // @grafana/grafana-operator-experience-squad
@@ -603,7 +604,6 @@ require (
github.com/yuin/gopher-lua v1.1.1 // indirect
github.com/zclconf/go-cty v1.16.3 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.etcd.io/bbolt v1.4.2 // indirect
go.etcd.io/etcd/api/v3 v3.6.6 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.6.6 // indirect
go.etcd.io/etcd/client/v3 v3.6.6 // indirect
+7 -9
View File
@@ -89,7 +89,6 @@ func RegisterAPIService(
}
builder, err = NewDataSourceAPIBuilder(
pluginJSON.ID+".datasource.grafana.app",
pluginJSON,
client,
datasources.GetDatasourceProvider(pluginJSON),
@@ -120,7 +119,6 @@ type PluginClient interface {
}
func NewDataSourceAPIBuilder(
groupName string,
plugin plugins.JSONData,
client PluginClient,
datasources PluginDatasourceProvider,
@@ -129,13 +127,13 @@ func NewDataSourceAPIBuilder(
loadQueryTypes bool,
configCrudUseNewApis bool,
) (*DataSourceAPIBuilder, error) {
// group, err := plugins.GetDatasourceGroupNameFromPluginID(plugin.ID)
// if err != nil {
// return nil, err
// }
var err error
group, err := plugins.GetDatasourceGroupNameFromPluginID(plugin.ID)
if err != nil {
return nil, err
}
builder := &DataSourceAPIBuilder{
datasourceResourceInfo: datasourceV0.DataSourceResourceInfo.WithGroupAndShortName(groupName, plugin.ID),
datasourceResourceInfo: datasourceV0.DataSourceResourceInfo.WithGroupAndShortName(group, plugin.ID),
pluginJSON: plugin,
client: client,
datasources: datasources,
@@ -145,7 +143,7 @@ func NewDataSourceAPIBuilder(
}
if loadQueryTypes {
// In the future, this will somehow come from the plugin
builder.queryTypes, err = getHardcodedQueryTypes(groupName)
builder.queryTypes, err = getHardcodedQueryTypes(group)
}
return builder, err
}
+52 -21
View File
@@ -25,6 +25,7 @@ import (
bleveSearch "github.com/blevesearch/bleve/v2/search/searcher"
index "github.com/blevesearch/bleve_index_api"
"github.com/prometheus/client_golang/prometheus"
bolterrors "go.etcd.io/bbolt/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/atomic"
@@ -44,6 +45,7 @@ import (
const (
indexStorageMemory = "memory"
indexStorageFile = "file"
boltTimeout = "500ms"
)
// Keys used to store internal data in index.
@@ -415,14 +417,25 @@ func (b *bleveBackend) BuildIndex(
// This happens on startup, or when memory-based index has expired. (We don't expire file-based indexes)
// If we do have an unexpired cached index already, we always build a new index from scratch.
if cachedIndex == nil && !rebuild {
index, fileIndexName, indexRV = b.findPreviousFileBasedIndex(resourceDir)
result := b.findPreviousFileBasedIndex(resourceDir)
if result != nil && result.IsOpen {
// Index file exists but is opened by another process, fallback to memory.
// Keep the name so we can skip cleanup of that directory.
newIndexType = indexStorageMemory
fileIndexName = result.Name
} else if result != nil && result.Index != nil {
// Found and opened existing index successfully
index = result.Index
fileIndexName = result.Name
indexRV = result.RV
}
}
if index != nil {
if newIndexType == indexStorageFile && index != nil {
build = false
logWithDetails.Debug("Existing index found on filesystem", "indexRV", indexRV, "directory", filepath.Join(resourceDir, fileIndexName))
defer closeIndexOnExit(index, "") // Close index, but don't delete directory.
} else {
} else if newIndexType == indexStorageFile {
// Building index from scratch. Index name has a time component in it to be unique, but if
// we happen to create non-unique name, we bump the time and try again.
@@ -449,7 +462,9 @@ func (b *bleveBackend) BuildIndex(
logWithDetails.Info("Building index using filesystem", "directory", indexDir)
defer closeIndexOnExit(index, indexDir) // Close index, and delete new index directory.
}
} else {
}
if newIndexType == indexStorageMemory {
index, err = newBleveIndex("", mapper, time.Now(), b.opts.BuildVersion)
if err != nil {
return nil, fmt.Errorf("error creating new in-memory bleve index: %w", err)
@@ -552,30 +567,30 @@ func cleanFileSegment(input string) string {
return input
}
// cleanOldIndexes deletes all subdirectories inside dir, skipping directory with "skipName".
// cleanOldIndexes deletes all subdirectories inside resourceDir, skipping directory with "skipName".
// "skipName" can be empty.
func (b *bleveBackend) cleanOldIndexes(dir string, skipName string) {
files, err := os.ReadDir(dir)
func (b *bleveBackend) cleanOldIndexes(resourceDir string, skipName string) {
entries, err := os.ReadDir(resourceDir)
if err != nil {
if os.IsNotExist(err) {
return
}
b.log.Warn("error cleaning folders from", "directory", dir, "error", err)
b.log.Warn("error cleaning folders from", "directory", resourceDir, "error", err)
return
}
for _, file := range files {
if file.IsDir() && file.Name() != skipName {
fpath := filepath.Join(dir, file.Name())
if !isPathWithinRoot(fpath, b.opts.Root) {
b.log.Warn("Skipping cleanup of directory", "directory", fpath)
for _, ent := range entries {
if ent.IsDir() && ent.Name() != skipName {
indexDir := filepath.Join(resourceDir, ent.Name())
if !isPathWithinRoot(indexDir, b.opts.Root) {
b.log.Warn("Skipping cleanup of directory", "directory", indexDir)
continue
}
err = os.RemoveAll(fpath)
err = os.RemoveAll(indexDir)
if err != nil {
b.log.Error("Unable to remove old index folder", "directory", fpath, "error", err)
b.log.Error("Unable to remove old index folder", "directory", indexDir, "error", err)
} else {
b.log.Info("Removed old index folder", "directory", fpath)
b.log.Info("Removed old index folder", "directory", indexDir)
}
}
}
@@ -622,10 +637,17 @@ func formatIndexName(now time.Time) string {
return now.Format("20060102-150405")
}
func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string) (bleve.Index, string, int64) {
type fileIndex struct {
Index bleve.Index
Name string
RV int64
IsOpen bool
}
func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string) *fileIndex {
entries, err := os.ReadDir(resourceDir)
if err != nil {
return nil, "", 0
return nil
}
for _, ent := range entries {
@@ -635,8 +657,13 @@ func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string) (bleve.Ind
indexName := ent.Name()
indexDir := filepath.Join(resourceDir, indexName)
idx, err := bleve.Open(indexDir)
idx, err := bleve.OpenUsing(indexDir, map[string]interface{}{"bolt_timeout": boltTimeout})
if err != nil {
if errors.Is(err, bolterrors.ErrTimeout) {
b.log.Debug("Index is opened by another process (timeout), skipping", "indexDir", indexDir)
return &fileIndex{Name: indexName, IsOpen: true}
}
b.log.Debug("error opening index", "indexDir", indexDir, "err", err)
continue
}
@@ -648,10 +675,14 @@ func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string) (bleve.Ind
continue
}
return idx, indexName, indexRV
return &fileIndex{
Index: idx,
Name: indexName,
RV: indexRV,
}
}
return nil, "", 0
return nil
}
// Stop closes all indexes and stops background tasks.
+73
View File
@@ -1583,3 +1583,76 @@ func docCount(t *testing.T, idx resource.ResourceIndex) int {
require.NoError(t, err)
return int(cnt)
}
func TestBleveBackendFallsBackToMemory(t *testing.T) {
ns := resource.NamespacedResource{
Namespace: "test",
Group: "group",
Resource: "resource",
}
tmpDir := t.TempDir()
// First, create a file-based index with one backend and keep it open
backend1, reg1 := setupBleveBackend(t, withRootDir(tmpDir))
index1, err := backend1.BuildIndex(context.Background(), ns, 100 /* file based */, nil, "test", indexTestDocs(ns, 10, 100), nil, false)
require.NoError(t, err)
require.NotNil(t, index1)
// Verify first index is file-based
bleveIdx1, ok := index1.(*bleveIndex)
require.True(t, ok)
require.Equal(t, indexStorageFile, bleveIdx1.indexStorage)
checkOpenIndexes(t, reg1, 0, 1)
// Now create a second backend using the same directory
// This simulates another instance trying to open the same index
backend2, reg2 := setupBleveBackend(t, withRootDir(tmpDir))
// BuildIndex should detect the file is locked and fallback to memory
index2, err := backend2.BuildIndex(context.Background(), ns, 100 /* file based */, nil, "test", indexTestDocs(ns, 10, 100), nil, false)
require.NoError(t, err)
require.NotNil(t, index2)
// Verify second index fell back to in-memory despite size being above file threshold
bleveIdx2, ok := index2.(*bleveIndex)
require.True(t, ok)
require.Equal(t, indexStorageMemory, bleveIdx2.indexStorage)
// Verify metrics show 1 memory index and 0 file indexes for backend2
checkOpenIndexes(t, reg2, 1, 0)
// Verify the in-memory index works correctly
require.Equal(t, 10, docCount(t, index2))
// Clean up: close first backend to release the file lock
backend1.Stop()
}
func TestBleveSkipCleanOldIndexesOnMemoryFallback(t *testing.T) {
ns := resource.NamespacedResource{
Namespace: "test",
Group: "group",
Resource: "resource",
}
tmpDir := t.TempDir()
backend1, _ := setupBleveBackend(t, withRootDir(tmpDir))
_, err := backend1.BuildIndex(context.Background(), ns, 100 /* file based */, nil, "test", indexTestDocs(ns, 10, 100), nil, false)
require.NoError(t, err)
// Now create a second backend using the same directory
// This simulates another instance trying to open the same index
backend2, _ := setupBleveBackend(t, withRootDir(tmpDir))
// BuildIndex should detect the file is locked and fallback to memory
_, err = backend2.BuildIndex(context.Background(), ns, 100 /* file based */, nil, "test", indexTestDocs(ns, 10, 100), nil, false)
require.NoError(t, err)
// Verify that the index directory still exists (i.e., cleanOldIndexes was skipped)
verifyDirEntriesCount(t, backend2.getResourceDir(ns), 1)
// Clean up: close first backend to release the file lock
backend1.Stop()
}
+2 -2
View File
@@ -33,11 +33,11 @@ func (ds *DataSource) parseResponse(ctx context.Context, metricDataOutputs []*cl
dataRes := backend.DataResponse{}
if response.HasArithmeticError {
dataRes.Error = fmt.Errorf("ArithmeticError in query %q: %s", queryRow.RefId, response.ArithmeticErrorMessage)
dataRes.Error = backend.DownstreamErrorf("ArithmeticError in query %q: %s", queryRow.RefId, response.ArithmeticErrorMessage)
}
if response.HasPermissionError {
dataRes.Error = fmt.Errorf("PermissionError in query %q: %s", queryRow.RefId, response.PermissionErrorMessage)
dataRes.Error = backend.DownstreamErrorf("PermissionError in query %q: %s", queryRow.RefId, response.PermissionErrorMessage)
}
var err error