add
This commit is contained in:
@@ -0,0 +1,10 @@
|
||||
-- Persist Mastodon read markers for home timeline and notifications.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS markers (
|
||||
user_id TEXT NOT NULL,
|
||||
timeline TEXT NOT NULL,
|
||||
last_read_id TEXT NOT NULL,
|
||||
version INTEGER NOT NULL DEFAULT 1,
|
||||
updated_at TEXT NOT NULL,
|
||||
PRIMARY KEY(user_id, timeline)
|
||||
);
|
||||
@@ -112,8 +112,8 @@ npm run deploy
|
||||
- `GET / POST /api/v1/lists`、`GET / PUT / DELETE /api/v1/lists/:id`、`GET / POST / DELETE /api/v1/lists/:id/accounts`
|
||||
- `GET /api/v1/notifications`、`POST /api/v1/notifications/clear`、`POST /api/v1/notifications/:id/dismiss`
|
||||
- `POST /api/v1/media`、`POST /api/v2/media`、`PUT /api/v1/media/:id`
|
||||
- `GET /api/v2/search`、`GET /api/v1/search`(本地账号 / 嘟文 / 话题标签 + 跨站 WebFinger 解析 `acct:` 查询)
|
||||
- `GET /api/v1/custom_emojis`、`GET /api/v1/filters`、`GET /api/v1/trends/tags`、`GET /api/v1/markers`(stub)
|
||||
- `GET /api/v2/search`、`GET /api/v1/search`(本地账号 / 嘟文 / 话题标签 + 跨站 WebFinger 解析 `acct:` 查询 + 粘贴远端嘟文 URL 按需抓取缓存)
|
||||
- `GET /api/v1/custom_emojis`、`GET /api/v1/filters`、`GET /api/v1/trends/tags`、`GET /api/v1/markers`、`POST /api/v1/markers`
|
||||
- `GET / POST / PUT / DELETE /api/v1/push/subscription`(存储 Web Push 订阅参数;实际推送投递仍需 VAPID/加密发送实现)
|
||||
|
||||
### ActivityPub / 发现
|
||||
@@ -153,6 +153,7 @@ npm run deploy
|
||||
- `migrations/0006_cached_status_metadata.sql` — 远端缓存嘟文的可见性 / mentions / tags / 本地收件人元数据
|
||||
- `migrations/0007_polls_lists_push_scheduled.sql` — poll / list / push subscription / scheduled statuses
|
||||
- `migrations/0008_outgoing_deliveries.sql` — 出站 ActivityPub 投递队列 / 重试状态
|
||||
- `migrations/0009_markers.sql` — Mastodon 读位 markers(home / notifications)
|
||||
|
||||
## 重要限制
|
||||
|
||||
@@ -163,7 +164,7 @@ npm run deploy
|
||||
- `public` / `unlisted` 可公开读取; ActivityPub outbox 只暴露这两类状态
|
||||
- `private` 会按 followers-only 投递,本地读取限作者和本地关注者
|
||||
- `direct` 仍没有完整受众表,本地读取保守限制为作者可见,不应当作为完整私信系统使用
|
||||
- 远端嘟文缓存只从入站 `Create(Note)` 和已缓存嘟文的 `Update(Note)` 写入,不抓取历史 outbox
|
||||
- 远端嘟文缓存从入站 `Create(Note)`、已缓存嘟文的 `Update(Note)` 和搜索远端嘟文 URL 时按需写入,不抓取历史 outbox
|
||||
- 远端缓存嘟文会保留正文、CW、语言、可见性、mentions、tags、本地收件人和附件; 互动计数、poll、card 等扩展信息不会完整恢复
|
||||
- Web Push 目前实现订阅存储和 API 兼容,尚未实现 VAPID 加密投递通知
|
||||
- Poll 当前只在本地 Mastodon API 中序列化和投票,不会联邦成 ActivityPub Question
|
||||
|
||||
+4
-3
@@ -451,8 +451,8 @@ async function handleCreate(ctx: InboxContext): Promise<Response> {
|
||||
return new Response(null, { status: 202 });
|
||||
}
|
||||
|
||||
async function cacheRemoteNote(env: Env, actorId: string, note: Json, activity: Json = {}, fallback?: CachedStatus): Promise<void> {
|
||||
if (typeof note.id !== "string") return;
|
||||
export async function cacheRemoteNote(env: Env, actorId: string, note: Json, activity: Json = {}, fallback?: CachedStatus): Promise<CachedStatus | null> {
|
||||
if (typeof note.id !== "string") return null;
|
||||
const cachedId = note.id;
|
||||
const recipients = collectRecipients(activity, note);
|
||||
const mentions = note.tag === undefined ? parseJsonArray<CachedStatusMention>(fallback?.mentions_json) : extractRemoteMentions(note);
|
||||
@@ -474,7 +474,7 @@ async function cacheRemoteNote(env: Env, actorId: string, note: Json, activity:
|
||||
tags_json: JSON.stringify(tags),
|
||||
local_recipients_json: JSON.stringify(localRecipients)
|
||||
});
|
||||
if (!stored) return;
|
||||
if (!stored) return null;
|
||||
await env.DB.prepare("DELETE FROM cached_status_attachments WHERE cached_status_id = ?").bind(stored.id).run();
|
||||
const attachments = Array.isArray(note.attachment) ? note.attachment : note.attachment ? [note.attachment] : [];
|
||||
let position = 0;
|
||||
@@ -493,6 +493,7 @@ async function cacheRemoteNote(env: Env, actorId: string, note: Json, activity:
|
||||
).bind(stored.id, position, url, null, mime, description).run();
|
||||
position++;
|
||||
}
|
||||
return stored;
|
||||
}
|
||||
|
||||
function inferRemoteVisibility(actorId: string, activity: Json, object: Json, fallback: string): string {
|
||||
|
||||
@@ -10,6 +10,7 @@ import type {
|
||||
Notification,
|
||||
OAuthApp,
|
||||
OAuthCode,
|
||||
Marker,
|
||||
OutgoingDelivery,
|
||||
OutgoingFollow,
|
||||
Reblog,
|
||||
@@ -340,6 +341,64 @@ export async function setUserHeaderKey(env: Env, userId: string, key: string | n
|
||||
await env.DB.prepare("UPDATE users SET header_r2_key = ? WHERE id = ?").bind(key, userId).run();
|
||||
}
|
||||
|
||||
export async function listMarkers(env: Env, userId: string, timelines: string[]): Promise<Marker[]> {
|
||||
const uniqueTimelines = [...new Set(timelines.map((timeline) => timeline.trim()).filter(Boolean))];
|
||||
if (uniqueTimelines.length === 0) return [];
|
||||
const placeholders = uniqueTimelines.map(() => "?").join(",");
|
||||
const rows = await env.DB.prepare(
|
||||
`SELECT * FROM markers WHERE user_id = ? AND timeline IN (${placeholders})`
|
||||
).bind(userId, ...uniqueTimelines).all<Marker>();
|
||||
return rows.results;
|
||||
}
|
||||
|
||||
export async function saveMarker(
|
||||
env: Env,
|
||||
userId: string,
|
||||
timeline: string,
|
||||
lastReadId: string
|
||||
): Promise<{ conflict: boolean; marker: Marker | null }> {
|
||||
const now = new Date().toISOString();
|
||||
const existing = await env.DB.prepare("SELECT * FROM markers WHERE user_id = ? AND timeline = ?")
|
||||
.bind(userId, timeline).first<Marker>();
|
||||
|
||||
if (!existing) {
|
||||
try {
|
||||
await env.DB.prepare(
|
||||
"INSERT INTO markers (user_id, timeline, last_read_id, version, updated_at) VALUES (?, ?, ?, ?, ?)"
|
||||
).bind(userId, timeline, lastReadId, 1, now).run();
|
||||
} catch (error) {
|
||||
const message = String(error);
|
||||
if (!/UNIQUE|constraint/i.test(message)) throw error;
|
||||
const raced = await env.DB.prepare("SELECT * FROM markers WHERE user_id = ? AND timeline = ?")
|
||||
.bind(userId, timeline).first<Marker>();
|
||||
if (!raced) return { conflict: true, marker: null };
|
||||
return saveMarkerWithCurrent(env, userId, timeline, lastReadId, raced, now);
|
||||
}
|
||||
const marker = await env.DB.prepare("SELECT * FROM markers WHERE user_id = ? AND timeline = ?")
|
||||
.bind(userId, timeline).first<Marker>();
|
||||
return { conflict: false, marker };
|
||||
}
|
||||
|
||||
return saveMarkerWithCurrent(env, userId, timeline, lastReadId, existing, now);
|
||||
}
|
||||
|
||||
async function saveMarkerWithCurrent(
|
||||
env: Env,
|
||||
userId: string,
|
||||
timeline: string,
|
||||
lastReadId: string,
|
||||
current: Marker,
|
||||
now: string
|
||||
): Promise<{ conflict: boolean; marker: Marker | null }> {
|
||||
const result = await env.DB.prepare(
|
||||
"UPDATE markers SET last_read_id = ?, version = ?, updated_at = ? WHERE user_id = ? AND timeline = ? AND version = ?"
|
||||
).bind(lastReadId, current.version + 1, now, userId, timeline, current.version).run();
|
||||
if ((result.meta.changes ?? 0) === 0) return { conflict: true, marker: null };
|
||||
const marker = await env.DB.prepare("SELECT * FROM markers WHERE user_id = ? AND timeline = ?")
|
||||
.bind(userId, timeline).first<Marker>();
|
||||
return { conflict: false, marker };
|
||||
}
|
||||
|
||||
export async function enqueueOutgoingDeliveries(env: Env, userId: string, inboxes: Iterable<string>, activity: Json): Promise<void> {
|
||||
const uniqueInboxes = [...new Set([...inboxes].map((inbox) => inbox.trim()).filter(Boolean))];
|
||||
if (uniqueInboxes.length === 0) return;
|
||||
|
||||
@@ -71,6 +71,7 @@ import {
|
||||
statusContext,
|
||||
token,
|
||||
trendsTags,
|
||||
updateMarkers,
|
||||
unbookmarkStatus,
|
||||
unfavouriteStatus,
|
||||
unfollowAccount,
|
||||
@@ -203,6 +204,7 @@ async function route(request: Request, env: Env): Promise<Response> {
|
||||
if (method === "GET" && path === "/api/v1/filters") return filtersV1(request, env);
|
||||
if (method === "GET" && path === "/api/v1/trends/tags") return trendsTags(env);
|
||||
if (method === "GET" && path === "/api/v1/markers") return markersList(request, env);
|
||||
if (method === "POST" && path === "/api/v1/markers") return updateMarkers(request, env);
|
||||
if (method === "GET" && path === "/api/v1/push/subscription") return getPushSubscription(request, env);
|
||||
if (method === "POST" && path === "/api/v1/push/subscription") return createPushSubscription(request, env);
|
||||
if (method === "PUT" && path === "/api/v1/push/subscription") return updatePushSubscription(request, env);
|
||||
|
||||
+118
-3
@@ -1,6 +1,7 @@
|
||||
import {
|
||||
actorDocument,
|
||||
announceActivity,
|
||||
cacheRemoteNote,
|
||||
createActivity,
|
||||
deleteActivity,
|
||||
followActivity,
|
||||
@@ -25,17 +26,20 @@ import {
|
||||
getActorFromCache,
|
||||
getAdminUser,
|
||||
getAppByClientId,
|
||||
getCachedStatusByObjectId,
|
||||
getStatus,
|
||||
getUserById,
|
||||
getUserByIdOrUsername,
|
||||
getUserByUsername,
|
||||
insertOAuthToken,
|
||||
listCachedStatusAttachments,
|
||||
listMarkers,
|
||||
listProfileFields,
|
||||
recordNotification,
|
||||
removeBookmark,
|
||||
removePin,
|
||||
replaceProfileFields,
|
||||
saveMarker,
|
||||
setUserAvatarKey,
|
||||
setUserHeaderKey,
|
||||
takeOAuthCode
|
||||
@@ -43,6 +47,7 @@ import {
|
||||
import {
|
||||
deliverToInboxes,
|
||||
gatherFollowerInboxes,
|
||||
objectAsJson,
|
||||
resolveDeliveryInboxes,
|
||||
resolveRemoteActor
|
||||
} from "./federation";
|
||||
@@ -66,6 +71,7 @@ import type {
|
||||
Json,
|
||||
Media,
|
||||
Mention,
|
||||
Marker,
|
||||
Notification,
|
||||
Poll,
|
||||
PollOption,
|
||||
@@ -104,6 +110,7 @@ const MAX_POLL_OPTION_CHARS = 50;
|
||||
const MIN_POLL_EXPIRATION_SECONDS = 300;
|
||||
const MAX_POLL_EXPIRATION_SECONDS = 2629746;
|
||||
const SCHEDULED_STATUS_MIN_DELAY_SECONDS = 300;
|
||||
const ACTIVITY_JSON_ACCEPT = "application/activity+json, application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\", application/json";
|
||||
|
||||
type StatusVisibility = "public" | "unlisted" | "private" | "direct";
|
||||
type StatusViewer = {
|
||||
@@ -1549,8 +1556,13 @@ export async function search(request: Request, env: Env): Promise<Response> {
|
||||
}
|
||||
|
||||
if (!type || type === "statuses") {
|
||||
const viewer = await loadStatusViewer(request, env);
|
||||
const remoteStatus = await resolveRemoteStatusSearch(env, q);
|
||||
if (remoteStatus && await canViewerViewCachedStatus(env, remoteStatus, viewer)) {
|
||||
statuses.push(await cachedStatusToMastodon(env, remoteStatus));
|
||||
}
|
||||
const rows = await env.DB.prepare("SELECT * FROM statuses WHERE content LIKE ? ORDER BY created_at DESC LIMIT 100").bind(`%${escapeHtml(q)}%`).all<Status>();
|
||||
const visibleRows = await filterStatusesForViewer(env, rows.results, await loadStatusViewer(request, env));
|
||||
const visibleRows = await filterStatusesForViewer(env, rows.results, viewer);
|
||||
statuses.push(...await serializeStatuses(env, visibleRows.slice(0, 20), request));
|
||||
}
|
||||
|
||||
@@ -1563,6 +1575,74 @@ export async function search(request: Request, env: Env): Promise<Response> {
|
||||
return json({ accounts, statuses, hashtags });
|
||||
}
|
||||
|
||||
async function resolveRemoteStatusSearch(env: Env, q: string): Promise<CachedStatus | null> {
|
||||
const url = parseSearchUrl(q);
|
||||
if (!url || url.host.toLowerCase() === hostFromBaseUrl(env).toLowerCase()) return null;
|
||||
|
||||
const cached = await env.DB.prepare("SELECT * FROM cached_statuses WHERE object_id = ? OR url = ? LIMIT 1")
|
||||
.bind(url.toString(), url.toString()).first<CachedStatus>();
|
||||
if (cached) return cached;
|
||||
|
||||
let data: Json;
|
||||
try {
|
||||
const response = await fetch(url.toString(), {
|
||||
headers: { accept: ACTIVITY_JSON_ACCEPT },
|
||||
signal: AbortSignal.timeout(10_000),
|
||||
cf: { cacheTtl: 60 }
|
||||
});
|
||||
if (!response.ok) return null;
|
||||
data = await response.json() as Json;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
|
||||
const resolved = remoteNoteFromFetchedObject(data);
|
||||
if (!resolved) return null;
|
||||
await resolveRemoteActor(env, resolved.actorId);
|
||||
const stored = await cacheRemoteNote(env, resolved.actorId, resolved.note, resolved.activity);
|
||||
return stored ?? getCachedStatusByObjectId(env, String(resolved.note.id));
|
||||
}
|
||||
|
||||
function parseSearchUrl(value: string): URL | null {
|
||||
try {
|
||||
const url = new URL(value);
|
||||
return url.protocol === "https:" || url.protocol === "http:" ? url : null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function remoteNoteFromFetchedObject(value: Json): { actorId: string; note: Json; activity: Json } | null {
|
||||
const type = String(value.type ?? "");
|
||||
if (type === "Note") {
|
||||
const actorId = actorIdFromField(value.attributedTo);
|
||||
return actorId && typeof value.id === "string" ? { actorId, note: value, activity: {} } : null;
|
||||
}
|
||||
|
||||
const note = objectAsJson(value.object);
|
||||
if (type === "Create" && note && String(note.type ?? "") === "Note" && typeof note.id === "string") {
|
||||
const actorId = actorIdFromField(value.actor) ?? actorIdFromField(note.attributedTo);
|
||||
return actorId ? { actorId, note, activity: value } : null;
|
||||
}
|
||||
if (type === "Announce" && note && String(note.type ?? "") === "Note" && typeof note.id === "string") {
|
||||
const actorId = actorIdFromField(note.attributedTo) ?? actorIdFromField(value.actor);
|
||||
return actorId ? { actorId, note, activity: value } : null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function actorIdFromField(value: unknown): string | null {
|
||||
if (typeof value === "string" && value) return value;
|
||||
if (Array.isArray(value)) {
|
||||
for (const item of value) {
|
||||
const actorId = actorIdFromField(item);
|
||||
if (actorId) return actorId;
|
||||
}
|
||||
}
|
||||
const obj = objectAsJson(value);
|
||||
return typeof obj?.id === "string" && obj.id ? obj.id : null;
|
||||
}
|
||||
|
||||
export async function customEmojis(env: Env): Promise<Response> {
|
||||
void env;
|
||||
return json([]);
|
||||
@@ -1632,8 +1712,43 @@ export async function deletePushSubscription(request: Request, env: Env): Promis
|
||||
}
|
||||
|
||||
export async function markersList(request: Request, env: Env): Promise<Response> {
|
||||
void request; void env;
|
||||
return json({});
|
||||
const user = await requireUser(request, env);
|
||||
const url = new URL(request.url);
|
||||
const timelines = uniqueStrings(url.searchParams.getAll("timeline[]").concat(url.searchParams.getAll("timeline")));
|
||||
if (timelines.length === 0) return json({});
|
||||
|
||||
const rows = await listMarkers(env, user.id, timelines);
|
||||
return json(markersJson(rows));
|
||||
}
|
||||
|
||||
export async function updateMarkers(request: Request, env: Env): Promise<Response> {
|
||||
const user = await requireUser(request, env);
|
||||
const body = await readBody(request);
|
||||
const out: Record<string, unknown> = {};
|
||||
|
||||
for (const timeline of ["home", "notifications"]) {
|
||||
const lastReadId = bodyString(body, `${timeline}[last_read_id]`).trim();
|
||||
if (!lastReadId) continue;
|
||||
const result = await saveMarker(env, user.id, timeline, lastReadId);
|
||||
if (result.conflict) throw new HttpError(409, "Conflict during update, please try again");
|
||||
if (result.marker) out[timeline] = markerJson(result.marker);
|
||||
}
|
||||
|
||||
return json(out);
|
||||
}
|
||||
|
||||
function markersJson(rows: Marker[]): Record<string, unknown> {
|
||||
const out: Record<string, unknown> = {};
|
||||
for (const row of rows) out[row.timeline] = markerJson(row);
|
||||
return out;
|
||||
}
|
||||
|
||||
function markerJson(row: Marker): Record<string, unknown> {
|
||||
return {
|
||||
last_read_id: row.last_read_id,
|
||||
version: row.version,
|
||||
updated_at: row.updated_at
|
||||
};
|
||||
}
|
||||
|
||||
function pushSubscriptionJson(row: PushSubscription): Record<string, unknown> {
|
||||
|
||||
@@ -227,6 +227,14 @@ export type ScheduledStatus = {
|
||||
created_at: string;
|
||||
};
|
||||
|
||||
export type Marker = {
|
||||
user_id: string;
|
||||
timeline: string;
|
||||
last_read_id: string;
|
||||
version: number;
|
||||
updated_at: string;
|
||||
};
|
||||
|
||||
export type OutgoingDelivery = {
|
||||
id: string;
|
||||
user_id: string;
|
||||
|
||||
Reference in New Issue
Block a user