修复远程缓存

This commit is contained in:
浪子
2026-05-14 15:36:25 +08:00
parent 5365a3569f
commit a2badc2d4f
6 changed files with 225 additions and 33 deletions
@@ -0,0 +1,9 @@
-- Preserve enough ActivityPub Note metadata for remote cached statuses to
-- render accurately and avoid exposing non-public deliveries.
ALTER TABLE cached_statuses ADD COLUMN visibility TEXT NOT NULL DEFAULT 'public';
ALTER TABLE cached_statuses ADD COLUMN mentions_json TEXT NOT NULL DEFAULT '[]';
ALTER TABLE cached_statuses ADD COLUMN tags_json TEXT NOT NULL DEFAULT '[]';
ALTER TABLE cached_statuses ADD COLUMN local_recipients_json TEXT NOT NULL DEFAULT '[]';
CREATE INDEX IF NOT EXISTS idx_cached_statuses_visibility_time ON cached_statuses(visibility, published DESC);
+4 -3
View File
@@ -120,7 +120,7 @@ npm run deploy
- `POST /users/:username/inbox``POST /inbox`(共享 inbox)
- `GET /objects/:id`(嘟文已删除时返回 `Tombstone`,HTTP 410)
入站 inbox 处理类型:`Follow` / `Undo(Follow)` / `Accept(Follow)` / `Reject(Follow)` / `Like` / `Undo(Like)` / `Announce` / `Undo(Announce)` / `Delete(Note)`(同时清远端嘟文缓存)/ `Delete(Person)`(同时清缓存与关注关系)/ `Update(Person)` / `Update(Note)` / `Create(Note)`(被关注的远端账号公开嘟文会写入 `cached_statuses` 给 home timeline 使用,同时若 mention/回复本地用户会触发通知)。
入站 inbox 处理类型:`Follow` / `Undo(Follow)` / `Accept(Follow)` / `Reject(Follow)` / `Like` / `Undo(Like)` / `Announce` / `Undo(Announce)` / `Delete(Note)`(同时清远端嘟文缓存)/ `Delete(Person)`(同时清缓存与关注关系)/ `Update(Person)` / `Update(Note)` / `Create(Note)`(被关注的远端账号公开 / followers-only Note,或投递给本地用户的 Note 会写入 `cached_statuses` 给 home timeline 和通知使用,同时若 mention/回复本地用户会触发通知)。
## 安全
@@ -141,6 +141,7 @@ npm run deploy
- `migrations/0001_initial.sql` — 基础表(users / oauth_apps / oauth_codes / statuses / media / follows / remote_activities)
- `migrations/0002_features.sql` — 通知 / 收藏 / 转发 / 提及 / 话题标签 / actor 缓存 / 出站关注 / 删除墓碑 / 嘟文扩展字段(summary / sensitive / language)
- `migrations/0003_bookmarks_cache.sql` — 收藏夹(bookmarks)/ 置顶(pinned_statuses)/ 远端嘟文缓存(cached_statuses)/ OAuth Token 持久表
- `migrations/0006_cached_status_metadata.sql` — 远端缓存嘟文的可见性 / mentions / tags / 本地收件人元数据
## 重要限制
@@ -151,8 +152,8 @@ npm run deploy
- `public` / `unlisted` 可公开读取; ActivityPub outbox 只暴露这两类状态
- `private` 会按 followers-only 投递,本地读取限作者和本地关注者
- `direct` 仍没有完整受众表,本地读取保守限制为作者可见,不应当作为完整私信系统使用
- 远端嘟文缓存只在被本地账号关注的 actor 发出的入站 `Create(Note)` 写入,不抓取历史 outbox
- 远端缓存嘟文保留正文、CW、语言和附件; `mentions``tags`、互动计数等不会完整恢复,在 home timeline 中统一按 `visibility: public` 返回
- 远端嘟文缓存只从入站 `Create(Note)` 和已缓存嘟文的 `Update(Note)` 写入,不抓取历史 outbox
- 远端缓存嘟文保留正文、CW、语言、可见性、mentions、tags、本地收件人和附件; 互动计数、poll、card 等扩展信息不会完整恢复
- 媒体上传只支持 `image/jpeg``image/png``image/gif``image/webp`,单文件 10MB,单条状态的附件数量不做服务端限制; 头像和封面同样只按图片路径处理
- 没有实现接口级限流、反滥用或审核流; `follow_requests` 相关接口仍是 stub
- 没有实现轮询(poll)、列表(list)、推送(push)、未来嘟文(scheduled)等
+95 -15
View File
@@ -35,7 +35,7 @@ import {
PUBLIC_COLLECTION,
SECURITY_CONTEXT
} from "./types";
import type { ActorCache, Json, RemoteActor, Status, User } from "./types";
import type { ActorCache, CachedStatus, CachedStatusMention, CachedStatusTag, Json, RemoteActor, Status, User } from "./types";
import {
actorUrl,
activityUrl,
@@ -419,7 +419,7 @@ async function handleUpdate(ctx: InboxContext): Promise<Response> {
if (String(obj.type ?? "") === "Note" && typeof obj.id === "string") {
const existing = await getCachedStatusByObjectId(env, obj.id);
if (existing && existing.actor === actorId) {
await cacheRemoteNote(env, actorId, obj);
await cacheRemoteNote(env, actorId, obj, activity.body, existing);
}
}
return new Response(null, { status: 202 });
@@ -439,28 +439,27 @@ async function handleCreate(ctx: InboxContext): Promise<Response> {
}
const isPublic = recipients.includes(PUBLIC_COLLECTION);
const isFollowersOnly = recipients.some((recipient) => isFollowersCollection(actorId, recipient));
const followed = await isFollowedByAnyLocalUser(env, actorId);
if (followed && (isPublic || localActorIds.size > 0)) {
await cacheRemoteNote(env, actorId, obj);
if ((followed && (isPublic || isFollowersOnly)) || localActorIds.size > 0) {
await cacheRemoteNote(env, actorId, obj, activity.body);
}
for (const username of localActorIds) {
const localUser = await getUserByUsername(env, username);
if (!localUser) continue;
const inReplyTo = typeof obj.inReplyTo === "string" ? obj.inReplyTo : null;
let statusId: string | null = null;
if (inReplyTo) {
const parent = await getStatusByObjectId(env, inReplyTo);
if (parent) statusId = parent.id;
}
await recordNotification(env, localUser.id, "mention", actorId, statusId);
await recordNotification(env, localUser.id, "mention", actorId, typeof obj.id === "string" ? obj.id : null);
}
return new Response(null, { status: 202 });
}
async function cacheRemoteNote(env: Env, actorId: string, note: Json): Promise<void> {
async function cacheRemoteNote(env: Env, actorId: string, note: Json, activity: Json = {}, fallback?: CachedStatus): Promise<void> {
if (typeof note.id !== "string") return;
const cachedId = note.id;
const recipients = collectRecipients(activity, note);
const mentions = note.tag === undefined ? parseJsonArray<CachedStatusMention>(fallback?.mentions_json) : extractRemoteMentions(note);
const tags = note.tag === undefined ? parseJsonArray<CachedStatusTag>(fallback?.tags_json) : extractRemoteHashtags(note);
const localRecipients = recipients.length === 0 ? parseJsonArray<string>(fallback?.local_recipients_json) : collectLocalRecipients(env, recipients);
const stored = await upsertCachedStatus(env, {
id: cachedId,
object_id: cachedId,
@@ -469,9 +468,13 @@ async function cacheRemoteNote(env: Env, actorId: string, note: Json): Promise<v
summary: typeof note.summary === "string" ? note.summary : "",
sensitive: note.sensitive ? 1 : 0,
language: typeof note.contentMap === "object" && note.contentMap ? Object.keys(note.contentMap as Json)[0] ?? "en" : "en",
visibility: inferRemoteVisibility(actorId, activity, note, fallback?.visibility ?? "public"),
in_reply_to: typeof note.inReplyTo === "string" ? note.inReplyTo : null,
url: typeof note.url === "string" ? note.url : cachedId,
published: typeof note.published === "string" ? note.published : new Date().toISOString()
published: typeof note.published === "string" ? note.published : new Date().toISOString(),
mentions_json: JSON.stringify(mentions),
tags_json: JSON.stringify(tags),
local_recipients_json: JSON.stringify(localRecipients)
});
if (!stored) return;
await env.DB.prepare("DELETE FROM cached_status_attachments WHERE cached_status_id = ?").bind(stored.id).run();
@@ -494,8 +497,85 @@ async function cacheRemoteNote(env: Env, actorId: string, note: Json): Promise<v
}
}
function inferRemoteVisibility(actorId: string, activity: Json, object: Json, fallback: string): string {
const to = collectRecipientFields(activity.to, activity.bto, object.to, object.bto);
const cc = collectRecipientFields(activity.cc, activity.bcc, object.cc, object.bcc);
const recipients = new Set([...to, ...cc]);
if (recipients.size === 0) return fallback;
if (to.has(PUBLIC_COLLECTION)) return "public";
if (cc.has(PUBLIC_COLLECTION)) return "unlisted";
if ([...recipients].some((recipient) => isFollowersCollection(actorId, recipient))) return "private";
return "direct";
}
function isFollowersCollection(actorId: string, recipient: string): boolean {
return recipient === `${actorId}/followers` || recipient.endsWith("/followers");
}
function collectLocalRecipients(env: Env, recipients: string[]): string[] {
return recipients.filter((recipient) => recipient.startsWith(baseUrl(env)) && /\/users\/[^/?#]+$/.test(recipient));
}
function extractRemoteMentions(note: Json): CachedStatusMention[] {
const mentions: CachedStatusMention[] = [];
for (const tag of noteTagObjects(note)) {
if (String(tag.type ?? "") !== "Mention") continue;
const url = stringValue(tag.href) ?? stringValue(tag.id);
if (!url) continue;
const acct = mentionAcct(tag, url);
mentions.push({ actor: url, acct, url });
}
return mentions;
}
function extractRemoteHashtags(note: Json): CachedStatusTag[] {
const tags: CachedStatusTag[] = [];
for (const tag of noteTagObjects(note)) {
const name = stringValue(tag.name);
if (String(tag.type ?? "") !== "Hashtag" && !name?.startsWith("#")) continue;
if (!name) continue;
tags.push({ name: name.replace(/^#/, "").toLowerCase(), url: stringValue(tag.href) ?? stringValue(tag.id) });
}
return tags;
}
function noteTagObjects(note: Json): Json[] {
const tags = Array.isArray(note.tag) ? note.tag : note.tag ? [note.tag] : [];
return tags.filter((tag): tag is Json => Boolean(tag) && typeof tag === "object");
}
function mentionAcct(tag: Json, url: string): string {
const name = stringValue(tag.name)?.replace(/^@/, "");
if (name) return name;
try {
const parsed = new URL(url);
const username = parsed.pathname.split("/").filter(Boolean).pop() ?? parsed.host;
return `${username}@${parsed.host}`;
} catch {
return url;
}
}
function stringValue(value: unknown): string | null {
return typeof value === "string" && value ? value : null;
}
function parseJsonArray<T>(value: string | null | undefined): T[] {
if (!value) return [];
try {
const parsed = JSON.parse(value) as unknown;
return Array.isArray(parsed) ? parsed as T[] : [];
} catch {
return [];
}
}
function collectRecipients(activity: Json, object: Json): string[] {
const fields: unknown[] = [activity.to, activity.cc, activity.bto, activity.bcc, object.to, object.cc];
const fields: unknown[] = [activity.to, activity.cc, activity.bto, activity.bcc, object.to, object.cc, object.bto, object.bcc];
return [...collectRecipientFields(...fields)];
}
function collectRecipientFields(...fields: unknown[]): Set<string> {
const out = new Set<string>();
for (const field of fields) {
if (Array.isArray(field)) {
@@ -506,7 +586,7 @@ function collectRecipients(activity: Json, object: Json): string[] {
out.add(field);
}
}
return [...out];
return out;
}
async function localUserFromTarget(env: Env, actorId: string | null): Promise<User | null> {
+23 -3
View File
@@ -247,19 +247,39 @@ export async function getCachedStatusByObjectId(env: Env, objectId: string): Pro
export async function upsertCachedStatus(env: Env, status: Omit<CachedStatus, "cached_at"> & { cached_at?: string }): Promise<CachedStatus | null> {
const now = status.cached_at ?? new Date().toISOString();
await env.DB.prepare(
`INSERT INTO cached_statuses (id, object_id, actor, content, summary, sensitive, language, in_reply_to, url, published, cached_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`INSERT INTO cached_statuses (id, object_id, actor, content, summary, sensitive, language, visibility, in_reply_to, url, published, mentions_json, tags_json, local_recipients_json, cached_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(object_id) DO UPDATE SET
content = excluded.content,
summary = excluded.summary,
sensitive = excluded.sensitive,
language = excluded.language,
visibility = excluded.visibility,
in_reply_to = excluded.in_reply_to,
url = excluded.url,
published = excluded.published,
mentions_json = excluded.mentions_json,
tags_json = excluded.tags_json,
local_recipients_json = excluded.local_recipients_json,
cached_at = excluded.cached_at`
)
.bind(status.id, status.object_id, status.actor, status.content, status.summary, status.sensitive, status.language, status.in_reply_to, status.url, status.published, now)
.bind(
status.id,
status.object_id,
status.actor,
status.content,
status.summary,
status.sensitive,
status.language,
status.visibility,
status.in_reply_to,
status.url,
status.published,
status.mentions_json,
status.tags_json,
status.local_recipients_json,
now
)
.run();
return getCachedStatusByObjectId(env, status.object_id);
}
+79 -12
View File
@@ -59,6 +59,8 @@ import type { ParsedBody } from "./http";
import type {
ActorCache,
CachedStatus,
CachedStatusMention,
CachedStatusTag,
Follow,
Media,
Mention,
@@ -97,6 +99,7 @@ type StatusViewer = {
user: User | null;
actor: string | null;
followsByOwnerId: Map<string, boolean>;
remoteFollowsByActorId: Map<string, boolean>;
};
function parseRedirectUris(value: string): string[] {
@@ -477,10 +480,13 @@ export async function accountStatuses(request: Request, env: Env, accountId: str
const remote = await getActorByLocalId(env, accountId)
?? (accountId.startsWith("http://") || accountId.startsWith("https://") ? await resolveRemoteActor(env, accountId) : null);
if (remote) {
const viewer = await loadStatusViewer(request, env);
const fetchLimit = Math.min(limit * 4, 160);
const rows = await env.DB.prepare(
"SELECT * FROM cached_statuses WHERE actor = ? ORDER BY published DESC LIMIT ?"
).bind(remote.id, limit).all<CachedStatus>();
const items = await Promise.all(rows.results.map((row) => cachedStatusToMastodon(env, row)));
).bind(remote.id, fetchLimit).all<CachedStatus>();
const visibleRows = await filterCachedStatusesForViewer(env, rows.results, viewer);
const items = await Promise.all(visibleRows.slice(0, limit).map((row) => cachedStatusToMastodon(env, row)));
return json(items);
}
@@ -920,6 +926,8 @@ export async function homeTimeline(request: Request, env: Env): Promise<Response
const user = await requireUser(request, env);
const url = new URL(request.url);
const limit = clampLimit(url.searchParams.get("limit"), 20, 40);
const viewer = statusViewerForUser(env, user);
const cachedFetchLimit = Math.min(limit * 4, 160);
const localRows = await env.DB.prepare(
"SELECT * FROM statuses WHERE user_id = ? ORDER BY created_at DESC LIMIT ?"
@@ -930,10 +938,11 @@ export async function homeTimeline(request: Request, env: Env): Promise<Response
INNER JOIN outgoing_follows of ON of.target_actor = cs.actor
WHERE of.local_user_id = ? AND of.accepted = 1
ORDER BY cs.published DESC LIMIT ?`
).bind(user.id, limit).all<CachedStatus>();
).bind(user.id, cachedFetchLimit).all<CachedStatus>();
const localItems = await serializeStatuses(env, localRows.results, request);
const cachedItems = await Promise.all(cachedRows.results.map((row) => cachedStatusToMastodon(env, row)));
const visibleCachedRows = await filterCachedStatusesForViewer(env, cachedRows.results, viewer);
const cachedItems = await Promise.all(visibleCachedRows.slice(0, limit).map((row) => cachedStatusToMastodon(env, row)));
const merged = [...localItems, ...cachedItems].sort((a, b) => {
const at = String(a.created_at ?? "");
@@ -1219,18 +1228,20 @@ async function cachedStatusToMastodon(env: Env, row: CachedStatus): Promise<Reco
const cache = await resolveRemoteActor(env, row.actor);
const account = cache ? remoteAccountJson(cache) : { id: row.actor, acct: row.actor, username: row.actor };
const attachments = await listCachedStatusAttachments(env, row.id);
const mentions = parseCachedJson<CachedStatusMention>(row.mentions_json);
const tags = parseCachedJson<CachedStatusTag>(row.tags_json);
return {
id: row.object_id,
uri: row.object_id,
url: row.url,
account,
in_reply_to_id: null,
in_reply_to_id: row.in_reply_to,
in_reply_to_account_id: null,
content: row.content,
text: row.content,
created_at: row.published,
edited_at: null,
visibility: "public",
visibility: row.visibility,
language: row.language,
sensitive: Boolean(row.sensitive),
spoiler_text: row.summary,
@@ -1247,8 +1258,13 @@ async function cachedStatusToMastodon(env: Env, row: CachedStatus): Promise<Reco
description: att.description,
blurhash: null
})),
mentions: [],
tags: [],
mentions: mentions.map((mention) => ({
id: mention.actor,
username: mention.acct.replace(/^@/, "").split("@")[0],
acct: mention.acct.replace(/^@/, ""),
url: mention.url
})),
tags: tags.map((tag) => ({ name: tag.name, url: tag.url })),
emojis: [],
reblogs_count: 0,
favourites_count: 0,
@@ -1265,6 +1281,15 @@ async function cachedStatusToMastodon(env: Env, row: CachedStatus): Promise<Reco
};
}
function parseCachedJson<T>(value: string): T[] {
try {
const parsed = JSON.parse(value) as unknown;
return Array.isArray(parsed) ? parsed as T[] : [];
} catch {
return [];
}
}
async function statusJson(
env: Env,
status: Status,
@@ -1589,6 +1614,12 @@ async function loadStatusesByIds(env: Env, statusIds: string[]): Promise<Status[
return rows.results;
}
async function loadCachedStatusesByObjectIds(env: Env, objectIds: string[]): Promise<CachedStatus[]> {
if (objectIds.length === 0) return [];
const rows = await env.DB.prepare(`SELECT * FROM cached_statuses WHERE object_id IN (${placeholders(objectIds.length)})`).bind(...objectIds).all<CachedStatus>();
return rows.results;
}
async function loadMediaByStatusIds(env: Env, statusIds: string[]): Promise<Map<string, Media[]>> {
const grouped = new Map<string, Media[]>();
if (statusIds.length === 0) return grouped;
@@ -1666,10 +1697,18 @@ async function loadReplyCountByStatusIds(env: Env, statusIds: string[]): Promise
async function serializeNotifications(env: Env, notifications: Notification[], request: Request): Promise<Record<string, unknown>[]> {
if (notifications.length === 0) return [];
const statuses = await loadStatusesByIds(env, uniqueStrings(notifications.map((notification) => notification.status_id)));
const visibleStatuses = await filterStatusesForViewer(env, statuses, await loadStatusViewer(request, env));
const notificationStatusIds = uniqueStrings(notifications.map((notification) => notification.status_id));
const viewer = await loadStatusViewer(request, env);
const statuses = await loadStatusesByIds(env, notificationStatusIds);
const localStatusIds = new Set(statuses.map((status) => status.id));
const cachedStatuses = await loadCachedStatusesByObjectIds(env, notificationStatusIds.filter((statusId) => !localStatusIds.has(statusId)));
const visibleStatuses = await filterStatusesForViewer(env, statuses, viewer);
const visibleCachedStatuses = await filterCachedStatusesForViewer(env, cachedStatuses, viewer);
const serializedStatuses = await serializeStatuses(env, visibleStatuses, request);
const serializedStatusById = new Map(serializedStatuses.map((item) => [String(item.id), item]));
const serializedCachedStatuses = await Promise.all(visibleCachedStatuses.map((row) => cachedStatusToMastodon(env, row)));
const serializedStatusById = new Map(
[...serializedStatuses, ...serializedCachedStatuses].map((item) => [String(item.id), item])
);
const remoteActorIds = uniqueStrings(
notifications.map((notification) => notification.actor).filter((actorId) => !actorId.startsWith(baseUrl(env)))
@@ -1762,7 +1801,8 @@ function statusViewerForUser(env: Env, user: User | null): StatusViewer {
return {
user,
actor: user ? actorUrl(env, user) : null,
followsByOwnerId: new Map()
followsByOwnerId: new Map(),
remoteFollowsByActorId: new Map()
};
}
@@ -1786,6 +1826,14 @@ async function filterStatusesForViewer(env: Env, statuses: Status[], viewer: Sta
return visible;
}
async function filterCachedStatusesForViewer(env: Env, statuses: CachedStatus[], viewer: StatusViewer): Promise<CachedStatus[]> {
const visible: CachedStatus[] = [];
for (const status of statuses) {
if (await canViewerViewCachedStatus(env, status, viewer)) visible.push(status);
}
return visible;
}
async function canViewerViewStatus(env: Env, status: Status, viewer: StatusViewer): Promise<boolean> {
if (status.visibility === "public" || status.visibility === "unlisted") return true;
if (viewer.user?.id === status.user_id) return true;
@@ -1793,6 +1841,13 @@ async function canViewerViewStatus(env: Env, status: Status, viewer: StatusViewe
return false;
}
async function canViewerViewCachedStatus(env: Env, status: CachedStatus, viewer: StatusViewer): Promise<boolean> {
if (status.visibility === "public" || status.visibility === "unlisted") return true;
if (!viewer.actor) return false;
if (status.visibility === "private") return viewerFollowsRemoteActor(env, viewer, status.actor);
return parseCachedJson<string>(status.local_recipients_json).includes(viewer.actor);
}
async function viewerFollowsOwner(env: Env, viewer: StatusViewer, ownerUserId: string): Promise<boolean> {
if (!viewer.actor) return false;
const cached = viewer.followsByOwnerId.get(ownerUserId);
@@ -1805,6 +1860,18 @@ async function viewerFollowsOwner(env: Env, viewer: StatusViewer, ownerUserId: s
return follows;
}
async function viewerFollowsRemoteActor(env: Env, viewer: StatusViewer, actorId: string): Promise<boolean> {
if (!viewer.user) return false;
const cached = viewer.remoteFollowsByActorId.get(actorId);
if (cached !== undefined) return cached;
const row = await env.DB.prepare(
"SELECT 1 AS hit FROM outgoing_follows WHERE local_user_id = ? AND target_actor = ? AND accepted = 1 LIMIT 1"
).bind(viewer.user.id, actorId).first<{ hit: number }>();
const follows = Boolean(row?.hit);
viewer.remoteFollowsByActorId.set(actorId, follows);
return follows;
}
async function viewerUser(request: Request, env: Env): Promise<User | null> {
const auth = request.headers.get("authorization") ?? "";
const token = auth.match(/^Bearer\s+(.+)$/i)?.[1];
+15
View File
@@ -144,12 +144,27 @@ export type CachedStatus = {
summary: string;
sensitive: number;
language: string;
visibility: string;
in_reply_to: string | null;
url: string;
published: string;
mentions_json: string;
tags_json: string;
local_recipients_json: string;
cached_at: string;
};
export type CachedStatusMention = {
actor: string;
acct: string;
url: string;
};
export type CachedStatusTag = {
name: string;
url: string | null;
};
export type CachedStatusAttachment = {
cached_status_id: string;
position: number;