I have have an Astro endpoint, using the Vercel adapter, that I am using to listen for changes to a MongoDB Atlas collection using SSE. If I navigate away from the page, using astro dev
the cancel method is called and the function is killed. However, using vercel dev
or deploying it to Vercel, cancel is never called and the function is left open until it times out which can lead to several idle functions running for up to 13 minutes. Is this a bug? Is there a better approach to killing these inactive functions once the client disconnects?
Endpoint:
import { connectToDatabase } from "@lib/mongo-client";
import { ObjectId } from "mongodb";
import type { APIContext } from "astro";
import type { ChangeStream } from "mongodb";
export async function GET({ params, locals }: APIContext) {
const { id } = params;
const { user } = locals;
const { db } = await connectToDatabase();
const encoder = new TextEncoder();
let changeStream: ChangeStream | null = null;
let streamController: ReadableStreamDefaultController | null = null;
function destroy() {
changeStream?.close();
changeStream = null;
streamController?.close();
streamController = null;
}
const stream = new ReadableStream({
start(controller: ReadableStreamDefaultController) {
streamController = controller;
streamController.enqueue(
encoder.encode(
`data:${JSON.stringify({ user: user?.email, doc: id })}\n\n`,
),
);
const collection = db.collection("lease");
changeStream = collection.watch([
{ $match: { "documentKey._id": new ObjectId(id) } },
]);
changeStream
.on("change", (next) => {
// biome-ignore lint/suspicious/noConsoleLog: testing
console.log(next);
streamController?.enqueue(
encoder.encode(`data:${JSON.stringify(next)}\n\n`),
);
})
.once("error", (error) => {
console.error(error);
destroy();
});
},
cancel() {
// biome-ignore lint/suspicious/noConsoleLog: testing
console.log("Stream cancelled");
destroy();
},
});
const headers = new Headers({
"Cache-Control": "no-cache, no-store, must-revalidate",
Connection: "keep-alive",
"Content-Type": "text/event-stream",
"X-Accel-Buffering": "no",
});
return new Response(stream, { headers });
}
Client:
---
import { actions } from "astro:actions";
import AppFooter from "@components/AppFooter.astro";
import AppHeader from "@components/AppHeader.astro";
import Layout from "@layouts/Layout.astro";
const { id } = Astro.params;
if (!id) throw new Error("ID is required");
const { data, error } = await Astro.callAction(actions.doc.findOneById, id);
if (error) throw error;
---
<Layout>
<AppHeader />
<main class="flex-grow p-8 text-sm">
<h1 class="mb-8 font-bold text-4xl text-center">Lease {id}</h1>
<pre
class="mb-8 w-full overflow-auto font-mono">{JSON.stringify(data, null, 2)}</pre>
<ul
class="bg-base-100 *:px-2 border border-base-300 h-96 overflow-auto font-mono *:whitespace-pre"
data-id={id}
id="messages"
>
</ul>
</main>
<AppFooter />
</Layout>
<script>
const $messages = document.querySelector("ul") as HTMLUListElement;
const url = `/api/doc/watch/${$messages.dataset.id}`;
let eventSource: EventSource | null = null;
function appendMessage(text: string): void {
const $el = document.createElement("li");
$el.textContent = text;
$messages.append($el);
$messages.scrollTop = $messages.scrollHeight;
}
const connectToSSE = () => {
if (eventSource) eventSource.close();
eventSource = new EventSource(url);
appendMessage("Connecting…");
eventSource.addEventListener("open", (event: Event) => {
appendMessage("Connection established!");
});
eventSource.addEventListener("message", (event: MessageEvent) => {
appendMessage(
`Received: ${JSON.stringify(JSON.parse(event.data), null, 2)}`,
);
});
eventSource.addEventListener("update", (event: MessageEvent) => {
appendMessage(`Update event: ${event.data}`);
});
eventSource.addEventListener("error", (event: Event): void => {
appendMessage("Error occurred, connection closed");
if (eventSource) eventSource.close();
});
};
document.addEventListener("DOMContentLoaded", connectToSSE);
</script>