Skip to content

Commit

Permalink
全sharedInbox配信の最適化 (mei23#2527)
Browse files Browse the repository at this point in the history
* 全sharedInbox配信の最適化

* streaming

* Tune


 Co-authored-by: まっちゃとーにゅ <17376330+u1-liquid@users.noreply.github.com>
  • Loading branch information
mei23 authored Apr 6, 2024
1 parent ab14775 commit 8a854f7
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 38 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
"parse5": "6.0.1",
"parsimmon": "1.18.1",
"pg": "8.8.0",
"pg-query-stream": "4.5.5",
"portscanner": "2.2.0",
"postcss": "8.4.31",
"postcss-loader": "7.0.2",
Expand Down
25 changes: 23 additions & 2 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions src/misc/process-streaming-rows.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { SelectQueryBuilder } from 'typeorm';
import { ReadStream } from 'typeorm/platform/PlatformTools';

export async function processStreamingRows<T> (query: SelectQueryBuilder<T>, callback: (row: Record<string, unknown>) => Promise<void>) {
return new Promise(async (res, rej) => {
// query and get stream
let stream: ReadStream;
try {
stream = await query.stream();
} catch (e) {
return rej(e);
}

stream
.on('data', async (data: any) => { // Buffer | string のはずだけどobjectが返ってくる
try {
await callback(data);
} catch (e) {
rej(e);
}
})
.on('end', () => res('end'))
.on('error', err => rej(err));
});
}
44 changes: 27 additions & 17 deletions src/services/suspend-user.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,41 @@ import { deliver } from '../queue';
import config from '../config';
import { User } from '../models/entities/user';
import { Users, Followings } from '../models';
import { Not, IsNull } from 'typeorm';
import { Not, IsNull, Brackets } from 'typeorm';
import { processStreamingRows } from '../misc/process-streaming-rows';

export async function doPostSuspend(user: User) {
if (Users.isLocalUser(user)) {
// 知り得る全SharedInboxにDelete配信
const content = renderActivity(renderDelete(`${config.url}/users/${user.id}`, user));

const queue: string[] = [];
const query = Followings.createQueryBuilder('following')
.select('distinct coalesce(following.followerSharedInbox, following.followeeSharedInbox) as inbox')
.where(new Brackets((qb) =>
qb.where({ followerHost: Not(IsNull()) })
.orWhere({ followeeHost: Not(IsNull()) })
))
.andWhere(new Brackets((qb) =>
qb.where({ followerSharedInbox: Not(IsNull()) })
.orWhere({ followeeSharedInbox: Not(IsNull()) })
));

const followings = await Followings.find({
where: [
{ followerSharedInbox: Not(IsNull()) },
{ followeeSharedInbox: Not(IsNull()) }
],
select: ['followerSharedInbox', 'followeeSharedInbox']
});

const inboxes = followings.map(x => x.followerSharedInbox || x.followeeSharedInbox);

for (const inbox of inboxes) {
if (inbox != null && !queue.includes(inbox)) queue.push(inbox);
/* streamingしない版
for (const row of await query.getRawMany()) {
deliver(user as any, content, row.inbox);
}
*/

for (const inbox of queue) {
deliver(user as any, content, inbox);
}
await processStreamingRows(query, async (row: Record<string, unknown>) => {
if (typeof row.inbox === 'string') {
try {
await deliver(user as any, content, row.inbox);
} catch (e) {
console.warn(`deliver error ${e}`);
}
} else {
console.warn(`invalid row.inbox`);
}
});
}
}
42 changes: 23 additions & 19 deletions src/services/unsuspend-user.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,35 @@ import { deliver } from '../queue';
import config from '../config';
import { User } from '../models/entities/user';
import { Users, Followings } from '../models';
import { Not, IsNull } from 'typeorm';
import { Not, IsNull, Brackets } from 'typeorm';
import { processStreamingRows } from '../misc/process-streaming-rows';

export async function doPostUnsuspend(user: User) {
if (Users.isLocalUser(user)) {
// 知り得る全SharedInboxにUndo Delete配信
const content = renderActivity(renderUndo(renderDelete(`${config.url}/users/${user.id}`, user), user));

const queue: string[] = [];
const query = Followings.createQueryBuilder('following')
.select('distinct coalesce(following.followerSharedInbox, following.followeeSharedInbox) as inbox')
.where(new Brackets((qb) =>
qb.where({ followerHost: Not(IsNull()) })
.orWhere({ followeeHost: Not(IsNull()) })
))
.andWhere(new Brackets((qb) =>
qb.where({ followerSharedInbox: Not(IsNull()) })
.orWhere({ followeeSharedInbox: Not(IsNull()) })
));

const followings = await Followings.find({
where: [
{ followerSharedInbox: Not(IsNull()) },
{ followeeSharedInbox: Not(IsNull()) }
],
select: ['followerSharedInbox', 'followeeSharedInbox']
});

const inboxes = followings.map(x => x.followerSharedInbox || x.followeeSharedInbox);

for (const inbox of inboxes) {
if (inbox != null && !queue.includes(inbox)) queue.push(inbox);
}

for (const inbox of queue) {
deliver(user as any, content, inbox);
}
await processStreamingRows(query, async (row: Record<string, unknown>) => {
if (typeof row.inbox === 'string') {
try {
await deliver(user as any, content, row.inbox);
} catch (e) {
console.warn(`deliver error ${e}`);
}
} else {
console.warn(`invalid row.inbox`);
}
});
}
}

0 comments on commit 8a854f7

Please # to comment.