properly handle closed for ReadableStream

This commit is contained in:
Muki Kiboigo
2025-09-09 11:03:31 -07:00
parent af75ce79ac
commit 76dae43103
3 changed files with 38 additions and 18 deletions

View File

@@ -36,6 +36,9 @@ const State = union(enum) {
// This promise resolves when a stream is canceled.
cancel_resolver: v8.Persistent(v8.PromiseResolver),
closed_resolver: v8.Persistent(v8.PromiseResolver),
reader_resolver: ?v8.Persistent(v8.PromiseResolver) = null,
locked: bool = false,
state: State = .readable,
@@ -43,7 +46,6 @@ cancel_fn: ?Env.Function = null,
pull_fn: ?Env.Function = null,
strategy: QueueingStrategy,
reader_resolver: ?v8.Persistent(v8.PromiseResolver) = null,
queue: std.ArrayListUnmanaged([]const u8) = .empty,
pub const ReadableStreamReadResult = struct {
@@ -82,8 +84,13 @@ pub fn constructor(underlying: ?UnderlyingSource, _strategy: ?QueueingStrategy,
v8.PromiseResolver.init(page.main_context.v8_context),
);
const closed_resolver = v8.Persistent(v8.PromiseResolver).init(
page.main_context.isolate,
v8.PromiseResolver.init(page.main_context.v8_context),
);
const stream = try page.arena.create(ReadableStream);
stream.* = ReadableStream{ .cancel_resolver = cancel_resolver, .strategy = strategy };
stream.* = ReadableStream{ .cancel_resolver = cancel_resolver, .closed_resolver = closed_resolver, .strategy = strategy };
const controller = ReadableStreamDefaultController{ .stream = stream };
@@ -106,6 +113,15 @@ pub fn constructor(underlying: ?UnderlyingSource, _strategy: ?QueueingStrategy,
return stream;
}
pub fn destructor(self: *ReadableStream) void {
self.cancel_resolver.deinit();
self.closed_resolver.deinit();
if (self.reader_resolver) |*rr| {
rr.deinit();
}
}
pub fn get_locked(self: *const ReadableStream) bool {
return self.locked;
}
@@ -150,7 +166,7 @@ const GetReaderOptions = struct {
mode: ?[]const u8 = null,
};
pub fn _getReader(self: *ReadableStream, _options: ?GetReaderOptions, page: *Page) !ReadableStreamDefaultReader {
pub fn _getReader(self: *ReadableStream, _options: ?GetReaderOptions) !ReadableStreamDefaultReader {
if (self.locked) {
return error.TypeError;
}
@@ -159,7 +175,7 @@ pub fn _getReader(self: *ReadableStream, _options: ?GetReaderOptions, page: *Pag
const options = _options orelse GetReaderOptions{};
_ = options;
return ReadableStreamDefaultReader.constructor(self, page);
return ReadableStreamDefaultReader.constructor(self);
}
// TODO: pipeThrough (requires TransformStream)

View File

@@ -39,6 +39,7 @@ pub fn _close(self: *ReadableStreamDefaultController, _reason: ?[]const u8, page
const reason = if (_reason) |reason| try page.arena.dupe(u8, reason) else null;
self.stream.state = .{ .closed = reason };
// Resolve the Reader Promise
if (self.stream.reader_resolver) |rr| {
const resolver = Env.PromiseResolver{
.js_context = page.main_context,
@@ -49,6 +50,14 @@ pub fn _close(self: *ReadableStreamDefaultController, _reason: ?[]const u8, page
self.stream.reader_resolver = null;
}
// Resolve the Closed promise.
const closed_resolver = Env.PromiseResolver{
.js_context = page.main_context,
.resolver = self.stream.closed_resolver.castToPromiseResolver(),
};
try closed_resolver.resolve({});
// close just sets as closed meaning it wont READ any more but anything in the queue is fine to read.
// to discard, must use cancel.
}

View File

@@ -29,23 +29,18 @@ const ReadableStreamReadResult = @import("./ReadableStream.zig").ReadableStreamR
const ReadableStreamDefaultReader = @This();
stream: *ReadableStream,
// This promise resolves when the stream is closed.
closed_resolver: Env.PromiseResolver,
pub fn constructor(stream: *ReadableStream, page: *Page) ReadableStreamDefaultReader {
const closed_resolver = Env.PromiseResolver{
.js_context = page.main_context,
.resolver = v8.PromiseResolver.init(page.main_context.v8_context),
};
return .{
.stream = stream,
.closed_resolver = closed_resolver,
};
pub fn constructor(stream: *ReadableStream) ReadableStreamDefaultReader {
return .{ .stream = stream };
}
pub fn get_closed(self: *const ReadableStreamDefaultReader) Env.Promise {
return self.closed_resolver.promise();
pub fn get_closed(self: *const ReadableStreamDefaultReader, page: *Page) Env.Promise {
const resolver = Env.PromiseResolver{
.js_context = page.main_context,
.resolver = self.stream.closed_resolver.castToPromiseResolver(),
};
return resolver.promise();
}
pub fn _cancel(self: *ReadableStreamDefaultReader, reason: ?[]const u8, page: *Page) !Env.Promise {