diff --git a/package.json b/package.json index 985e4b670b..01710dcf3e 100644 --- a/package.json +++ b/package.json @@ -122,6 +122,7 @@ "parse5": "6.0.1", "parsimmon": "1.18.1", "pg": "8.8.0", + "pg-query-stream": "4.5.5", "portscanner": "2.2.0", "postcss": "8.4.31", "postcss-loader": "7.0.2", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 10268cfddf..dce5a65da0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -276,6 +276,9 @@ dependencies: pg: specifier: 8.8.0 version: 8.8.0 + pg-query-stream: + specifier: 4.5.5 + version: 4.5.5(pg@8.8.0) portscanner: specifier: 2.2.0 version: 2.2.0 @@ -410,7 +413,7 @@ dependencies: version: 10.9.1(@swc/core@1.3.107)(@types/node@18.11.18)(typescript@4.9.5) typeorm: specifier: 0.2.38 - version: 0.2.38(pg@8.8.0)(redis@3.1.2) + version: 0.2.38(pg-query-stream@4.5.5)(pg@8.8.0)(redis@3.1.2) typescript: specifier: 4.9.5 version: 4.9.5 @@ -7903,6 +7906,14 @@ packages: resolution: {integrity: sha512-r5o/V/ORTA6TmUnyWZR9nCj1klXCO2CEKNRlVuJptZe85QuhFayC7WeMic7ndayT5IRIR0S0xFxFi2ousartlQ==} dev: false + /pg-cursor@2.10.5(pg@8.8.0): + resolution: {integrity: sha512-wzgmyk+k9mwuYe30ylLA6qRWw2TBFSee4Bw23oTz66YL9RdRJjDi2TaROMMF+V3QB6QWB3FFCju22loDftjKkw==} + peerDependencies: + pg: ^8 + dependencies: + pg: 8.8.0 + dev: false + /pg-int8@1.0.1: resolution: {integrity: sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==} engines: {node: '>=4.0.0'} @@ -7920,6 +7931,15 @@ packages: resolution: {integrity: sha512-muRttij7H8TqRNu/DxrAJQITO4Ac7RmX3Klyr/9mJEOBeIpgnF8f9jAfRz5d3XwQZl5qBjF9gLsUtMPJE0vezQ==} dev: false + /pg-query-stream@4.5.5(pg@8.8.0): + resolution: {integrity: sha512-mBGxVdiR9O6SdkOcXBoZuxtHUQ0nSFIWcFauGMUteko+9rZcu97vE15JX/w8pdijQ+diLbiw8ijpV/V+VCUUtA==} + peerDependencies: + pg: ^8 + dependencies: + pg: 8.8.0 + pg-cursor: 2.10.5(pg@8.8.0) + dev: false + /pg-types@2.2.0: resolution: {integrity: sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==} engines: {node: '>=4'} @@ -10271,7 +10291,7 @@ packages: resolution: {integrity: sha512-/aCDEGatGvZ2BIk+HmLf4ifCJFwvKFNb9/JeZPMulfgFracn9QFcAf5GO8B/mweUjSoblS5In0cWhqpfs/5PQA==} dev: false - /typeorm@0.2.38(pg@8.8.0)(redis@3.1.2): + /typeorm@0.2.38(pg-query-stream@4.5.5)(pg@8.8.0)(redis@3.1.2): resolution: {integrity: sha512-M6Y3KQcAREQcphOVJciywf4mv6+A0I/SeR+lWNjKsjnQ+a3XcMwGYMGL0Jonsx3H0Cqlf/3yYqVki1jIXSK/xg==} hasBin: true peerDependencies: @@ -10333,6 +10353,7 @@ packages: js-yaml: 4.1.0 mkdirp: 1.0.4 pg: 8.8.0 + pg-query-stream: 4.5.5(pg@8.8.0) redis: 3.1.2 reflect-metadata: 0.1.13 sha.js: 2.4.11 diff --git a/src/misc/process-streaming-rows.ts b/src/misc/process-streaming-rows.ts new file mode 100644 index 0000000000..70e4c9ec17 --- /dev/null +++ b/src/misc/process-streaming-rows.ts @@ -0,0 +1,25 @@ +import { SelectQueryBuilder } from 'typeorm'; +import { ReadStream } from 'typeorm/platform/PlatformTools'; + +export async function processStreamingRows (query: SelectQueryBuilder, callback: (row: Record) => Promise) { + return new Promise(async (res, rej) => { + // query and get stream + let stream: ReadStream; + try { + stream = await query.stream(); + } catch (e) { + return rej(e); + } + + stream + .on('data', async (data: any) => { // Buffer | string のはずだけどobjectが返ってくる + try { + await callback(data); + } catch (e) { + rej(e); + } + }) + .on('end', () => res('end')) + .on('error', err => rej(err)); + }); +} diff --git a/src/services/suspend-user.ts b/src/services/suspend-user.ts index a85188acbe..dadc229d14 100644 --- a/src/services/suspend-user.ts +++ b/src/services/suspend-user.ts @@ -4,31 +4,41 @@ import { deliver } from '../queue'; import config from '../config'; import { User } from '../models/entities/user'; import { Users, Followings } from '../models'; -import { Not, IsNull } from 'typeorm'; +import { Not, IsNull, Brackets } from 'typeorm'; +import { processStreamingRows } from '../misc/process-streaming-rows'; export async function doPostSuspend(user: User) { if (Users.isLocalUser(user)) { // 知り得る全SharedInboxにDelete配信 const content = renderActivity(renderDelete(`${config.url}/users/${user.id}`, user)); - const queue: string[] = []; + const query = Followings.createQueryBuilder('following') + .select('distinct coalesce(following.followerSharedInbox, following.followeeSharedInbox) as inbox') + .where(new Brackets((qb) => + qb.where({ followerHost: Not(IsNull()) }) + .orWhere({ followeeHost: Not(IsNull()) }) + )) + .andWhere(new Brackets((qb) => + qb.where({ followerSharedInbox: Not(IsNull()) }) + .orWhere({ followeeSharedInbox: Not(IsNull()) }) + )); - const followings = await Followings.find({ - where: [ - { followerSharedInbox: Not(IsNull()) }, - { followeeSharedInbox: Not(IsNull()) } - ], - select: ['followerSharedInbox', 'followeeSharedInbox'] - }); - - const inboxes = followings.map(x => x.followerSharedInbox || x.followeeSharedInbox); - - for (const inbox of inboxes) { - if (inbox != null && !queue.includes(inbox)) queue.push(inbox); + /* streamingしない版 + for (const row of await query.getRawMany()) { + deliver(user as any, content, row.inbox); } + */ - for (const inbox of queue) { - deliver(user as any, content, inbox); - } + await processStreamingRows(query, async (row: Record) => { + if (typeof row.inbox === 'string') { + try { + await deliver(user as any, content, row.inbox); + } catch (e) { + console.warn(`deliver error ${e}`); + } + } else { + console.warn(`invalid row.inbox`); + } + }); } } diff --git a/src/services/unsuspend-user.ts b/src/services/unsuspend-user.ts index 6cab375821..47643af05f 100644 --- a/src/services/unsuspend-user.ts +++ b/src/services/unsuspend-user.ts @@ -5,31 +5,35 @@ import { deliver } from '../queue'; import config from '../config'; import { User } from '../models/entities/user'; import { Users, Followings } from '../models'; -import { Not, IsNull } from 'typeorm'; +import { Not, IsNull, Brackets } from 'typeorm'; +import { processStreamingRows } from '../misc/process-streaming-rows'; export async function doPostUnsuspend(user: User) { if (Users.isLocalUser(user)) { // 知り得る全SharedInboxにUndo Delete配信 const content = renderActivity(renderUndo(renderDelete(`${config.url}/users/${user.id}`, user), user)); - const queue: string[] = []; + const query = Followings.createQueryBuilder('following') + .select('distinct coalesce(following.followerSharedInbox, following.followeeSharedInbox) as inbox') + .where(new Brackets((qb) => + qb.where({ followerHost: Not(IsNull()) }) + .orWhere({ followeeHost: Not(IsNull()) }) + )) + .andWhere(new Brackets((qb) => + qb.where({ followerSharedInbox: Not(IsNull()) }) + .orWhere({ followeeSharedInbox: Not(IsNull()) }) + )); - const followings = await Followings.find({ - where: [ - { followerSharedInbox: Not(IsNull()) }, - { followeeSharedInbox: Not(IsNull()) } - ], - select: ['followerSharedInbox', 'followeeSharedInbox'] - }); - - const inboxes = followings.map(x => x.followerSharedInbox || x.followeeSharedInbox); - - for (const inbox of inboxes) { - if (inbox != null && !queue.includes(inbox)) queue.push(inbox); - } - - for (const inbox of queue) { - deliver(user as any, content, inbox); - } + await processStreamingRows(query, async (row: Record) => { + if (typeof row.inbox === 'string') { + try { + await deliver(user as any, content, row.inbox); + } catch (e) { + console.warn(`deliver error ${e}`); + } + } else { + console.warn(`invalid row.inbox`); + } + }); } }