Initial toot-worker implementation
This commit is contained in:
@@ -0,0 +1,260 @@
|
||||
import {
|
||||
digestBase64,
|
||||
parseSignatureHeader,
|
||||
signString,
|
||||
verifyWithPem
|
||||
} from "./crypto";
|
||||
import {
|
||||
actorCacheStale,
|
||||
deleteActorFromCache,
|
||||
getActorByKeyId,
|
||||
getActorFromCache,
|
||||
recordNotification,
|
||||
upsertActorCache
|
||||
} from "./db";
|
||||
import type { ActorCache, Json, RemoteActor, Status, User } from "./types";
|
||||
import { SIGNATURE_MAX_SKEW_MS } from "./types";
|
||||
import { actorUrl, base64Decode, encoder, hostFromBaseUrl, parseAcctFromActor } from "./util";
|
||||
|
||||
const ACTIVITY_HEADERS = "application/activity+json, application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"";
|
||||
|
||||
export async function resolveRemoteActor(env: Env, actorId: string, opts: { force?: boolean } = {}): Promise<ActorCache | null> {
|
||||
if (!actorId) return null;
|
||||
const cached = await getActorFromCache(env, actorId);
|
||||
if (cached && !opts.force && !actorCacheStale(cached)) return cached;
|
||||
const fetched = await fetchRemoteActor(actorId);
|
||||
if (!fetched) return cached;
|
||||
return upsertActorCache(env, fetched);
|
||||
}
|
||||
|
||||
export async function fetchRemoteActor(actorId: string): Promise<RemoteActor | null> {
|
||||
try {
|
||||
const response = await fetch(actorId, {
|
||||
headers: { accept: ACTIVITY_HEADERS },
|
||||
cf: { cacheTtl: 60 }
|
||||
});
|
||||
if (!response.ok) return null;
|
||||
const data = await response.json() as RemoteActor;
|
||||
if (!data || !data.id) return null;
|
||||
return data;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export async function discoverActorByKeyId(env: Env, keyId: string): Promise<ActorCache | null> {
|
||||
const cached = await getActorByKeyId(env, keyId);
|
||||
if (cached && !actorCacheStale(cached)) return cached;
|
||||
const actorIdGuess = keyId.split("#")[0];
|
||||
const refreshed = await resolveRemoteActor(env, actorIdGuess, { force: true });
|
||||
if (refreshed?.public_key_id === keyId) return refreshed;
|
||||
return cached;
|
||||
}
|
||||
|
||||
export type VerifiedSignature = {
|
||||
actor: ActorCache;
|
||||
keyId: string;
|
||||
};
|
||||
|
||||
export async function verifyInboundSignature(request: Request, body: string, env: Env): Promise<VerifiedSignature | null> {
|
||||
const sigHeader = request.headers.get("signature");
|
||||
if (!sigHeader) return null;
|
||||
const parsed = parseSignatureHeader(sigHeader);
|
||||
if (!parsed || !parsed.headers.includes("(request-target)")) return null;
|
||||
|
||||
const dateHeader = request.headers.get("date");
|
||||
if (dateHeader) {
|
||||
const stamp = Date.parse(dateHeader);
|
||||
if (!Number.isFinite(stamp)) return null;
|
||||
if (Math.abs(Date.now() - stamp) > SIGNATURE_MAX_SKEW_MS) return null;
|
||||
} else if (parsed.headers.includes("date")) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const digestHeader = request.headers.get("digest");
|
||||
if (digestHeader) {
|
||||
const expected = `SHA-256=${await digestBase64(body)}`;
|
||||
if (digestHeader !== expected) return null;
|
||||
} else if (request.method.toUpperCase() === "POST" && body) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const url = new URL(request.url);
|
||||
const hostHeader = request.headers.get("host");
|
||||
if (parsed.headers.includes("host") && hostHeader && hostHeader.toLowerCase() !== url.host.toLowerCase()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const lines: string[] = [];
|
||||
for (const headerName of parsed.headers) {
|
||||
if (headerName === "(request-target)") {
|
||||
lines.push(`(request-target): ${request.method.toLowerCase()} ${url.pathname}${url.search}`);
|
||||
continue;
|
||||
}
|
||||
const value = request.headers.get(headerName);
|
||||
if (value === null) return null;
|
||||
lines.push(`${headerName}: ${value}`);
|
||||
}
|
||||
|
||||
const actor = await discoverActorByKeyId(env, parsed.keyId);
|
||||
if (!actor || !actor.public_key_pem) return null;
|
||||
|
||||
try {
|
||||
const ok = await verifyWithPem(actor.public_key_pem, lines.join("\n"), parsed.signature);
|
||||
if (!ok) return null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
return { actor, keyId: parsed.keyId };
|
||||
}
|
||||
|
||||
export async function sendSignedActivity(env: Env, user: User, inboxUrl: string, activity: Json): Promise<{ ok: boolean; status: number; text: string }> {
|
||||
const target = new URL(inboxUrl);
|
||||
const body = JSON.stringify(activity);
|
||||
const digest = `SHA-256=${await digestBase64(body)}`;
|
||||
const date = new Date().toUTCString();
|
||||
const host = target.host;
|
||||
const path = `${target.pathname}${target.search}`;
|
||||
const signingString = [
|
||||
`(request-target): post ${path}`,
|
||||
`host: ${host}`,
|
||||
`date: ${date}`,
|
||||
`digest: ${digest}`
|
||||
].join("\n");
|
||||
const signature = await signString(signingString, JSON.parse(user.private_key_jwk) as JsonWebKey);
|
||||
const headerValue = [
|
||||
`keyId="${actorUrl(env, user)}#main-key"`,
|
||||
`algorithm="rsa-sha256"`,
|
||||
`headers="(request-target) host date digest"`,
|
||||
`signature="${signature}"`
|
||||
].join(",");
|
||||
|
||||
try {
|
||||
const response = await fetch(inboxUrl, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
accept: "application/activity+json",
|
||||
"content-type": "application/activity+json",
|
||||
date,
|
||||
digest,
|
||||
signature: headerValue,
|
||||
"user-agent": `toot-worker (+https://${hostFromBaseUrl(env)})`
|
||||
},
|
||||
body
|
||||
});
|
||||
const text = response.ok ? "" : await response.text().catch(() => "");
|
||||
if (!response.ok) console.warn("signed-delivery", inboxUrl, response.status, text.slice(0, 200));
|
||||
return { ok: response.ok, status: response.status, text };
|
||||
} catch (error) {
|
||||
console.warn("signed-delivery-error", inboxUrl, String(error));
|
||||
return { ok: false, status: 0, text: String(error) };
|
||||
}
|
||||
}
|
||||
|
||||
export async function deliverToInboxes(env: Env, user: User, inboxes: Iterable<string>, activity: Json): Promise<void> {
|
||||
const unique = new Set<string>();
|
||||
for (const inbox of inboxes) {
|
||||
if (inbox) unique.add(inbox);
|
||||
}
|
||||
await Promise.allSettled([...unique].map((inbox) => sendSignedActivity(env, user, inbox, activity)));
|
||||
}
|
||||
|
||||
export async function gatherFollowerInboxes(env: Env, userId: string): Promise<string[]> {
|
||||
const rows = await env.DB.prepare(
|
||||
"SELECT inbox FROM follows WHERE local_user_id = ? AND accepted = 1"
|
||||
).bind(userId).all<{ inbox: string }>();
|
||||
const inboxes = new Set<string>();
|
||||
for (const row of rows.results) {
|
||||
if (row.inbox) {
|
||||
const actor = await getActorFromCache(env, row.inbox);
|
||||
const shared = (await getActorByKeyId(env, row.inbox))?.shared_inbox;
|
||||
inboxes.add(actor?.shared_inbox ?? shared ?? row.inbox);
|
||||
}
|
||||
}
|
||||
return [...inboxes];
|
||||
}
|
||||
|
||||
export async function resolveDeliveryInboxes(env: Env, actorIds: Iterable<string>): Promise<string[]> {
|
||||
const inboxes = new Set<string>();
|
||||
for (const actorId of actorIds) {
|
||||
const actor = await resolveRemoteActor(env, actorId);
|
||||
if (!actor) continue;
|
||||
inboxes.add(actor.shared_inbox ?? actor.inbox);
|
||||
}
|
||||
return [...inboxes];
|
||||
}
|
||||
|
||||
export type InboundActivity = {
|
||||
body: Json;
|
||||
bodyText: string;
|
||||
activityId: string;
|
||||
type: string;
|
||||
actor: string;
|
||||
};
|
||||
|
||||
export function parseActivity(bodyText: string): InboundActivity | null {
|
||||
try {
|
||||
const body = JSON.parse(bodyText) as Json;
|
||||
return {
|
||||
body,
|
||||
bodyText,
|
||||
activityId: String(body.id ?? ""),
|
||||
type: String(body.type ?? ""),
|
||||
actor: typeof body.actor === "string" ? body.actor : String((body.actor as Json | undefined)?.id ?? "")
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export function activityObjectField(activity: InboundActivity, field: string): unknown {
|
||||
const object = activity.body[field];
|
||||
return object;
|
||||
}
|
||||
|
||||
export function objectAsJson(value: unknown): Json | null {
|
||||
return value && typeof value === "object" ? value as Json : null;
|
||||
}
|
||||
|
||||
export function objectIdString(value: unknown): string | null {
|
||||
if (typeof value === "string") return value;
|
||||
const obj = objectAsJson(value);
|
||||
if (obj && typeof obj.id === "string") return obj.id;
|
||||
return null;
|
||||
}
|
||||
|
||||
export async function recordRemoteActivity(env: Env, activity: InboundActivity, verified: boolean): Promise<void> {
|
||||
await env.DB.prepare(
|
||||
"INSERT OR IGNORE INTO remote_activities (id, actor, type, payload, received_at) VALUES (?, ?, ?, ?, ?)"
|
||||
)
|
||||
.bind(
|
||||
activity.activityId || crypto.randomUUID(),
|
||||
activity.actor,
|
||||
activity.type,
|
||||
JSON.stringify({ ...activity.body, signature_verified: verified }),
|
||||
new Date().toISOString()
|
||||
)
|
||||
.run();
|
||||
}
|
||||
|
||||
export async function isDuplicateActivity(env: Env, activityId: string): Promise<boolean> {
|
||||
if (!activityId) return false;
|
||||
const row = await env.DB.prepare("SELECT id FROM remote_activities WHERE id = ?").bind(activityId).first<{ id: string }>();
|
||||
return Boolean(row);
|
||||
}
|
||||
|
||||
export async function notifyForLocalStatus(env: Env, status: Status, type: string, actor: string): Promise<void> {
|
||||
await recordNotification(env, status.user_id, type, actor, status.id);
|
||||
}
|
||||
|
||||
export function mentionAcct(env: Env, actorId: string): string {
|
||||
return parseAcctFromActor(env, actorId);
|
||||
}
|
||||
|
||||
export async function decodeRemoteSignatureBase64(value: string): Promise<Uint8Array> {
|
||||
return base64Decode(value);
|
||||
}
|
||||
|
||||
export function activitySignableData(input: string): Uint8Array {
|
||||
return encoder.encode(input);
|
||||
}
|
||||
Reference in New Issue
Block a user