Files
gerbeur/src/contexts/WSProvider.tsx
2026-03-30 14:55:30 +00:00

507 lines
15 KiB
TypeScript

import {
type ReactNode,
useCallback,
useEffect,
useLayoutEffect,
useMemo,
useRef,
useState,
} from "react";
import {
type CommentEvent,
type PlaylistEvent,
type UserEvent,
type VoteEvent,
WSContext,
type WSContextValue,
} from "./WSContext.ts";
import { WS_URL } from "../config/api.ts";
import type {
Dump,
IncomingWSMessage,
Notification,
OnlineUser,
OutgoingWSMessage,
} from "../model.ts";
import {
deserializeComment,
deserializeDump,
deserializeNotification,
deserializePlaylist,
deserializePublicUser,
} from "../model.ts";
interface WSProviderProps {
children: ReactNode;
token: string | null;
userId: string | null;
onForceLogout?: () => void;
}
const MAX_BACKOFF = 30_000;
const ACK_TIMEOUT = 5_000;
const CONNECT_TIMEOUT = 2_500;
interface PendingVote {
timeout: ReturnType<typeof setTimeout>;
rollback: () => void;
}
// Minimal runtime check: verify the `type` field is a known string so we can
// safely cast to the discriminated union and let TypeScript narrow from there.
function parseWSMessage(data: string): IncomingWSMessage | null {
try {
const msg = JSON.parse(data);
if (!msg || typeof msg !== "object" || typeof msg.type !== "string") {
return null;
}
return msg as IncomingWSMessage;
} catch {
return null;
}
}
export function WSProvider(
{ children, token, userId, onForceLogout }: WSProviderProps,
) {
const [wsStatus, setWSStatus] = useState<
"connecting" | "connected" | "disconnected"
>("connecting");
const [wsErrorMessage, setWSErrorMessage] = useState<string | null>(null);
const [onlineUsers, setOnlineUsers] = useState<OnlineUser[]>([]);
const [voteCounts, setVoteCounts] = useState<Record<string, number>>({});
const [myVotes, setMyVotes] = useState<Set<string>>(new Set());
const [recentDumps, setRecentDumps] = useState<Dump[]>([]);
const [deletedDumpIds, setDeletedDumpIds] = useState<Set<string>>(new Set());
const [lastVoteEvent, setLastVoteEvent] = useState<VoteEvent | null>(null);
const [lastDumpEvent, setLastDumpEvent] = useState<Dump | null>(null);
const [lastPlaylistEvent, setLastPlaylistEvent] = useState<
PlaylistEvent | null
>(null);
const [deletedPlaylistIds, setDeletedPlaylistIds] = useState<Set<string>>(
new Set(),
);
const [lastCommentEvent, setLastCommentEvent] = useState<CommentEvent | null>(
null,
);
const [lastUserEvent, setLastUserEvent] = useState<UserEvent | null>(null);
const [unreadNotificationCount, setUnreadNotificationCount] = useState(0);
const [lastNotification, setLastNotification] = useState<Notification | null>(
null,
);
// Refs to avoid stale closures in event handlers
const voteCountsRef = useRef(voteCounts);
const myVotesRef = useRef(myVotes);
const userIdRef = useRef(userId);
useLayoutEffect(() => {
voteCountsRef.current = voteCounts;
myVotesRef.current = myVotes;
userIdRef.current = userId;
});
const socketRef = useRef<WebSocket | null>(null);
// Tracks pending optimistic votes: dumpId → pending rollback handler
const pendingRef = useRef<Map<string, PendingVote>>(
new Map(),
);
const clearPendingVote = useCallback((dumpId: string) => {
const pending = pendingRef.current.get(dumpId);
if (!pending) return;
clearTimeout(pending.timeout);
pendingRef.current.delete(dumpId);
}, []);
const clearAllPendingVotes = useCallback(() => {
for (const pending of pendingRef.current.values()) {
clearTimeout(pending.timeout);
}
pendingRef.current.clear();
}, []);
const schedulePendingVote = useCallback((
dumpId: string,
rollback: () => void,
) => {
clearPendingVote(dumpId);
const timeout = setTimeout(() => {
pendingRef.current.delete(dumpId);
rollback();
}, ACK_TIMEOUT);
pendingRef.current.set(dumpId, { timeout, rollback });
}, [clearPendingVote]);
useEffect(() => {
let closed = false;
let backoff = 500;
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
let connectTimeout: ReturnType<typeof setTimeout> | null = null;
let everConnected = false;
setWSStatus("connecting");
setWSErrorMessage(null);
function connect() {
if (closed) return;
const url = `${WS_URL}/ws${
token ? `?token=${encodeURIComponent(token)}` : ""
}`;
const ws = new WebSocket(url);
socketRef.current = ws;
connectTimeout = setTimeout(() => {
if (ws.readyState !== WebSocket.CONNECTING) return;
setWSStatus("disconnected");
setWSErrorMessage(
"Can't connect to the live updates server. Upvotes and notifications may not sync until it reconnects.",
);
ws.close();
}, CONNECT_TIMEOUT);
ws.onopen = () => {
if (connectTimeout) {
clearTimeout(connectTimeout);
connectTimeout = null;
}
everConnected = true;
setWSStatus("connected");
setWSErrorMessage(null);
};
ws.onmessage = (event) => {
const msg = parseWSMessage(event.data);
if (!msg) return;
switch (msg.type) {
case "ping":
ws.send(
JSON.stringify({ type: "pong" } satisfies OutgoingWSMessage),
);
break;
case "welcome":
backoff = 500; // reset backoff on successful connect
setOnlineUsers(msg.users);
setMyVotes(new Set(msg.myVotes));
setUnreadNotificationCount(msg.unreadNotificationCount);
// welcome provides authoritative server state — cancel any
// in-flight revert timers, they are now superseded.
clearAllPendingVotes();
break;
case "presence_update":
setOnlineUsers(msg.users);
break;
case "votes_update": {
const { dumpId, voteCount, voterId, action } = msg;
setVoteCounts((prev) => ({ ...prev, [dumpId]: voteCount }));
setLastVoteEvent({ dumpId, voterId, action });
// Keep myVotes in sync across tabs: if this vote event belongs to
// the current user (from another tab), update myVotes accordingly.
if (voterId === userIdRef.current) {
clearPendingVote(dumpId);
setMyVotes((prev) => {
const next = new Set(prev);
if (action === "cast") next.add(dumpId);
else next.delete(dumpId);
return next;
});
}
break;
}
case "dump_created": {
const dump = deserializeDump(msg.dump);
setRecentDumps((prev) => [dump, ...prev]);
break;
}
case "dump_updated": {
const dump = deserializeDump(msg.dump);
setLastDumpEvent(dump);
// Un-delete if this dump was previously removed from the feed
// (e.g. it was made private, and is now public again).
setDeletedDumpIds((prev) => {
if (!prev.has(dump.id)) return prev;
const next = new Set(prev);
next.delete(dump.id);
return next;
});
// Add to live feed if not already present (private→public).
setRecentDumps((prev) =>
prev.some((d) => d.id === dump.id) ? prev : [dump, ...prev]
);
break;
}
case "dump_deleted": {
const { dumpId } = msg;
setDeletedDumpIds((prev) => new Set([...prev, dumpId]));
setRecentDumps((prev) => prev.filter((d) => d.id !== dumpId));
break;
}
case "vote_ack": {
const { dumpId, action, voteCount } = msg;
clearPendingVote(dumpId);
// Reconcile with authoritative count
setVoteCounts((prev) => ({ ...prev, [dumpId]: voteCount }));
// Confirm vote state
setMyVotes((prev) => {
const next = new Set(prev);
if (action === "cast") next.add(dumpId);
else next.delete(dumpId);
return next;
});
break;
}
case "playlist_created":
case "playlist_updated": {
const playlist = deserializePlaylist(msg.playlist);
setLastPlaylistEvent({
type: msg.type === "playlist_created" ? "created" : "updated",
playlistId: playlist.id,
playlist,
});
break;
}
case "playlist_deleted": {
const { playlistId, userId } = msg;
setDeletedPlaylistIds((prev) => new Set([...prev, playlistId]));
setLastPlaylistEvent({ type: "deleted", playlistId, userId });
break;
}
case "playlist_dumps_updated": {
const { playlistId, dumpIds } = msg;
setLastPlaylistEvent({
type: "dumps_updated",
playlistId,
dumpIds,
});
break;
}
case "user_updated": {
const user = deserializePublicUser(msg.user);
setLastUserEvent({ user });
break;
}
case "comment_created": {
const comment = deserializeComment(msg.comment);
setLastCommentEvent({
type: "created",
dumpId: comment.dumpId,
comment,
});
break;
}
case "comment_deleted": {
const { commentId, dumpId } = msg;
setLastCommentEvent({ type: "deleted", dumpId, commentId });
break;
}
case "comment_updated": {
const comment = deserializeComment(msg.comment);
setLastCommentEvent({
type: "updated",
dumpId: comment.dumpId,
comment,
});
break;
}
case "notification_created": {
const notification = deserializeNotification(msg.notification);
setLastNotification(notification);
setUnreadNotificationCount((prev) => prev + 1);
break;
}
case "force_logout":
onForceLogout?.();
break;
case "error":
// Vote errors currently don't identify which dump/action failed, so
// fall back to the per-dump timeout rollback instead of guessing.
break;
}
};
ws.onclose = () => {
if (connectTimeout) {
clearTimeout(connectTimeout);
connectTimeout = null;
}
if (closed) return;
setWSStatus("disconnected");
setWSErrorMessage(
everConnected
? "Live updates are temporarily disconnected. Trying to reconnect..."
: "Can't connect to the live updates server. Upvotes and notifications may not sync until it reconnects.",
);
reconnectTimer = setTimeout(() => {
backoff = Math.min(backoff * 2, MAX_BACKOFF);
connect();
}, backoff);
};
ws.onerror = () => {
// onclose will fire after onerror
};
}
connect();
const pending = pendingRef.current;
return () => {
closed = true;
if (reconnectTimer) clearTimeout(reconnectTimer);
if (connectTimeout) clearTimeout(connectTimeout);
socketRef.current?.close();
socketRef.current = null;
for (const pendingVote of pending.values()) {
clearTimeout(pendingVote.timeout);
}
pending.clear();
};
}, [clearAllPendingVotes, clearPendingVote, token]);
const castVote = useCallback((dumpId: string) => {
// Optimistic update
const prevCount = voteCountsRef.current[dumpId] ?? 0;
const prevVoted = myVotesRef.current.has(dumpId);
if (prevVoted) return; // already voted
setMyVotes((prev) => {
const n = new Set(prev);
n.add(dumpId);
return n;
});
setVoteCounts((prev) => ({ ...prev, [dumpId]: prevCount + 1 }));
// Schedule revert if no authoritative confirmation arrives.
schedulePendingVote(dumpId, () => {
setMyVotes((prev) => {
const n = new Set(prev);
n.delete(dumpId);
return n;
});
setVoteCounts((prev) => ({ ...prev, [dumpId]: prevCount }));
});
if (socketRef.current?.readyState === WebSocket.OPEN) {
socketRef.current.send(
JSON.stringify(
{ type: "vote_cast", dumpId } satisfies OutgoingWSMessage,
),
);
}
// If socket is not OPEN, the revert timer will handle cleanup after ACK_TIMEOUT
}, [schedulePendingVote]);
const removeVote = useCallback((dumpId: string) => {
// Optimistic update
const prevCount = voteCountsRef.current[dumpId] ?? 0;
const prevVoted = myVotesRef.current.has(dumpId);
if (!prevVoted) return; // not voted
setMyVotes((prev) => {
const n = new Set(prev);
n.delete(dumpId);
return n;
});
setVoteCounts((prev) => ({
...prev,
[dumpId]: Math.max(0, prevCount - 1),
}));
// Schedule revert if no authoritative confirmation arrives.
schedulePendingVote(dumpId, () => {
setMyVotes((prev) => {
const n = new Set(prev);
n.add(dumpId);
return n;
});
setVoteCounts((prev) => ({ ...prev, [dumpId]: prevCount }));
});
if (socketRef.current?.readyState === WebSocket.OPEN) {
socketRef.current.send(
JSON.stringify(
{ type: "vote_remove", dumpId } satisfies OutgoingWSMessage,
),
);
}
// If socket is not OPEN, the revert timer will handle cleanup after ACK_TIMEOUT
}, [schedulePendingVote]);
const injectDump = useCallback((dump: Dump) => {
setRecentDumps((prev) => {
if (prev.some((d) => d.id === dump.id)) return prev;
return [dump, ...prev];
});
}, []);
const clearUnreadNotifications = useCallback(() => {
setUnreadNotificationCount(0);
}, []);
const value: WSContextValue = useMemo(() => ({
wsStatus,
wsErrorMessage,
onlineUsers,
voteCounts,
myVotes,
recentDumps,
deletedDumpIds,
lastVoteEvent,
lastDumpEvent,
lastPlaylistEvent,
deletedPlaylistIds,
lastCommentEvent,
lastUserEvent,
unreadNotificationCount,
lastNotification,
castVote,
removeVote,
injectDump,
clearUnreadNotifications,
}), [
wsStatus,
wsErrorMessage,
onlineUsers,
voteCounts,
myVotes,
recentDumps,
deletedDumpIds,
lastVoteEvent,
lastDumpEvent,
lastPlaylistEvent,
deletedPlaylistIds,
lastCommentEvent,
lastUserEvent,
unreadNotificationCount,
lastNotification,
castVote,
removeVote,
injectDump,
clearUnreadNotifications,
]);
return (
<WSContext.Provider value={value}>
{children}
</WSContext.Provider>
);
}