From 8050352ad88798be222f735a3217367acaee277f Mon Sep 17 00:00:00 2001
From: syuilo <Syuilotan@yahoo.co.jp>
Date: Sun, 21 Mar 2021 15:14:03 +0900
Subject: [PATCH] =?UTF-8?q?perf:=20=E5=90=84=E3=82=B9=E3=83=88=E3=83=AA?=
 =?UTF-8?q?=E3=83=BC=E3=83=9F=E3=83=B3=E3=82=B0=E6=8E=A5=E7=B6=9A=E3=81=94?=
 =?UTF-8?q?=E3=81=A8=E3=81=AB=E3=83=9D=E3=83=BC=E3=83=AA=E3=83=B3=E3=82=B0?=
 =?UTF-8?q?=E3=81=97=E3=81=AA=E3=81=84=E3=82=88=E3=81=86=E3=81=AB?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 src/server/api/endpoints/channels/follow.ts   |  3 +
 src/server/api/endpoints/channels/unfollow.ts |  3 +
 src/server/api/endpoints/i/update.ts          |  3 +-
 src/server/api/endpoints/mute/create.ts       |  3 +
 src/server/api/endpoints/mute/delete.ts       |  3 +
 src/server/api/stream/index.ts                | 57 +++++++++++++------
 src/services/blocking/create.ts               | 12 +++-
 src/services/following/create.ts              |  7 ++-
 src/services/following/delete.ts              |  7 ++-
 src/services/following/requests/reject.ts     |  7 ++-
 src/services/stream.ts                        |  5 ++
 11 files changed, 83 insertions(+), 27 deletions(-)

diff --git a/src/server/api/endpoints/channels/follow.ts b/src/server/api/endpoints/channels/follow.ts
index bf2f2bbb57..11c6e37ff7 100644
--- a/src/server/api/endpoints/channels/follow.ts
+++ b/src/server/api/endpoints/channels/follow.ts
@@ -4,6 +4,7 @@ import define from '../../define';
 import { ApiError } from '../../error';
 import { Channels, ChannelFollowings } from '../../../../models';
 import { genId } from '../../../../misc/gen-id';
+import { publishUserEvent } from '../../../../services/stream';
 
 export const meta = {
 	tags: ['channels'],
@@ -42,4 +43,6 @@ export default define(meta, async (ps, user) => {
 		followerId: user.id,
 		followeeId: channel.id,
 	});
+
+	publishUserEvent(user.id, 'followChannel', channel);
 });
diff --git a/src/server/api/endpoints/channels/unfollow.ts b/src/server/api/endpoints/channels/unfollow.ts
index 8cab5c36a6..3eb0f1519b 100644
--- a/src/server/api/endpoints/channels/unfollow.ts
+++ b/src/server/api/endpoints/channels/unfollow.ts
@@ -3,6 +3,7 @@ import { ID } from '../../../../misc/cafy-id';
 import define from '../../define';
 import { ApiError } from '../../error';
 import { Channels, ChannelFollowings } from '../../../../models';
+import { publishUserEvent } from '../../../../services/stream';
 
 export const meta = {
 	tags: ['channels'],
@@ -39,4 +40,6 @@ export default define(meta, async (ps, user) => {
 		followerId: user.id,
 		followeeId: channel.id,
 	});
+
+	publishUserEvent(user.id, 'unfollowChannel', channel);
 });
diff --git a/src/server/api/endpoints/i/update.ts b/src/server/api/endpoints/i/update.ts
index a1faf8f1c2..92be2e9e6d 100644
--- a/src/server/api/endpoints/i/update.ts
+++ b/src/server/api/endpoints/i/update.ts
@@ -1,6 +1,6 @@
 import $ from 'cafy';
 import { ID } from '../../../../misc/cafy-id';
-import { publishMainStream } from '../../../../services/stream';
+import { publishMainStream, publishUserEvent } from '../../../../services/stream';
 import acceptAllFollowRequests from '../../../../services/following/requests/accept-all';
 import { publishToFollowers } from '../../../../services/i/update';
 import define from '../../define';
@@ -317,6 +317,7 @@ export default define(meta, async (ps, user, token) => {
 
 	// Publish meUpdated event
 	publishMainStream(user.id, 'meUpdated', iObj);
+	publishUserEvent(user.id, 'updateUserProfile', await UserProfiles.findOne(user.id));
 
 	// 鍵垢を解除したとき、溜まっていたフォローリクエストがあるならすべて承認
 	if (user.isLocked && ps.isLocked === false) {
diff --git a/src/server/api/endpoints/mute/create.ts b/src/server/api/endpoints/mute/create.ts
index 437ad96107..ebfc6028ed 100644
--- a/src/server/api/endpoints/mute/create.ts
+++ b/src/server/api/endpoints/mute/create.ts
@@ -6,6 +6,7 @@ import { getUser } from '../../common/getters';
 import { genId } from '../../../../misc/gen-id';
 import { Mutings, NoteWatchings } from '../../../../models';
 import { Muting } from '../../../../models/entities/muting';
+import { publishUserEvent } from '../../../../services/stream';
 
 export const meta = {
 	desc: {
@@ -82,6 +83,8 @@ export default define(meta, async (ps, user) => {
 		muteeId: mutee.id,
 	} as Muting);
 
+	publishUserEvent(user.id, 'mute', mutee);
+
 	NoteWatchings.delete({
 		userId: muter.id,
 		noteUserId: mutee.id
diff --git a/src/server/api/endpoints/mute/delete.ts b/src/server/api/endpoints/mute/delete.ts
index 217352acb4..67a59e3ae4 100644
--- a/src/server/api/endpoints/mute/delete.ts
+++ b/src/server/api/endpoints/mute/delete.ts
@@ -4,6 +4,7 @@ import define from '../../define';
 import { ApiError } from '../../error';
 import { getUser } from '../../common/getters';
 import { Mutings } from '../../../../models';
+import { publishUserEvent } from '../../../../services/stream';
 
 export const meta = {
 	desc: {
@@ -76,4 +77,6 @@ export default define(meta, async (ps, user) => {
 	await Mutings.delete({
 		id: exist.id
 	});
+
+	publishUserEvent(user.id, 'unmute', mutee);
 });
diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts
index a94923484d..748e894f83 100644
--- a/src/server/api/stream/index.ts
+++ b/src/server/api/stream/index.ts
@@ -30,10 +30,6 @@ export default class Connection {
 	public subscriber: EventEmitter;
 	private channels: Channel[] = [];
 	private subscribingNotes: any = {};
-	private followingClock: ReturnType<typeof setInterval>;
-	private mutingClock: ReturnType<typeof setInterval>;
-	private followingChannelsClock: ReturnType<typeof setInterval>;
-	private userProfileClock: ReturnType<typeof setInterval>;
 
 	constructor(
 		wsConnection: websocket.connection,
@@ -52,19 +48,51 @@ export default class Connection {
 			this.onBroadcastMessage(type, body);
 		});
 
-		// TODO: reidsでイベントをもらうようにし、ポーリングはしないようにする
 		if (this.user) {
 			this.updateFollowing();
-			this.followingClock = setInterval(this.updateFollowing, 5000);
-
 			this.updateMuting();
-			this.mutingClock = setInterval(this.updateMuting, 5000);
-
 			this.updateFollowingChannels();
-			this.followingChannelsClock = setInterval(this.updateFollowingChannels, 5000);
-
 			this.updateUserProfile();
-			this.userProfileClock = setInterval(this.updateUserProfile, 5000);
+
+			this.subscriber.on(`user:${this.user.id}`, ({ type, body }) => {
+				this.onUserEvent(type, body);
+			});
+		}
+	}
+
+	@autobind
+	private onUserEvent(type: string, body: any) {
+		switch (type) {
+			case 'follow':
+				this.following.add(body.id);
+				break;
+
+			case 'unfollow':
+				this.following.delete(body.id);
+				break;
+
+			case 'mute':
+				this.muting.add(body.id);
+				break;
+
+			case 'unmute':
+				this.muting.delete(body.id);
+				break;
+
+			case 'followChannel':
+				this.followingChannels.add(body.id);
+				break;
+
+			case 'unfollowChannel':
+				this.followingChannels.delete(body.id);
+				break;
+
+			case 'updateUserProfile':
+				this.userProfile = body;
+				break;
+
+			default:
+				break;
 		}
 	}
 
@@ -354,10 +382,5 @@ export default class Connection {
 		for (const c of this.channels.filter(c => c.dispose)) {
 			if (c.dispose) c.dispose();
 		}
-
-		if (this.followingClock) clearInterval(this.followingClock);
-		if (this.mutingClock) clearInterval(this.mutingClock);
-		if (this.followingChannelsClock) clearInterval(this.followingChannelsClock);
-		if (this.userProfileClock) clearInterval(this.userProfileClock);
 	}
 }
diff --git a/src/services/blocking/create.ts b/src/services/blocking/create.ts
index def4f33585..4f0238db91 100644
--- a/src/services/blocking/create.ts
+++ b/src/services/blocking/create.ts
@@ -1,4 +1,4 @@
-import { publishMainStream } from '../stream';
+import { publishMainStream, publishUserEvent } from '../stream';
 import { renderActivity } from '../../remote/activitypub/renderer';
 import renderFollow from '../../remote/activitypub/renderer/follow';
 import renderUndo from '../../remote/activitypub/renderer/undo';
@@ -55,7 +55,10 @@ async function cancelRequest(follower: User, followee: User) {
 	if (Users.isLocalUser(follower)) {
 		Users.pack(followee, follower, {
 			detail: true
-		}).then(packed => publishMainStream(follower.id, 'unfollow', packed));
+		}).then(packed => {
+			publishUserEvent(follower.id, 'unfollow', packed);
+			publishMainStream(follower.id, 'unfollow', packed);
+		});
 	}
 
 	// リモートにフォローリクエストをしていたらUndoFollow送信
@@ -97,7 +100,10 @@ async function unFollow(follower: User, followee: User) {
 	if (Users.isLocalUser(follower)) {
 		Users.pack(followee, follower, {
 			detail: true
-		}).then(packed => publishMainStream(follower.id, 'unfollow', packed));
+		}).then(packed => {
+			publishUserEvent(follower.id, 'unfollow', packed);
+			publishMainStream(follower.id, 'unfollow', packed);
+		});
 	}
 
 	// リモートにフォローをしていたらUndoFollow送信
diff --git a/src/services/following/create.ts b/src/services/following/create.ts
index 6bc98aee87..eb6699b0bf 100644
--- a/src/services/following/create.ts
+++ b/src/services/following/create.ts
@@ -1,4 +1,4 @@
-import { publishMainStream } from '../stream';
+import { publishMainStream, publishUserEvent } from '../stream';
 import { renderActivity } from '../../remote/activitypub/renderer';
 import renderFollow from '../../remote/activitypub/renderer/follow';
 import renderAccept from '../../remote/activitypub/renderer/accept';
@@ -88,7 +88,10 @@ export async function insertFollowingDoc(followee: User, follower: User) {
 	if (Users.isLocalUser(follower)) {
 		Users.pack(followee, follower, {
 			detail: true
-		}).then(packed => publishMainStream(follower.id, 'follow', packed));
+		}).then(packed => {
+			publishUserEvent(follower.id, 'follow', packed);
+			publishMainStream(follower.id, 'follow', packed);
+		});
 	}
 
 	// Publish followed event
diff --git a/src/services/following/delete.ts b/src/services/following/delete.ts
index 8821611515..32c47ea7f4 100644
--- a/src/services/following/delete.ts
+++ b/src/services/following/delete.ts
@@ -1,4 +1,4 @@
-import { publishMainStream } from '../stream';
+import { publishMainStream, publishUserEvent } from '../stream';
 import { renderActivity } from '../../remote/activitypub/renderer';
 import renderFollow from '../../remote/activitypub/renderer/follow';
 import renderUndo from '../../remote/activitypub/renderer/undo';
@@ -30,7 +30,10 @@ export default async function(follower: User, followee: User, silent = false) {
 	if (!silent && Users.isLocalUser(follower)) {
 		Users.pack(followee, follower, {
 			detail: true
-		}).then(packed => publishMainStream(follower.id, 'unfollow', packed));
+		}).then(packed => {
+			publishUserEvent(follower.id, 'unfollow', packed);
+			publishMainStream(follower.id, 'unfollow', packed);
+		});
 	}
 
 	if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) {
diff --git a/src/services/following/requests/reject.ts b/src/services/following/requests/reject.ts
index 9a8b14bbfd..d8d3788088 100644
--- a/src/services/following/requests/reject.ts
+++ b/src/services/following/requests/reject.ts
@@ -2,7 +2,7 @@ import { renderActivity } from '../../../remote/activitypub/renderer';
 import renderFollow from '../../../remote/activitypub/renderer/follow';
 import renderReject from '../../../remote/activitypub/renderer/reject';
 import { deliver } from '../../../queue';
-import { publishMainStream } from '../../stream';
+import { publishMainStream, publishUserEvent } from '../../stream';
 import { User, ILocalUser } from '../../../models/entities/user';
 import { Users, FollowRequests, Followings } from '../../../models';
 import { decrementFollowing } from '../delete';
@@ -39,5 +39,8 @@ export default async function(followee: User, follower: User) {
 
 	Users.pack(followee, follower, {
 		detail: true
-	}).then(packed => publishMainStream(follower.id, 'unfollow', packed));
+	}).then(packed => {
+		publishUserEvent(follower.id, 'unfollow', packed);
+		publishMainStream(follower.id, 'unfollow', packed);
+	});
 }
diff --git a/src/services/stream.ts b/src/services/stream.ts
index d833d700fe..75385847ce 100644
--- a/src/services/stream.ts
+++ b/src/services/stream.ts
@@ -20,6 +20,10 @@ class Publisher {
 		}));
 	}
 
+	public publishUserEvent = (userId: User['id'], type: string, value?: any): void => {
+		this.publish(`user:${userId}`, type, typeof value === 'undefined' ? null : value);
+	}
+
 	public publishBroadcastStream = (type: string, value?: any): void => {
 		this.publish('broadcast', type, typeof value === 'undefined' ? null : value);
 	}
@@ -84,6 +88,7 @@ const publisher = new Publisher();
 
 export default publisher;
 
+export const publishUserEvent = publisher.publishUserEvent;
 export const publishBroadcastStream = publisher.publishBroadcastStream;
 export const publishMainStream = publisher.publishMainStream;
 export const publishDriveStream = publisher.publishDriveStream;
-- 
GitLab