diff --git a/.env.example b/.env.example index da83f57..94fbece 100644 --- a/.env.example +++ b/.env.example @@ -15,10 +15,6 @@ SOLA_DB_USER=sola SOLA_DB_PWD=sola SOLA_DB_NAME=sola -# Redis setting -REDIS_HOST=127.0.0.1 -REDIS_PORT=6379 - # Solr setting SOLA_SOLR_LIST=http://127.0.0.1:8983/solr/ SOLA_SOLR_SIZE=1 diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 2cdfdf7..bd88d18 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -16,10 +16,6 @@ jobs: # See supported Node.js release schedule at https://nodejs.org/en/about/releases/ services: - redis: - image: redis - ports: - - 6379:6379 mariadb: image: mariadb env: @@ -45,8 +41,6 @@ jobs: - run: npm ci - run: npm run jest env: - REDIS_HOST: 127.0.0.1 - REDIS_PORT: 6379 SOLA_DB_HOST: 127.0.0.1 SOLA_DB_PORT: 3306 SOLA_DB_USER: root diff --git a/README.md b/README.md index a5d5d53..c1c9974 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,6 @@ API server for [trace.moe](https://github.com/soruly/trace.moe) - mariaDB 10.4.x - ffmpeg 4.x - java (openjdk 17) -- redis - [liresolr](https://github.com/soruly/liresolr) - g++, cmake (if you need to compile OpenCV) diff --git a/docker-compose.yml b/docker-compose.yml index f1b8c12..acba2af 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,13 +11,6 @@ services: networks: trace_moe_net: - redis: - image: redis:latest - ports: - - ${REDIS_PORT}:6379 - networks: - trace_moe_net: - liresolr: image: ghcr.io/soruly/liresolr:latest command: solr-precreate cl_0 /opt/solr/server/solr/configsets/liresolr diff --git a/package-lock.json b/package-lock.json index e5b6b67..98ea899 100644 --- a/package-lock.json +++ b/package-lock.json @@ -22,7 +22,6 @@ "multer": "^1.4.5-lts.1", "mysql": "^2.18.1", "nodemailer": "^6.9.12", - "redis": "^4.6.13", "rss": "^1.2.2", "xmldoc": "^1.3.0" }, @@ -1266,57 +1265,6 @@ "node": ">=14" } }, - "node_modules/@redis/bloom": { - "version": "1.2.0", - "license": "MIT", - "peerDependencies": { - "@redis/client": "^1.0.0" - } - }, - "node_modules/@redis/client": { - "version": "1.5.14", - "license": "MIT", - "dependencies": { - "cluster-key-slot": "1.1.2", - "generic-pool": "3.9.0", - "yallist": "4.0.0" - }, - "engines": { - "node": ">=14" - } - }, - "node_modules/@redis/client/node_modules/yallist": { - "version": "4.0.0", - "license": "ISC" - }, - "node_modules/@redis/graph": { - "version": "1.1.1", - "license": "MIT", - "peerDependencies": { - "@redis/client": "^1.0.0" - } - }, - "node_modules/@redis/json": { - "version": "1.0.6", - "license": "MIT", - "peerDependencies": { - "@redis/client": "^1.0.0" - } - }, - "node_modules/@redis/search": { - "version": "1.1.6", - "license": "MIT", - "peerDependencies": { - "@redis/client": "^1.0.0" - } - }, - "node_modules/@redis/time-series": { - "version": "1.0.5", - "license": "MIT", - "peerDependencies": { - "@redis/client": "^1.0.0" - } - }, "node_modules/@sinclair/typebox": { "version": "0.27.8", "dev": true, @@ -2178,13 +2126,6 @@ "node": ">=12" } }, - "node_modules/cluster-key-slot": { - "version": "1.1.2", - "license": "Apache-2.0", - "engines": { - "node": ">=0.10.0" - } - }, "node_modules/co": { "version": "4.6.0", "dev": true, @@ -2903,13 +2844,6 @@ "url": "https://github.com/sponsors/isaacs" } }, - "node_modules/generic-pool": { - "version": "3.9.0", - "license": "MIT", - "engines": { - "node": ">= 4" - } - }, "node_modules/gensync": { "version": "1.0.0-beta.2", "dev": true, @@ -5256,21 +5190,6 @@ "node": ">= 10.13.0" } }, - "node_modules/redis": { - "version": "4.6.13", - "license": "MIT", - "workspaces": [ - "./packages/*" - ], - "dependencies": { - "@redis/bloom": "1.2.0", - "@redis/client": "1.5.14", - "@redis/graph": "1.1.1", - "@redis/json": "1.0.6", - "@redis/search": "1.1.6", - "@redis/time-series": "1.0.5" - } - }, "node_modules/require-directory": { "version": "2.1.1", "dev": true, diff --git a/package.json b/package.json index 8c13d93..d1560d0 100644 --- a/package.json +++ b/package.json @@ -46,7 +46,6 @@ "multer": "^1.4.5-lts.1", "mysql": "^2.18.1", "nodemailer": "^6.9.12", - "redis": "^4.6.13", "rss": "^1.2.2", "xmldoc": "^1.3.0" } diff --git a/script/redis.js b/script/redis.js deleted file mode 100644 index c673971..0000000 --- a/script/redis.js +++ /dev/null @@ -1,34 +0,0 @@ -import "dotenv/config"; -import { createClient } from "redis"; - -const { REDIS_HOST, REDIS_PORT } = process.env; - -const redis = createClient({ - url: `redis://${REDIS_HOST}:${REDIS_PORT}`, -}); -await redis.connect(); - -const keys = await redis.keys("*"); - -console.table( - await Promise.all( - keys.filter((key) => key.startsWith("q:")).map((key) => Promise.all([key, redis.get(key)])), - ), -); -console.table( - await Promise.all( - keys.filter((key) => key.startsWith("c:")).map((key) => Promise.all([key, redis.get(key)])), - ), -); -console.log(await redis.get("queue")); - -const priority = 0; - -const queueKeys = await redis.keys("q:*"); -const higherPriorityKeys = queueKeys.filter((e) => Number(e.split(":")[1]) >= priority); -const higherPriorityQueues = higherPriorityKeys.length ? await redis.mGet(higherPriorityKeys) : []; -const higherPriorityQueuesLength = higherPriorityQueues - .map((e) => Number(e)) - .reduce((a, b) => a + b, 0); -console.log(higherPriorityQueuesLength); -process.exit(); diff --git a/server.js b/server.js index ee816c4..a1e791e 100644 --- a/server.js +++ b/server.js @@ -1,7 +1,6 @@ import "dotenv/config"; import fs from "fs-extra"; import Knex from "knex"; -import { createClient } from "redis"; import app from "./src/app.js"; @@ -18,8 +17,6 @@ const { SOLA_DB_PWD, SOLA_DB_NAME, SERVER_PORT, - REDIS_HOST, - REDIS_PORT, } = process.env; console.log("Creating SQL database if not exist"); @@ -33,12 +30,6 @@ await Knex({ }, }).raw(`CREATE DATABASE IF NOT EXISTS ${SOLA_DB_NAME} CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;`); -app.locals.redis = createClient({ - url: `redis://${REDIS_HOST}:${REDIS_PORT}`, -}); -await app.locals.redis.connect(); -await app.locals.redis.flushAll(); - app.locals.knex = Knex({ client: "mysql", connection: { @@ -60,6 +51,14 @@ await app.locals.knex.raw(fs.readFileSync("sql/data.sql", "utf8")); app.locals.workerCount = 0; app.locals.mutex = false; app.locals.mediaQueue = 0; +app.locals.searchQueue = []; +app.locals.searchConcurrent = new Map(); +setInterval(() => app.locals.searchConcurrent.clear(), 60 * 60 * 1000); +setInterval(() => (app.locals.searchQueue = []), 60 * 60 * 1000); +setInterval(() => { + console.log("queue", app.locals.searchQueue); + console.log("concurrent", app.locals.searchConcurrent); +}, 60 * 1000); const server = app.listen(SERVER_PORT, "0.0.0.0", () => console.log(`API server listening on port ${server.address().port}`), diff --git a/src/search.js b/src/search.js index cb1b94f..c7efb7c 100644 --- a/src/search.js +++ b/src/search.js @@ -30,8 +30,11 @@ const search = (image, candidates, anilistID) => ), ); -const logAndDequeue = async (knex, redis, uid, priority, status, searchTime, accuracy) => { +const logAndDequeue = async (locals, uid, priority, status, searchTime, accuracy) => { + const knex = locals.knex; if (status === 200) { + while (locals.mut) await new Promise((resolve) => setTimeout(resolve, 0)); + locals.mut = true; const searchCountCache = await knex("search_count").where({ uid: `${uid}` }); if (searchCountCache.length) { await knex("search_count") @@ -40,6 +43,7 @@ const logAndDequeue = async (knex, redis, uid, priority, status, searchTime, acc } else { await knex("search_count").insert({ uid, count: 1 }); } + locals.mut = false; } if (searchTime && accuracy) { await knex("log").insert({ @@ -56,19 +60,16 @@ const logAndDequeue = async (knex, redis, uid, priority, status, searchTime, acc } else { await knex("log").insert({ time: knex.fn.now(), uid, status }); } - const c = await redis.decr(`c:${uid}`); - if (c < 0) { - await redis.del(`c:${uid}`); - } - const q = await redis.decr(`q:${priority}`); - if (q < 0) { - await redis.del(`q:${priority}`); - } + const concurrentCount = locals.searchConcurrent.get(uid) ?? 0; + if (concurrentCount <= 1) locals.searchConcurrent.delete(uid); + else locals.searchConcurrent.set(uid, concurrentCount - 1); + + locals.searchQueue[priority] = (locals.searchQueue[priority] || 1) - 1; }; export default async (req, res) => { + const locals = req.app.locals; const knex = req.app.locals.knex; - const redis = req.app.locals.redis; const rows = await knex("tier").select("concurrency", "quota", "priority").where("id", 0); let quota = rows[0].quota; @@ -107,24 +108,19 @@ export default async (req, res) => { }); } - const concurrentCount = await redis.incr(`c:${uid}`); - await redis.expire(`c:${uid}`, 60); - if (concurrentCount > concurrency) { - await logAndDequeue(knex, redis, uid, priority, 402); + locals.searchConcurrent.set(uid, (locals.searchConcurrent.get(uid) ?? 0) + 1); + if (locals.searchConcurrent.get(uid) > concurrency) { + await logAndDequeue(locals, uid, priority, 402); return res.status(402).json({ error: "Concurrency limit exceeded", }); } - await redis.incr(`q:${priority}`); - await redis.expire(`q:${priority}`, 60); - const queueKeys = await redis.keys("q:*"); - const priorityKeys = queueKeys.filter((e) => Number(e.split(":")[1]) >= priority); - const priorityQueues = priorityKeys.length ? await redis.mGet(priorityKeys) : []; - const priorityQueuesLength = priorityQueues.map((e) => Number(e)).reduce((a, b) => a + b, 0); + locals.searchQueue[priority] = (locals.searchQueue[priority] ?? 0) + 1; + const queueSize = locals.searchQueue.reduce((acc, cur, i) => (i >= priority ? acc + cur : 0), 0); - if (priorityQueuesLength >= 5) { - await logAndDequeue(knex, redis, uid, priority, 503); + if (queueSize > 8) { + await logAndDequeue(locals, uid, priority, 503); return res.status(503).json({ error: `Error: Search queue is full`, }); @@ -136,7 +132,7 @@ export default async (req, res) => { try { new URL(req.query.url); } catch (e) { - await logAndDequeue(knex, redis, uid, priority, 400); + await logAndDequeue(locals, uid, priority, 400); return res.status(400).json({ error: `Invalid image url ${req.query.url}`, }); @@ -159,7 +155,7 @@ export default async (req, res) => { return { status: 400 }; }); if (response.status >= 400) { - await logAndDequeue(knex, redis, uid, priority, 400); + await logAndDequeue(locals, uid, priority, 400); return res.status(response.status).json({ error: `Failed to fetch image ${req.query.url}`, }); @@ -170,7 +166,7 @@ export default async (req, res) => { } else if (req.rawBody?.length) { searchFile = req.rawBody; } else { - await logAndDequeue(knex, redis, uid, priority, 405); + await logAndDequeue(locals, uid, priority, 405); return res.status(405).json({ error: "Method Not Allowed", }); @@ -201,7 +197,7 @@ export default async (req, res) => { ]); await fs.remove(tempFilePath); if (!ffmpeg.stdout.length) { - await logAndDequeue(knex, redis, uid, priority, 400); + await logAndDequeue(locals, uid, priority, 400); return res.status(400).json({ error: `Failed to process image. ${ffmpeg.stderr.toString()}`, }); @@ -251,7 +247,7 @@ export default async (req, res) => { searchImage = cv.imencode(".jpg", image.getRegion(new cv.Rect(x, y, w, h))); } } catch (e) { - await logAndDequeue(knex, redis, uid, priority, 400); + await logAndDequeue(locals, uid, priority, 400); return res.status(400).json({ error: "OpenCV: Failed to detect and cut borders", }); @@ -264,14 +260,14 @@ export default async (req, res) => { try { solrResponse = await search(searchImage, candidates, Number(req.query.anilistID)); } catch (e) { - await logAndDequeue(knex, redis, uid, priority, 503); + await logAndDequeue(locals, uid, priority, 503); return res.status(503).json({ error: `Error: Database is not responding`, }); } if (solrResponse.find((e) => e.status >= 500)) { const r = solrResponse.find((e) => e.status >= 500); - await logAndDequeue(knex, redis, uid, priority, r.status); + await logAndDequeue(locals, uid, priority, r.status); return res.status(r.status).json({ error: `Database is ${r.status === 504 ? "overloaded" : "offline"}`, }); @@ -286,7 +282,7 @@ export default async (req, res) => { solrResponse = await search(searchImage, candidates, Number(req.query.anilistID)); if (solrResponse.find((e) => e.status >= 500)) { const r = solrResponse.find((e) => e.status >= 500); - await logAndDequeue(knex, redis, uid, priority, r.status); + await logAndDequeue(locals, uid, priority, r.status); return res.status(r.status).json({ error: `Database is ${r.status === 504 ? "overloaded" : "offline"}`, }); @@ -302,7 +298,7 @@ export default async (req, res) => { if (solrResults.find((e) => e.Error)) { console.log(solrResults.find((e) => e.Error)); - await logAndDequeue(knex, redis, uid, priority, 500); + await logAndDequeue(locals, uid, priority, 500); return res.status(500).json({ error: solrResults.find((e) => e.Error).Error, }); @@ -415,7 +411,7 @@ export default async (req, res) => { } } - await logAndDequeue(knex, redis, uid, priority, 200, searchTime, result[0]?.similarity); + await logAndDequeue(locals, uid, priority, 200, searchTime, result[0]?.similarity); res.json({ frameCount: frameCountList.reduce((prev, curr) => prev + curr, 0), error: "", diff --git a/src/search.test.js b/src/search.test.js index 2558e04..8f5c757 100644 --- a/src/search.test.js +++ b/src/search.test.js @@ -1,7 +1,6 @@ import "dotenv/config"; import { default as request } from "supertest"; import Knex from "knex"; -import { createClient } from "redis"; import fs from "fs-extra"; import app from "./app.js"; @@ -13,17 +12,9 @@ const { SOLA_DB_NAME, SOLA_SOLR_LIST, TRACE_ALGO, - REDIS_HOST, - REDIS_PORT, } = process.env; beforeAll(async () => { - app.locals.redis = createClient({ - url: `redis://${REDIS_HOST}:${REDIS_PORT}`, - }); - await app.locals.redis.connect(); - // await app.locals.redis.flushAll(); - app.locals.knex = Knex({ client: "mysql", connection: { @@ -80,13 +71,17 @@ beforeAll(async () => { created: new Date(), updated: new Date(), }); + await app.locals.knex("search_count").truncate(); + app.locals.searchQueue = []; + app.locals.searchConcurrent = new Map(); }); afterAll(async () => { await app.locals.knex(TRACE_ALGO).truncate(); + await app.locals.knex("search_count").truncate(); await app.locals.knex("user").where("email", "test@trace.moe").del(); - await app.locals.redis.disconnect(); await app.locals.knex.destroy(); + if (fs.existsSync("32B15UXxymfSMwKGTObY5e.jpg")) await fs.remove("32B15UXxymfSMwKGTObY5e.jpg"); }); describe("without API Key", () => { @@ -292,7 +287,7 @@ describe("with system Tier 9 API Key", () => { ); } const res = await Promise.all( - [...new Array(8)].map((_) => + [...new Array(10)].map((_) => request(app) .post("/search") .query({ key: app.locals.apiKeyTier9 })