Add start/cancel/fill/strategy support for ReadableStream

This commit is contained in:
Karl Seguin
2025-12-08 18:23:50 +08:00
parent 09328aeb5a
commit fbb37633f0
5 changed files with 314 additions and 26 deletions

View File

@@ -178,3 +178,126 @@
testing.expectEqual('object', typeof stream); testing.expectEqual('object', typeof stream);
})(); })();
</script> </script>
<script id=readable_stream_locked>
const stream1 = new ReadableStream({
start(controller) {
controller.enqueue("data");
controller.close();
}
});
testing.expectEqual(false, stream1.locked);
const reader1 = stream1.getReader();
testing.expectEqual(true, stream1.locked);
reader1.releaseLock();
testing.expectEqual(false, stream1.locked);
</script>
<script id=readable_stream_cancel>
(async function() {
var cancelCalled = false;
var cancelReason = null;
const stream2 = new ReadableStream({
start(controller) {
controller.enqueue("data1");
controller.enqueue("data2");
},
cancel(reason) {
cancelCalled = true;
cancelReason = reason;
}
});
const result = await stream2.cancel("user requested");
testing.expectEqual(undefined, result);
testing.expectEqual(true, cancelCalled);
testing.expectEqual("user requested", cancelReason);
})();
</script>
<script id=readable_stream_pull>
(async function() {
var pullCount = 0;
const stream3 = new ReadableStream({
start(controller) {
controller.enqueue("initial");
},
pull(controller) {
pullCount++;
if (pullCount <= 2) {
controller.enqueue("pulled" + pullCount);
}
if (pullCount === 2) {
controller.close();
}
}
}, { highWaterMark: 1 });
const reader3 = stream3.getReader();
const data1 = await reader3.read();
testing.expectEqual("initial", data1.value);
testing.expectEqual(false, data1.done);
const data2 = await reader3.read();
testing.expectEqual("pulled1", data2.value);
testing.expectEqual(false, data2.done);
})();
</script>
<script id=readable_stream_desired_size>
var desiredSizes = [];
const stream4 = new ReadableStream({
start(controller) {
desiredSizes.push(controller.desiredSize);
controller.enqueue("a");
desiredSizes.push(controller.desiredSize);
controller.enqueue("b");
desiredSizes.push(controller.desiredSize);
}
}, { highWaterMark: 2 });
testing.expectEqual(2, desiredSizes[0]);
testing.expectEqual(1, desiredSizes[1]);
testing.expectEqual(0, desiredSizes[2]);
</script>
<script id=readable_stream_start_with_pull>
(async function() {
var pullCount = 0;
const stream5 = new ReadableStream({
start(controller) {
controller.enqueue("start1");
controller.enqueue("start2");
},
pull(controller) {
pullCount++;
if (pullCount === 1) {
controller.enqueue("pull1");
} else if (pullCount === 2) {
controller.enqueue("pull2");
controller.close();
}
}
}, { highWaterMark: 2 });
const reader5 = stream5.getReader();
const data1 = await reader5.read();
testing.expectEqual("start1", data1.value);
testing.expectEqual(false, data1.done);
const data2 = await reader5.read();
testing.expectEqual("start2", data2.value);
testing.expectEqual(false, data2.done);
const data3 = await reader5.read();
testing.expectEqual("pull1", data3.value);
testing.expectEqual(false, data3.done);
})();
</script>

View File

@@ -78,7 +78,7 @@ pub fn getBody(self: *const Response, page: *Page) !?*ReadableStream {
// Empty string should create a closed stream with no data // Empty string should create a closed stream with no data
if (body.len == 0) { if (body.len == 0) {
const stream = try ReadableStream.init(page); const stream = try ReadableStream.init(null, null, page);
try stream._controller.close(); try stream._controller.close();
return stream; return stream;
} }

View File

@@ -43,9 +43,27 @@ _state: State,
_reader: ?*ReadableStreamDefaultReader, _reader: ?*ReadableStreamDefaultReader,
_controller: *ReadableStreamDefaultController, _controller: *ReadableStreamDefaultController,
_stored_error: ?[]const u8, _stored_error: ?[]const u8,
_pull_fn: ?js.Function = null,
_pulling: bool = false,
_pull_again: bool = false,
_cancel: ?Cancel = null,
pub fn init(page: *Page) !*ReadableStream { const UnderlyingSource = struct {
const stream = try page._factory.create(ReadableStream{ start: ?js.Function = null,
pull: ?js.Function = null,
cancel: ?js.Function = null,
type: ?[]const u8 = null,
};
const QueueingStrategy = struct {
size: ?js.Function = null,
highWaterMark: u32 = 1,
};
pub fn init(src_: ?UnderlyingSource, strategy_: ?QueueingStrategy, page: *Page) !*ReadableStream {
const strategy: QueueingStrategy = strategy_ orelse .{};
const self = try page._factory.create(ReadableStream{
._page = page, ._page = page,
._state = .readable, ._state = .readable,
._reader = null, ._reader = null,
@@ -53,22 +71,40 @@ pub fn init(page: *Page) !*ReadableStream {
._stored_error = null, ._stored_error = null,
}); });
stream._controller = try ReadableStreamDefaultController.init(stream, page); self._controller = try ReadableStreamDefaultController.init(self, strategy.highWaterMark, page);
return stream;
if (src_) |src| {
if (src.start) |start| {
try start.call(void, .{self._controller});
}
if (src.cancel) |callback| {
self._cancel = .{
.callback = callback,
};
}
if (src.pull) |pull| {
self._pull_fn = pull;
try self.callPullIfNeeded();
}
}
return self;
} }
pub fn initWithData(data: []const u8, page: *Page) !*ReadableStream { pub fn initWithData(data: []const u8, page: *Page) !*ReadableStream {
const stream = try init(page); const stream = try init(null, null, page);
// For Phase 1: immediately enqueue all data and close // For Phase 1: immediately enqueue all data and close
try stream._controller.enqueue(data); try stream._controller.enqueue(.{ .uint8array = .{ .values = data } });
try stream._controller.close(); try stream._controller.close();
return stream; return stream;
} }
pub fn getReader(self: *ReadableStream, page: *Page) !*ReadableStreamDefaultReader { pub fn getReader(self: *ReadableStream, page: *Page) !*ReadableStreamDefaultReader {
if (self._reader != null) { if (self.getLocked()) {
return error.ReaderLocked; return error.ReaderLocked;
} }
@@ -85,6 +121,101 @@ pub fn getAsyncIterator(self: *ReadableStream, page: *Page) !*AsyncIterator {
return AsyncIterator.init(self, page); return AsyncIterator.init(self, page);
} }
pub fn getLocked(self: *const ReadableStream) bool {
return self._reader != null;
}
pub fn callPullIfNeeded(self: *ReadableStream) !void {
if (!self.shouldCallPull()) {
return;
}
if (self._pulling) {
self._pull_again = true;
return;
}
self._pulling = true;
const pull_fn = self._pull_fn orelse return;
// Call the pull function
// Note: In a complete implementation, we'd handle the promise returned by pull
// and set _pulling = false when it resolves
try pull_fn.call(void, .{self._controller});
self._pulling = false;
// If pull was requested again while we were pulling, pull again
if (self._pull_again) {
self._pull_again = false;
try self.callPullIfNeeded();
}
}
fn shouldCallPull(self: *const ReadableStream) bool {
if (self._state != .readable) {
return false;
}
if (self._pull_fn == null) {
return false;
}
const desired_size = self._controller.getDesiredSize() orelse return false;
return desired_size > 0;
}
pub fn cancel(self: *ReadableStream, reason: ?[]const u8, page: *Page) !js.Promise {
if (self._state != .readable) {
if (self._cancel) |c| {
if (c.resolver) |r| {
return r.promise();
}
}
return page.js.resolvePromise(.{});
}
if (self._cancel == null) {
self._cancel = Cancel{};
}
var c = &self._cancel.?;
if (c.resolver == null) {
c.resolver = try page.js.createPromiseResolver(.self);
}
// Execute the cancel callback if provided
if (c.callback) |cb| {
if (reason) |r| {
try cb.call(void, .{r});
} else {
try cb.call(void, .{});
}
}
self._state = .closed;
self._controller._queue.clearRetainingCapacity();
const result = ReadableStreamDefaultReader.ReadResult{
.done = true,
.value = .empty,
};
for (self._controller._pending_reads.items) |resolver| {
resolver.resolve("stream cancelled", result);
}
self._controller._pending_reads.clearRetainingCapacity();
c.resolver.?.resolve("ReadableStream.cancel", {});
return c.resolver.?.promise();
}
const Cancel = struct {
callback: ?js.Function = null,
reason: ?[]const u8 = null,
resolver: ?js.PersistentPromiseResolver = null,
};
pub const JsApi = struct { pub const JsApi = struct {
pub const bridge = js.Bridge(ReadableStream); pub const bridge = js.Bridge(ReadableStream);
@@ -95,7 +226,9 @@ pub const JsApi = struct {
}; };
pub const constructor = bridge.constructor(ReadableStream.init, .{}); pub const constructor = bridge.constructor(ReadableStream.init, .{});
pub const cancel = bridge.function(ReadableStream.cancel, .{});
pub const getReader = bridge.function(ReadableStream.getReader, .{}); pub const getReader = bridge.function(ReadableStream.getReader, .{});
pub const locked = bridge.accessor(ReadableStream.getLocked, null, .{});
pub const symbol_async_iterator = bridge.iterator(ReadableStream.getAsyncIterator, .{ .async = true }); pub const symbol_async_iterator = bridge.iterator(ReadableStream.getAsyncIterator, .{ .async = true });
}; };

View File

@@ -25,19 +25,34 @@ const ReadableStreamDefaultReader = @import("ReadableStreamDefaultReader.zig");
const ReadableStreamDefaultController = @This(); const ReadableStreamDefaultController = @This();
pub const Chunk = union(enum) {
// the order matters, sorry.
uint8array: js.TypedArray(u8),
string: []const u8,
pub fn dupe(self: Chunk, allocator: std.mem.Allocator) !Chunk {
return switch (self) {
.string => |str| .{ .string = try allocator.dupe(u8, str) },
.uint8array => |arr| .{ .uint8array = try arr.dupe(allocator) },
};
}
};
_page: *Page, _page: *Page,
_stream: *ReadableStream, _stream: *ReadableStream,
_arena: std.mem.Allocator, _arena: std.mem.Allocator,
_queue: std.ArrayList([]const u8), _queue: std.ArrayList(Chunk),
_pending_reads: std.ArrayList(js.PersistentPromiseResolver), _pending_reads: std.ArrayList(js.PersistentPromiseResolver),
_high_water_mark: u32,
pub fn init(stream: *ReadableStream, page: *Page) !*ReadableStreamDefaultController { pub fn init(stream: *ReadableStream, high_water_mark: u32, page: *Page) !*ReadableStreamDefaultController {
return page._factory.create(ReadableStreamDefaultController{ return page._factory.create(ReadableStreamDefaultController{
._page = page, ._page = page,
._queue = .empty, ._queue = .empty,
._stream = stream, ._stream = stream,
._arena = page.arena, ._arena = page.arena,
._pending_reads = .empty, ._pending_reads = .empty,
._high_water_mark = high_water_mark,
}); });
} }
@@ -47,13 +62,13 @@ pub fn addPendingRead(self: *ReadableStreamDefaultController, page: *Page) !js.P
return resolver.promise(); return resolver.promise();
} }
pub fn enqueue(self: *ReadableStreamDefaultController, chunk: []const u8) !void { pub fn enqueue(self: *ReadableStreamDefaultController, chunk: Chunk) !void {
if (self._stream._state != .readable) { if (self._stream._state != .readable) {
return error.StreamNotReadable; return error.StreamNotReadable;
} }
if (self._pending_reads.items.len == 0) { if (self._pending_reads.items.len == 0) {
const chunk_copy = try self._page.arena.dupe(u8, chunk); const chunk_copy = try chunk.dupe(self._page.arena);
return self._queue.append(self._arena, chunk_copy); return self._queue.append(self._arena, chunk_copy);
} }
@@ -62,7 +77,7 @@ pub fn enqueue(self: *ReadableStreamDefaultController, chunk: []const u8) !void
const resolver = self._pending_reads.orderedRemove(0); const resolver = self._pending_reads.orderedRemove(0);
const result = ReadableStreamDefaultReader.ReadResult{ const result = ReadableStreamDefaultReader.ReadResult{
.done = false, .done = false,
.value = .{ .values = chunk }, .value = .fromChunk(chunk),
}; };
resolver.resolve("stream enqueue", result); resolver.resolve("stream enqueue", result);
} }
@@ -77,7 +92,7 @@ pub fn close(self: *ReadableStreamDefaultController) !void {
// Resolve all pending reads with done=true // Resolve all pending reads with done=true
const result = ReadableStreamDefaultReader.ReadResult{ const result = ReadableStreamDefaultReader.ReadResult{
.done = true, .done = true,
.value = null, .value = .empty,
}; };
for (self._pending_reads.items) |resolver| { for (self._pending_reads.items) |resolver| {
resolver.resolve("stream close", result); resolver.resolve("stream close", result);
@@ -100,11 +115,16 @@ pub fn doError(self: *ReadableStreamDefaultController, err: []const u8) !void {
self._pending_reads.clearRetainingCapacity(); self._pending_reads.clearRetainingCapacity();
} }
pub fn dequeue(self: *ReadableStreamDefaultController) ?[]const u8 { pub fn dequeue(self: *ReadableStreamDefaultController) ?Chunk {
if (self._queue.items.len == 0) { if (self._queue.items.len == 0) {
return null; return null;
} }
return self._queue.orderedRemove(0); const chunk = self._queue.orderedRemove(0);
// After dequeueing, we may need to pull more data
self._stream.callPullIfNeeded() catch {};
return chunk;
} }
pub fn getDesiredSize(self: *const ReadableStreamDefaultController) ?i32 { pub fn getDesiredSize(self: *const ReadableStreamDefaultController) ?i32 {
@@ -112,9 +132,9 @@ pub fn getDesiredSize(self: *const ReadableStreamDefaultController) ?i32 {
.errored => return null, .errored => return null,
.closed => return 0, .closed => return 0,
.readable => { .readable => {
// For now, just report based on queue size const queue_size: i32 = @intCast(self._queue.items.len);
// In a real implementation, this would use highWaterMark const hwm: i32 = @intCast(self._high_water_mark);
return @as(i32, 1) - @as(i32, @intCast(self._queue.items.len)); return hwm - queue_size;
}, },
} }
} }

View File

@@ -21,6 +21,7 @@ const js = @import("../../js/js.zig");
const Page = @import("../../Page.zig"); const Page = @import("../../Page.zig");
const ReadableStream = @import("ReadableStream.zig"); const ReadableStream = @import("ReadableStream.zig");
const ReadableStreamDefaultController = @import("ReadableStreamDefaultController.zig");
const ReadableStreamDefaultReader = @This(); const ReadableStreamDefaultReader = @This();
@@ -36,7 +37,21 @@ pub fn init(stream: *ReadableStream, page: *Page) !*ReadableStreamDefaultReader
pub const ReadResult = struct { pub const ReadResult = struct {
done: bool, done: bool,
value: ?js.TypedArray(u8), value: Chunk,
// Done like this so that we can properly return undefined in some cases
const Chunk = union(enum) {
empty,
string: []const u8,
uint8array: js.TypedArray(u8),
pub fn fromChunk(chunk: ReadableStreamDefaultController.Chunk) Chunk {
return switch (chunk) {
.string => |s| .{ .string = s },
.uint8array => |arr| .{ .uint8array = arr },
};
}
};
}; };
pub fn read(self: *ReadableStreamDefaultReader, page: *Page) !js.Promise { pub fn read(self: *ReadableStreamDefaultReader, page: *Page) !js.Promise {
@@ -52,7 +67,7 @@ pub fn read(self: *ReadableStreamDefaultReader, page: *Page) !js.Promise {
if (stream._controller.dequeue()) |chunk| { if (stream._controller.dequeue()) |chunk| {
const result = ReadResult{ const result = ReadResult{
.done = false, .done = false,
.value = js.TypedArray(u8){ .values = chunk }, .value = .fromChunk(chunk),
}; };
return page.js.resolvePromise(result); return page.js.resolvePromise(result);
} }
@@ -60,7 +75,7 @@ pub fn read(self: *ReadableStreamDefaultReader, page: *Page) !js.Promise {
if (stream._state == .closed) { if (stream._state == .closed) {
const result = ReadResult{ const result = ReadResult{
.done = true, .done = true,
.value = null, .value = .empty,
}; };
return page.js.resolvePromise(result); return page.js.resolvePromise(result);
} }
@@ -81,12 +96,9 @@ pub fn cancel(self: *ReadableStreamDefaultReader, reason_: ?[]const u8, page: *P
return page.js.rejectPromise("Reader has been released"); return page.js.rejectPromise("Reader has been released");
}; };
const reason = reason_ orelse "canceled";
try stream._controller.doError(reason);
self.releaseLock(); self.releaseLock();
return page.js.resolvePromise(.{}); return stream.cancel(reason_, page);
} }
pub const JsApi = struct { pub const JsApi = struct {