diff --git a/migrations/0009_markers.sql b/migrations/0009_markers.sql new file mode 100644 index 0000000..096a6cd --- /dev/null +++ b/migrations/0009_markers.sql @@ -0,0 +1,10 @@ +-- Persist Mastodon read markers for home timeline and notifications. + +CREATE TABLE IF NOT EXISTS markers ( + user_id TEXT NOT NULL, + timeline TEXT NOT NULL, + last_read_id TEXT NOT NULL, + version INTEGER NOT NULL DEFAULT 1, + updated_at TEXT NOT NULL, + PRIMARY KEY(user_id, timeline) +); diff --git a/readme.md b/readme.md index 5bd1f1a..30a86ac 100644 --- a/readme.md +++ b/readme.md @@ -112,8 +112,8 @@ npm run deploy - `GET / POST /api/v1/lists`、`GET / PUT / DELETE /api/v1/lists/:id`、`GET / POST / DELETE /api/v1/lists/:id/accounts` - `GET /api/v1/notifications`、`POST /api/v1/notifications/clear`、`POST /api/v1/notifications/:id/dismiss` - `POST /api/v1/media`、`POST /api/v2/media`、`PUT /api/v1/media/:id` -- `GET /api/v2/search`、`GET /api/v1/search`(本地账号 / 嘟文 / 话题标签 + 跨站 WebFinger 解析 `acct:` 查询) -- `GET /api/v1/custom_emojis`、`GET /api/v1/filters`、`GET /api/v1/trends/tags`、`GET /api/v1/markers`(stub) +- `GET /api/v2/search`、`GET /api/v1/search`(本地账号 / 嘟文 / 话题标签 + 跨站 WebFinger 解析 `acct:` 查询 + 粘贴远端嘟文 URL 按需抓取缓存) +- `GET /api/v1/custom_emojis`、`GET /api/v1/filters`、`GET /api/v1/trends/tags`、`GET /api/v1/markers`、`POST /api/v1/markers` - `GET / POST / PUT / DELETE /api/v1/push/subscription`(存储 Web Push 订阅参数;实际推送投递仍需 VAPID/加密发送实现) ### ActivityPub / 发现 @@ -153,6 +153,7 @@ npm run deploy - `migrations/0006_cached_status_metadata.sql` — 远端缓存嘟文的可见性 / mentions / tags / 本地收件人元数据 - `migrations/0007_polls_lists_push_scheduled.sql` — poll / list / push subscription / scheduled statuses - `migrations/0008_outgoing_deliveries.sql` — 出站 ActivityPub 投递队列 / 重试状态 +- `migrations/0009_markers.sql` — Mastodon 读位 markers(home / notifications) ## 重要限制 @@ -163,7 +164,7 @@ npm run deploy - `public` / `unlisted` 可公开读取; ActivityPub outbox 只暴露这两类状态 - `private` 会按 followers-only 投递,本地读取限作者和本地关注者 - `direct` 仍没有完整受众表,本地读取保守限制为作者可见,不应当作为完整私信系统使用 -- 远端嘟文缓存只从入站 `Create(Note)` 和已缓存嘟文的 `Update(Note)` 写入,不抓取历史 outbox +- 远端嘟文缓存从入站 `Create(Note)`、已缓存嘟文的 `Update(Note)` 和搜索远端嘟文 URL 时按需写入,不抓取历史 outbox - 远端缓存嘟文会保留正文、CW、语言、可见性、mentions、tags、本地收件人和附件; 互动计数、poll、card 等扩展信息不会完整恢复 - Web Push 目前实现订阅存储和 API 兼容,尚未实现 VAPID 加密投递通知 - Poll 当前只在本地 Mastodon API 中序列化和投票,不会联邦成 ActivityPub Question diff --git a/src/activitypub.ts b/src/activitypub.ts index cb0bcd0..31adbc1 100644 --- a/src/activitypub.ts +++ b/src/activitypub.ts @@ -451,8 +451,8 @@ async function handleCreate(ctx: InboxContext): Promise { return new Response(null, { status: 202 }); } -async function cacheRemoteNote(env: Env, actorId: string, note: Json, activity: Json = {}, fallback?: CachedStatus): Promise { - if (typeof note.id !== "string") return; +export async function cacheRemoteNote(env: Env, actorId: string, note: Json, activity: Json = {}, fallback?: CachedStatus): Promise { + if (typeof note.id !== "string") return null; const cachedId = note.id; const recipients = collectRecipients(activity, note); const mentions = note.tag === undefined ? parseJsonArray(fallback?.mentions_json) : extractRemoteMentions(note); @@ -474,7 +474,7 @@ async function cacheRemoteNote(env: Env, actorId: string, note: Json, activity: tags_json: JSON.stringify(tags), local_recipients_json: JSON.stringify(localRecipients) }); - if (!stored) return; + if (!stored) return null; await env.DB.prepare("DELETE FROM cached_status_attachments WHERE cached_status_id = ?").bind(stored.id).run(); const attachments = Array.isArray(note.attachment) ? note.attachment : note.attachment ? [note.attachment] : []; let position = 0; @@ -493,6 +493,7 @@ async function cacheRemoteNote(env: Env, actorId: string, note: Json, activity: ).bind(stored.id, position, url, null, mime, description).run(); position++; } + return stored; } function inferRemoteVisibility(actorId: string, activity: Json, object: Json, fallback: string): string { diff --git a/src/db.ts b/src/db.ts index 75de3a9..9b99f81 100644 --- a/src/db.ts +++ b/src/db.ts @@ -10,6 +10,7 @@ import type { Notification, OAuthApp, OAuthCode, + Marker, OutgoingDelivery, OutgoingFollow, Reblog, @@ -340,6 +341,64 @@ export async function setUserHeaderKey(env: Env, userId: string, key: string | n await env.DB.prepare("UPDATE users SET header_r2_key = ? WHERE id = ?").bind(key, userId).run(); } +export async function listMarkers(env: Env, userId: string, timelines: string[]): Promise { + const uniqueTimelines = [...new Set(timelines.map((timeline) => timeline.trim()).filter(Boolean))]; + if (uniqueTimelines.length === 0) return []; + const placeholders = uniqueTimelines.map(() => "?").join(","); + const rows = await env.DB.prepare( + `SELECT * FROM markers WHERE user_id = ? AND timeline IN (${placeholders})` + ).bind(userId, ...uniqueTimelines).all(); + return rows.results; +} + +export async function saveMarker( + env: Env, + userId: string, + timeline: string, + lastReadId: string +): Promise<{ conflict: boolean; marker: Marker | null }> { + const now = new Date().toISOString(); + const existing = await env.DB.prepare("SELECT * FROM markers WHERE user_id = ? AND timeline = ?") + .bind(userId, timeline).first(); + + if (!existing) { + try { + await env.DB.prepare( + "INSERT INTO markers (user_id, timeline, last_read_id, version, updated_at) VALUES (?, ?, ?, ?, ?)" + ).bind(userId, timeline, lastReadId, 1, now).run(); + } catch (error) { + const message = String(error); + if (!/UNIQUE|constraint/i.test(message)) throw error; + const raced = await env.DB.prepare("SELECT * FROM markers WHERE user_id = ? AND timeline = ?") + .bind(userId, timeline).first(); + if (!raced) return { conflict: true, marker: null }; + return saveMarkerWithCurrent(env, userId, timeline, lastReadId, raced, now); + } + const marker = await env.DB.prepare("SELECT * FROM markers WHERE user_id = ? AND timeline = ?") + .bind(userId, timeline).first(); + return { conflict: false, marker }; + } + + return saveMarkerWithCurrent(env, userId, timeline, lastReadId, existing, now); +} + +async function saveMarkerWithCurrent( + env: Env, + userId: string, + timeline: string, + lastReadId: string, + current: Marker, + now: string +): Promise<{ conflict: boolean; marker: Marker | null }> { + const result = await env.DB.prepare( + "UPDATE markers SET last_read_id = ?, version = ?, updated_at = ? WHERE user_id = ? AND timeline = ? AND version = ?" + ).bind(lastReadId, current.version + 1, now, userId, timeline, current.version).run(); + if ((result.meta.changes ?? 0) === 0) return { conflict: true, marker: null }; + const marker = await env.DB.prepare("SELECT * FROM markers WHERE user_id = ? AND timeline = ?") + .bind(userId, timeline).first(); + return { conflict: false, marker }; +} + export async function enqueueOutgoingDeliveries(env: Env, userId: string, inboxes: Iterable, activity: Json): Promise { const uniqueInboxes = [...new Set([...inboxes].map((inbox) => inbox.trim()).filter(Boolean))]; if (uniqueInboxes.length === 0) return; diff --git a/src/index.ts b/src/index.ts index befb12b..0a8bcb5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -71,6 +71,7 @@ import { statusContext, token, trendsTags, + updateMarkers, unbookmarkStatus, unfavouriteStatus, unfollowAccount, @@ -203,6 +204,7 @@ async function route(request: Request, env: Env): Promise { if (method === "GET" && path === "/api/v1/filters") return filtersV1(request, env); if (method === "GET" && path === "/api/v1/trends/tags") return trendsTags(env); if (method === "GET" && path === "/api/v1/markers") return markersList(request, env); + if (method === "POST" && path === "/api/v1/markers") return updateMarkers(request, env); if (method === "GET" && path === "/api/v1/push/subscription") return getPushSubscription(request, env); if (method === "POST" && path === "/api/v1/push/subscription") return createPushSubscription(request, env); if (method === "PUT" && path === "/api/v1/push/subscription") return updatePushSubscription(request, env); diff --git a/src/mastodon.ts b/src/mastodon.ts index cffd3f9..6017c50 100644 --- a/src/mastodon.ts +++ b/src/mastodon.ts @@ -1,6 +1,7 @@ import { actorDocument, announceActivity, + cacheRemoteNote, createActivity, deleteActivity, followActivity, @@ -25,17 +26,20 @@ import { getActorFromCache, getAdminUser, getAppByClientId, + getCachedStatusByObjectId, getStatus, getUserById, getUserByIdOrUsername, getUserByUsername, insertOAuthToken, listCachedStatusAttachments, + listMarkers, listProfileFields, recordNotification, removeBookmark, removePin, replaceProfileFields, + saveMarker, setUserAvatarKey, setUserHeaderKey, takeOAuthCode @@ -43,6 +47,7 @@ import { import { deliverToInboxes, gatherFollowerInboxes, + objectAsJson, resolveDeliveryInboxes, resolveRemoteActor } from "./federation"; @@ -66,6 +71,7 @@ import type { Json, Media, Mention, + Marker, Notification, Poll, PollOption, @@ -104,6 +110,7 @@ const MAX_POLL_OPTION_CHARS = 50; const MIN_POLL_EXPIRATION_SECONDS = 300; const MAX_POLL_EXPIRATION_SECONDS = 2629746; const SCHEDULED_STATUS_MIN_DELAY_SECONDS = 300; +const ACTIVITY_JSON_ACCEPT = "application/activity+json, application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\", application/json"; type StatusVisibility = "public" | "unlisted" | "private" | "direct"; type StatusViewer = { @@ -1549,8 +1556,13 @@ export async function search(request: Request, env: Env): Promise { } if (!type || type === "statuses") { + const viewer = await loadStatusViewer(request, env); + const remoteStatus = await resolveRemoteStatusSearch(env, q); + if (remoteStatus && await canViewerViewCachedStatus(env, remoteStatus, viewer)) { + statuses.push(await cachedStatusToMastodon(env, remoteStatus)); + } const rows = await env.DB.prepare("SELECT * FROM statuses WHERE content LIKE ? ORDER BY created_at DESC LIMIT 100").bind(`%${escapeHtml(q)}%`).all(); - const visibleRows = await filterStatusesForViewer(env, rows.results, await loadStatusViewer(request, env)); + const visibleRows = await filterStatusesForViewer(env, rows.results, viewer); statuses.push(...await serializeStatuses(env, visibleRows.slice(0, 20), request)); } @@ -1563,6 +1575,74 @@ export async function search(request: Request, env: Env): Promise { return json({ accounts, statuses, hashtags }); } +async function resolveRemoteStatusSearch(env: Env, q: string): Promise { + const url = parseSearchUrl(q); + if (!url || url.host.toLowerCase() === hostFromBaseUrl(env).toLowerCase()) return null; + + const cached = await env.DB.prepare("SELECT * FROM cached_statuses WHERE object_id = ? OR url = ? LIMIT 1") + .bind(url.toString(), url.toString()).first(); + if (cached) return cached; + + let data: Json; + try { + const response = await fetch(url.toString(), { + headers: { accept: ACTIVITY_JSON_ACCEPT }, + signal: AbortSignal.timeout(10_000), + cf: { cacheTtl: 60 } + }); + if (!response.ok) return null; + data = await response.json() as Json; + } catch { + return null; + } + + const resolved = remoteNoteFromFetchedObject(data); + if (!resolved) return null; + await resolveRemoteActor(env, resolved.actorId); + const stored = await cacheRemoteNote(env, resolved.actorId, resolved.note, resolved.activity); + return stored ?? getCachedStatusByObjectId(env, String(resolved.note.id)); +} + +function parseSearchUrl(value: string): URL | null { + try { + const url = new URL(value); + return url.protocol === "https:" || url.protocol === "http:" ? url : null; + } catch { + return null; + } +} + +function remoteNoteFromFetchedObject(value: Json): { actorId: string; note: Json; activity: Json } | null { + const type = String(value.type ?? ""); + if (type === "Note") { + const actorId = actorIdFromField(value.attributedTo); + return actorId && typeof value.id === "string" ? { actorId, note: value, activity: {} } : null; + } + + const note = objectAsJson(value.object); + if (type === "Create" && note && String(note.type ?? "") === "Note" && typeof note.id === "string") { + const actorId = actorIdFromField(value.actor) ?? actorIdFromField(note.attributedTo); + return actorId ? { actorId, note, activity: value } : null; + } + if (type === "Announce" && note && String(note.type ?? "") === "Note" && typeof note.id === "string") { + const actorId = actorIdFromField(note.attributedTo) ?? actorIdFromField(value.actor); + return actorId ? { actorId, note, activity: value } : null; + } + return null; +} + +function actorIdFromField(value: unknown): string | null { + if (typeof value === "string" && value) return value; + if (Array.isArray(value)) { + for (const item of value) { + const actorId = actorIdFromField(item); + if (actorId) return actorId; + } + } + const obj = objectAsJson(value); + return typeof obj?.id === "string" && obj.id ? obj.id : null; +} + export async function customEmojis(env: Env): Promise { void env; return json([]); @@ -1632,8 +1712,43 @@ export async function deletePushSubscription(request: Request, env: Env): Promis } export async function markersList(request: Request, env: Env): Promise { - void request; void env; - return json({}); + const user = await requireUser(request, env); + const url = new URL(request.url); + const timelines = uniqueStrings(url.searchParams.getAll("timeline[]").concat(url.searchParams.getAll("timeline"))); + if (timelines.length === 0) return json({}); + + const rows = await listMarkers(env, user.id, timelines); + return json(markersJson(rows)); +} + +export async function updateMarkers(request: Request, env: Env): Promise { + const user = await requireUser(request, env); + const body = await readBody(request); + const out: Record = {}; + + for (const timeline of ["home", "notifications"]) { + const lastReadId = bodyString(body, `${timeline}[last_read_id]`).trim(); + if (!lastReadId) continue; + const result = await saveMarker(env, user.id, timeline, lastReadId); + if (result.conflict) throw new HttpError(409, "Conflict during update, please try again"); + if (result.marker) out[timeline] = markerJson(result.marker); + } + + return json(out); +} + +function markersJson(rows: Marker[]): Record { + const out: Record = {}; + for (const row of rows) out[row.timeline] = markerJson(row); + return out; +} + +function markerJson(row: Marker): Record { + return { + last_read_id: row.last_read_id, + version: row.version, + updated_at: row.updated_at + }; } function pushSubscriptionJson(row: PushSubscription): Record { diff --git a/src/types.ts b/src/types.ts index 0bc3a8a..e9dd3b5 100644 --- a/src/types.ts +++ b/src/types.ts @@ -227,6 +227,14 @@ export type ScheduledStatus = { created_at: string; }; +export type Marker = { + user_id: string; + timeline: string; + last_read_id: string; + version: number; + updated_at: string; +}; + export type OutgoingDelivery = { id: string; user_id: string;