Skip to content
Snippets Groups Projects
Unverified Commit 9835945e authored by syuilo's avatar syuilo
Browse files

Improve queue option

parent 4f2d5269
No related branches found
No related tags found
No related merge requests found
......@@ -5,6 +5,7 @@ program
.version(pkg.version)
.option('--no-daemons', 'Disable daemon processes (for debbuging)')
.option('--disable-clustering', 'Disable clustering')
.option('--disable-ap-queue', 'Disable creating job queue related to ap')
.option('--disable-queue', 'Disable job queue processing')
.option('--only-queue', 'Pocessing job queue only')
.option('--quiet', 'Suppress all logs')
......@@ -13,6 +14,7 @@ program
.option('--color', 'This option is a dummy for some external program\'s (e.g. forever) issue.')
.parse(process.argv);
if (process.env.MK_DISABLE_AP_QUEUE) program.disableApQueue = true;
if (process.env.MK_DISABLE_QUEUE) program.disableQueue = true;
if (process.env.MK_ONLY_QUEUE) program.onlyQueue = true;
......
import * as Queue from 'bee-queue';
import config from '../config';
import * as httpSignature from 'http-signature';
import config from '../config';
import { ILocalUser } from '../models/user';
import { program } from '../argv';
import handler from './processors';
......@@ -31,10 +32,19 @@ function initializeQueue() {
}
}
export function createHttpJob(data: any) {
if (queueAvailable) {
export function deliver(user: ILocalUser, content: any, to: any) {
if (content == null) return;
const data = {
type: 'deliver',
user,
content,
to
};
if (queueAvailable && !program.disableApQueue) {
return queue.createJob(data)
.retries(3)
.retries(8)
.backoff('exponential', 1000)
.save();
} else {
......@@ -42,15 +52,21 @@ export function createHttpJob(data: any) {
}
}
export function deliver(user: ILocalUser, content: any, to: any) {
if (content == null) return;
export function processInbox(activity: any, signature: httpSignature.IParsedSignature) {
const data = {
type: 'processInbox',
activity: activity,
signature
};
createHttpJob({
type: 'deliver',
user,
content,
to
});
if (queueAvailable && !program.disableApQueue) {
return queue.createJob(data)
.retries(3)
.backoff('exponential', 500)
.save();
} else {
return handler({ data }, () => {});
}
}
export function createExportNotesJob(user: ILocalUser) {
......
......@@ -3,7 +3,6 @@ import * as Router from 'koa-router';
import * as json from 'koa-json-body';
import * as httpSignature from 'http-signature';
import { createHttpJob } from '../queue';
import { renderActivity } from '../remote/activitypub/renderer';
import Note from '../models/note';
import User, { isLocalUser, ILocalUser, IUser } from '../models/user';
......@@ -17,6 +16,7 @@ import Followers from './activitypub/followers';
import Following from './activitypub/following';
import Featured from './activitypub/featured';
import renderQuestion from '../remote/activitypub/renderer/question';
import { processInbox } from '../queue';
// Init router
const router = new Router();
......@@ -35,11 +35,7 @@ function inbox(ctx: Router.IRouterContext) {
return;
}
createHttpJob({
type: 'processInbox',
activity: ctx.request.body,
signature
});
processInbox(ctx.request.body, signature);
ctx.status = 202;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment