diff --git a/src/browser/tests/streams/readable_stream.html b/src/browser/tests/streams/readable_stream.html index 3e0744bb..3d00d6cf 100644 --- a/src/browser/tests/streams/readable_stream.html +++ b/src/browser/tests/streams/readable_stream.html @@ -178,3 +178,126 @@ testing.expectEqual('object', typeof stream); })(); + + + + + + + + + + diff --git a/src/browser/webapi/net/Response.zig b/src/browser/webapi/net/Response.zig index fe464320..dffa4c60 100644 --- a/src/browser/webapi/net/Response.zig +++ b/src/browser/webapi/net/Response.zig @@ -78,7 +78,7 @@ pub fn getBody(self: *const Response, page: *Page) !?*ReadableStream { // Empty string should create a closed stream with no data if (body.len == 0) { - const stream = try ReadableStream.init(page); + const stream = try ReadableStream.init(null, null, page); try stream._controller.close(); return stream; } diff --git a/src/browser/webapi/streams/ReadableStream.zig b/src/browser/webapi/streams/ReadableStream.zig index eec8cb94..70c59ba0 100644 --- a/src/browser/webapi/streams/ReadableStream.zig +++ b/src/browser/webapi/streams/ReadableStream.zig @@ -43,9 +43,27 @@ _state: State, _reader: ?*ReadableStreamDefaultReader, _controller: *ReadableStreamDefaultController, _stored_error: ?[]const u8, +_pull_fn: ?js.Function = null, +_pulling: bool = false, +_pull_again: bool = false, +_cancel: ?Cancel = null, -pub fn init(page: *Page) !*ReadableStream { - const stream = try page._factory.create(ReadableStream{ +const UnderlyingSource = struct { + start: ?js.Function = null, + pull: ?js.Function = null, + cancel: ?js.Function = null, + type: ?[]const u8 = null, +}; + +const QueueingStrategy = struct { + size: ?js.Function = null, + highWaterMark: u32 = 1, +}; + +pub fn init(src_: ?UnderlyingSource, strategy_: ?QueueingStrategy, page: *Page) !*ReadableStream { + const strategy: QueueingStrategy = strategy_ orelse .{}; + + const self = try page._factory.create(ReadableStream{ ._page = page, ._state = .readable, ._reader = null, @@ -53,22 +71,40 @@ pub fn init(page: *Page) !*ReadableStream { ._stored_error = null, }); - stream._controller = try ReadableStreamDefaultController.init(stream, page); - return stream; + self._controller = try ReadableStreamDefaultController.init(self, strategy.highWaterMark, page); + + if (src_) |src| { + if (src.start) |start| { + try start.call(void, .{self._controller}); + } + + if (src.cancel) |callback| { + self._cancel = .{ + .callback = callback, + }; + } + + if (src.pull) |pull| { + self._pull_fn = pull; + try self.callPullIfNeeded(); + } + } + + return self; } pub fn initWithData(data: []const u8, page: *Page) !*ReadableStream { - const stream = try init(page); + const stream = try init(null, null, page); // For Phase 1: immediately enqueue all data and close - try stream._controller.enqueue(data); + try stream._controller.enqueue(.{ .uint8array = .{ .values = data } }); try stream._controller.close(); return stream; } pub fn getReader(self: *ReadableStream, page: *Page) !*ReadableStreamDefaultReader { - if (self._reader != null) { + if (self.getLocked()) { return error.ReaderLocked; } @@ -85,6 +121,101 @@ pub fn getAsyncIterator(self: *ReadableStream, page: *Page) !*AsyncIterator { return AsyncIterator.init(self, page); } +pub fn getLocked(self: *const ReadableStream) bool { + return self._reader != null; +} + +pub fn callPullIfNeeded(self: *ReadableStream) !void { + if (!self.shouldCallPull()) { + return; + } + + if (self._pulling) { + self._pull_again = true; + return; + } + + self._pulling = true; + + const pull_fn = self._pull_fn orelse return; + + // Call the pull function + // Note: In a complete implementation, we'd handle the promise returned by pull + // and set _pulling = false when it resolves + try pull_fn.call(void, .{self._controller}); + + self._pulling = false; + + // If pull was requested again while we were pulling, pull again + if (self._pull_again) { + self._pull_again = false; + try self.callPullIfNeeded(); + } +} + +fn shouldCallPull(self: *const ReadableStream) bool { + if (self._state != .readable) { + return false; + } + + if (self._pull_fn == null) { + return false; + } + + const desired_size = self._controller.getDesiredSize() orelse return false; + return desired_size > 0; +} + +pub fn cancel(self: *ReadableStream, reason: ?[]const u8, page: *Page) !js.Promise { + if (self._state != .readable) { + if (self._cancel) |c| { + if (c.resolver) |r| { + return r.promise(); + } + } + return page.js.resolvePromise(.{}); + } + + if (self._cancel == null) { + self._cancel = Cancel{}; + } + + var c = &self._cancel.?; + if (c.resolver == null) { + c.resolver = try page.js.createPromiseResolver(.self); + } + + // Execute the cancel callback if provided + if (c.callback) |cb| { + if (reason) |r| { + try cb.call(void, .{r}); + } else { + try cb.call(void, .{}); + } + } + + self._state = .closed; + self._controller._queue.clearRetainingCapacity(); + + const result = ReadableStreamDefaultReader.ReadResult{ + .done = true, + .value = .empty, + }; + for (self._controller._pending_reads.items) |resolver| { + resolver.resolve("stream cancelled", result); + } + self._controller._pending_reads.clearRetainingCapacity(); + + c.resolver.?.resolve("ReadableStream.cancel", {}); + return c.resolver.?.promise(); +} + +const Cancel = struct { + callback: ?js.Function = null, + reason: ?[]const u8 = null, + resolver: ?js.PersistentPromiseResolver = null, +}; + pub const JsApi = struct { pub const bridge = js.Bridge(ReadableStream); @@ -95,7 +226,9 @@ pub const JsApi = struct { }; pub const constructor = bridge.constructor(ReadableStream.init, .{}); + pub const cancel = bridge.function(ReadableStream.cancel, .{}); pub const getReader = bridge.function(ReadableStream.getReader, .{}); + pub const locked = bridge.accessor(ReadableStream.getLocked, null, .{}); pub const symbol_async_iterator = bridge.iterator(ReadableStream.getAsyncIterator, .{ .async = true }); }; diff --git a/src/browser/webapi/streams/ReadableStreamDefaultController.zig b/src/browser/webapi/streams/ReadableStreamDefaultController.zig index 0f4c26c6..6de3505a 100644 --- a/src/browser/webapi/streams/ReadableStreamDefaultController.zig +++ b/src/browser/webapi/streams/ReadableStreamDefaultController.zig @@ -25,19 +25,34 @@ const ReadableStreamDefaultReader = @import("ReadableStreamDefaultReader.zig"); const ReadableStreamDefaultController = @This(); +pub const Chunk = union(enum) { + // the order matters, sorry. + uint8array: js.TypedArray(u8), + string: []const u8, + + pub fn dupe(self: Chunk, allocator: std.mem.Allocator) !Chunk { + return switch (self) { + .string => |str| .{ .string = try allocator.dupe(u8, str) }, + .uint8array => |arr| .{ .uint8array = try arr.dupe(allocator) }, + }; + } +}; + _page: *Page, _stream: *ReadableStream, _arena: std.mem.Allocator, -_queue: std.ArrayList([]const u8), +_queue: std.ArrayList(Chunk), _pending_reads: std.ArrayList(js.PersistentPromiseResolver), +_high_water_mark: u32, -pub fn init(stream: *ReadableStream, page: *Page) !*ReadableStreamDefaultController { +pub fn init(stream: *ReadableStream, high_water_mark: u32, page: *Page) !*ReadableStreamDefaultController { return page._factory.create(ReadableStreamDefaultController{ ._page = page, ._queue = .empty, ._stream = stream, ._arena = page.arena, ._pending_reads = .empty, + ._high_water_mark = high_water_mark, }); } @@ -47,13 +62,13 @@ pub fn addPendingRead(self: *ReadableStreamDefaultController, page: *Page) !js.P return resolver.promise(); } -pub fn enqueue(self: *ReadableStreamDefaultController, chunk: []const u8) !void { +pub fn enqueue(self: *ReadableStreamDefaultController, chunk: Chunk) !void { if (self._stream._state != .readable) { return error.StreamNotReadable; } if (self._pending_reads.items.len == 0) { - const chunk_copy = try self._page.arena.dupe(u8, chunk); + const chunk_copy = try chunk.dupe(self._page.arena); return self._queue.append(self._arena, chunk_copy); } @@ -62,7 +77,7 @@ pub fn enqueue(self: *ReadableStreamDefaultController, chunk: []const u8) !void const resolver = self._pending_reads.orderedRemove(0); const result = ReadableStreamDefaultReader.ReadResult{ .done = false, - .value = .{ .values = chunk }, + .value = .fromChunk(chunk), }; resolver.resolve("stream enqueue", result); } @@ -77,7 +92,7 @@ pub fn close(self: *ReadableStreamDefaultController) !void { // Resolve all pending reads with done=true const result = ReadableStreamDefaultReader.ReadResult{ .done = true, - .value = null, + .value = .empty, }; for (self._pending_reads.items) |resolver| { resolver.resolve("stream close", result); @@ -100,11 +115,16 @@ pub fn doError(self: *ReadableStreamDefaultController, err: []const u8) !void { self._pending_reads.clearRetainingCapacity(); } -pub fn dequeue(self: *ReadableStreamDefaultController) ?[]const u8 { +pub fn dequeue(self: *ReadableStreamDefaultController) ?Chunk { if (self._queue.items.len == 0) { return null; } - return self._queue.orderedRemove(0); + const chunk = self._queue.orderedRemove(0); + + // After dequeueing, we may need to pull more data + self._stream.callPullIfNeeded() catch {}; + + return chunk; } pub fn getDesiredSize(self: *const ReadableStreamDefaultController) ?i32 { @@ -112,9 +132,9 @@ pub fn getDesiredSize(self: *const ReadableStreamDefaultController) ?i32 { .errored => return null, .closed => return 0, .readable => { - // For now, just report based on queue size - // In a real implementation, this would use highWaterMark - return @as(i32, 1) - @as(i32, @intCast(self._queue.items.len)); + const queue_size: i32 = @intCast(self._queue.items.len); + const hwm: i32 = @intCast(self._high_water_mark); + return hwm - queue_size; }, } } diff --git a/src/browser/webapi/streams/ReadableStreamDefaultReader.zig b/src/browser/webapi/streams/ReadableStreamDefaultReader.zig index d24562cd..16dd0ad4 100644 --- a/src/browser/webapi/streams/ReadableStreamDefaultReader.zig +++ b/src/browser/webapi/streams/ReadableStreamDefaultReader.zig @@ -21,6 +21,7 @@ const js = @import("../../js/js.zig"); const Page = @import("../../Page.zig"); const ReadableStream = @import("ReadableStream.zig"); +const ReadableStreamDefaultController = @import("ReadableStreamDefaultController.zig"); const ReadableStreamDefaultReader = @This(); @@ -36,7 +37,21 @@ pub fn init(stream: *ReadableStream, page: *Page) !*ReadableStreamDefaultReader pub const ReadResult = struct { done: bool, - value: ?js.TypedArray(u8), + value: Chunk, + + // Done like this so that we can properly return undefined in some cases + const Chunk = union(enum) { + empty, + string: []const u8, + uint8array: js.TypedArray(u8), + + pub fn fromChunk(chunk: ReadableStreamDefaultController.Chunk) Chunk { + return switch (chunk) { + .string => |s| .{ .string = s }, + .uint8array => |arr| .{ .uint8array = arr }, + }; + } + }; }; pub fn read(self: *ReadableStreamDefaultReader, page: *Page) !js.Promise { @@ -52,7 +67,7 @@ pub fn read(self: *ReadableStreamDefaultReader, page: *Page) !js.Promise { if (stream._controller.dequeue()) |chunk| { const result = ReadResult{ .done = false, - .value = js.TypedArray(u8){ .values = chunk }, + .value = .fromChunk(chunk), }; return page.js.resolvePromise(result); } @@ -60,7 +75,7 @@ pub fn read(self: *ReadableStreamDefaultReader, page: *Page) !js.Promise { if (stream._state == .closed) { const result = ReadResult{ .done = true, - .value = null, + .value = .empty, }; return page.js.resolvePromise(result); } @@ -81,12 +96,9 @@ pub fn cancel(self: *ReadableStreamDefaultReader, reason_: ?[]const u8, page: *P return page.js.rejectPromise("Reader has been released"); }; - const reason = reason_ orelse "canceled"; - - try stream._controller.doError(reason); self.releaseLock(); - return page.js.resolvePromise(.{}); + return stream.cancel(reason_, page); } pub const JsApi = struct {