From 76dae43103064f145cd8312709bf21072eef06ca Mon Sep 17 00:00:00 2001 From: Muki Kiboigo Date: Tue, 9 Sep 2025 11:03:31 -0700 Subject: [PATCH] properly handle closed for ReadableStream --- src/browser/streams/ReadableStream.zig | 24 +++++++++++++++---- .../ReadableStreamDefaultController.zig | 9 +++++++ .../streams/ReadableStreamDefaultReader.zig | 23 +++++++----------- 3 files changed, 38 insertions(+), 18 deletions(-) diff --git a/src/browser/streams/ReadableStream.zig b/src/browser/streams/ReadableStream.zig index bc8fb41b..8660123b 100644 --- a/src/browser/streams/ReadableStream.zig +++ b/src/browser/streams/ReadableStream.zig @@ -36,6 +36,9 @@ const State = union(enum) { // This promise resolves when a stream is canceled. cancel_resolver: v8.Persistent(v8.PromiseResolver), +closed_resolver: v8.Persistent(v8.PromiseResolver), +reader_resolver: ?v8.Persistent(v8.PromiseResolver) = null, + locked: bool = false, state: State = .readable, @@ -43,7 +46,6 @@ cancel_fn: ?Env.Function = null, pull_fn: ?Env.Function = null, strategy: QueueingStrategy, -reader_resolver: ?v8.Persistent(v8.PromiseResolver) = null, queue: std.ArrayListUnmanaged([]const u8) = .empty, pub const ReadableStreamReadResult = struct { @@ -82,8 +84,13 @@ pub fn constructor(underlying: ?UnderlyingSource, _strategy: ?QueueingStrategy, v8.PromiseResolver.init(page.main_context.v8_context), ); + const closed_resolver = v8.Persistent(v8.PromiseResolver).init( + page.main_context.isolate, + v8.PromiseResolver.init(page.main_context.v8_context), + ); + const stream = try page.arena.create(ReadableStream); - stream.* = ReadableStream{ .cancel_resolver = cancel_resolver, .strategy = strategy }; + stream.* = ReadableStream{ .cancel_resolver = cancel_resolver, .closed_resolver = closed_resolver, .strategy = strategy }; const controller = ReadableStreamDefaultController{ .stream = stream }; @@ -106,6 +113,15 @@ pub fn constructor(underlying: ?UnderlyingSource, _strategy: ?QueueingStrategy, return stream; } +pub fn destructor(self: *ReadableStream) void { + self.cancel_resolver.deinit(); + self.closed_resolver.deinit(); + + if (self.reader_resolver) |*rr| { + rr.deinit(); + } +} + pub fn get_locked(self: *const ReadableStream) bool { return self.locked; } @@ -150,7 +166,7 @@ const GetReaderOptions = struct { mode: ?[]const u8 = null, }; -pub fn _getReader(self: *ReadableStream, _options: ?GetReaderOptions, page: *Page) !ReadableStreamDefaultReader { +pub fn _getReader(self: *ReadableStream, _options: ?GetReaderOptions) !ReadableStreamDefaultReader { if (self.locked) { return error.TypeError; } @@ -159,7 +175,7 @@ pub fn _getReader(self: *ReadableStream, _options: ?GetReaderOptions, page: *Pag const options = _options orelse GetReaderOptions{}; _ = options; - return ReadableStreamDefaultReader.constructor(self, page); + return ReadableStreamDefaultReader.constructor(self); } // TODO: pipeThrough (requires TransformStream) diff --git a/src/browser/streams/ReadableStreamDefaultController.zig b/src/browser/streams/ReadableStreamDefaultController.zig index df55c85e..af5057a7 100644 --- a/src/browser/streams/ReadableStreamDefaultController.zig +++ b/src/browser/streams/ReadableStreamDefaultController.zig @@ -39,6 +39,7 @@ pub fn _close(self: *ReadableStreamDefaultController, _reason: ?[]const u8, page const reason = if (_reason) |reason| try page.arena.dupe(u8, reason) else null; self.stream.state = .{ .closed = reason }; + // Resolve the Reader Promise if (self.stream.reader_resolver) |rr| { const resolver = Env.PromiseResolver{ .js_context = page.main_context, @@ -49,6 +50,14 @@ pub fn _close(self: *ReadableStreamDefaultController, _reason: ?[]const u8, page self.stream.reader_resolver = null; } + // Resolve the Closed promise. + const closed_resolver = Env.PromiseResolver{ + .js_context = page.main_context, + .resolver = self.stream.closed_resolver.castToPromiseResolver(), + }; + + try closed_resolver.resolve({}); + // close just sets as closed meaning it wont READ any more but anything in the queue is fine to read. // to discard, must use cancel. } diff --git a/src/browser/streams/ReadableStreamDefaultReader.zig b/src/browser/streams/ReadableStreamDefaultReader.zig index a339fb7a..ac7bcf68 100644 --- a/src/browser/streams/ReadableStreamDefaultReader.zig +++ b/src/browser/streams/ReadableStreamDefaultReader.zig @@ -29,23 +29,18 @@ const ReadableStreamReadResult = @import("./ReadableStream.zig").ReadableStreamR const ReadableStreamDefaultReader = @This(); stream: *ReadableStream, -// This promise resolves when the stream is closed. -closed_resolver: Env.PromiseResolver, -pub fn constructor(stream: *ReadableStream, page: *Page) ReadableStreamDefaultReader { - const closed_resolver = Env.PromiseResolver{ - .js_context = page.main_context, - .resolver = v8.PromiseResolver.init(page.main_context.v8_context), - }; - - return .{ - .stream = stream, - .closed_resolver = closed_resolver, - }; +pub fn constructor(stream: *ReadableStream) ReadableStreamDefaultReader { + return .{ .stream = stream }; } -pub fn get_closed(self: *const ReadableStreamDefaultReader) Env.Promise { - return self.closed_resolver.promise(); +pub fn get_closed(self: *const ReadableStreamDefaultReader, page: *Page) Env.Promise { + const resolver = Env.PromiseResolver{ + .js_context = page.main_context, + .resolver = self.stream.closed_resolver.castToPromiseResolver(), + }; + + return resolver.promise(); } pub fn _cancel(self: *ReadableStreamDefaultReader, reason: ?[]const u8, page: *Page) !Env.Promise {