This commit is contained in:
浪子
2026-06-19 07:51:54 +08:00
parent ad6a8b0dcf
commit 554ac1e33a
7 changed files with 359 additions and 297 deletions
+4
View File
@@ -0,0 +1,4 @@
-- Keep federation delivery and home timeline lookups indexed as caches grow.
CREATE INDEX IF NOT EXISTS idx_actor_cache_inbox ON actor_cache(inbox);
CREATE INDEX IF NOT EXISTS idx_outgoing_follows_target_accepted ON outgoing_follows(target_actor, accepted);
+210 -257
View File
File diff suppressed because it is too large Load Diff
+2 -2
View File
@@ -11,8 +11,8 @@
"db:remote": "wrangler d1 migrations apply toot_db --remote"
},
"devDependencies": {
"@cloudflare/workers-types": "^4.20260507.0",
"@cloudflare/workers-types": "^4.20260617.1",
"typescript": "^5.9.3",
"wrangler": "^4.37.0"
"wrangler": "^4.102.0"
}
}
+15
View File
@@ -13,6 +13,7 @@ import type {
Marker,
OutgoingDelivery,
OutgoingFollow,
OAuthToken,
Reblog,
RemoteActor,
Status,
@@ -21,7 +22,17 @@ import type {
import { ACTOR_CACHE_TTL_MS } from "./types";
import { id } from "./util";
let adminUserReady: Promise<void> | null = null;
export async function ensureAdminUser(env: Env): Promise<void> {
adminUserReady ??= ensureAdminUserOnce(env).catch((error) => {
adminUserReady = null;
throw error;
});
await adminUserReady;
}
async function ensureAdminUserOnce(env: Env): Promise<void> {
const existing = await env.DB.prepare("SELECT id FROM users WHERE username = ?").bind(env.ADMIN_USERNAME).first<{ id: string }>();
if (existing) return;
const adminPassword = env.ADMIN_PASSWORD;
@@ -306,6 +317,10 @@ export async function insertOAuthToken(env: Env, token: string, userId: string,
.bind(token, userId, appId, scopes, new Date().toISOString()).run();
}
export async function getOAuthToken(env: Env, token: string): Promise<OAuthToken | null> {
return env.DB.prepare("SELECT * FROM oauth_tokens WHERE token = ?").bind(token).first<OAuthToken>();
}
export async function deleteOAuthToken(env: Env, token: string): Promise<void> {
await env.DB.prepare("DELETE FROM oauth_tokens WHERE token = ?").bind(token).run();
}
+22 -14
View File
@@ -25,9 +25,11 @@ import { actorUrl, base64Decode, encoder, hostFromBaseUrl, parseAcctFromActor }
const ACTIVITY_HEADERS = "application/activity+json, application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"";
const DELIVERY_BATCH_SIZE = 20;
const DELIVERY_CONCURRENCY = 4;
const DELIVERY_MAX_ATTEMPTS = 8;
const DELIVERY_LEASE_MS = 60_000;
const DELIVERY_MAX_BACKOFF_SECONDS = 60 * 60;
const REMOTE_FETCH_TIMEOUT_MS = 10_000;
export async function resolveRemoteActor(env: Env, actorId: string, opts: { force?: boolean } = {}): Promise<ActorCache | null> {
if (!actorId) return null;
@@ -42,6 +44,7 @@ export async function fetchRemoteActor(actorId: string): Promise<RemoteActor | n
try {
const response = await fetch(actorId, {
headers: { accept: ACTIVITY_HEADERS },
signal: AbortSignal.timeout(REMOTE_FETCH_TIMEOUT_MS),
cf: { cacheTtl: 60 }
});
if (!response.ok) return null;
@@ -151,7 +154,8 @@ export async function sendSignedActivity(env: Env, user: User, inboxUrl: string,
signature: headerValue,
"user-agent": `toot-worker (+https://${hostFromBaseUrl(env)})`
},
body
body,
signal: AbortSignal.timeout(REMOTE_FETCH_TIMEOUT_MS)
});
const text = response.ok ? "" : await response.text().catch(() => "");
if (!response.ok) console.warn("signed-delivery", inboxUrl, response.status, text.slice(0, 200));
@@ -169,9 +173,18 @@ export async function deliverToInboxes(env: Env, user: User, inboxes: Iterable<s
export async function processOutgoingDeliveries(env: Env): Promise<void> {
const now = new Date().toISOString();
const deliveries = await listDueOutgoingDeliveries(env, now, DELIVERY_BATCH_SIZE);
for (const delivery of deliveries) {
await processOutgoingDelivery(env, delivery);
}
await runWithConcurrency(deliveries, DELIVERY_CONCURRENCY, (delivery) => processOutgoingDelivery(env, delivery));
}
async function runWithConcurrency<T>(items: T[], concurrency: number, worker: (item: T) => Promise<void>): Promise<void> {
let cursor = 0;
const runners = Array.from({ length: Math.min(concurrency, items.length) }, async () => {
while (cursor < items.length) {
const item = items[cursor++];
await worker(item);
}
});
await Promise.all(runners);
}
async function processOutgoingDelivery(env: Env, delivery: OutgoingDelivery): Promise<void> {
@@ -218,17 +231,12 @@ function nextDeliveryAttemptAt(attempts: number): string {
export async function gatherFollowerInboxes(env: Env, userId: string): Promise<string[]> {
const rows = await env.DB.prepare(
"SELECT inbox FROM follows WHERE local_user_id = ? AND accepted = 1"
`SELECT COALESCE(ac.shared_inbox, f.inbox) AS inbox
FROM follows f
LEFT JOIN actor_cache ac ON ac.inbox = f.inbox
WHERE f.local_user_id = ? AND f.accepted = 1`
).bind(userId).all<{ inbox: string }>();
const inboxes = new Set<string>();
for (const row of rows.results) {
if (row.inbox) {
const actor = await getActorFromCache(env, row.inbox);
const shared = (await getActorByKeyId(env, row.inbox))?.shared_inbox;
inboxes.add(actor?.shared_inbox ?? shared ?? row.inbox);
}
}
return [...inboxes];
return [...new Set(rows.results.map((row) => row.inbox).filter(Boolean))];
}
export async function resolveDeliveryInboxes(env: Env, actorIds: Iterable<string>): Promise<string[]> {
+8 -1
View File
@@ -97,7 +97,9 @@ export default {
try {
await ensureAdminUser(env);
const response = await route(request, env);
ctx.waitUntil(processOutgoingDeliveries(env));
if (shouldDrainOutgoingDeliveries(request)) {
ctx.waitUntil(processOutgoingDeliveries(env));
}
return response;
} catch (error) {
if (error instanceof HttpError) return json({ error: error.message }, error.status);
@@ -252,3 +254,8 @@ function wantsHtmlDocument(request: Request): boolean {
const accept = request.headers.get("accept") ?? "";
return /\btext\/html\b/i.test(accept) && !/(application\/activity\+json|application\/ld\+json)/i.test(accept);
}
function shouldDrainOutgoingDeliveries(request: Request): boolean {
const method = request.method.toUpperCase();
return method === "POST" || method === "PUT" || method === "PATCH" || method === "DELETE";
}
+98 -23
View File
@@ -27,6 +27,7 @@ import {
getActorFromCache,
getAdminUser,
getAppByClientId,
getOAuthToken,
getCachedStatusByObjectId,
getStatus,
getUserById,
@@ -43,7 +44,8 @@ import {
saveMarker,
setUserAvatarKey,
setUserHeaderKey,
takeOAuthCode
takeOAuthCode,
touchOAuthToken
} from "./db";
import {
deliverToInboxes,
@@ -103,7 +105,7 @@ import {
const TOKEN_TTL_SECONDS = 60 * 60 * 24 * 90;
const MAX_STATUS_CHARS = 5000;
const REPORTED_MEDIA_ATTACHMENTS_LIMIT = 9999;
const MAX_MEDIA_ATTACHMENTS = 20;
const MAX_MEDIA_BYTES = 10 * 1024 * 1024;
const SUPPORTED_MIME = ["image/jpeg", "image/png", "image/gif", "image/webp"];
@@ -175,7 +177,7 @@ export async function instance(env: Env): Promise<Response> {
approval_required: false,
invites_enabled: false,
configuration: {
statuses: { max_characters: MAX_STATUS_CHARS, max_media_attachments: REPORTED_MEDIA_ATTACHMENTS_LIMIT, characters_reserved_per_url: 23 },
statuses: { max_characters: MAX_STATUS_CHARS, max_media_attachments: MAX_MEDIA_ATTACHMENTS, characters_reserved_per_url: 23 },
media_attachments: { supported_mime_types: SUPPORTED_MIME, image_size_limit: MAX_MEDIA_BYTES, image_matrix_limit: 16777216 },
polls: { max_options: MAX_POLL_OPTIONS, max_characters_per_option: MAX_POLL_OPTION_CHARS, min_expiration: MIN_POLL_EXPIRATION_SECONDS, max_expiration: MAX_POLL_EXPIRATION_SECONDS }
},
@@ -198,7 +200,7 @@ export async function instanceV2(env: Env): Promise<Response> {
configuration: {
urls: { streaming: `wss://${hostFromBaseUrl(env)}` },
accounts: { max_featured_tags: 0 },
statuses: { max_characters: MAX_STATUS_CHARS, max_media_attachments: REPORTED_MEDIA_ATTACHMENTS_LIMIT, characters_reserved_per_url: 23 },
statuses: { max_characters: MAX_STATUS_CHARS, max_media_attachments: MAX_MEDIA_ATTACHMENTS, characters_reserved_per_url: 23 },
media_attachments: { supported_mime_types: SUPPORTED_MIME, image_size_limit: MAX_MEDIA_BYTES, image_matrix_limit: 16777216 },
polls: { max_options: MAX_POLL_OPTIONS, max_characters_per_option: MAX_POLL_OPTION_CHARS, min_expiration: MIN_POLL_EXPIRATION_SECONDS, max_expiration: MAX_POLL_EXPIRATION_SECONDS }
},
@@ -242,8 +244,9 @@ export async function verifyAppCredentials(request: Request, env: Env): Promise<
const auth = request.headers.get("authorization") ?? "";
const token = auth.match(/^Bearer\s+(.+)$/i)?.[1];
if (!token) throw new HttpError(401, "The access token is invalid");
const session = await env.KV.get<Session>(`token:${token}`, "json");
const session = await loadSession(env, token);
if (!session) throw new HttpError(401, "The access token is invalid");
requireScopes(session, ["read"]);
const app = await env.DB.prepare("SELECT * FROM oauth_apps WHERE id = ?").bind(session.appId).first<{ name: string; website: string | null }>();
return json({ name: app?.name ?? "Mastodon App", website: app?.website ?? null, vapid_key: "" });
}
@@ -285,7 +288,7 @@ export async function authorize(request: Request, env: Env): Promise<Response> {
}
const code = tokenString(32);
const scope = bodyString(body, "scope", app.scopes);
const scope = requestedScopesWithinApp(bodyString(body, "scope", app.scopes), app.scopes);
await env.DB.prepare("INSERT INTO oauth_codes (code, app_id, user_id, redirect_uri, scopes, expires_at) VALUES (?, ?, ?, ?, ?, ?)")
.bind(code, app.id, user.id, redirectUri, scope, Math.floor(Date.now() / 1000) + 600)
.run();
@@ -311,9 +314,9 @@ export async function token(request: Request, env: Env): Promise<Response> {
const user = await getUserByUsername(env, bodyString(body, "username"));
if (!user || !(await verifyPassword(bodyString(body, "password"), user.password_hash))) return json({ error: "invalid_grant" }, 400);
userId = user.id;
scopes = bodyString(body, "scope", app.scopes);
scopes = requestedScopesWithinApp(bodyString(body, "scope", app.scopes), app.scopes);
} else if (grantType === "client_credentials") {
scopes = bodyString(body, "scope", "read");
scopes = requestedScopesWithinApp(bodyString(body, "scope", "read"), app.scopes);
} else {
const row = await takeOAuthCode(env, bodyString(body, "code"));
if (!row || row.app_id !== app.id) return json({ error: "invalid_grant" }, 400);
@@ -407,12 +410,17 @@ export async function updateCredentials(request: Request, env: Env): Promise<Res
}
async function storeProfileAsset(env: Env, userId: string, kind: "avatar" | "header", file: File): Promise<string> {
if (!isSupportedImageMime(file.type)) throw new HttpError(415, "unsupported media type");
const ext = mimeExtension(file.type) ?? safeFileName(file.name).split(".").pop() ?? "bin";
const key = `${userId}/${kind}-${id()}.${ext}`;
await env.MEDIA.put(key, file.stream(), { httpMetadata: { contentType: file.type || "application/octet-stream" } });
return key;
}
function isSupportedImageMime(mime: string): boolean {
return SUPPORTED_MIME.includes(mime) || mime === "image/jpg";
}
function mimeExtension(mime: string): string | null {
switch (mime) {
case "image/jpeg": case "image/jpg": return "jpg";
@@ -740,6 +748,8 @@ function parseStatusCreateInput(body: ParsedBody): StatusCreateInput {
if (pollOptions.some((option) => option.length > MAX_POLL_OPTION_CHARS)) throw new HttpError(422, "poll_option_too_long");
const pollExpiresIn = pollOptions.length > 0 ? parsePollExpiresIn(bodyString(body, "poll[expires_in]", String(MIN_POLL_EXPIRATION_SECONDS))) : null;
const mediaIds = bodyArray(body, "media_ids");
if (mediaIds.length > MAX_MEDIA_ATTACHMENTS) throw new HttpError(422, "too_many_media_attachments");
return {
statusText,
@@ -748,7 +758,7 @@ function parseStatusCreateInput(body: ParsedBody): StatusCreateInput {
visibility,
inReplyTo: bodyString(body, "in_reply_to_id"),
language: bodyString(body, "language", "en"),
mediaIds: bodyArray(body, "media_ids"),
mediaIds,
pollOptions,
pollExpiresIn,
pollMultiple: bodyString(body, "poll[multiple]") === "true",
@@ -766,6 +776,8 @@ function parseStatusEditInput(body: ParsedBody, existing: Status): StatusEditInp
const visibility = bodyString(body, "visibility", existing.visibility);
if (!isStatusVisibility(visibility)) throw new HttpError(422, "invalid_visibility");
const mediaIds = Object.prototype.hasOwnProperty.call(body, "media_ids") ? bodyArray(body, "media_ids") : null;
if (mediaIds && mediaIds.length > MAX_MEDIA_ATTACHMENTS) throw new HttpError(422, "too_many_media_attachments");
return {
statusText,
@@ -775,7 +787,7 @@ function parseStatusEditInput(body: ParsedBody, existing: Status): StatusEditInp
: Boolean(existing.sensitive),
visibility,
language: bodyString(body, "language", existing.language || "en"),
mediaIds: Object.prototype.hasOwnProperty.call(body, "media_ids") ? bodyArray(body, "media_ids") : null
mediaIds
};
}
@@ -1381,6 +1393,7 @@ export async function uploadMedia(request: Request, env: Env): Promise<Response>
const file = form.get("file");
if (!(file instanceof File)) return json({ error: "file is required" }, 422);
if (file.size > MAX_MEDIA_BYTES) return json({ error: "file too large" }, 413);
if (!isSupportedImageMime(file.type)) return json({ error: "unsupported media type" }, 415);
const mediaId = id();
const key = `${user.id}/${mediaId}/${safeFileName(file.name || "upload")}`;
@@ -1606,7 +1619,7 @@ export async function listTimeline(request: Request, env: Env, listId: string):
}
export async function followAccount(request: Request, env: Env, accountId: string): Promise<Response> {
const user = await requireUser(request, env);
const user = await requireUser(request, env, ["follow"]);
const target = await resolveAccountTarget(env, accountId);
if (!target) return json({ error: "Record not found" }, 404);
@@ -1630,7 +1643,7 @@ export async function followAccount(request: Request, env: Env, accountId: strin
}
export async function unfollowAccount(request: Request, env: Env, accountId: string): Promise<Response> {
const user = await requireUser(request, env);
const user = await requireUser(request, env, ["follow"]);
const target = await resolveAccountTarget(env, accountId);
if (!target) return json({ error: "Record not found" }, 404);
@@ -1649,23 +1662,23 @@ export async function unfollowAccount(request: Request, env: Env, accountId: str
}
export async function followRequestsList(request: Request, env: Env): Promise<Response> {
await requireUser(request, env);
await requireUser(request, env, ["follow"]);
return json([]);
}
export async function authorizeFollowRequest(request: Request, env: Env, _accountId: string): Promise<Response> {
await requireUser(request, env);
await requireUser(request, env, ["follow"]);
return json({ id: _accountId, following: true, requested: false });
}
export async function rejectFollowRequest(request: Request, env: Env, _accountId: string): Promise<Response> {
await requireUser(request, env);
await requireUser(request, env, ["follow"]);
return json({ id: _accountId, following: false, requested: false });
}
export async function search(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
const q = (url.searchParams.get("q") ?? "").trim();
const q = (url.searchParams.get("q") ?? "").trim().slice(0, 128);
const type = url.searchParams.get("type");
const accounts: unknown[] = [];
const statuses: unknown[] = [];
@@ -1686,7 +1699,7 @@ export async function search(request: Request, env: Env): Promise<Response> {
}
}
} else {
const rows = await env.DB.prepare("SELECT * FROM users WHERE username LIKE ? LIMIT 20").bind(`%${q}%`).all<User>();
const rows = await env.DB.prepare("SELECT * FROM users WHERE username LIKE ? ESCAPE '\\' LIMIT 20").bind(likeContains(q)).all<User>();
for (const row of rows.results) accounts.push(await accountJson(env, row));
}
}
@@ -1697,20 +1710,24 @@ export async function search(request: Request, env: Env): Promise<Response> {
if (remoteStatus && await canViewerViewCachedStatus(env, remoteStatus, viewer)) {
statuses.push(await cachedStatusToMastodon(env, remoteStatus));
}
const rows = await env.DB.prepare("SELECT * FROM statuses WHERE content LIKE ? ORDER BY created_at DESC LIMIT 100").bind(`%${escapeHtml(q)}%`).all<Status>();
const rows = await env.DB.prepare("SELECT * FROM statuses WHERE content LIKE ? ESCAPE '\\' ORDER BY created_at DESC LIMIT 100").bind(likeContains(q)).all<Status>();
const visibleRows = await filterStatusesForViewer(env, rows.results, viewer);
statuses.push(...await serializeStatuses(env, visibleRows.slice(0, 20), request));
}
if (!type || type === "hashtags") {
const tag = q.replace(/^#/, "");
const rows = await env.DB.prepare("SELECT tag, COUNT(*) AS count FROM hashtags WHERE tag LIKE ? GROUP BY tag LIMIT 20").bind(`%${tag}%`).all<{ tag: string; count: number }>();
const rows = await env.DB.prepare("SELECT tag, COUNT(*) AS count FROM hashtags WHERE tag LIKE ? ESCAPE '\\' GROUP BY tag LIMIT 20").bind(likeContains(tag)).all<{ tag: string; count: number }>();
for (const row of rows.results) hashtags.push({ name: row.tag, url: `${baseUrl(env)}/tags/${encodeURIComponent(row.tag)}`, history: [] });
}
return json({ accounts, statuses, hashtags });
}
function likeContains(value: string): string {
return `%${value.replace(/[\\%_]/g, (char) => `\\${char}`)}%`;
}
async function resolveRemoteStatusSearch(env: Env, q: string): Promise<CachedStatus | null> {
const url = parseSearchUrl(q);
if (!url || url.host.toLowerCase() === hostFromBaseUrl(env).toLowerCase()) return null;
@@ -2369,7 +2386,8 @@ async function resolveAcct(env: Env, acct: string): Promise<{ acct: string; acto
}
try {
const wf = await fetch(`https://${targetHost}/.well-known/webfinger?resource=acct:${name}@${targetHost}`, {
headers: { accept: "application/jrd+json, application/json" }
headers: { accept: "application/jrd+json, application/json" },
signal: AbortSignal.timeout(10_000)
});
if (!wf.ok) return null;
const doc = await wf.json() as { links?: { rel: string; type?: string; href: string }[] };
@@ -2750,8 +2768,9 @@ async function viewerUser(request: Request, env: Env): Promise<User | null> {
const auth = request.headers.get("authorization") ?? "";
const token = auth.match(/^Bearer\s+(.+)$/i)?.[1];
if (!token) return null;
const session = await env.KV.get<Session>(`token:${token}`, "json");
const session = await loadSession(env, token);
if (!session) return null;
if (!hasAnyScope(session, ["read"])) return null;
return getUserById(env, session.userId);
}
@@ -2773,15 +2792,71 @@ async function loadPinnedStatusIds(env: Env, userId: string, statusIds: string[]
return new Set(rows.results.map((row) => row.status_id));
}
async function requireUser(request: Request, env: Env): Promise<User> {
async function requireUser(request: Request, env: Env, scopes = defaultRequiredScopes(request)): Promise<User> {
const auth = request.headers.get("authorization") ?? "";
const token = auth.match(/^Bearer\s+(.+)$/i)?.[1];
if (!token) throw new HttpError(401, "The access token is invalid");
const session = await env.KV.get<Session>(`token:${token}`, "json");
const session = await loadSession(env, token);
if (!session) throw new HttpError(401, "The access token is invalid");
requireScopes(session, scopes);
const user = await getUserById(env, session.userId);
if (!user) throw new HttpError(401, "The access token is invalid");
return user;
}
async function loadSession(env: Env, token: string): Promise<Session | null> {
const session = await env.KV.get<Session>(`token:${token}`, "json");
if (session) return session;
const row = await getOAuthToken(env, token);
if (!row) return null;
const createdAt = Date.parse(row.created_at);
if (!Number.isFinite(createdAt) || Date.now() - createdAt > TOKEN_TTL_SECONDS * 1000) {
await deleteOAuthToken(env, token).catch(() => undefined);
return null;
}
const restored = { userId: row.user_id, appId: row.app_id, scopes: row.scopes } satisfies Session;
await Promise.allSettled([
env.KV.put(`token:${token}`, JSON.stringify(restored), { expirationTtl: TOKEN_TTL_SECONDS }),
touchOAuthToken(env, token)
]);
return restored;
}
function defaultRequiredScopes(request: Request): string[] {
return request.method.toUpperCase() === "GET" ? ["read"] : ["write"];
}
function requestedScopesWithinApp(requested: string, appScopes: string): string {
const requestedScopes = normalizeScopes(requested || appScopes);
const allowed = normalizeScopes(appScopes);
if (requestedScopes.length === 0) return appScopes;
if (requestedScopes.every((scope) => scopeAllowed(scope, allowed))) return requestedScopes.join(" ");
throw new HttpError(400, "invalid_scope");
}
function requireScopes(session: Session, required: string[]): void {
if (!hasAllScopes(session, required)) throw new HttpError(403, "insufficient_scope");
}
function hasAllScopes(session: Session, required: string[]): boolean {
return required.every((scope) => hasAnyScope(session, [scope]));
}
function hasAnyScope(session: Session, required: string[]): boolean {
const granted = normalizeScopes(session.scopes);
return required.some((scope) => scopeAllowed(scope, granted));
}
function scopeAllowed(required: string, granted: string[]): boolean {
if (granted.includes(required)) return true;
if (!required.includes(":") && granted.some((scope) => scope.startsWith(`${required}:`))) return true;
const root = required.split(":")[0];
return granted.includes(root);
}
function normalizeScopes(scopes: string): string[] {
return [...new Set(scopes.split(/\s+/).map((scope) => scope.trim()).filter(Boolean))];
}
export { requireUser };