Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 79a61a2b63 | |||
| dc4c106e91 |
@@ -181,6 +181,7 @@ require (
|
||||
github.com/xlab/treeprint v1.2.0 // @grafana/observability-traces-and-profiling
|
||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // @grafana/grafana-operator-experience-squad
|
||||
github.com/yudai/gojsondiff v1.0.0 // @grafana/grafana-backend-group
|
||||
go.etcd.io/bbolt v1.4.2 // @grafana/grafana-search-and-storage
|
||||
go.opentelemetry.io/collector/pdata v1.44.0 // @grafana/grafana-backend-group
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.64.0 // @grafana/plugins-platform-backend
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.63.0 // @grafana/grafana-operator-experience-squad
|
||||
@@ -603,7 +604,6 @@ require (
|
||||
github.com/yuin/gopher-lua v1.1.1 // indirect
|
||||
github.com/zclconf/go-cty v1.16.3 // indirect
|
||||
github.com/zeebo/xxh3 v1.0.2 // indirect
|
||||
go.etcd.io/bbolt v1.4.2 // indirect
|
||||
go.etcd.io/etcd/api/v3 v3.6.6 // indirect
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.6.6 // indirect
|
||||
go.etcd.io/etcd/client/v3 v3.6.6 // indirect
|
||||
|
||||
@@ -89,7 +89,6 @@ func RegisterAPIService(
|
||||
}
|
||||
|
||||
builder, err = NewDataSourceAPIBuilder(
|
||||
pluginJSON.ID+".datasource.grafana.app",
|
||||
pluginJSON,
|
||||
client,
|
||||
datasources.GetDatasourceProvider(pluginJSON),
|
||||
@@ -120,7 +119,6 @@ type PluginClient interface {
|
||||
}
|
||||
|
||||
func NewDataSourceAPIBuilder(
|
||||
groupName string,
|
||||
plugin plugins.JSONData,
|
||||
client PluginClient,
|
||||
datasources PluginDatasourceProvider,
|
||||
@@ -129,13 +127,13 @@ func NewDataSourceAPIBuilder(
|
||||
loadQueryTypes bool,
|
||||
configCrudUseNewApis bool,
|
||||
) (*DataSourceAPIBuilder, error) {
|
||||
// group, err := plugins.GetDatasourceGroupNameFromPluginID(plugin.ID)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
var err error
|
||||
group, err := plugins.GetDatasourceGroupNameFromPluginID(plugin.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
builder := &DataSourceAPIBuilder{
|
||||
datasourceResourceInfo: datasourceV0.DataSourceResourceInfo.WithGroupAndShortName(groupName, plugin.ID),
|
||||
datasourceResourceInfo: datasourceV0.DataSourceResourceInfo.WithGroupAndShortName(group, plugin.ID),
|
||||
pluginJSON: plugin,
|
||||
client: client,
|
||||
datasources: datasources,
|
||||
@@ -145,7 +143,7 @@ func NewDataSourceAPIBuilder(
|
||||
}
|
||||
if loadQueryTypes {
|
||||
// In the future, this will somehow come from the plugin
|
||||
builder.queryTypes, err = getHardcodedQueryTypes(groupName)
|
||||
builder.queryTypes, err = getHardcodedQueryTypes(group)
|
||||
}
|
||||
return builder, err
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
bleveSearch "github.com/blevesearch/bleve/v2/search/searcher"
|
||||
index "github.com/blevesearch/bleve_index_api"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
bolterrors "go.etcd.io/bbolt/errors"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.uber.org/atomic"
|
||||
@@ -44,6 +45,7 @@ import (
|
||||
const (
|
||||
indexStorageMemory = "memory"
|
||||
indexStorageFile = "file"
|
||||
boltTimeout = "500ms"
|
||||
)
|
||||
|
||||
// Keys used to store internal data in index.
|
||||
@@ -415,14 +417,25 @@ func (b *bleveBackend) BuildIndex(
|
||||
// This happens on startup, or when memory-based index has expired. (We don't expire file-based indexes)
|
||||
// If we do have an unexpired cached index already, we always build a new index from scratch.
|
||||
if cachedIndex == nil && !rebuild {
|
||||
index, fileIndexName, indexRV = b.findPreviousFileBasedIndex(resourceDir)
|
||||
result := b.findPreviousFileBasedIndex(resourceDir)
|
||||
if result != nil && result.IsOpen {
|
||||
// Index file exists but is opened by another process, fallback to memory.
|
||||
// Keep the name so we can skip cleanup of that directory.
|
||||
newIndexType = indexStorageMemory
|
||||
fileIndexName = result.Name
|
||||
} else if result != nil && result.Index != nil {
|
||||
// Found and opened existing index successfully
|
||||
index = result.Index
|
||||
fileIndexName = result.Name
|
||||
indexRV = result.RV
|
||||
}
|
||||
}
|
||||
|
||||
if index != nil {
|
||||
if newIndexType == indexStorageFile && index != nil {
|
||||
build = false
|
||||
logWithDetails.Debug("Existing index found on filesystem", "indexRV", indexRV, "directory", filepath.Join(resourceDir, fileIndexName))
|
||||
defer closeIndexOnExit(index, "") // Close index, but don't delete directory.
|
||||
} else {
|
||||
} else if newIndexType == indexStorageFile {
|
||||
// Building index from scratch. Index name has a time component in it to be unique, but if
|
||||
// we happen to create non-unique name, we bump the time and try again.
|
||||
|
||||
@@ -449,7 +462,9 @@ func (b *bleveBackend) BuildIndex(
|
||||
logWithDetails.Info("Building index using filesystem", "directory", indexDir)
|
||||
defer closeIndexOnExit(index, indexDir) // Close index, and delete new index directory.
|
||||
}
|
||||
} else {
|
||||
}
|
||||
|
||||
if newIndexType == indexStorageMemory {
|
||||
index, err = newBleveIndex("", mapper, time.Now(), b.opts.BuildVersion)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating new in-memory bleve index: %w", err)
|
||||
@@ -552,30 +567,30 @@ func cleanFileSegment(input string) string {
|
||||
return input
|
||||
}
|
||||
|
||||
// cleanOldIndexes deletes all subdirectories inside dir, skipping directory with "skipName".
|
||||
// cleanOldIndexes deletes all subdirectories inside resourceDir, skipping directory with "skipName".
|
||||
// "skipName" can be empty.
|
||||
func (b *bleveBackend) cleanOldIndexes(dir string, skipName string) {
|
||||
files, err := os.ReadDir(dir)
|
||||
func (b *bleveBackend) cleanOldIndexes(resourceDir string, skipName string) {
|
||||
entries, err := os.ReadDir(resourceDir)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return
|
||||
}
|
||||
b.log.Warn("error cleaning folders from", "directory", dir, "error", err)
|
||||
b.log.Warn("error cleaning folders from", "directory", resourceDir, "error", err)
|
||||
return
|
||||
}
|
||||
for _, file := range files {
|
||||
if file.IsDir() && file.Name() != skipName {
|
||||
fpath := filepath.Join(dir, file.Name())
|
||||
if !isPathWithinRoot(fpath, b.opts.Root) {
|
||||
b.log.Warn("Skipping cleanup of directory", "directory", fpath)
|
||||
for _, ent := range entries {
|
||||
if ent.IsDir() && ent.Name() != skipName {
|
||||
indexDir := filepath.Join(resourceDir, ent.Name())
|
||||
if !isPathWithinRoot(indexDir, b.opts.Root) {
|
||||
b.log.Warn("Skipping cleanup of directory", "directory", indexDir)
|
||||
continue
|
||||
}
|
||||
|
||||
err = os.RemoveAll(fpath)
|
||||
err = os.RemoveAll(indexDir)
|
||||
if err != nil {
|
||||
b.log.Error("Unable to remove old index folder", "directory", fpath, "error", err)
|
||||
b.log.Error("Unable to remove old index folder", "directory", indexDir, "error", err)
|
||||
} else {
|
||||
b.log.Info("Removed old index folder", "directory", fpath)
|
||||
b.log.Info("Removed old index folder", "directory", indexDir)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -622,10 +637,17 @@ func formatIndexName(now time.Time) string {
|
||||
return now.Format("20060102-150405")
|
||||
}
|
||||
|
||||
func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string) (bleve.Index, string, int64) {
|
||||
type fileIndex struct {
|
||||
Index bleve.Index
|
||||
Name string
|
||||
RV int64
|
||||
IsOpen bool
|
||||
}
|
||||
|
||||
func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string) *fileIndex {
|
||||
entries, err := os.ReadDir(resourceDir)
|
||||
if err != nil {
|
||||
return nil, "", 0
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, ent := range entries {
|
||||
@@ -635,8 +657,13 @@ func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string) (bleve.Ind
|
||||
|
||||
indexName := ent.Name()
|
||||
indexDir := filepath.Join(resourceDir, indexName)
|
||||
idx, err := bleve.Open(indexDir)
|
||||
|
||||
idx, err := bleve.OpenUsing(indexDir, map[string]interface{}{"bolt_timeout": boltTimeout})
|
||||
if err != nil {
|
||||
if errors.Is(err, bolterrors.ErrTimeout) {
|
||||
b.log.Debug("Index is opened by another process (timeout), skipping", "indexDir", indexDir)
|
||||
return &fileIndex{Name: indexName, IsOpen: true}
|
||||
}
|
||||
b.log.Debug("error opening index", "indexDir", indexDir, "err", err)
|
||||
continue
|
||||
}
|
||||
@@ -648,10 +675,14 @@ func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string) (bleve.Ind
|
||||
continue
|
||||
}
|
||||
|
||||
return idx, indexName, indexRV
|
||||
return &fileIndex{
|
||||
Index: idx,
|
||||
Name: indexName,
|
||||
RV: indexRV,
|
||||
}
|
||||
}
|
||||
|
||||
return nil, "", 0
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop closes all indexes and stops background tasks.
|
||||
|
||||
@@ -1583,3 +1583,76 @@ func docCount(t *testing.T, idx resource.ResourceIndex) int {
|
||||
require.NoError(t, err)
|
||||
return int(cnt)
|
||||
}
|
||||
|
||||
func TestBleveBackendFallsBackToMemory(t *testing.T) {
|
||||
ns := resource.NamespacedResource{
|
||||
Namespace: "test",
|
||||
Group: "group",
|
||||
Resource: "resource",
|
||||
}
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
// First, create a file-based index with one backend and keep it open
|
||||
backend1, reg1 := setupBleveBackend(t, withRootDir(tmpDir))
|
||||
index1, err := backend1.BuildIndex(context.Background(), ns, 100 /* file based */, nil, "test", indexTestDocs(ns, 10, 100), nil, false)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, index1)
|
||||
|
||||
// Verify first index is file-based
|
||||
bleveIdx1, ok := index1.(*bleveIndex)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, indexStorageFile, bleveIdx1.indexStorage)
|
||||
checkOpenIndexes(t, reg1, 0, 1)
|
||||
|
||||
// Now create a second backend using the same directory
|
||||
// This simulates another instance trying to open the same index
|
||||
backend2, reg2 := setupBleveBackend(t, withRootDir(tmpDir))
|
||||
|
||||
// BuildIndex should detect the file is locked and fallback to memory
|
||||
index2, err := backend2.BuildIndex(context.Background(), ns, 100 /* file based */, nil, "test", indexTestDocs(ns, 10, 100), nil, false)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, index2)
|
||||
|
||||
// Verify second index fell back to in-memory despite size being above file threshold
|
||||
bleveIdx2, ok := index2.(*bleveIndex)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, indexStorageMemory, bleveIdx2.indexStorage)
|
||||
|
||||
// Verify metrics show 1 memory index and 0 file indexes for backend2
|
||||
checkOpenIndexes(t, reg2, 1, 0)
|
||||
|
||||
// Verify the in-memory index works correctly
|
||||
require.Equal(t, 10, docCount(t, index2))
|
||||
|
||||
// Clean up: close first backend to release the file lock
|
||||
backend1.Stop()
|
||||
}
|
||||
|
||||
func TestBleveSkipCleanOldIndexesOnMemoryFallback(t *testing.T) {
|
||||
ns := resource.NamespacedResource{
|
||||
Namespace: "test",
|
||||
Group: "group",
|
||||
Resource: "resource",
|
||||
}
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
backend1, _ := setupBleveBackend(t, withRootDir(tmpDir))
|
||||
_, err := backend1.BuildIndex(context.Background(), ns, 100 /* file based */, nil, "test", indexTestDocs(ns, 10, 100), nil, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Now create a second backend using the same directory
|
||||
// This simulates another instance trying to open the same index
|
||||
backend2, _ := setupBleveBackend(t, withRootDir(tmpDir))
|
||||
|
||||
// BuildIndex should detect the file is locked and fallback to memory
|
||||
_, err = backend2.BuildIndex(context.Background(), ns, 100 /* file based */, nil, "test", indexTestDocs(ns, 10, 100), nil, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify that the index directory still exists (i.e., cleanOldIndexes was skipped)
|
||||
verifyDirEntriesCount(t, backend2.getResourceDir(ns), 1)
|
||||
|
||||
// Clean up: close first backend to release the file lock
|
||||
backend1.Stop()
|
||||
}
|
||||
|
||||
@@ -33,11 +33,11 @@ func (ds *DataSource) parseResponse(ctx context.Context, metricDataOutputs []*cl
|
||||
dataRes := backend.DataResponse{}
|
||||
|
||||
if response.HasArithmeticError {
|
||||
dataRes.Error = fmt.Errorf("ArithmeticError in query %q: %s", queryRow.RefId, response.ArithmeticErrorMessage)
|
||||
dataRes.Error = backend.DownstreamErrorf("ArithmeticError in query %q: %s", queryRow.RefId, response.ArithmeticErrorMessage)
|
||||
}
|
||||
|
||||
if response.HasPermissionError {
|
||||
dataRes.Error = fmt.Errorf("PermissionError in query %q: %s", queryRow.RefId, response.PermissionErrorMessage)
|
||||
dataRes.Error = backend.DownstreamErrorf("PermissionError in query %q: %s", queryRow.RefId, response.PermissionErrorMessage)
|
||||
}
|
||||
|
||||
var err error
|
||||
|
||||
Reference in New Issue
Block a user