Skip to content

update mq export;perf: redis cache #4465

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 4 commits into from
Apr 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion packages/global/core/dataset/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ export const DatasetTypeMap = {
export enum DatasetStatusEnum {
active = 'active',
syncing = 'syncing',
waiting = 'waiting'
waiting = 'waiting',
error = 'error'
}
export const DatasetStatusMap = {
[DatasetStatusEnum.active]: {
Expand All @@ -62,6 +63,9 @@ export const DatasetStatusMap = {
},
[DatasetStatusEnum.waiting]: {
label: i18nT('common:core.dataset.status.waiting')
},
[DatasetStatusEnum.error]: {
label: i18nT('dataset:status_error')
}
};

Expand Down
1 change: 1 addition & 0 deletions packages/global/core/dataset/type.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ export type DatasetListItemType = {

export type DatasetItemType = Omit<DatasetSchemaType, 'vectorModel' | 'agentModel' | 'vlmModel'> & {
status: `${DatasetStatusEnum}`;
errorMsg?: string;
vectorModel: EmbeddingModelItemType;
agentModel: LLMModelItemType;
vlmModel?: LLMModelItemType;
Expand Down
5 changes: 5 additions & 0 deletions packages/service/common/bullmq/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ export function getWorker<DataType, ReturnType = void>(
newWorker.on('error', (error) => {
addLog.error(`MQ Worker [${name}]: ${error.message}`, error);
});
newWorker.on('failed', (jobId, error) => {
addLog.error(`MQ Worker [${name}]: ${error.message}`, error);
});
workers.set(name, newWorker);
return newWorker;
}

export * from 'bullmq';
10 changes: 8 additions & 2 deletions packages/service/common/vectorStore/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { EmbeddingModelItemType } from '@fastgpt/global/core/ai/model.d';
import { MILVUS_ADDRESS, PG_ADDRESS, OCEANBASE_ADDRESS } from './constants';
import { MilvusCtrl } from './milvus/class';
import { setRedisCache, getRedisCache, delRedisCache, CacheKeyEnum } from '../redis/cache';
import { throttle } from 'lodash';

const getVectorObj = () => {
if (PG_ADDRESS) return new PgVectorCtrl();
Expand All @@ -15,7 +16,12 @@ const getVectorObj = () => {

return new PgVectorCtrl();
};

const getChcheKey = (teamId: string) => `${CacheKeyEnum.team_vector_count}:${teamId}`;
const onDelCache = throttle((teamId: string) => delRedisCache(getChcheKey(teamId)), 30000, {
leading: true,
trailing: true
});

const Vector = getVectorObj();

Expand Down Expand Up @@ -59,7 +65,7 @@ export const insertDatasetDataVector = async ({
vector: vectors[0]
});

delRedisCache(getChcheKey(props.teamId));
onDelCache(props.teamId);

return {
tokens,
Expand All @@ -69,6 +75,6 @@ export const insertDatasetDataVector = async ({

export const deleteDatasetDataVector = async (props: DelDatasetVectorCtrlProps) => {
const result = await Vector.delete(props);
delRedisCache(getChcheKey(props.teamId));
onDelCache(props.teamId);
return result;
};
31 changes: 26 additions & 5 deletions packages/service/core/dataset/websiteSync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,44 @@ export const addWebsiteSyncJob = (data: WebsiteSyncJobData) => {
export const getWebsiteSyncDatasetStatus = async (datasetId: string) => {
const jobId = await websiteSyncQueue.getDeduplicationJobId(datasetId);
if (!jobId) {
return DatasetStatusEnum.active;
return {
status: DatasetStatusEnum.active,
errorMsg: undefined
};
}
const job = await websiteSyncQueue.getJob(jobId);
if (!job) {
return DatasetStatusEnum.active;
return {
status: DatasetStatusEnum.active,
errorMsg: undefined
};
}

const jobState = await job.getState();

if (jobState === 'failed' || jobState === 'unknown') {
return {
status: DatasetStatusEnum.error,
errorMsg: job.failedReason
};
}
if (['waiting-children', 'waiting'].includes(jobState)) {
return DatasetStatusEnum.waiting;
return {
status: DatasetStatusEnum.waiting,
errorMsg: undefined
};
}
if (jobState === 'active') {
return DatasetStatusEnum.syncing;
return {
status: DatasetStatusEnum.syncing,
errorMsg: undefined
};
}

return DatasetStatusEnum.active;
return {
status: DatasetStatusEnum.active,
errorMsg: undefined
};
};

// Scheduler setting
Expand Down
1 change: 1 addition & 0 deletions packages/web/i18n/en/dataset.json
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
"split_sign_question": "question mark",
"split_sign_semicolon": "semicolon",
"start_sync_website_tip": "Confirm to start synchronizing data? \nThe old data will be deleted and retrieved again, please confirm!",
"status_error": "Running exception",
"sync_collection_failed": "Synchronization collection error, please check whether the source file can be accessed normally",
"sync_schedule": "Timing synchronization",
"sync_schedule_tip": "Only existing collections will be synchronized. \nIncludes linked collections and all collections in the API knowledge base. \nThe system will poll for updates every day, and the specific update time cannot be determined.",
Expand Down
1 change: 1 addition & 0 deletions packages/web/i18n/zh-CN/dataset.json
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
"split_sign_question": "问号",
"split_sign_semicolon": "分号",
"start_sync_website_tip": "确认开始同步数据?将会删除旧数据后重新获取,请确认!",
"status_error": "运行异常",
"sync_collection_failed": "同步集合错误,请检查是否能正常访问源文件",
"sync_schedule": "定时同步",
"sync_schedule_tip": "仅会同步已存在的集合。包括链接集合以及 API 知识库里所有集合。系统会每天进行轮询更新,无法确定具体的更新时间。",
Expand Down
1 change: 1 addition & 0 deletions packages/web/i18n/zh-Hant/dataset.json
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
"split_sign_question": "問號",
"split_sign_semicolon": "分號",
"start_sync_website_tip": "確認開始同步資料?\n將會刪除舊資料後重新獲取,請確認!",
"status_error": "運行異常",
"sync_collection_failed": "同步集合錯誤,請檢查是否能正常存取來源文件",
"sync_schedule": "定時同步",
"sync_schedule_tip": "只會同步已存在的集合。\n包括連結集合以及 API 知識庫裡所有集合。\n系統會每天進行輪詢更新,無法確定特定的更新時間。",
Expand Down
1 change: 1 addition & 0 deletions projects/app/next.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ const nextConfig = {
serverComponentsExternalPackages: [
'mongoose',
'pg',
'bullmq',
'@zilliz/milvus2-sdk-node',
"tiktoken",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import HeaderTagPopOver from './HeaderTagPopOver';
import MyBox from '@fastgpt/web/components/common/MyBox';
import Icon from '@fastgpt/web/components/common/Icon';
import MyTag from '@fastgpt/web/components/common/Tag/index';
import QuestionTip from '@fastgpt/web/components/common/MyTooltip/QuestionTip';

const FileSourceSelector = dynamic(() => import('../Import/components/FileSourceSelector'));

Expand Down Expand Up @@ -321,6 +322,14 @@ const Header = ({}: {}) => {
{t('common:core.dataset.status.waiting')}
</MyTag>
)}
{datasetDetail.status === DatasetStatusEnum.error && (
<MyTag colorSchema="red" showDot px={3} h={'36px'}>
<HStack spacing={1}>
<Box>{t('dataset:status_error')}</Box>
<QuestionTip color={'red.500'} label={datasetDetail.errorMsg} />
</HStack>
</MyTag>
)}
</>
) : (
<Button
Expand Down
8 changes: 6 additions & 2 deletions projects/app/src/pages/api/core/dataset/detail.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@ async function handler(req: ApiRequestProps<Query>): Promise<DatasetItemType> {
per: ReadPermissionVal
});

const status = await (async () => {
const { status, errorMsg } = await (async () => {
if (dataset.type === DatasetTypeEnum.websiteDataset) {
return await getWebsiteSyncDatasetStatus(datasetId);
}

return DatasetStatusEnum.active;
return {
status: DatasetStatusEnum.active,
errorMsg: undefined
};
})();

return {
...dataset,
status,
errorMsg,
apiServer: dataset.apiServer
? {
baseUrl: dataset.apiServer.baseUrl,
Expand Down
Loading