Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

全sharedInbox配信の最適化 #2527

Merged
merged 3 commits into from
Apr 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
"parse5": "6.0.1",
"parsimmon": "1.18.1",
"pg": "8.8.0",
"pg-query-stream": "4.5.5",
"portscanner": "2.2.0",
"postcss": "8.4.31",
"postcss-loader": "7.0.2",
Expand Down
25 changes: 23 additions & 2 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 48 additions & 15 deletions src/services/suspend-user.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,64 @@ import { deliver } from '../queue';
import config from '../config';
import { User } from '../models/entities/user';
import { Users, Followings } from '../models';
import { Not, IsNull } from 'typeorm';
import { Not, IsNull, Brackets, SelectQueryBuilder } from 'typeorm';
import { ReadStream } from 'typeorm/platform/PlatformTools';

export async function doPostSuspend(user: User) {
if (Users.isLocalUser(user)) {
// 知り得る全SharedInboxにDelete配信
const content = renderActivity(renderDelete(`${config.url}/users/${user.id}`, user));

const queue: string[] = [];
const query = Followings.createQueryBuilder('following')
.select('distinct coalesce(following.followerSharedInbox, following.followeeSharedInbox) as inbox')
.where(new Brackets((qb) =>
qb.where({ followerHost: Not(IsNull()) })
.orWhere({ followeeHost: Not(IsNull()) })
))
.andWhere(new Brackets((qb) =>
qb.where({ followerSharedInbox: Not(IsNull()) })
.orWhere({ followeeSharedInbox: Not(IsNull()) })
));

const followings = await Followings.find({
where: [
{ followerSharedInbox: Not(IsNull()) },
{ followeeSharedInbox: Not(IsNull()) }
],
select: ['followerSharedInbox', 'followeeSharedInbox']
});
/*
for (const row of await query.getRawMany()) {
deliver(user as any, content, row.inbox);
}
*/

const inboxes = followings.map(x => x.followerSharedInbox || x.followeeSharedInbox);
async function ProcessStreamingRows<T> (query: SelectQueryBuilder<T>, callback: (row: Record<string, unknown>) => Promise<void>) {
return new Promise(async (res, rej) => {
// query and get stream
let stream: ReadStream;
try {
stream = await query.stream();
} catch (e) {
return rej(e);
}

for (const inbox of inboxes) {
if (inbox != null && !queue.includes(inbox)) queue.push(inbox);
stream
.on('data', async (data: any) => { // Buffer | string のはずだけどobjectが返ってくる
try {
await callback(data);
} catch (e) {
rej(e);
}
})
.on('end', () => res('end'))
.on('error', err => rej(err));
});
}

for (const inbox of queue) {
deliver(user as any, content, inbox);
}
await ProcessStreamingRows(query, async (row: Record<string, unknown>) => {
if (typeof row.inbox === 'string') {
try {
await deliver(user as any, content, row.inbox);
} catch (e) {
console.warn('mmm');
}
} else {
console.warn('nnn');
}
});
}
}
31 changes: 13 additions & 18 deletions src/services/unsuspend-user.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,26 @@ import { deliver } from '../queue';
import config from '../config';
import { User } from '../models/entities/user';
import { Users, Followings } from '../models';
import { Not, IsNull } from 'typeorm';
import { Not, IsNull, Brackets } from 'typeorm';

export async function doPostUnsuspend(user: User) {
if (Users.isLocalUser(user)) {
// 知り得る全SharedInboxにUndo Delete配信
const content = renderActivity(renderUndo(renderDelete(`${config.url}/users/${user.id}`, user), user));

const queue: string[] = [];
const query = Followings.createQueryBuilder('following')
.select('distinct coalesce(following.followerSharedInbox, following.followeeSharedInbox) as inbox')
.where(new Brackets((qb) =>
qb.where({ followerHost: Not(IsNull()) })
.orWhere({ followeeHost: Not(IsNull()) })
))
.andWhere(new Brackets((qb) =>
qb.where({ followerSharedInbox: Not(IsNull()) })
.orWhere({ followeeSharedInbox: Not(IsNull()) })
));

const followings = await Followings.find({
where: [
{ followerSharedInbox: Not(IsNull()) },
{ followeeSharedInbox: Not(IsNull()) }
],
select: ['followerSharedInbox', 'followeeSharedInbox']
});

const inboxes = followings.map(x => x.followerSharedInbox || x.followeeSharedInbox);

for (const inbox of inboxes) {
if (inbox != null && !queue.includes(inbox)) queue.push(inbox);
}

for (const inbox of queue) {
deliver(user as any, content, inbox);
for (const row of await query.getRawMany()) {
mei23 marked this conversation as resolved.
Show resolved Hide resolved
deliver(user as any, content, row.inbox);
}
}
}
Loading