From d4dbaa08a03a05757a2546df34bc372f86f3305d Mon Sep 17 00:00:00 2001 From: Sonui Date: Tue, 25 Feb 2025 18:18:26 +0800 Subject: [PATCH 1/4] feat(plugin): add key-value storage support for plugins --- internal/entity/plugin_kv_storage_entity.go | 32 ++ internal/migrations/init_data.go | 1 + internal/migrations/migrations.go | 1 + internal/migrations/v25.go | 31 ++ .../plugin_common/plugin_common_service.go | 5 + plugin/kv_storage.go | 344 ++++++++++++++++++ plugin/plugin.go | 12 + 7 files changed, 426 insertions(+) create mode 100644 internal/entity/plugin_kv_storage_entity.go create mode 100644 internal/migrations/v25.go create mode 100644 plugin/kv_storage.go diff --git a/internal/entity/plugin_kv_storage_entity.go b/internal/entity/plugin_kv_storage_entity.go new file mode 100644 index 000000000..c7e6efbe3 --- /dev/null +++ b/internal/entity/plugin_kv_storage_entity.go @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package entity + +type PluginKVStorage struct { + ID int `xorm:"not null pk autoincr INT(11) id"` + PluginSlugName string `xorm:"not null VARCHAR(128) UNIQUE(uk_psg) plugin_slug_name"` + Group string `xorm:"not null VARCHAR(128) UNIQUE(uk_psg) 'group'"` + Key string `xorm:"not null VARCHAR(128) UNIQUE(uk_psg) 'key'"` + Value string `xorm:"not null TEXT value"` +} + +func (PluginKVStorage) TableName() string { + return "plugin_kv_storage" +} diff --git a/internal/migrations/init_data.go b/internal/migrations/init_data.go index 7322f7388..8b853c4dc 100644 --- a/internal/migrations/init_data.go +++ b/internal/migrations/init_data.go @@ -74,6 +74,7 @@ var ( &entity.Badge{}, &entity.BadgeGroup{}, &entity.BadgeAward{}, + &entity.PluginKVStorage{}, } roles = []*entity.Role{ diff --git a/internal/migrations/migrations.go b/internal/migrations/migrations.go index bc2ecda70..f14895b9b 100644 --- a/internal/migrations/migrations.go +++ b/internal/migrations/migrations.go @@ -100,6 +100,7 @@ var migrations = []Migration{ NewMigration("v1.4.0", "add badge/badge_group/badge_award table", addBadges, true), NewMigration("v1.4.1", "add question link", addQuestionLink, true), NewMigration("v1.4.2", "add the number of question links", addQuestionLinkedCount, true), + NewMigration("v1.4.3", "add plugin kv storage", addPluginKVStorage, true), } func GetMigrations() []Migration { diff --git a/internal/migrations/v25.go b/internal/migrations/v25.go new file mode 100644 index 000000000..008a094a4 --- /dev/null +++ b/internal/migrations/v25.go @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package migrations + +import ( + "context" + + "github.com/apache/answer/internal/entity" + "xorm.io/xorm" +) + +func addPluginKVStorage(ctx context.Context, x *xorm.Engine) error { + return x.Context(ctx).Sync(new(entity.PluginKVStorage)) +} diff --git a/internal/service/plugin_common/plugin_common_service.go b/internal/service/plugin_common/plugin_common_service.go index c1b0ad442..de6d981d5 100644 --- a/internal/service/plugin_common/plugin_common_service.go +++ b/internal/service/plugin_common/plugin_common_service.go @@ -135,6 +135,11 @@ func (ps *PluginCommonService) GetUserPluginConfig(ctx context.Context, req *sch } func (ps *PluginCommonService) initPluginData() { + plugin.SetKVStorageDB(&plugin.Data{ + DB: ps.data.DB, + Cache: ps.data.Cache, + }) + // init plugin status pluginStatus, err := ps.configService.GetStringValue(context.TODO(), constant.PluginStatus) if err != nil { diff --git a/plugin/kv_storage.go b/plugin/kv_storage.go new file mode 100644 index 000000000..9714487fe --- /dev/null +++ b/plugin/kv_storage.go @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package plugin + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "time" + + "github.com/apache/answer/internal/entity" + "github.com/segmentfault/pacman/log" + "xorm.io/builder" + "xorm.io/xorm" +) + +// define error +var ( + ErrKVKeyNotFound = fmt.Errorf("key not found in KV storage") + ErrKVGroupEmpty = fmt.Errorf("group name is empty") + ErrKVKeyEmpty = fmt.Errorf("key name is empty") + ErrKVKeyAndGroupEmpty = fmt.Errorf("both key and group are empty") + ErrKVTransactionFailed = fmt.Errorf("KV storage transaction failed") + ErrKVDataNotInitialized = fmt.Errorf("KV storage data not initialized") + ErrKVDBNotInitialized = fmt.Errorf("KV storage database connection not initialized") +) + +type KVOperator struct { + data *Data + session *xorm.Session + pluginSlugName string +} + +func (kv *KVOperator) checkDB() error { + if kv.data == nil { + return ErrKVDataNotInitialized + } + if kv.data.DB == nil { + return ErrKVDBNotInitialized + } + return nil +} + +func (kv *KVOperator) getSession(ctx context.Context) (session *xorm.Session, close func()) { + if kv.session != nil { + session = kv.session + } else { + session = kv.data.DB.NewSession().Context(ctx) + close = func() { + if session != nil { + session.Close() + } + } + } + return +} + +func (kv *KVOperator) getCacheTTL() time.Duration { + return 30*time.Minute + time.Duration(rand.Intn(300))*time.Second +} + +func (kv *KVOperator) getCacheKey(group, key string) string { + if group == "" { + return fmt.Sprintf("plugin_kv_storage:%s:key:%s", kv.pluginSlugName, key) + } + if key == "" { + return fmt.Sprintf("plugin_kv_storage:%s:group:%s", kv.pluginSlugName, group) + } + return fmt.Sprintf("plugin_kv_storage:%s:group:%s:key:%s", kv.pluginSlugName, group, key) +} + +func (kv *KVOperator) Get(ctx context.Context, group, key string) (string, error) { + // validate + if err := kv.checkDB(); err != nil { + return "", err + } + if key == "" { + return "", ErrKVKeyEmpty + } + + cacheKey := kv.getCacheKey(group, key) + if value, exist, err := kv.data.Cache.GetString(ctx, cacheKey); err == nil && exist { + return value, nil + } + + // query + data := entity.PluginKVStorage{} + query, close := kv.getSession(ctx) + defer close() + + query.Where(builder.Eq{ + "plugin_slug_name": kv.pluginSlugName, + "`group`": group, + "`key`": key, + }) + + has, err := query.Get(&data) + if err != nil { + return "", err + } + if !has { + return "", ErrKVKeyNotFound + } + + if err := kv.data.Cache.SetString(ctx, cacheKey, data.Value, kv.getCacheTTL()); err != nil { + log.Error(err) + } + + return data.Value, nil +} + +func (kv *KVOperator) Set(ctx context.Context, group, key, value string) error { + if err := kv.checkDB(); err != nil { + return err + } + + if key == "" { + return ErrKVKeyEmpty + } + + query, close := kv.getSession(ctx) + if close != nil { + defer close() + } + + data := &entity.PluginKVStorage{ + PluginSlugName: kv.pluginSlugName, + Group: group, + Key: key, + Value: value, + } + + kv.cleanCache(ctx, group, key) + + affected, err := query.Where(builder.Eq{ + "plugin_slug_name": kv.pluginSlugName, + "`group`": group, + "`key`": key, + }).Cols("value").Update(data) + if err != nil { + return err + } + + if affected == 0 { + _, err = query.Insert(data) + if err != nil { + return err + } + } + return nil +} + +func (kv *KVOperator) Del(ctx context.Context, group, key string) error { + if err := kv.checkDB(); err != nil { + return err + } + + if key == "" && group == "" { + return ErrKVKeyAndGroupEmpty + } + + kv.cleanCache(ctx, group, key) + + session, close := kv.getSession(ctx) + defer close() + + session.Where(builder.Eq{ + "plugin_slug_name": kv.pluginSlugName, + }) + if group != "" { + session.Where(builder.Eq{"`group`": group}) + } + if key != "" { + session.Where(builder.Eq{"`key`": key}) + } + + _, err := session.Delete(&entity.PluginKVStorage{}) + return err +} + +func (kv *KVOperator) cleanCache(ctx context.Context, group, key string) { + if key != "" { + if err := kv.data.Cache.Del(ctx, kv.getCacheKey("", key)); err != nil { + log.Warnf("Failed to delete cache for key %s: %v", key, err) + } + + if group != "" { + if err := kv.data.Cache.Del(ctx, kv.getCacheKey(group, key)); err != nil { + log.Warnf("Failed to delete cache for group %s, key %s: %v", group, key, err) + } + } + } + + if group != "" { + if err := kv.data.Cache.Del(ctx, kv.getCacheKey(group, "")); err != nil { + log.Warnf("Failed to delete cache for group %s: %v", group, err) + } + } +} + +func (kv *KVOperator) GetByGroup(ctx context.Context, group string, page, pageSize int) (map[string]string, error) { + if err := kv.checkDB(); err != nil { + return nil, err + } + + if group == "" { + return nil, ErrKVGroupEmpty + } + + if page < 1 { + page = 1 + } + if pageSize < 1 { + pageSize = 10 + } + + if pageSize > 100 { + pageSize = 100 + } + + cacheKey := kv.getCacheKey(group, "") + if value, exist, err := kv.data.Cache.GetString(ctx, cacheKey); err == nil && exist { + result := make(map[string]string) + if err := json.Unmarshal([]byte(value), &result); err == nil { + return result, nil + } + } + + query, close := kv.getSession(ctx) + defer close() + + var items []entity.PluginKVStorage + err := query.Where(builder.Eq{"plugin_slug_name": kv.pluginSlugName, "`group`": group}). + Limit(pageSize, (page-1)*pageSize). + OrderBy("id ASC"). + Find(&items) + if err != nil { + return nil, err + } + + result := make(map[string]string, len(items)) + for _, item := range items { + result[item.Key] = item.Value + if err := kv.data.Cache.SetString(ctx, kv.getCacheKey(group, item.Key), item.Value, kv.getCacheTTL()); err != nil { + log.Warnf("Failed to set cache for group %s, key %s: %v", group, item.Key, err) + } + } + + if resultJSON, err := json.Marshal(result); err == nil { + _ = kv.data.Cache.SetString(ctx, cacheKey, string(resultJSON), kv.getCacheTTL()) + } + + return result, nil +} + +func (kv *KVOperator) Tx(ctx context.Context, fn func(ctx context.Context, kv *KVOperator) error) error { + if err := kv.checkDB(); err != nil { + return fmt.Errorf("%w: %v", ErrKVTransactionFailed, err) + } + + var ( + txKv = kv + shouldCommit bool + ) + + if kv.session == nil { + session := kv.data.DB.NewSession().Context(ctx) + if err := session.Begin(); err != nil { + session.Close() + return fmt.Errorf("%w: begin transaction failed: %v", ErrKVTransactionFailed, err) + } + + defer func() { + if !shouldCommit { + if rollbackErr := session.Rollback(); rollbackErr != nil { + log.Errorf("rollback failed: %v", rollbackErr) + } + } + session.Close() + }() + + txKv = &KVOperator{ + session: session, + data: kv.data, + pluginSlugName: kv.pluginSlugName, + } + shouldCommit = true + } + + if err := fn(ctx, txKv); err != nil { + return fmt.Errorf("%w: %v", ErrKVTransactionFailed, err) + } + + if shouldCommit { + if err := txKv.session.Commit(); err != nil { + return fmt.Errorf("%w: commit failed: %v", ErrKVTransactionFailed, err) + } + } + return nil +} + +// PluginData defines the interface for plugins that need data storage capabilities +type KVStorage interface { + Info() Info + SetOperator(operator *KVOperator) +} + +var ( + _, + registerPluginKVStorage = func() (CallFn[KVStorage], RegisterFn[KVStorage]) { + callFn, registerFn := MakePlugin[KVStorage](false) + return callFn, func(p KVStorage) { + registerFn(p) + kvStoragePluginStack.plugins = append(kvStoragePluginStack.plugins, p) + } + }() + kvStoragePluginStack = &Stack[KVStorage]{} +) + +func SetKVStorageDB(data *Data) { + for _, p := range kvStoragePluginStack.plugins { + p.SetOperator(&KVOperator{ + data: data, + pluginSlugName: p.Info().SlugName, + }) + } +} diff --git a/plugin/plugin.go b/plugin/plugin.go index 36087c547..dc8c35ea1 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -23,13 +23,21 @@ import ( "encoding/json" "sync" + "github.com/segmentfault/pacman/cache" "github.com/segmentfault/pacman/i18n" + "xorm.io/xorm" "github.com/apache/answer/internal/base/handler" "github.com/apache/answer/internal/base/translator" "github.com/gin-gonic/gin" ) +// Data is defined here to avoid circular dependency with internal/base/data +type Data struct { + DB *xorm.Engine + Cache cache.Cache +} + // GinContext is a wrapper of gin.Context // We export it to make it easy to use in plugins type GinContext = gin.Context @@ -114,6 +122,10 @@ func Register(p Base) { if _, ok := p.(Importer); ok { registerImporter(p.(Importer)) } + + if _, ok := p.(KVStorage); ok { + registerPluginKVStorage(p.(KVStorage)) + } } type Stack[T Base] struct { From d95ec86f8f5b8fd6a6d2cd0c0a1d472314d42bcd Mon Sep 17 00:00:00 2001 From: Sonui Date: Sat, 1 Mar 2025 11:22:40 +0800 Subject: [PATCH 2/4] refactor(plugin): improved initialization of the KV storage plugin --- .../plugin_common/plugin_common_service.go | 10 +++++--- plugin/kv_storage.go | 24 +++++++------------ plugin/plugin.go | 2 +- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/internal/service/plugin_common/plugin_common_service.go b/internal/service/plugin_common/plugin_common_service.go index de6d981d5..d3aa839b2 100644 --- a/internal/service/plugin_common/plugin_common_service.go +++ b/internal/service/plugin_common/plugin_common_service.go @@ -135,9 +135,13 @@ func (ps *PluginCommonService) GetUserPluginConfig(ctx context.Context, req *sch } func (ps *PluginCommonService) initPluginData() { - plugin.SetKVStorageDB(&plugin.Data{ - DB: ps.data.DB, - Cache: ps.data.Cache, + _ = plugin.CallKVStorage(func(k plugin.KVStorage) error { + k.SetOperator(plugin.NewKVOperator( + ps.data.DB, + ps.data.Cache, + k.Info().SlugName, + )) + return nil }) // init plugin status diff --git a/plugin/kv_storage.go b/plugin/kv_storage.go index 9714487fe..b4d940b80 100644 --- a/plugin/kv_storage.go +++ b/plugin/kv_storage.go @@ -323,22 +323,16 @@ type KVStorage interface { } var ( - _, - registerPluginKVStorage = func() (CallFn[KVStorage], RegisterFn[KVStorage]) { - callFn, registerFn := MakePlugin[KVStorage](false) - return callFn, func(p KVStorage) { - registerFn(p) - kvStoragePluginStack.plugins = append(kvStoragePluginStack.plugins, p) - } - }() - kvStoragePluginStack = &Stack[KVStorage]{} + CallKVStorage, + registerKVStorage = MakePlugin[KVStorage](true) ) -func SetKVStorageDB(data *Data) { - for _, p := range kvStoragePluginStack.plugins { - p.SetOperator(&KVOperator{ - data: data, - pluginSlugName: p.Info().SlugName, - }) +// NewKVOperator creates a new KV storage operator with the specified database engine, cache and plugin name. +// It returns a KVOperator instance that can be used to interact with the plugin's storage. +func NewKVOperator(db *xorm.Engine, cache cache.Cache, pluginSlugName string) *KVOperator { + return &KVOperator{ + data: &Data{DB: db, Cache: cache}, + pluginSlugName: pluginSlugName, + cacheTTL: 30 * time.Minute, } } diff --git a/plugin/plugin.go b/plugin/plugin.go index dc8c35ea1..a9e173100 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -124,7 +124,7 @@ func Register(p Base) { } if _, ok := p.(KVStorage); ok { - registerPluginKVStorage(p.(KVStorage)) + registerKVStorage(p.(KVStorage)) } } From 6c20ab23d62c388bbc04bfdecc2492451f12b499 Mon Sep 17 00:00:00 2001 From: Sonui Date: Sat, 1 Mar 2025 11:56:32 +0800 Subject: [PATCH 3/4] perf(plugin): remove unnecessary KV storage checks --- plugin/kv_storage.go | 33 --------------------------------- 1 file changed, 33 deletions(-) diff --git a/plugin/kv_storage.go b/plugin/kv_storage.go index b4d940b80..a3ab46d03 100644 --- a/plugin/kv_storage.go +++ b/plugin/kv_storage.go @@ -48,16 +48,6 @@ type KVOperator struct { pluginSlugName string } -func (kv *KVOperator) checkDB() error { - if kv.data == nil { - return ErrKVDataNotInitialized - } - if kv.data.DB == nil { - return ErrKVDBNotInitialized - } - return nil -} - func (kv *KVOperator) getSession(ctx context.Context) (session *xorm.Session, close func()) { if kv.session != nil { session = kv.session @@ -88,9 +78,6 @@ func (kv *KVOperator) getCacheKey(group, key string) string { func (kv *KVOperator) Get(ctx context.Context, group, key string) (string, error) { // validate - if err := kv.checkDB(); err != nil { - return "", err - } if key == "" { return "", ErrKVKeyEmpty } @@ -127,10 +114,6 @@ func (kv *KVOperator) Get(ctx context.Context, group, key string) (string, error } func (kv *KVOperator) Set(ctx context.Context, group, key, value string) error { - if err := kv.checkDB(); err != nil { - return err - } - if key == "" { return ErrKVKeyEmpty } @@ -168,10 +151,6 @@ func (kv *KVOperator) Set(ctx context.Context, group, key, value string) error { } func (kv *KVOperator) Del(ctx context.Context, group, key string) error { - if err := kv.checkDB(); err != nil { - return err - } - if key == "" && group == "" { return ErrKVKeyAndGroupEmpty } @@ -216,10 +195,6 @@ func (kv *KVOperator) cleanCache(ctx context.Context, group, key string) { } func (kv *KVOperator) GetByGroup(ctx context.Context, group string, page, pageSize int) (map[string]string, error) { - if err := kv.checkDB(); err != nil { - return nil, err - } - if group == "" { return nil, ErrKVGroupEmpty } @@ -231,10 +206,6 @@ func (kv *KVOperator) GetByGroup(ctx context.Context, group string, page, pageSi pageSize = 10 } - if pageSize > 100 { - pageSize = 100 - } - cacheKey := kv.getCacheKey(group, "") if value, exist, err := kv.data.Cache.GetString(ctx, cacheKey); err == nil && exist { result := make(map[string]string) @@ -271,10 +242,6 @@ func (kv *KVOperator) GetByGroup(ctx context.Context, group string, page, pageSi } func (kv *KVOperator) Tx(ctx context.Context, fn func(ctx context.Context, kv *KVOperator) error) error { - if err := kv.checkDB(); err != nil { - return fmt.Errorf("%w: %v", ErrKVTransactionFailed, err) - } - var ( txKv = kv shouldCommit bool From a2233709c7991490297a4b8e6a81a60399a3b61b Mon Sep 17 00:00:00 2001 From: Sonui Date: Sat, 1 Mar 2025 13:55:36 +0800 Subject: [PATCH 4/4] fix(plugin): rename 'close' to avoid builtin collision --- plugin/kv_storage.go | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/plugin/kv_storage.go b/plugin/kv_storage.go index a3ab46d03..0ec3d6409 100644 --- a/plugin/kv_storage.go +++ b/plugin/kv_storage.go @@ -48,18 +48,18 @@ type KVOperator struct { pluginSlugName string } -func (kv *KVOperator) getSession(ctx context.Context) (session *xorm.Session, close func()) { - if kv.session != nil { - session = kv.session - } else { +func (kv *KVOperator) getSession(ctx context.Context) (*xorm.Session, func()) { + session := kv.session + cleanup := func() {} + if session == nil { session = kv.data.DB.NewSession().Context(ctx) - close = func() { + cleanup = func() { if session != nil { session.Close() } } } - return + return session, cleanup } func (kv *KVOperator) getCacheTTL() time.Duration { @@ -89,8 +89,8 @@ func (kv *KVOperator) Get(ctx context.Context, group, key string) (string, error // query data := entity.PluginKVStorage{} - query, close := kv.getSession(ctx) - defer close() + query, cleanup := kv.getSession(ctx) + defer cleanup() query.Where(builder.Eq{ "plugin_slug_name": kv.pluginSlugName, @@ -118,10 +118,8 @@ func (kv *KVOperator) Set(ctx context.Context, group, key, value string) error { return ErrKVKeyEmpty } - query, close := kv.getSession(ctx) - if close != nil { - defer close() - } + query, cleanup := kv.getSession(ctx) + defer cleanup() data := &entity.PluginKVStorage{ PluginSlugName: kv.pluginSlugName, @@ -157,8 +155,8 @@ func (kv *KVOperator) Del(ctx context.Context, group, key string) error { kv.cleanCache(ctx, group, key) - session, close := kv.getSession(ctx) - defer close() + session, cleanup := kv.getSession(ctx) + defer cleanup() session.Where(builder.Eq{ "plugin_slug_name": kv.pluginSlugName, @@ -214,8 +212,8 @@ func (kv *KVOperator) GetByGroup(ctx context.Context, group string, page, pageSi } } - query, close := kv.getSession(ctx) - defer close() + query, cleanup := kv.getSession(ctx) + defer cleanup() var items []entity.PluginKVStorage err := query.Where(builder.Eq{"plugin_slug_name": kv.pluginSlugName, "`group`": group}).