fix
This commit is contained in:
@@ -0,0 +1,22 @@
|
||||
-- Queue outbound ActivityPub deliveries so API responses do not wait on
|
||||
-- remote inbox latency and failed deliveries can be retried.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS outgoing_deliveries (
|
||||
id TEXT PRIMARY KEY,
|
||||
user_id TEXT NOT NULL,
|
||||
inbox TEXT NOT NULL,
|
||||
activity_id TEXT NOT NULL,
|
||||
activity_json TEXT NOT NULL,
|
||||
attempts INTEGER NOT NULL DEFAULT 0,
|
||||
next_attempt_at TEXT,
|
||||
locked_until TEXT,
|
||||
delivered_at TEXT,
|
||||
failed_at TEXT,
|
||||
last_error TEXT,
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL,
|
||||
UNIQUE(user_id, inbox, activity_id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_outgoing_deliveries_due ON outgoing_deliveries(next_attempt_at, locked_until);
|
||||
CREATE INDEX IF NOT EXISTS idx_outgoing_deliveries_user_time ON outgoing_deliveries(user_id, created_at DESC);
|
||||
@@ -3,7 +3,7 @@
|
||||
一个运行在 Cloudflare Workers 上的单用户联邦宇宙发布软件,使用:
|
||||
|
||||
- Workers: HTTP API、ActivityPub 路由、Mastodon API 兼容层
|
||||
- D1: 用户、OAuth 应用/Token、嘟文、媒体索引、关注、收藏、转发、通知、提及、话题标签、收藏夹、置顶、远端 actor 与远端嘟文缓存
|
||||
- D1: 用户、OAuth 应用/Token、嘟文、媒体索引、关注、收藏、转发、通知、提及、话题标签、收藏夹、置顶、远端 actor 与远端嘟文缓存、出站联邦投递队列
|
||||
- R2: 媒体文件
|
||||
- KV: OAuth access token 会话(D1 同步保留,便于管理)
|
||||
|
||||
@@ -17,17 +17,20 @@ npm run db:local
|
||||
npm run dev
|
||||
```
|
||||
|
||||
默认管理员账号来自 `wrangler.jsonc`:
|
||||
默认管理员用户名来自 `wrangler.jsonc` 的 `ADMIN_USERNAME`,当前示例值为 `sun`。密码必须通过 Secret 或本地开发变量提供:
|
||||
|
||||
- 用户名:`admin`
|
||||
- 密码:`change-me-before-deploy`
|
||||
|
||||
部署前必须改掉 `ADMIN_PASSWORD`,更推荐用 Cloudflare secret 管理密码:
|
||||
生产部署:
|
||||
|
||||
```bash
|
||||
wrangler secret put ADMIN_PASSWORD
|
||||
```
|
||||
|
||||
本地开发可以创建 `.dev.vars`:
|
||||
|
||||
```dotenv
|
||||
ADMIN_PASSWORD=change-me-before-deploy
|
||||
```
|
||||
|
||||
`PUBLIC_BASE_URL` 必须在首次正式部署前改成你的稳定实例域名,例如:
|
||||
|
||||
```json
|
||||
@@ -126,6 +129,8 @@ npm run deploy
|
||||
|
||||
入站 inbox 处理类型:`Follow` / `Undo(Follow)` / `Accept(Follow)` / `Reject(Follow)` / `Like` / `Undo(Like)` / `Announce` / `Undo(Announce)` / `Delete(Note)`(同时清远端嘟文缓存)/ `Delete(Person)`(同时清缓存与关注关系)/ `Update(Person)` / `Update(Note)` / `Create(Note)`(被关注的远端账号公开 / followers-only Note,或投递给本地用户的 Note 会写入 `cached_statuses` 给 home timeline 和通知使用,同时若 mention/回复本地用户会触发通知)。
|
||||
|
||||
出站 ActivityPub 投递会先写入 D1 `outgoing_deliveries` 队列,再由请求后的 `waitUntil` 和每分钟 Cron 触发器后台签名发送。失败投递会指数退避重试,最多 8 次后标记为失败并保留错误摘要。
|
||||
|
||||
## 安全
|
||||
|
||||
- 密码哈希使用 **PBKDF2-SHA256 / 100000 iterations**,带每用户随机 16 字节 salt,旧版 `salt.hash` 哈希也能继续验证(便于无缝升级)。
|
||||
@@ -147,6 +152,7 @@ npm run deploy
|
||||
- `migrations/0003_bookmarks_cache.sql` — 收藏夹(bookmarks)/ 置顶(pinned_statuses)/ 远端嘟文缓存(cached_statuses)/ OAuth Token 持久表
|
||||
- `migrations/0006_cached_status_metadata.sql` — 远端缓存嘟文的可见性 / mentions / tags / 本地收件人元数据
|
||||
- `migrations/0007_polls_lists_push_scheduled.sql` — poll / list / push subscription / scheduled statuses
|
||||
- `migrations/0008_outgoing_deliveries.sql` — 出站 ActivityPub 投递队列 / 重试状态
|
||||
|
||||
## 重要限制
|
||||
|
||||
|
||||
@@ -14,7 +14,6 @@ import {
|
||||
upsertCachedStatus
|
||||
} from "./db";
|
||||
import {
|
||||
deliverToInboxes,
|
||||
isDuplicateActivity,
|
||||
isFollowedByAnyLocalUser,
|
||||
notifyForLocalStatus,
|
||||
@@ -22,7 +21,6 @@ import {
|
||||
objectIdString,
|
||||
parseActivity,
|
||||
recordRemoteActivity,
|
||||
resolveDeliveryInboxes,
|
||||
resolveRemoteActor,
|
||||
sendSignedActivity,
|
||||
verifyInboundSignature
|
||||
|
||||
@@ -5,10 +5,12 @@ import type {
|
||||
CachedStatusAttachment,
|
||||
Favourite,
|
||||
Follow,
|
||||
Json,
|
||||
Media,
|
||||
Notification,
|
||||
OAuthApp,
|
||||
OAuthCode,
|
||||
OutgoingDelivery,
|
||||
OutgoingFollow,
|
||||
Reblog,
|
||||
RemoteActor,
|
||||
@@ -337,3 +339,71 @@ export async function setUserAvatarKey(env: Env, userId: string, key: string | n
|
||||
export async function setUserHeaderKey(env: Env, userId: string, key: string | null): Promise<void> {
|
||||
await env.DB.prepare("UPDATE users SET header_r2_key = ? WHERE id = ?").bind(key, userId).run();
|
||||
}
|
||||
|
||||
export async function enqueueOutgoingDeliveries(env: Env, userId: string, inboxes: Iterable<string>, activity: Json): Promise<void> {
|
||||
const uniqueInboxes = [...new Set([...inboxes].map((inbox) => inbox.trim()).filter(Boolean))];
|
||||
if (uniqueInboxes.length === 0) return;
|
||||
const now = new Date().toISOString();
|
||||
const activityId = typeof activity.id === "string" && activity.id ? activity.id : id();
|
||||
const activityJson = JSON.stringify(activity);
|
||||
const statements = uniqueInboxes.map((inbox) => env.DB.prepare(
|
||||
`INSERT OR IGNORE INTO outgoing_deliveries
|
||||
(id, user_id, inbox, activity_id, activity_json, attempts, next_attempt_at, locked_until, delivered_at, failed_at, last_error, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, 0, ?, NULL, NULL, NULL, NULL, ?, ?)`
|
||||
).bind(id(), userId, inbox, activityId, activityJson, now, now, now));
|
||||
|
||||
for (let offset = 0; offset < statements.length; offset += 50) {
|
||||
await env.DB.batch(statements.slice(offset, offset + 50));
|
||||
}
|
||||
}
|
||||
|
||||
export async function listDueOutgoingDeliveries(env: Env, now: string, limit: number): Promise<OutgoingDelivery[]> {
|
||||
const rows = await env.DB.prepare(
|
||||
`SELECT * FROM outgoing_deliveries
|
||||
WHERE delivered_at IS NULL
|
||||
AND failed_at IS NULL
|
||||
AND next_attempt_at IS NOT NULL
|
||||
AND next_attempt_at <= ?
|
||||
AND (locked_until IS NULL OR locked_until <= ?)
|
||||
ORDER BY next_attempt_at ASC, created_at ASC
|
||||
LIMIT ?`
|
||||
).bind(now, now, limit).all<OutgoingDelivery>();
|
||||
return rows.results;
|
||||
}
|
||||
|
||||
export async function claimOutgoingDelivery(env: Env, deliveryId: string, now: string, lockedUntil: string): Promise<boolean> {
|
||||
const result = await env.DB.prepare(
|
||||
`UPDATE outgoing_deliveries
|
||||
SET locked_until = ?, updated_at = ?
|
||||
WHERE id = ?
|
||||
AND delivered_at IS NULL
|
||||
AND failed_at IS NULL
|
||||
AND next_attempt_at IS NOT NULL
|
||||
AND next_attempt_at <= ?
|
||||
AND (locked_until IS NULL OR locked_until <= ?)`
|
||||
).bind(lockedUntil, now, deliveryId, now, now).run();
|
||||
return (result.meta.changes ?? 0) > 0;
|
||||
}
|
||||
|
||||
export async function markOutgoingDeliveryDelivered(env: Env, deliveryId: string): Promise<void> {
|
||||
const now = new Date().toISOString();
|
||||
await env.DB.prepare(
|
||||
"UPDATE outgoing_deliveries SET delivered_at = ?, locked_until = NULL, updated_at = ? WHERE id = ?"
|
||||
).bind(now, now, deliveryId).run();
|
||||
}
|
||||
|
||||
export async function markOutgoingDeliveryFailed(
|
||||
env: Env,
|
||||
deliveryId: string,
|
||||
attempts: number,
|
||||
nextAttemptAt: string | null,
|
||||
error: string
|
||||
): Promise<void> {
|
||||
const now = new Date().toISOString();
|
||||
const failedAt = nextAttemptAt ? null : now;
|
||||
await env.DB.prepare(
|
||||
`UPDATE outgoing_deliveries
|
||||
SET attempts = ?, next_attempt_at = ?, locked_until = NULL, failed_at = ?, last_error = ?, updated_at = ?
|
||||
WHERE id = ?`
|
||||
).bind(attempts, nextAttemptAt, failedAt, error.slice(0, 500), now, deliveryId).run();
|
||||
}
|
||||
|
||||
+61
-5
@@ -6,18 +6,28 @@ import {
|
||||
} from "./crypto";
|
||||
import {
|
||||
actorCacheStale,
|
||||
claimOutgoingDelivery,
|
||||
deleteActorFromCache,
|
||||
enqueueOutgoingDeliveries,
|
||||
ensureActorLocalId,
|
||||
getActorByKeyId,
|
||||
getActorFromCache,
|
||||
getUserById,
|
||||
listDueOutgoingDeliveries,
|
||||
markOutgoingDeliveryDelivered,
|
||||
markOutgoingDeliveryFailed,
|
||||
recordNotification,
|
||||
upsertActorCache
|
||||
} from "./db";
|
||||
import type { ActorCache, Json, RemoteActor, Status, User } from "./types";
|
||||
import type { ActorCache, Json, OutgoingDelivery, RemoteActor, Status, User } from "./types";
|
||||
import { SIGNATURE_MAX_SKEW_MS } from "./types";
|
||||
import { actorUrl, base64Decode, encoder, hostFromBaseUrl, parseAcctFromActor } from "./util";
|
||||
|
||||
const ACTIVITY_HEADERS = "application/activity+json, application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"";
|
||||
const DELIVERY_BATCH_SIZE = 20;
|
||||
const DELIVERY_MAX_ATTEMPTS = 8;
|
||||
const DELIVERY_LEASE_MS = 60_000;
|
||||
const DELIVERY_MAX_BACKOFF_SECONDS = 60 * 60;
|
||||
|
||||
export async function resolveRemoteActor(env: Env, actorId: string, opts: { force?: boolean } = {}): Promise<ActorCache | null> {
|
||||
if (!actorId) return null;
|
||||
@@ -153,11 +163,57 @@ export async function sendSignedActivity(env: Env, user: User, inboxUrl: string,
|
||||
}
|
||||
|
||||
export async function deliverToInboxes(env: Env, user: User, inboxes: Iterable<string>, activity: Json): Promise<void> {
|
||||
const unique = new Set<string>();
|
||||
for (const inbox of inboxes) {
|
||||
if (inbox) unique.add(inbox);
|
||||
await enqueueOutgoingDeliveries(env, user.id, inboxes, activity);
|
||||
}
|
||||
|
||||
export async function processOutgoingDeliveries(env: Env): Promise<void> {
|
||||
const now = new Date().toISOString();
|
||||
const deliveries = await listDueOutgoingDeliveries(env, now, DELIVERY_BATCH_SIZE);
|
||||
for (const delivery of deliveries) {
|
||||
await processOutgoingDelivery(env, delivery);
|
||||
}
|
||||
await Promise.allSettled([...unique].map((inbox) => sendSignedActivity(env, user, inbox, activity)));
|
||||
}
|
||||
|
||||
async function processOutgoingDelivery(env: Env, delivery: OutgoingDelivery): Promise<void> {
|
||||
const now = new Date();
|
||||
const nowIso = now.toISOString();
|
||||
const lockedUntil = new Date(now.getTime() + DELIVERY_LEASE_MS).toISOString();
|
||||
const claimed = await claimOutgoingDelivery(env, delivery.id, nowIso, lockedUntil);
|
||||
if (!claimed) return;
|
||||
|
||||
let activity: Json;
|
||||
try {
|
||||
activity = JSON.parse(delivery.activity_json) as Json;
|
||||
} catch {
|
||||
await markOutgoingDeliveryFailed(env, delivery.id, DELIVERY_MAX_ATTEMPTS, null, "invalid_activity_json");
|
||||
return;
|
||||
}
|
||||
|
||||
const user = await getUserById(env, delivery.user_id);
|
||||
if (!user) {
|
||||
await markOutgoingDeliveryFailed(env, delivery.id, DELIVERY_MAX_ATTEMPTS, null, "delivery_user_missing");
|
||||
return;
|
||||
}
|
||||
|
||||
const result = await sendSignedActivity(env, user, delivery.inbox, activity).catch((error) => ({
|
||||
ok: false,
|
||||
status: 0,
|
||||
text: String(error)
|
||||
}));
|
||||
if (result.ok) {
|
||||
await markOutgoingDeliveryDelivered(env, delivery.id);
|
||||
return;
|
||||
}
|
||||
|
||||
const attempts = delivery.attempts + 1;
|
||||
const nextAttemptAt = attempts >= DELIVERY_MAX_ATTEMPTS ? null : nextDeliveryAttemptAt(attempts);
|
||||
const error = result.status ? `${result.status} ${result.text}` : result.text;
|
||||
await markOutgoingDeliveryFailed(env, delivery.id, attempts, nextAttemptAt, error);
|
||||
}
|
||||
|
||||
function nextDeliveryAttemptAt(attempts: number): string {
|
||||
const delaySeconds = Math.min(DELIVERY_MAX_BACKOFF_SECONDS, 60 * (2 ** Math.max(0, attempts - 1)));
|
||||
return new Date(Date.now() + delaySeconds * 1000).toISOString();
|
||||
}
|
||||
|
||||
export async function gatherFollowerInboxes(env: Env, userId: string): Promise<string[]> {
|
||||
|
||||
+6
-3
@@ -86,13 +86,15 @@ import {
|
||||
verifyAppCredentials,
|
||||
verifyCredentials
|
||||
} from "./mastodon";
|
||||
import { processOutgoingDeliveries } from "./federation";
|
||||
|
||||
export default {
|
||||
async fetch(request: Request, env: Env): Promise<Response> {
|
||||
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
|
||||
try {
|
||||
await ensureAdminUser(env);
|
||||
await publishDueScheduledStatuses(env);
|
||||
return await route(request, env);
|
||||
const response = await route(request, env);
|
||||
ctx.waitUntil(processOutgoingDeliveries(env));
|
||||
return response;
|
||||
} catch (error) {
|
||||
if (error instanceof HttpError) return json({ error: error.message }, error.status);
|
||||
console.error("unhandled", error);
|
||||
@@ -102,6 +104,7 @@ export default {
|
||||
async scheduled(_event: ScheduledEvent, env: Env): Promise<void> {
|
||||
await ensureAdminUser(env);
|
||||
await publishDueScheduledStatuses(env);
|
||||
await processOutgoingDeliveries(env);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -227,6 +227,22 @@ export type ScheduledStatus = {
|
||||
created_at: string;
|
||||
};
|
||||
|
||||
export type OutgoingDelivery = {
|
||||
id: string;
|
||||
user_id: string;
|
||||
inbox: string;
|
||||
activity_id: string;
|
||||
activity_json: string;
|
||||
attempts: number;
|
||||
next_attempt_at: string | null;
|
||||
locked_until: string | null;
|
||||
delivered_at: string | null;
|
||||
failed_at: string | null;
|
||||
last_error: string | null;
|
||||
created_at: string;
|
||||
updated_at: string;
|
||||
};
|
||||
|
||||
export type OAuthToken = {
|
||||
token: string;
|
||||
user_id: string;
|
||||
|
||||
@@ -12,6 +12,9 @@
|
||||
"secrets": {
|
||||
"required": ["ADMIN_PASSWORD"]
|
||||
},
|
||||
"triggers": {
|
||||
"crons": ["* * * * *"]
|
||||
},
|
||||
"d1_databases": [
|
||||
{
|
||||
"binding": "DB",
|
||||
|
||||
Reference in New Issue
Block a user