diff --git a/migrations/0008_outgoing_deliveries.sql b/migrations/0008_outgoing_deliveries.sql new file mode 100644 index 0000000..31c96e9 --- /dev/null +++ b/migrations/0008_outgoing_deliveries.sql @@ -0,0 +1,22 @@ +-- Queue outbound ActivityPub deliveries so API responses do not wait on +-- remote inbox latency and failed deliveries can be retried. + +CREATE TABLE IF NOT EXISTS outgoing_deliveries ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + inbox TEXT NOT NULL, + activity_id TEXT NOT NULL, + activity_json TEXT NOT NULL, + attempts INTEGER NOT NULL DEFAULT 0, + next_attempt_at TEXT, + locked_until TEXT, + delivered_at TEXT, + failed_at TEXT, + last_error TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + UNIQUE(user_id, inbox, activity_id) +); + +CREATE INDEX IF NOT EXISTS idx_outgoing_deliveries_due ON outgoing_deliveries(next_attempt_at, locked_until); +CREATE INDEX IF NOT EXISTS idx_outgoing_deliveries_user_time ON outgoing_deliveries(user_id, created_at DESC); diff --git a/readme.md b/readme.md index cd6afe7..5bd1f1a 100644 --- a/readme.md +++ b/readme.md @@ -3,7 +3,7 @@ 一个运行在 Cloudflare Workers 上的单用户联邦宇宙发布软件,使用: - Workers: HTTP API、ActivityPub 路由、Mastodon API 兼容层 -- D1: 用户、OAuth 应用/Token、嘟文、媒体索引、关注、收藏、转发、通知、提及、话题标签、收藏夹、置顶、远端 actor 与远端嘟文缓存 +- D1: 用户、OAuth 应用/Token、嘟文、媒体索引、关注、收藏、转发、通知、提及、话题标签、收藏夹、置顶、远端 actor 与远端嘟文缓存、出站联邦投递队列 - R2: 媒体文件 - KV: OAuth access token 会话(D1 同步保留,便于管理) @@ -17,17 +17,20 @@ npm run db:local npm run dev ``` -默认管理员账号来自 `wrangler.jsonc`: +默认管理员用户名来自 `wrangler.jsonc` 的 `ADMIN_USERNAME`,当前示例值为 `sun`。密码必须通过 Secret 或本地开发变量提供: -- 用户名:`admin` -- 密码:`change-me-before-deploy` - -部署前必须改掉 `ADMIN_PASSWORD`,更推荐用 Cloudflare secret 管理密码: +生产部署: ```bash wrangler secret put ADMIN_PASSWORD ``` +本地开发可以创建 `.dev.vars`: + +```dotenv +ADMIN_PASSWORD=change-me-before-deploy +``` + `PUBLIC_BASE_URL` 必须在首次正式部署前改成你的稳定实例域名,例如: ```json @@ -126,6 +129,8 @@ npm run deploy 入站 inbox 处理类型:`Follow` / `Undo(Follow)` / `Accept(Follow)` / `Reject(Follow)` / `Like` / `Undo(Like)` / `Announce` / `Undo(Announce)` / `Delete(Note)`(同时清远端嘟文缓存)/ `Delete(Person)`(同时清缓存与关注关系)/ `Update(Person)` / `Update(Note)` / `Create(Note)`(被关注的远端账号公开 / followers-only Note,或投递给本地用户的 Note 会写入 `cached_statuses` 给 home timeline 和通知使用,同时若 mention/回复本地用户会触发通知)。 +出站 ActivityPub 投递会先写入 D1 `outgoing_deliveries` 队列,再由请求后的 `waitUntil` 和每分钟 Cron 触发器后台签名发送。失败投递会指数退避重试,最多 8 次后标记为失败并保留错误摘要。 + ## 安全 - 密码哈希使用 **PBKDF2-SHA256 / 100000 iterations**,带每用户随机 16 字节 salt,旧版 `salt.hash` 哈希也能继续验证(便于无缝升级)。 @@ -147,6 +152,7 @@ npm run deploy - `migrations/0003_bookmarks_cache.sql` — 收藏夹(bookmarks)/ 置顶(pinned_statuses)/ 远端嘟文缓存(cached_statuses)/ OAuth Token 持久表 - `migrations/0006_cached_status_metadata.sql` — 远端缓存嘟文的可见性 / mentions / tags / 本地收件人元数据 - `migrations/0007_polls_lists_push_scheduled.sql` — poll / list / push subscription / scheduled statuses +- `migrations/0008_outgoing_deliveries.sql` — 出站 ActivityPub 投递队列 / 重试状态 ## 重要限制 diff --git a/src/activitypub.ts b/src/activitypub.ts index 7752f0e..cb0bcd0 100644 --- a/src/activitypub.ts +++ b/src/activitypub.ts @@ -14,7 +14,6 @@ import { upsertCachedStatus } from "./db"; import { - deliverToInboxes, isDuplicateActivity, isFollowedByAnyLocalUser, notifyForLocalStatus, @@ -22,7 +21,6 @@ import { objectIdString, parseActivity, recordRemoteActivity, - resolveDeliveryInboxes, resolveRemoteActor, sendSignedActivity, verifyInboundSignature diff --git a/src/db.ts b/src/db.ts index 92fd9bc..75de3a9 100644 --- a/src/db.ts +++ b/src/db.ts @@ -5,10 +5,12 @@ import type { CachedStatusAttachment, Favourite, Follow, + Json, Media, Notification, OAuthApp, OAuthCode, + OutgoingDelivery, OutgoingFollow, Reblog, RemoteActor, @@ -337,3 +339,71 @@ export async function setUserAvatarKey(env: Env, userId: string, key: string | n export async function setUserHeaderKey(env: Env, userId: string, key: string | null): Promise { await env.DB.prepare("UPDATE users SET header_r2_key = ? WHERE id = ?").bind(key, userId).run(); } + +export async function enqueueOutgoingDeliveries(env: Env, userId: string, inboxes: Iterable, activity: Json): Promise { + const uniqueInboxes = [...new Set([...inboxes].map((inbox) => inbox.trim()).filter(Boolean))]; + if (uniqueInboxes.length === 0) return; + const now = new Date().toISOString(); + const activityId = typeof activity.id === "string" && activity.id ? activity.id : id(); + const activityJson = JSON.stringify(activity); + const statements = uniqueInboxes.map((inbox) => env.DB.prepare( + `INSERT OR IGNORE INTO outgoing_deliveries + (id, user_id, inbox, activity_id, activity_json, attempts, next_attempt_at, locked_until, delivered_at, failed_at, last_error, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, 0, ?, NULL, NULL, NULL, NULL, ?, ?)` + ).bind(id(), userId, inbox, activityId, activityJson, now, now, now)); + + for (let offset = 0; offset < statements.length; offset += 50) { + await env.DB.batch(statements.slice(offset, offset + 50)); + } +} + +export async function listDueOutgoingDeliveries(env: Env, now: string, limit: number): Promise { + const rows = await env.DB.prepare( + `SELECT * FROM outgoing_deliveries + WHERE delivered_at IS NULL + AND failed_at IS NULL + AND next_attempt_at IS NOT NULL + AND next_attempt_at <= ? + AND (locked_until IS NULL OR locked_until <= ?) + ORDER BY next_attempt_at ASC, created_at ASC + LIMIT ?` + ).bind(now, now, limit).all(); + return rows.results; +} + +export async function claimOutgoingDelivery(env: Env, deliveryId: string, now: string, lockedUntil: string): Promise { + const result = await env.DB.prepare( + `UPDATE outgoing_deliveries + SET locked_until = ?, updated_at = ? + WHERE id = ? + AND delivered_at IS NULL + AND failed_at IS NULL + AND next_attempt_at IS NOT NULL + AND next_attempt_at <= ? + AND (locked_until IS NULL OR locked_until <= ?)` + ).bind(lockedUntil, now, deliveryId, now, now).run(); + return (result.meta.changes ?? 0) > 0; +} + +export async function markOutgoingDeliveryDelivered(env: Env, deliveryId: string): Promise { + const now = new Date().toISOString(); + await env.DB.prepare( + "UPDATE outgoing_deliveries SET delivered_at = ?, locked_until = NULL, updated_at = ? WHERE id = ?" + ).bind(now, now, deliveryId).run(); +} + +export async function markOutgoingDeliveryFailed( + env: Env, + deliveryId: string, + attempts: number, + nextAttemptAt: string | null, + error: string +): Promise { + const now = new Date().toISOString(); + const failedAt = nextAttemptAt ? null : now; + await env.DB.prepare( + `UPDATE outgoing_deliveries + SET attempts = ?, next_attempt_at = ?, locked_until = NULL, failed_at = ?, last_error = ?, updated_at = ? + WHERE id = ?` + ).bind(attempts, nextAttemptAt, failedAt, error.slice(0, 500), now, deliveryId).run(); +} diff --git a/src/federation.ts b/src/federation.ts index 6fd1265..d8bb0c3 100644 --- a/src/federation.ts +++ b/src/federation.ts @@ -6,18 +6,28 @@ import { } from "./crypto"; import { actorCacheStale, + claimOutgoingDelivery, deleteActorFromCache, + enqueueOutgoingDeliveries, ensureActorLocalId, getActorByKeyId, getActorFromCache, + getUserById, + listDueOutgoingDeliveries, + markOutgoingDeliveryDelivered, + markOutgoingDeliveryFailed, recordNotification, upsertActorCache } from "./db"; -import type { ActorCache, Json, RemoteActor, Status, User } from "./types"; +import type { ActorCache, Json, OutgoingDelivery, RemoteActor, Status, User } from "./types"; import { SIGNATURE_MAX_SKEW_MS } from "./types"; import { actorUrl, base64Decode, encoder, hostFromBaseUrl, parseAcctFromActor } from "./util"; const ACTIVITY_HEADERS = "application/activity+json, application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\""; +const DELIVERY_BATCH_SIZE = 20; +const DELIVERY_MAX_ATTEMPTS = 8; +const DELIVERY_LEASE_MS = 60_000; +const DELIVERY_MAX_BACKOFF_SECONDS = 60 * 60; export async function resolveRemoteActor(env: Env, actorId: string, opts: { force?: boolean } = {}): Promise { if (!actorId) return null; @@ -153,11 +163,57 @@ export async function sendSignedActivity(env: Env, user: User, inboxUrl: string, } export async function deliverToInboxes(env: Env, user: User, inboxes: Iterable, activity: Json): Promise { - const unique = new Set(); - for (const inbox of inboxes) { - if (inbox) unique.add(inbox); + await enqueueOutgoingDeliveries(env, user.id, inboxes, activity); +} + +export async function processOutgoingDeliveries(env: Env): Promise { + const now = new Date().toISOString(); + const deliveries = await listDueOutgoingDeliveries(env, now, DELIVERY_BATCH_SIZE); + for (const delivery of deliveries) { + await processOutgoingDelivery(env, delivery); } - await Promise.allSettled([...unique].map((inbox) => sendSignedActivity(env, user, inbox, activity))); +} + +async function processOutgoingDelivery(env: Env, delivery: OutgoingDelivery): Promise { + const now = new Date(); + const nowIso = now.toISOString(); + const lockedUntil = new Date(now.getTime() + DELIVERY_LEASE_MS).toISOString(); + const claimed = await claimOutgoingDelivery(env, delivery.id, nowIso, lockedUntil); + if (!claimed) return; + + let activity: Json; + try { + activity = JSON.parse(delivery.activity_json) as Json; + } catch { + await markOutgoingDeliveryFailed(env, delivery.id, DELIVERY_MAX_ATTEMPTS, null, "invalid_activity_json"); + return; + } + + const user = await getUserById(env, delivery.user_id); + if (!user) { + await markOutgoingDeliveryFailed(env, delivery.id, DELIVERY_MAX_ATTEMPTS, null, "delivery_user_missing"); + return; + } + + const result = await sendSignedActivity(env, user, delivery.inbox, activity).catch((error) => ({ + ok: false, + status: 0, + text: String(error) + })); + if (result.ok) { + await markOutgoingDeliveryDelivered(env, delivery.id); + return; + } + + const attempts = delivery.attempts + 1; + const nextAttemptAt = attempts >= DELIVERY_MAX_ATTEMPTS ? null : nextDeliveryAttemptAt(attempts); + const error = result.status ? `${result.status} ${result.text}` : result.text; + await markOutgoingDeliveryFailed(env, delivery.id, attempts, nextAttemptAt, error); +} + +function nextDeliveryAttemptAt(attempts: number): string { + const delaySeconds = Math.min(DELIVERY_MAX_BACKOFF_SECONDS, 60 * (2 ** Math.max(0, attempts - 1))); + return new Date(Date.now() + delaySeconds * 1000).toISOString(); } export async function gatherFollowerInboxes(env: Env, userId: string): Promise { diff --git a/src/index.ts b/src/index.ts index 5a78e6c..befb12b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -86,13 +86,15 @@ import { verifyAppCredentials, verifyCredentials } from "./mastodon"; +import { processOutgoingDeliveries } from "./federation"; export default { - async fetch(request: Request, env: Env): Promise { + async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise { try { await ensureAdminUser(env); - await publishDueScheduledStatuses(env); - return await route(request, env); + const response = await route(request, env); + ctx.waitUntil(processOutgoingDeliveries(env)); + return response; } catch (error) { if (error instanceof HttpError) return json({ error: error.message }, error.status); console.error("unhandled", error); @@ -102,6 +104,7 @@ export default { async scheduled(_event: ScheduledEvent, env: Env): Promise { await ensureAdminUser(env); await publishDueScheduledStatuses(env); + await processOutgoingDeliveries(env); } }; diff --git a/src/types.ts b/src/types.ts index 219cdb3..0bc3a8a 100644 --- a/src/types.ts +++ b/src/types.ts @@ -227,6 +227,22 @@ export type ScheduledStatus = { created_at: string; }; +export type OutgoingDelivery = { + id: string; + user_id: string; + inbox: string; + activity_id: string; + activity_json: string; + attempts: number; + next_attempt_at: string | null; + locked_until: string | null; + delivered_at: string | null; + failed_at: string | null; + last_error: string | null; + created_at: string; + updated_at: string; +}; + export type OAuthToken = { token: string; user_id: string; diff --git a/wrangler.jsonc b/wrangler.jsonc index dbe7428..01943f4 100644 --- a/wrangler.jsonc +++ b/wrangler.jsonc @@ -12,6 +12,9 @@ "secrets": { "required": ["ADMIN_PASSWORD"] }, + "triggers": { + "crons": ["* * * * *"] + }, "d1_databases": [ { "binding": "DB",