From 18655386f3013512ae543e6cf161dcf471fa6a68 Mon Sep 17 00:00:00 2001
From: Hazelnoot <acomputerdog@gmail.com>
Date: Thu, 27 Mar 2025 10:55:22 -0400
Subject: [PATCH 01/12] convert streaming rate limit to bucket

---
 .../server/api/StreamingApiServerService.ts   | 35 ++++++++-----------
 1 file changed, 14 insertions(+), 21 deletions(-)

diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts
index 0954744f81..1c2569bf8d 100644
--- a/packages/backend/src/server/api/StreamingApiServerService.ts
+++ b/packages/backend/src/server/api/StreamingApiServerService.ts
@@ -10,7 +10,9 @@ import * as WebSocket from 'ws';
 import proxyAddr from 'proxy-addr';
 import ms from 'ms';
 import { DI } from '@/di-symbols.js';
-import type { UsersRepository, MiAccessToken } from '@/models/_.js';
+import type { UsersRepository, MiAccessToken, MiUser } from '@/models/_.js';
+import type { Config } from '@/config.js';
+import type { Keyed, RateLimit } from '@/misc/rate-limit-utils.js';
 import { NoteReadService } from '@/core/NoteReadService.js';
 import { NotificationService } from '@/core/NotificationService.js';
 import { bindThis } from '@/decorators.js';
@@ -25,8 +27,6 @@ import { AuthenticateService, AuthenticationError } from './AuthenticateService.
 import MainStreamConnection from './stream/Connection.js';
 import { ChannelsService } from './stream/ChannelsService.js';
 import type * as http from 'node:http';
-import type { IEndpointMeta } from './endpoints.js';
-import type { Config } from "@/config.js";
 
 @Injectable()
 export class StreamingApiServerService {
@@ -58,17 +58,9 @@ export class StreamingApiServerService {
 
 	@bindThis
 	private async rateLimitThis(
-		user: MiLocalUser | null | undefined,
-		requestIp: string,
-		limit: IEndpointMeta['limit'] & { key: NonNullable<string> },
+		limitActor: MiUser | string,
+		limit: Keyed<RateLimit>,
 	) : Promise<boolean> {
-		let limitActor: string | MiLocalUser;
-		if (user) {
-			limitActor = user;
-		} else {
-			limitActor = getIpHash(requestIp);
-		}
-
 		// Rate limit
 		const rateLimit = await this.rateLimiterService.limit(limit, limitActor);
 		return rateLimit.blocked;
@@ -93,7 +85,8 @@ export class StreamingApiServerService {
 			// so we do the same
 			const requestIp = proxyAddr(request, () => { return true; } );
 
-			if (await this.rateLimitThis(null, requestIp, {
+			const limitActor = getIpHash(requestIp);
+			if (await this.rateLimitThis(limitActor, {
 				key: 'wsconnect',
 				duration: ms('5min'),
 				max: 32,
@@ -141,14 +134,14 @@ export class StreamingApiServerService {
 			}
 
 			const rateLimiter = () => {
-				// rather high limit, because when catching up at the top of a
-				// timeline, the frontend may render many many notes, each of
-				// which causes a message via `useNoteCapture` to ask for
-				// realtime updates of that note
-				return this.rateLimitThis(user, requestIp, {
+				const limitActor = user ?? getIpHash(requestIp);
+
+				// Rather high limit because when catching up at the top of a timeline, the frontend may render many many notes.
+				// Each of which causes a message via `useNoteCapture` to ask for realtime updates of that note.
+				return this.rateLimitThis(limitActor, {
 					key: 'wsmessage',
-					duration: ms('2sec'),
-					max: 4096,
+					max: 4096, // Allow spikes of up to 4096
+					dripRate: 50, // Then once every 50ms (20/second rate)
 				});
 			};
 
-- 
GitLab


From bf1c9b67d63e1d9263be3f6e6d7184606c992696 Mon Sep 17 00:00:00 2001
From: Hazelnoot <acomputerdog@gmail.com>
Date: Thu, 27 Mar 2025 10:55:46 -0400
Subject: [PATCH 02/12] close websocket when rate limit exceeded

---
 .../src/server/api/stream/Connection.ts       | 31 +++++--------------
 1 file changed, 8 insertions(+), 23 deletions(-)

diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts
index e98e2a2f3f..9ca209f08b 100644
--- a/packages/backend/src/server/api/stream/Connection.ts
+++ b/packages/backend/src/server/api/stream/Connection.ts
@@ -31,7 +31,6 @@ const MAX_CHANNELS_PER_CONNECTION = 32;
 export default class Connection {
 	public user?: MiUser;
 	public token?: MiAccessToken;
-	private rateLimiter?: () => Promise<boolean>;
 	private wsConnection: WebSocket.WebSocket;
 	public subscriber: StreamEventEmitter;
 	private channels: Channel[] = [];
@@ -45,7 +44,6 @@ export default class Connection {
 	public userIdsWhoMeMutingRenotes: Set<string> = new Set();
 	public userMutedInstances: Set<string> = new Set();
 	private fetchIntervalId: NodeJS.Timeout | null = null;
-	private activeRateLimitRequests = 0;
 	private closingConnection = false;
 	private logger: Logger;
 
@@ -60,11 +58,10 @@ export default class Connection {
 		user: MiUser | null | undefined,
 		token: MiAccessToken | null | undefined,
 		private ip: string,
-		rateLimiter: () => Promise<boolean>,
+		private readonly rateLimiter: () => Promise<boolean>,
 	) {
 		if (user) this.user = user;
 		if (token) this.token = token;
-		if (rateLimiter) this.rateLimiter = rateLimiter;
 
 		this.logger = loggerService.getLogger('streaming', 'coral');
 	}
@@ -121,25 +118,13 @@ export default class Connection {
 
 		if (this.closingConnection) return;
 
-		if (this.rateLimiter) {
-			// this 4096 should match the `max` of the `rateLimiter`, see
-			// StreamingApiServerService
-			if (this.activeRateLimitRequests <= 4096) {
-				this.activeRateLimitRequests++;
-				const shouldRateLimit = await this.rateLimiter();
-				this.activeRateLimitRequests--;
-
-				if (shouldRateLimit) return;
-				if (this.closingConnection) return;
-			} else {
-				let connectionInfo = `IP ${this.ip}`;
-				if (this.user) connectionInfo += `, user ID ${this.user.id}`;
-
-				this.logger.warn(`Closing a connection (${connectionInfo}) due to an excessive influx of messages.`);
-				this.closingConnection = true;
-				this.wsConnection.close(1008, 'Please stop spamming the streaming API.');
-				return;
-			}
+		// The rate limit is very high, so we can safely disconnect any client that hits it.
+		if (await this.rateLimiter()) {
+			this.logger.warn(`Closing a connection from ${this.ip} (user=${this.user?.id}}) due to an excessive influx of messages.`);
+
+			this.closingConnection = true;
+			this.wsConnection.close(1008, 'Disconnected - too many requests');
+			return;
 		}
 
 		try {
-- 
GitLab


From 831329499dd6b29638a6390a860cded30c448215 Mon Sep 17 00:00:00 2001
From: Hazelnoot <acomputerdog@gmail.com>
Date: Thu, 27 Mar 2025 11:07:26 -0400
Subject: [PATCH 03/12] limit the number of note subscriptions per connection

---
 .../src/server/api/stream/Connection.ts       | 25 ++++++++++++++-----
 1 file changed, 19 insertions(+), 6 deletions(-)

diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts
index 9ca209f08b..36e5769216 100644
--- a/packages/backend/src/server/api/stream/Connection.ts
+++ b/packages/backend/src/server/api/stream/Connection.ts
@@ -23,6 +23,7 @@ import type { EventEmitter } from 'events';
 import type Channel from './channel.js';
 
 const MAX_CHANNELS_PER_CONNECTION = 32;
+const MAX_SUBSCRIPTIONS_PER_CONNECTION = 256;
 
 /**
  * Main stream connection
@@ -34,7 +35,7 @@ export default class Connection {
 	private wsConnection: WebSocket.WebSocket;
 	public subscriber: StreamEventEmitter;
 	private channels: Channel[] = [];
-	private subscribingNotes: Partial<Record<string, number>> = {};
+	private subscribingNotes = new Map<string, number>();
 	private cachedNotes: Packed<'Note'>[] = [];
 	public userProfile: MiUserProfile | null = null;
 	public following: Record<string, Pick<MiFollowing, 'withReplies'> | undefined> = {};
@@ -200,9 +201,21 @@ export default class Connection {
 		if (!isJsonObject(payload)) return;
 		if (!payload.id || typeof payload.id !== 'string') return;
 
-		const current = this.subscribingNotes[payload.id] ?? 0;
+		const current = this.subscribingNotes.get(payload.id) ?? 0;
+
+		// Limit the number of distinct notes that can be subscribed to.
+		// If current is-zero, then this is a new note and we need to check the limit
+		if (current === 0 && this.subscribingNotes.size >= MAX_SUBSCRIPTIONS_PER_CONNECTION) {
+			// Map maintains insertion order, so first key is always the oldest
+			// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+			const oldestKey = this.subscribingNotes.keys().next().value!;
+
+			this.subscribingNotes.delete(oldestKey);
+			this.subscriber.off(`noteStream:${oldestKey}`, this.onNoteStreamMessage);
+		}
+
 		const updated = current + 1;
-		this.subscribingNotes[payload.id] = updated;
+		this.subscribingNotes.set(payload.id, updated);
 
 		if (updated === 1) {
 			this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage);
@@ -217,12 +230,12 @@ export default class Connection {
 		if (!isJsonObject(payload)) return;
 		if (!payload.id || typeof payload.id !== 'string') return;
 
-		const current = this.subscribingNotes[payload.id];
+		const current = this.subscribingNotes.get(payload.id);
 		if (current == null) return;
 		const updated = current - 1;
-		this.subscribingNotes[payload.id] = updated;
+		this.subscribingNotes.set(payload.id, updated);
 		if (updated <= 0) {
-			delete this.subscribingNotes[payload.id];
+			this.subscribingNotes.delete(payload.id);
 			this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage);
 		}
 	}
-- 
GitLab


From b8fd9d0bc04656d30628e5f7a3c7c74632014154 Mon Sep 17 00:00:00 2001
From: Hazelnoot <acomputerdog@gmail.com>
Date: Thu, 27 Mar 2025 12:17:23 -0400
Subject: [PATCH 04/12] clear subscriptions when connection closes

---
 packages/backend/src/server/api/stream/Connection.ts | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts
index 36e5769216..691ce54feb 100644
--- a/packages/backend/src/server/api/stream/Connection.ts
+++ b/packages/backend/src/server/api/stream/Connection.ts
@@ -373,5 +373,12 @@ export default class Connection {
 		for (const c of this.channels.filter(c => c.dispose)) {
 			if (c.dispose) c.dispose();
 		}
+		for (const k of this.subscribingNotes.keys()) {
+			this.subscriber.off(`noteStream:${k}`, this.onNoteStreamMessage);
+		}
+
+		this.fetchIntervalId = null;
+		this.channels = [];
+		this.subscribingNotes.clear();
 	}
 }
-- 
GitLab


From 045ff5d2c0037fe0770d619b9f7a21d79a9b668d Mon Sep 17 00:00:00 2001
From: Hazelnoot <acomputerdog@gmail.com>
Date: Thu, 27 Mar 2025 12:23:14 -0400
Subject: [PATCH 05/12] make sure that note subscriptions can't stay above
 limit

---
 packages/backend/src/server/api/stream/Connection.ts | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts
index 691ce54feb..96b968d890 100644
--- a/packages/backend/src/server/api/stream/Connection.ts
+++ b/packages/backend/src/server/api/stream/Connection.ts
@@ -202,10 +202,11 @@ export default class Connection {
 		if (!payload.id || typeof payload.id !== 'string') return;
 
 		const current = this.subscribingNotes.get(payload.id) ?? 0;
+		const updated = current + 1;
+		this.subscribingNotes.set(payload.id, updated);
 
 		// Limit the number of distinct notes that can be subscribed to.
-		// If current is-zero, then this is a new note and we need to check the limit
-		if (current === 0 && this.subscribingNotes.size >= MAX_SUBSCRIPTIONS_PER_CONNECTION) {
+		while (this.subscribingNotes.size > MAX_SUBSCRIPTIONS_PER_CONNECTION) {
 			// Map maintains insertion order, so first key is always the oldest
 			// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
 			const oldestKey = this.subscribingNotes.keys().next().value!;
@@ -214,9 +215,6 @@ export default class Connection {
 			this.subscriber.off(`noteStream:${oldestKey}`, this.onNoteStreamMessage);
 		}
 
-		const updated = current + 1;
-		this.subscribingNotes.set(payload.id, updated);
-
 		if (updated === 1) {
 			this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage);
 		}
-- 
GitLab


From 14a7309cfbc35cb8e448ef8ee6e8e3cd13b62013 Mon Sep 17 00:00:00 2001
From: Hazelnoot <acomputerdog@gmail.com>
Date: Thu, 27 Mar 2025 12:28:42 -0400
Subject: [PATCH 06/12] avoid leaking cached notes in WS connection

---
 .../src/server/api/stream/Connection.ts       | 22 +++++++++----------
 1 file changed, 11 insertions(+), 11 deletions(-)

diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts
index 96b968d890..7cc6157999 100644
--- a/packages/backend/src/server/api/stream/Connection.ts
+++ b/packages/backend/src/server/api/stream/Connection.ts
@@ -24,6 +24,7 @@ import type Channel from './channel.js';
 
 const MAX_CHANNELS_PER_CONNECTION = 32;
 const MAX_SUBSCRIPTIONS_PER_CONNECTION = 256;
+const MAX_CACHED_NOTES_PER_CONNECTION = 32;
 
 /**
  * Main stream connection
@@ -36,7 +37,7 @@ export default class Connection {
 	public subscriber: StreamEventEmitter;
 	private channels: Channel[] = [];
 	private subscribingNotes = new Map<string, number>();
-	private cachedNotes: Packed<'Note'>[] = [];
+	private cachedNotes = new Map<string, Packed<'Note'>>();
 	public userProfile: MiUserProfile | null = null;
 	public following: Record<string, Pick<MiFollowing, 'withReplies'> | undefined> = {};
 	public followingChannels: Set<string> = new Set();
@@ -158,15 +159,13 @@ export default class Connection {
 	@bindThis
 	public cacheNote(note: Packed<'Note'>) {
 		const add = (note: Packed<'Note'>) => {
-			const existIndex = this.cachedNotes.findIndex(n => n.id === note.id);
-			if (existIndex > -1) {
-				this.cachedNotes[existIndex] = note;
-				return;
-			}
+			this.cachedNotes.set(note.id, note);
 
-			this.cachedNotes.unshift(note);
-			if (this.cachedNotes.length > 32) {
-				this.cachedNotes.splice(32);
+			while (this.cachedNotes.size > MAX_CACHED_NOTES_PER_CONNECTION) {
+				// Map maintains insertion order, so first key is always the oldest
+				// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+				const oldestKey = this.cachedNotes.keys().next().value!;
+				this.cachedNotes.delete(oldestKey);
 			}
 		};
 
@@ -178,9 +177,9 @@ export default class Connection {
 	@bindThis
 	private readNote(body: JsonValue | undefined) {
 		if (!isJsonObject(body)) return;
-		const id = body.id;
+		const id = body.id as string;
 
-		const note = this.cachedNotes.find(n => n.id === id);
+		const note = this.cachedNotes.get(id);
 		if (note == null) return;
 
 		if (this.user && (note.userId !== this.user.id)) {
@@ -378,5 +377,6 @@ export default class Connection {
 		this.fetchIntervalId = null;
 		this.channels = [];
 		this.subscribingNotes.clear();
+		this.cachedNotes.clear();
 	}
 }
-- 
GitLab


From eff73218604089148a3382c1c8530afc652880a4 Mon Sep 17 00:00:00 2001
From: Hazelnoot <acomputerdog@gmail.com>
Date: Thu, 27 Mar 2025 12:33:38 -0400
Subject: [PATCH 07/12] avoid duplicate channels in WS connection

---
 .../src/server/api/stream/Connection.ts       | 28 ++++++++++++-------
 1 file changed, 18 insertions(+), 10 deletions(-)

diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts
index 7cc6157999..4617fba9d1 100644
--- a/packages/backend/src/server/api/stream/Connection.ts
+++ b/packages/backend/src/server/api/stream/Connection.ts
@@ -35,7 +35,7 @@ export default class Connection {
 	public token?: MiAccessToken;
 	private wsConnection: WebSocket.WebSocket;
 	public subscriber: StreamEventEmitter;
-	private channels: Channel[] = [];
+	private channels = new Map<string, Channel>();
 	private subscribingNotes = new Map<string, number>();
 	private cachedNotes = new Map<string, Packed<'Note'>>();
 	public userProfile: MiUserProfile | null = null;
@@ -299,7 +299,11 @@ export default class Connection {
 	 */
 	@bindThis
 	public connectChannel(id: string, params: JsonObject | undefined, channel: string, pong = false) {
-		if (this.channels.length >= MAX_CHANNELS_PER_CONNECTION) {
+		if (this.channels.has(id)) {
+			this.disconnectChannel(id);
+		}
+
+		if (this.channels.size >= MAX_CHANNELS_PER_CONNECTION) {
 			return;
 		}
 
@@ -315,12 +319,16 @@ export default class Connection {
 		}
 
 		// 共有可能チャンネルに接続しようとしていて、かつそのチャンネルに既に接続していたら無意味なので無視
-		if (channelService.shouldShare && this.channels.some(c => c.chName === channel)) {
-			return;
+		if (channelService.shouldShare) {
+			for (const c of this.channels.values()) {
+				if (c.chName === channel) {
+					return;
+				}
+			}
 		}
 
 		const ch: Channel = channelService.create(id, this);
-		this.channels.push(ch);
+		this.channels.set(ch.id, ch);
 		ch.init(params ?? {});
 
 		if (pong) {
@@ -336,11 +344,11 @@ export default class Connection {
 	 */
 	@bindThis
 	public disconnectChannel(id: string) {
-		const channel = this.channels.find(c => c.id === id);
+		const channel = this.channels.get(id);
 
 		if (channel) {
 			if (channel.dispose) channel.dispose();
-			this.channels = this.channels.filter(c => c.id !== id);
+			this.channels.delete(id);
 		}
 	}
 
@@ -355,7 +363,7 @@ export default class Connection {
 		if (typeof data.type !== 'string') return;
 		if (typeof data.body === 'undefined') return;
 
-		const channel = this.channels.find(c => c.id === data.id);
+		const channel = this.channels.get(data.id);
 		if (channel != null && channel.onMessage != null) {
 			channel.onMessage(data.type, data.body);
 		}
@@ -367,7 +375,7 @@ export default class Connection {
 	@bindThis
 	public dispose() {
 		if (this.fetchIntervalId) clearInterval(this.fetchIntervalId);
-		for (const c of this.channels.filter(c => c.dispose)) {
+		for (const c of this.channels.values()) {
 			if (c.dispose) c.dispose();
 		}
 		for (const k of this.subscribingNotes.keys()) {
@@ -375,7 +383,7 @@ export default class Connection {
 		}
 
 		this.fetchIntervalId = null;
-		this.channels = [];
+		this.channels.clear();
 		this.subscribingNotes.clear();
 		this.cachedNotes.clear();
 	}
-- 
GitLab


From c41d617e6364d34021ea10f7ee9bc081b6d3a244 Mon Sep 17 00:00:00 2001
From: Hazelnoot <acomputerdog@gmail.com>
Date: Thu, 27 Mar 2025 19:33:32 -0400
Subject: [PATCH 08/12] limit the number of active connections per client, and
 limit upgrade requests by user

---
 .../server/api/StreamingApiServerService.ts   | 70 ++++++++++++++-----
 1 file changed, 52 insertions(+), 18 deletions(-)

diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts
index 1c2569bf8d..c7db4549d3 100644
--- a/packages/backend/src/server/api/StreamingApiServerService.ts
+++ b/packages/backend/src/server/api/StreamingApiServerService.ts
@@ -28,10 +28,15 @@ import MainStreamConnection from './stream/Connection.js';
 import { ChannelsService } from './stream/ChannelsService.js';
 import type * as http from 'node:http';
 
+// Maximum number of simultaneous connections by client (user ID or IP address).
+// Excess connections will be closed automatically.
+const MAX_CONNECTIONS_PER_CLIENT = 32;
+
 @Injectable()
 export class StreamingApiServerService {
 	#wss: WebSocket.WebSocketServer;
 	#connections = new Map<WebSocket.WebSocket, number>();
+	#connectionsByClient = new Map<string, Set<WebSocket.WebSocket>>(); // key: IP / user ID -> value: connection
 	#cleanConnectionsIntervalId: NodeJS.Timeout | null = null;
 
 	constructor(
@@ -80,22 +85,6 @@ export class StreamingApiServerService {
 				return;
 			}
 
-			// ServerServices sets `trustProxy: true`, which inside
-			// fastify/request.js ends up calling `proxyAddr` in this way,
-			// so we do the same
-			const requestIp = proxyAddr(request, () => { return true; } );
-
-			const limitActor = getIpHash(requestIp);
-			if (await this.rateLimitThis(limitActor, {
-				key: 'wsconnect',
-				duration: ms('5min'),
-				max: 32,
-			})) {
-				socket.write('HTTP/1.1 429 Rate Limit Exceeded\r\n\r\n');
-				socket.destroy();
-				return;
-			}
-
 			const q = new URL(request.url, `http://${request.headers.host}`).searchParams;
 
 			let user: MiLocalUser | null = null;
@@ -133,9 +122,41 @@ export class StreamingApiServerService {
 				return;
 			}
 
-			const rateLimiter = () => {
-				const limitActor = user ?? getIpHash(requestIp);
+			// ServerServices sets `trustProxy: true`, which inside fastify/request.js ends up calling `proxyAddr` in this way, so we do the same.
+			const requestIp = proxyAddr(request, () => true );
+			const limitActor = user?.id ?? getIpHash(requestIp);
+			if (await this.rateLimitThis(limitActor, {
+				key: 'wsconnect',
+				duration: ms('5min'),
+				max: 32,
+			})) {
+				socket.write('HTTP/1.1 429 Rate Limit Exceeded\r\n\r\n');
+				socket.destroy();
+				return;
+			}
+
+			// For performance and code simplicity, obtain and hold this reference for the lifetime of the connection.
+			// This should be safe because the map entry should only be deleted after *all* connections close.
+			let connectionsForClient = this.#connectionsByClient.get(limitActor);
+			if (!connectionsForClient) {
+				connectionsForClient = new Set();
+				this.#connectionsByClient.set(limitActor, connectionsForClient);
+			}
 
+			// Close excess connections
+			while (connectionsForClient.size >= MAX_CONNECTIONS_PER_CLIENT) {
+				// Set maintains insertion order, so first entry is the oldest.
+				// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+				const oldestConnection = connectionsForClient.values().next().value!;
+
+				// Technically, the close() handler should remove this entry.
+				// But if that ever fails, then we could enter an infinite loop.
+				// We manually remove the connection here just in case.
+				oldestConnection.close(1008, 'Disconnected - too many simultaneous connections');
+				connectionsForClient.delete(oldestConnection);
+			}
+
+			const rateLimiter = () => {
 				// Rather high limit because when catching up at the top of a timeline, the frontend may render many many notes.
 				// Each of which causes a message via `useNoteCapture` to ask for realtime updates of that note.
 				return this.rateLimitThis(limitActor, {
@@ -159,6 +180,19 @@ export class StreamingApiServerService {
 			await stream.init();
 
 			this.#wss.handleUpgrade(request, socket, head, (ws) => {
+				connectionsForClient.add(ws);
+
+				// Call before emit() in case it throws an error.
+				// We don't want to leave dangling references!
+				ws.once('close', () => {
+					connectionsForClient.delete(ws);
+
+					// Make sure we don't leak the Set objects!
+					if (connectionsForClient.size < 1) {
+						this.#connectionsByClient.delete(limitActor);
+					}
+				});
+
 				this.#wss.emit('connection', ws, request, {
 					stream, user, app,
 				});
-- 
GitLab


From 86e34175d36ce07b1baec28b871ef0ef0692c326 Mon Sep 17 00:00:00 2001
From: Hazelnoot <acomputerdog@gmail.com>
Date: Fri, 28 Mar 2025 11:43:30 -0400
Subject: [PATCH 09/12] SkRateLimiterService revision 3: cache lockouts in
 memory to avoid redis calls

---
 .../src/server/SkRateLimiterService.md        |  9 ++-
 .../src/server/SkRateLimiterService.ts        | 57 +++++++++++++++++++
 2 files changed, 65 insertions(+), 1 deletion(-)

diff --git a/packages/backend/src/server/SkRateLimiterService.md b/packages/backend/src/server/SkRateLimiterService.md
index c8a2b4e85c..2c7de6715b 100644
--- a/packages/backend/src/server/SkRateLimiterService.md
+++ b/packages/backend/src/server/SkRateLimiterService.md
@@ -34,8 +34,9 @@ Header meanings and usage have been devised by adapting common patterns to work
 
 ## Performance
 
-SkRateLimiterService makes between 1 and 4 redis transactions per rate limit check.
+SkRateLimiterService makes between 0 and 4 redis transactions per rate limit check.
 The first call is read-only, while the others perform at least one write operation.
+No calls are made if a client has already been blocked at least once, as the block status is stored in a short-term memory cache.
 Two integer keys are stored per client/subject, and both expire together after the maximum duration of the limit.
 While performance has not been formally tested, it's expected that SkRateLimiterService has an impact roughly on par with the legacy RateLimiterService.
 Redis memory usage should be notably lower due to the reduced number of keys and avoidance of set / array constructions.
@@ -54,6 +55,12 @@ Any possible conflict would have to occur within a few-milliseconds window, whic
 This error does not compound, as all further operations are relative (Increment and Add).
 Thus, it's considered an acceptable tradeoff given the limitations imposed by Redis and ioredis.
 
+In-process memory caches are used sparingly to avoid consistency problems.
+Besides the role factor cache, there is one "important" cache which directly impacts limit calculations: the lockout cache.
+This cache stores response data for blocked limits, preventing repeated calls to redis if a client ignores the 429 errors and continues making requests.
+Consistency is guaranteed by only caching blocked limits (allowances are not cached), and by limiting cached data to the duration of the block.
+This ensures that stale limit info is never used.
+
 ## Algorithm Pseudocode
 
 The Atomic Leaky Bucket algorithm is described here, in pseudocode:
diff --git a/packages/backend/src/server/SkRateLimiterService.ts b/packages/backend/src/server/SkRateLimiterService.ts
index 30bf092e4f..ccc26c1b61 100644
--- a/packages/backend/src/server/SkRateLimiterService.ts
+++ b/packages/backend/src/server/SkRateLimiterService.ts
@@ -17,10 +17,17 @@ import type { RoleService } from '@/core/RoleService.js';
 // Required because MemoryKVCache doesn't support null keys.
 const defaultUserKey = '';
 
+interface Lockout {
+	at: number;
+	info: LimitInfo;
+}
+
 @Injectable()
 export class SkRateLimiterService {
 	// 1-minute cache interval
 	private readonly factorCache = new MemoryKVCache<number>(1000 * 60);
+	// 10-second cache interval
+	private readonly lockoutCache = new MemoryKVCache<Lockout>(1000 * 10);
 	private readonly disabled: boolean;
 
 	constructor(
@@ -58,6 +65,15 @@ export class SkRateLimiterService {
 		}
 
 		const actor = typeof(actorOrUser) === 'object' ? actorOrUser.id : actorOrUser;
+
+		// TODO add to docs
+		// Fast-path to avoid extra redis calls for blocked clients
+		const lockoutKey = `@${actor}#${limit.key}`;
+		const lockout = this.getLockout(lockoutKey);
+		if (lockout) {
+			return lockout;
+		}
+
 		const userCacheKey = typeof(actorOrUser) === 'object' ? actorOrUser.id : defaultUserKey;
 		const userRoleKey = typeof(actorOrUser) === 'object' ? actorOrUser.id : null;
 		const factor = this.factorCache.get(userCacheKey) ?? await this.factorCache.fetch(userCacheKey, async () => {
@@ -73,6 +89,47 @@ export class SkRateLimiterService {
 			throw new Error(`Rate limit factor is zero or negative: ${factor}`);
 		}
 
+		const info = await this.applyLimit(limit, actor, factor);
+
+		// Store blocked status to avoid hammering redis
+		if (info.blocked) {
+			this.lockoutCache.set(lockoutKey, {
+				at: this.timeService.now,
+				info,
+			});
+		}
+
+		return info;
+	}
+
+	private getLockout(lockoutKey: string): LimitInfo | null {
+		const lockout = this.lockoutCache.get(lockoutKey);
+		if (!lockout) {
+			// Not blocked, proceed with redis check
+			return null;
+		}
+
+		const now = this.timeService.now;
+		const elapsedMs = now - lockout.at;
+		if (elapsedMs >= lockout.info.resetMs) {
+			// Block expired, clear and proceed with redis check
+			this.lockoutCache.delete(lockoutKey);
+			return null;
+		}
+
+		// Limit is still active, update calculations
+		lockout.at = now;
+		lockout.info.resetMs -= elapsedMs;
+		lockout.info.resetSec = Math.ceil(lockout.info.resetMs / 1000);
+		lockout.info.fullResetMs -= elapsedMs;
+		lockout.info.fullResetSec = Math.ceil(lockout.info.fullResetMs / 1000);
+
+		// Re-cache the new object
+		this.lockoutCache.set(lockoutKey, lockout);
+		return lockout.info;
+	}
+
+	private async applyLimit(limit: Keyed<RateLimit>, actor: string, factor: number) {
 		if (isLegacyRateLimit(limit)) {
 			return await this.limitLegacy(limit, actor, factor);
 		} else {
-- 
GitLab


From fafb811333e1bb375517810c05f95d048edb106b Mon Sep 17 00:00:00 2001
From: Hazelnoot <acomputerdog@gmail.com>
Date: Fri, 28 Mar 2025 11:44:29 -0400
Subject: [PATCH 10/12] increase limits on WS note subscriptions and cached
 notes

---
 packages/backend/src/server/api/stream/Connection.ts | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts
index 4617fba9d1..a69c1b8b7f 100644
--- a/packages/backend/src/server/api/stream/Connection.ts
+++ b/packages/backend/src/server/api/stream/Connection.ts
@@ -23,8 +23,8 @@ import type { EventEmitter } from 'events';
 import type Channel from './channel.js';
 
 const MAX_CHANNELS_PER_CONNECTION = 32;
-const MAX_SUBSCRIPTIONS_PER_CONNECTION = 256;
-const MAX_CACHED_NOTES_PER_CONNECTION = 32;
+const MAX_SUBSCRIPTIONS_PER_CONNECTION = 512;
+const MAX_CACHED_NOTES_PER_CONNECTION = 64;
 
 /**
  * Main stream connection
-- 
GitLab


From 47ea8527fd175c55a6d0128b91aced13ea442135 Mon Sep 17 00:00:00 2001
From: Hazelnoot <acomputerdog@gmail.com>
Date: Sat, 29 Mar 2025 09:44:38 -0400
Subject: [PATCH 11/12] fix wsmessage rate limit definition

---
 packages/backend/src/server/api/StreamingApiServerService.ts | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts
index c7db4549d3..d86deef1d7 100644
--- a/packages/backend/src/server/api/StreamingApiServerService.ts
+++ b/packages/backend/src/server/api/StreamingApiServerService.ts
@@ -160,8 +160,9 @@ export class StreamingApiServerService {
 				// Rather high limit because when catching up at the top of a timeline, the frontend may render many many notes.
 				// Each of which causes a message via `useNoteCapture` to ask for realtime updates of that note.
 				return this.rateLimitThis(limitActor, {
+					type: 'bucket',
 					key: 'wsmessage',
-					max: 4096, // Allow spikes of up to 4096
+					size: 4096, // Allow spikes of up to 4096
 					dripRate: 50, // Then once every 50ms (20/second rate)
 				});
 			};
-- 
GitLab


From 922a7ba1d4102c82ffffc952b6cb330221622728 Mon Sep 17 00:00:00 2001
From: Hazelnoot <acomputerdog@gmail.com>
Date: Sat, 29 Mar 2025 09:47:05 -0400
Subject: [PATCH 12/12] track the number of concurrent requests to redis, and
 bypass if the request is guaranteed to reject

---
 .../src/server/SkRateLimiterService.md        |   4 +
 .../src/server/SkRateLimiterService.ts        | 178 ++++++++++++------
 2 files changed, 123 insertions(+), 59 deletions(-)

diff --git a/packages/backend/src/server/SkRateLimiterService.md b/packages/backend/src/server/SkRateLimiterService.md
index 2c7de6715b..55786664e1 100644
--- a/packages/backend/src/server/SkRateLimiterService.md
+++ b/packages/backend/src/server/SkRateLimiterService.md
@@ -42,6 +42,10 @@ While performance has not been formally tested, it's expected that SkRateLimiter
 Redis memory usage should be notably lower due to the reduced number of keys and avoidance of set / array constructions.
 If redis load does become a concern, then a dedicated node can be assigned via the `redisForRateLimit` config setting.
 
+To prevent Redis DoS, SkRateLimiterService internally tracks the number of concurrent requests for each unique client/endpoint combination.
+If the number of requests exceeds the limit's maximum value, then any further requests are automatically rejected.
+The lockout will automatically end when the number of active requests drops to within the limit value.
+
 ## Concurrency and Multi-Node Correctness
 
 To provide consistency across multi-node environments, leaky bucket is implemented with only atomic operations (`Increment`, `Decrement`, `Add`, and `Subtract`).
diff --git a/packages/backend/src/server/SkRateLimiterService.ts b/packages/backend/src/server/SkRateLimiterService.ts
index ccc26c1b61..8978318045 100644
--- a/packages/backend/src/server/SkRateLimiterService.ts
+++ b/packages/backend/src/server/SkRateLimiterService.ts
@@ -17,9 +17,14 @@ import type { RoleService } from '@/core/RoleService.js';
 // Required because MemoryKVCache doesn't support null keys.
 const defaultUserKey = '';
 
-interface Lockout {
-	at: number;
-	info: LimitInfo;
+interface ParsedLimit {
+	key: string;
+	now: number;
+	bucketSize: number;
+	dripRate: number;
+	dripSize: number;
+	fullResetMs: number;
+	fullResetSec: number;
 }
 
 @Injectable()
@@ -27,7 +32,8 @@ export class SkRateLimiterService {
 	// 1-minute cache interval
 	private readonly factorCache = new MemoryKVCache<number>(1000 * 60);
 	// 10-second cache interval
-	private readonly lockoutCache = new MemoryKVCache<Lockout>(1000 * 10);
+	private readonly lockoutCache = new MemoryKVCache<number>(1000 * 10);
+	private readonly requestCounts = new Map<string, number>();
 	private readonly disabled: boolean;
 
 	constructor(
@@ -65,14 +71,7 @@ export class SkRateLimiterService {
 		}
 
 		const actor = typeof(actorOrUser) === 'object' ? actorOrUser.id : actorOrUser;
-
-		// TODO add to docs
-		// Fast-path to avoid extra redis calls for blocked clients
-		const lockoutKey = `@${actor}#${limit.key}`;
-		const lockout = this.getLockout(lockoutKey);
-		if (lockout) {
-			return lockout;
-		}
+		const actorKey = `@${actor}#${limit.key}`;
 
 		const userCacheKey = typeof(actorOrUser) === 'object' ? actorOrUser.id : defaultUserKey;
 		const userRoleKey = typeof(actorOrUser) === 'object' ? actorOrUser.id : null;
@@ -89,66 +88,81 @@ export class SkRateLimiterService {
 			throw new Error(`Rate limit factor is zero or negative: ${factor}`);
 		}
 
-		const info = await this.applyLimit(limit, actor, factor);
+		const parsedLimit = this.parseLimit(limit, factor);
+		if (parsedLimit == null) {
+			return disabledLimitInfo;
+		}
+
+		// Fast-path to avoid extra redis calls for blocked clients
+		const lockout = this.getLockout(actorKey, parsedLimit);
+		if (lockout) {
+			return lockout;
+		}
 
-		// Store blocked status to avoid hammering redis
-		if (info.blocked) {
-			this.lockoutCache.set(lockoutKey, {
-				at: this.timeService.now,
-				info,
-			});
+		// Fast-path to avoid queuing requests that are guaranteed to fail
+		const overflow = this.incrementOverflow(actorKey, parsedLimit);
+		if (overflow) {
+			return overflow;
 		}
 
-		return info;
+		try {
+			const info = await this.limitBucket(parsedLimit, actor);
+
+			// Store blocked status to avoid hammering redis
+			if (info.blocked) {
+				this.lockoutCache.set(actorKey, info.resetMs);
+			}
+
+			return info;
+		} finally {
+			this.decrementOverflow(actorKey);
+		}
 	}
 
-	private getLockout(lockoutKey: string): LimitInfo | null {
-		const lockout = this.lockoutCache.get(lockoutKey);
-		if (!lockout) {
+	private getLockout(lockoutKey: string, limit: ParsedLimit): LimitInfo | null {
+		const lockoutReset = this.lockoutCache.get(lockoutKey);
+		if (!lockoutReset) {
 			// Not blocked, proceed with redis check
 			return null;
 		}
 
-		const now = this.timeService.now;
-		const elapsedMs = now - lockout.at;
-		if (elapsedMs >= lockout.info.resetMs) {
+		if (limit.now >= lockoutReset) {
 			// Block expired, clear and proceed with redis check
 			this.lockoutCache.delete(lockoutKey);
 			return null;
 		}
 
-		// Limit is still active, update calculations
-		lockout.at = now;
-		lockout.info.resetMs -= elapsedMs;
-		lockout.info.resetSec = Math.ceil(lockout.info.resetMs / 1000);
-		lockout.info.fullResetMs -= elapsedMs;
-		lockout.info.fullResetSec = Math.ceil(lockout.info.fullResetMs / 1000);
-
-		// Re-cache the new object
-		this.lockoutCache.set(lockoutKey, lockout);
-		return lockout.info;
+		// Lockout is still active, pre-emptively reject the request
+		return {
+			blocked: true,
+			remaining: 0,
+			resetMs: limit.fullResetMs,
+			resetSec: limit.fullResetSec,
+			fullResetMs: limit.fullResetMs,
+			fullResetSec: limit.fullResetSec,
+		};
 	}
 
-	private async applyLimit(limit: Keyed<RateLimit>, actor: string, factor: number) {
+	private parseLimit(limit: Keyed<RateLimit>, factor: number): ParsedLimit | null {
 		if (isLegacyRateLimit(limit)) {
-			return await this.limitLegacy(limit, actor, factor);
+			return this.parseLegacyLimit(limit, factor);
 		} else {
-			return await this.limitBucket(limit, actor, factor);
+			return this.parseBucketLimit(limit, factor);
 		}
 	}
 
-	private async limitLegacy(limit: Keyed<LegacyRateLimit>, actor: string, factor: number): Promise<LimitInfo> {
+	private parseLegacyLimit(limit: Keyed<LegacyRateLimit>, factor: number): ParsedLimit | null {
 		if (hasMaxLimit(limit)) {
-			return await this.limitLegacyMinMax(limit, actor, factor);
+			return this.parseLegacyMinMax(limit, factor);
 		} else if (hasMinLimit(limit)) {
-			return await this.limitLegacyMinOnly(limit, actor, factor);
+			return this.parseLegacyMinOnly(limit, factor);
 		} else {
-			return disabledLimitInfo;
+			return null;
 		}
 	}
 
-	private async limitLegacyMinMax(limit: Keyed<MaxLegacyLimit>, actor: string, factor: number): Promise<LimitInfo> {
-		if (limit.duration === 0) return disabledLimitInfo;
+	private parseLegacyMinMax(limit: Keyed<MaxLegacyLimit>, factor: number): ParsedLimit | null {
+		if (limit.duration === 0) return null;
 		if (limit.duration < 0) throw new Error(`Invalid rate limit ${limit.key}: duration is negative (${limit.duration})`);
 		if (limit.max < 1) throw new Error(`Invalid rate limit ${limit.key}: max is less than 1 (${limit.max})`);
 
@@ -161,35 +175,30 @@ export class SkRateLimiterService {
 		// Calculate final dripRate from dripSize and duration/max
 		const dripRate = Math.max(Math.round(limit.duration / (limit.max / dripSize)), 1);
 
-		const bucketLimit: Keyed<BucketRateLimit> = {
+		return this.parseBucketLimit({
 			type: 'bucket',
 			key: limit.key,
 			size: limit.max,
 			dripRate,
 			dripSize,
-		};
-		return await this.limitBucket(bucketLimit, actor, factor);
+		}, factor);
 	}
 
-	private async limitLegacyMinOnly(limit: Keyed<MinLegacyLimit>, actor: string, factor: number): Promise<LimitInfo> {
-		if (limit.minInterval === 0) return disabledLimitInfo;
+	private parseLegacyMinOnly(limit: Keyed<MinLegacyLimit>, factor: number): ParsedLimit | null {
+		if (limit.minInterval === 0) return null;
 		if (limit.minInterval < 0) throw new Error(`Invalid rate limit ${limit.key}: minInterval is negative (${limit.minInterval})`);
 
 		const dripRate = Math.max(Math.round(limit.minInterval), 1);
-		const bucketLimit: Keyed<BucketRateLimit> = {
+		return this.parseBucketLimit({
 			type: 'bucket',
 			key: limit.key,
 			size: 1,
 			dripRate,
 			dripSize: 1,
-		};
-		return await this.limitBucket(bucketLimit, actor, factor);
+		}, factor);
 	}
 
-	/**
-	 * Implementation of Leaky Bucket rate limiting - see SkRateLimiterService.md for details.
-	 */
-	private async limitBucket(limit: Keyed<BucketRateLimit>, actor: string, factor: number): Promise<LimitInfo> {
+	private parseBucketLimit(limit: Keyed<BucketRateLimit>, factor: number): ParsedLimit {
 		if (limit.size < 1) throw new Error(`Invalid rate limit ${limit.key}: size is less than 1 (${limit.size})`);
 		if (limit.dripRate != null && limit.dripRate < 1) throw new Error(`Invalid rate limit ${limit.key}: dripRate is less than 1 (${limit.dripRate})`);
 		if (limit.dripSize != null && limit.dripSize < 1) throw new Error(`Invalid rate limit ${limit.key}: dripSize is less than 1 (${limit.dripSize})`);
@@ -199,7 +208,27 @@ export class SkRateLimiterService {
 		const bucketSize = Math.max(Math.ceil(limit.size / factor), 1);
 		const dripRate = Math.ceil(limit.dripRate ?? 1000);
 		const dripSize = Math.ceil(limit.dripSize ?? 1);
-		const expirationSec = Math.max(Math.ceil((dripRate * Math.ceil(bucketSize / dripSize)) / 1000), 1);
+		const fullResetMs = dripRate * Math.ceil(bucketSize / dripSize);
+		const fullResetSec = Math.max(Math.ceil(fullResetMs / 1000), 1);
+
+		return {
+			key: limit.key,
+			now,
+			bucketSize,
+			dripRate,
+			dripSize,
+			fullResetMs,
+			fullResetSec,
+		};
+	}
+
+	/**
+	 * Implementation of Leaky Bucket rate limiting - see SkRateLimiterService.md for details.
+	 */
+	private async limitBucket(limit: ParsedLimit, actor: string): Promise<LimitInfo> {
+		// 0 - Calculate (extracted to other function)
+		const { now, bucketSize, dripRate, dripSize } = limit;
+		const expirationSec = limit.fullResetSec;
 
 		// 1 - Read
 		const counterKey = createLimitKey(limit, actor, 'c');
@@ -319,13 +348,44 @@ export class SkRateLimiterService {
 
 		return responses;
 	}
+
+	private incrementOverflow(actorKey: string, limit: ParsedLimit): LimitInfo | null {
+		const oldCount = this.requestCounts.get(actorKey) ?? 0;
+
+		if (oldCount >= limit.bucketSize) {
+			// Overflow, pre-emptively reject the request
+			return {
+				blocked: true,
+				remaining: 0,
+				resetMs: limit.fullResetMs,
+				resetSec: limit.fullResetSec,
+				fullResetMs: limit.fullResetMs,
+				fullResetSec: limit.fullResetSec,
+			};
+		}
+
+		// No overflow, increment and continue to redis
+		this.requestCounts.set(actorKey, oldCount + 1);
+		return null;
+	}
+
+	private decrementOverflow(actorKey: string): void {
+		const count = this.requestCounts.get(actorKey);
+		if (count) {
+			if (count > 1) {
+				this.requestCounts.set(actorKey, count - 1);
+			} else {
+				this.requestCounts.delete(actorKey);
+			}
+		}
+	}
 }
 
 // Not correct, but good enough for the basic commands we use.
 type RedisResult = string | null;
 type RedisCommand = [command: string, ...args: unknown[]];
 
-function createLimitKey(limit: Keyed<RateLimit>, actor: string, value: string): string {
+function createLimitKey(limit: ParsedLimit, actor: string, value: string): string {
 	return `rl_${actor}_${limit.key}_${value}`;
 }
 
-- 
GitLab