Server-Sent Events & ReadableStream not calling cancel method

I have have an Astro endpoint, using the Vercel adapter, that I am using to listen for changes to a MongoDB Atlas collection using SSE. If I navigate away from the page, using astro dev the cancel method is called and the function is killed. However, using vercel dev or deploying it to Vercel, cancel is never called and the function is left open until it times out which can lead to several idle functions running for up to 13 minutes. Is this a bug? Is there a better approach to killing these inactive functions once the client disconnects?

Endpoint:

import { connectToDatabase } from "@lib/mongo-client";
import { ObjectId } from "mongodb";

import type { APIContext } from "astro";
import type { ChangeStream } from "mongodb";

export async function GET({ params, locals }: APIContext) {
  const { id } = params;
  const { user } = locals;
  const { db } = await connectToDatabase();
  const encoder = new TextEncoder();

  let changeStream: ChangeStream | null = null;
  let streamController: ReadableStreamDefaultController | null = null;

  function destroy() {
    changeStream?.close();
    changeStream = null;
    streamController?.close();
    streamController = null;
  }

  const stream = new ReadableStream({
    start(controller: ReadableStreamDefaultController) {
      streamController = controller;
      streamController.enqueue(
        encoder.encode(
          `data:${JSON.stringify({ user: user?.email, doc: id })}\n\n`,
        ),
      );

      const collection = db.collection("lease");
      changeStream = collection.watch([
        { $match: { "documentKey._id": new ObjectId(id) } },
      ]);

      changeStream
        .on("change", (next) => {
          // biome-ignore lint/suspicious/noConsoleLog: testing
          console.log(next);

          streamController?.enqueue(
            encoder.encode(`data:${JSON.stringify(next)}\n\n`),
          );
        })
        .once("error", (error) => {
          console.error(error);
          destroy();
        });
    },

    cancel() {
      // biome-ignore lint/suspicious/noConsoleLog: testing
      console.log("Stream cancelled");
      destroy();
    },
  });

  const headers = new Headers({
    "Cache-Control": "no-cache, no-store, must-revalidate",
    Connection: "keep-alive",
    "Content-Type": "text/event-stream",
    "X-Accel-Buffering": "no",
  });

  return new Response(stream, { headers });
}

Client:

---
import { actions } from "astro:actions";
import AppFooter from "@components/AppFooter.astro";
import AppHeader from "@components/AppHeader.astro";
import Layout from "@layouts/Layout.astro";

const { id } = Astro.params;
if (!id) throw new Error("ID is required");

const { data, error } = await Astro.callAction(actions.doc.findOneById, id);
if (error) throw error;
---

<Layout>
  <AppHeader />
  <main class="flex-grow p-8 text-sm">
    <h1 class="mb-8 font-bold text-4xl text-center">Lease {id}</h1>
    <pre
      class="mb-8 w-full overflow-auto font-mono">{JSON.stringify(data, null, 2)}</pre>
    <ul
      class="bg-base-100 *:px-2 border border-base-300 h-96 overflow-auto font-mono *:whitespace-pre"
      data-id={id}
      id="messages"
    >
    </ul>
  </main>
  <AppFooter />
</Layout>

<script>
  const $messages = document.querySelector("ul") as HTMLUListElement;
  const url = `/api/doc/watch/${$messages.dataset.id}`;

  let eventSource: EventSource | null = null;

  function appendMessage(text: string): void {
    const $el = document.createElement("li");
    $el.textContent = text;
    $messages.append($el);
    $messages.scrollTop = $messages.scrollHeight;
  }

  const connectToSSE = () => {
    if (eventSource) eventSource.close();

    eventSource = new EventSource(url);

    appendMessage("Connecting…");

    eventSource.addEventListener("open", (event: Event) => {
      appendMessage("Connection established!");
    });

    eventSource.addEventListener("message", (event: MessageEvent) => {
      appendMessage(
        `Received: ${JSON.stringify(JSON.parse(event.data), null, 2)}`,
      );
    });

    eventSource.addEventListener("update", (event: MessageEvent) => {
      appendMessage(`Update event: ${event.data}`);
    });

    eventSource.addEventListener("error", (event: Event): void => {
      appendMessage("Error occurred, connection closed");
      if (eventSource) eventSource.close();
    });
  };

  document.addEventListener("DOMContentLoaded", connectToSSE);
</script>

I’m not too familiar with Astro, but generally the way this needs to work is that the browser fires an abort event to all pending connections when you close the tab or navigate

Your server can usually access this by listening to events on request.signal.addEventListener('abort', () => {})

But based on this GitHub thread Astro might not support that

Thanks for finding that issue, Jacob! So, I’m guessing Vercel doesn’t support calling the cancel method and since Astro doesn’t support AbortSignals neither of those get called. I’ll keep my fingers crossed that the PR from that issue gets merged in soon and I’ll test it out again.

Does the cancel method get called when hosted anywhere else? If astro doesn’t read the abort signal, I don’t think any provider can know when to call cancel(), and I’d suspect it’s just handled differently in the dev server during local dev

But if there’s another way other providers are doing it then it might be possible to add that to the Vercel adapter too