ReadableStream

This commit is contained in:
Karl Seguin
2025-12-01 15:16:24 +08:00
parent 613428c54c
commit 92ae2c46b6
13 changed files with 632 additions and 59 deletions

View File

@@ -752,11 +752,7 @@ const Script = struct {
};
if (comptime IS_DEBUG) {
log.debug(.browser, "executed script", .{
.src = url,
.success = success,
.on_load = script_element._on_load != null
});
log.debug(.browser, "executed script", .{ .src = url, .success = success, .on_load = script_element._on_load != null });
}
defer page.tick();

View File

@@ -665,21 +665,21 @@ pub fn mapZigInstanceToJs(self: *Context, js_obj_: ?v8.Object, value: anytype) !
pub fn jsValueToZig(self: *Context, comptime T: type, js_value: v8.Value) !T {
switch (@typeInfo(T)) {
.optional => |o| {
// If type type is a ?js.Value or a ?js.Object, then we want to pass
// a js.Object, not null. Consider a function,
// _doSomething(arg: ?Env.JsObjet) void { ... }
//
// And then these two calls:
// doSomething();
// doSomething(null);
//
// In the first case, we'll pass `null`. But in the
// second, we'll pass a js.Object which represents
// null.
// If we don't have this code, both cases will
// pass in `null` and the the doSomething won't
// be able to tell if `null` was explicitly passed
// or whether no parameter was passed.
// If type type is a ?js.Value or a ?js.Object, then we want to pass
// a js.Object, not null. Consider a function,
// _doSomething(arg: ?Env.JsObjet) void { ... }
//
// And then these two calls:
// doSomething();
// doSomething(null);
//
// In the first case, we'll pass `null`. But in the
// second, we'll pass a js.Object which represents
// null.
// If we don't have this code, both cases will
// pass in `null` and the the doSomething won't
// be able to tell if `null` was explicitly passed
// or whether no parameter was passed.
if (comptime o.child == js.Value) {
return js.Value{
.context = self,
@@ -838,7 +838,6 @@ fn jsValueToStruct(self: *Context, comptime T: type, js_value: v8.Value) !?T {
return .{ .string = try self.valueToString(js_value, .{ .allocator = self.arena }) };
}
if (comptime T == js.Value) {
// Caller wants an opaque js.Object. Probably a parameter
// that it needs to pass back into a callback
@@ -1157,15 +1156,14 @@ pub fn stackTrace(self: *const Context) !?[]const u8 {
}
// == Promise Helpers ==
pub fn rejectPromise(self: *Context, value: anytype) js.Promise {
pub fn rejectPromise(self: *Context, value: anytype) !js.Promise {
const ctx = self.v8_context;
var resolver = v8.PromiseResolver.init(ctx);
if (self.zigValueToJs(value, .{})) |js_value| {
_ = resolver.reject(ctx, js_value);
} else |err| {
const str = self.isolate.initStringUtf8(@errorName(err));
_ = resolver.reject(ctx, str.toValue());
const js_value = try self.zigValueToJs(value, .{});
if (resolver.reject(ctx, js_value) == null) {
return error.FailedToResolvePromise;
}
self.runMicrotasks();
return resolver.getPromise();
}
@@ -1174,7 +1172,10 @@ pub fn resolvePromise(self: *Context, value: anytype) !js.Promise {
const js_value = try self.zigValueToJs(value, .{});
var resolver = v8.PromiseResolver.init(ctx);
_ = resolver.resolve(ctx, js_value);
if (resolver.resolve(ctx, js_value) == null) {
return error.FailedToResolvePromise;
}
self.runMicrotasks();
return resolver.getPromise();
}
@@ -1257,12 +1258,12 @@ pub fn dynamicModuleCallback(
const resource = self.jsStringToZigZ(.{ .handle = resource_name.? }, .{}) catch |err| {
log.err(.app, "OOM", .{ .err = err, .src = "dynamicModuleCallback1" });
return @constCast(self.rejectPromise("Out of memory").handle);
return @constCast((self.rejectPromise("Out of memory") catch return null).handle);
};
const specifier = self.jsStringToZigZ(.{ .handle = v8_specifier.? }, .{}) catch |err| {
log.err(.app, "OOM", .{ .err = err, .src = "dynamicModuleCallback2" });
return @constCast(self.rejectPromise("Out of memory").handle);
return @constCast((self.rejectPromise("Out of memory") catch return null).handle);
};
const normalized_specifier = self.script_manager.?.resolveSpecifier(
@@ -1271,14 +1272,14 @@ pub fn dynamicModuleCallback(
specifier,
) catch |err| {
log.err(.app, "OOM", .{ .err = err, .src = "dynamicModuleCallback3" });
return @constCast(self.rejectPromise("Out of memory").handle);
return @constCast((self.rejectPromise("Out of memory") catch return null).handle);
};
const promise = self._dynamicModuleCallback(normalized_specifier, resource) catch |err| blk: {
log.err(.js, "dynamic module callback", .{
.err = err,
});
break :blk self.rejectPromise("Failed to load module");
break :blk self.rejectPromise("Failed to load module") catch return null;
};
return @constCast(promise.handle);
}

View File

@@ -270,7 +270,10 @@ pub fn attachClass(comptime JsApi: type, isolate: v8.Isolate, template: v8.Funct
bridge.Iterator => {
// Same as a function, but with a specific name
const function_template = v8.FunctionTemplate.initCallback(isolate, value.func);
const js_name = v8.Symbol.getIterator(isolate).toName();
const js_name = if (value.async)
v8.Symbol.getAsyncIterator(isolate).toName()
else
v8.Symbol.getIterator(isolate).toName();
template_proto.set(js_name, function_template, v8.PropertyAttribute.None);
},
bridge.Property => {

View File

@@ -284,28 +284,36 @@ pub const NamedIndexed = struct {
pub const Iterator = struct {
func: *const fn (?*const v8.C_FunctionCallbackInfo) callconv(.c) void,
async: bool,
const Opts = struct {};
const Opts = struct {
async: bool = false,
};
fn init(comptime T: type, comptime struct_or_func: anytype, comptime opts: Opts) Iterator {
_ = opts;
if (@typeInfo(@TypeOf(struct_or_func)) == .type) {
return .{ .func = struct {
fn wrap(raw_info: ?*const v8.C_FunctionCallbackInfo) callconv(.c) void {
const info = v8.FunctionCallbackInfo.initFromV8(raw_info);
info.getReturnValue().set(info.getThis());
}
}.wrap };
return .{
.async = opts.async,
.func = struct {
fn wrap(raw_info: ?*const v8.C_FunctionCallbackInfo) callconv(.c) void {
const info = v8.FunctionCallbackInfo.initFromV8(raw_info);
info.getReturnValue().set(info.getThis());
}
}.wrap,
};
}
return .{ .func = struct {
fn wrap(raw_info: ?*const v8.C_FunctionCallbackInfo) callconv(.c) void {
const info = v8.FunctionCallbackInfo.initFromV8(raw_info);
var caller = Caller.init(info);
defer caller.deinit();
caller.method(T, struct_or_func, info, .{});
}
}.wrap };
return .{
.async = opts.async,
.func = struct {
fn wrap(raw_info: ?*const v8.C_FunctionCallbackInfo) callconv(.c) void {
const info = v8.FunctionCallbackInfo.initFromV8(raw_info);
var caller = Caller.init(info);
defer caller.deinit();
caller.method(T, struct_or_func, info, .{});
}
}.wrap,
};
}
};
@@ -564,6 +572,9 @@ pub const JsApis = flattenTypes(&.{
@import("../webapi/net/URLSearchParams.zig"),
@import("../webapi/net/XMLHttpRequest.zig"),
@import("../webapi/net/XMLHttpRequestEventTarget.zig"),
@import("../webapi/streams/ReadableStream.zig"),
@import("../webapi/streams/ReadableStreamDefaultReader.zig"),
@import("../webapi/streams/ReadableStreamDefaultController.zig"),
@import("../webapi/Node.zig"),
@import("../webapi/storage/storage.zig"),
@import("../webapi/URL.zig"),

View File

@@ -0,0 +1,192 @@
<!DOCTYPE html>
<script src="../testing.js"></script>
<script id=readable_stream_basic>
{
// Test basic stream creation
const stream = new ReadableStream();
testing.expectEqual('object', typeof stream);
testing.expectEqual('function', typeof stream.getReader);
}
</script>
<script id=readable_stream_reader>
{
// Test getting a reader
const stream = new ReadableStream();
const reader = stream.getReader();
testing.expectEqual('object', typeof reader);
testing.expectEqual('function', typeof reader.read);
testing.expectEqual('function', typeof reader.releaseLock);
testing.expectEqual('function', typeof reader.cancel);
}
</script>
<script id=readable_stream_read_empty>
(async function() {
const stream = new ReadableStream();
const reader = stream.getReader();
// Reading from empty stream should return done:true
const result = await reader.read();
testing.expectEqual('object', typeof result);
testing.expectEqual(true, result.done);
})();
</script>
<script id=response_body>
(async function() {
const response = new Response('hello world');
testing.expectEqual('object', typeof response.body);
const reader = response.body.getReader();
const result = await reader.read();
testing.expectEqual(false, result.done);
testing.expectEqual(true, result.value instanceof Uint8Array);
// Convert Uint8Array to string
const decoder = new TextDecoder();
const text = decoder.decode(result.value);
testing.expectEqual('hello world', text);
// Next read should be done
const result2 = await reader.read();
testing.expectEqual(true, result2.done);
})();
</script>
<script id=response_text>
(async function() {
const response = new Response('hello from text()');
const text = await response.text();
testing.expectEqual('hello from text()', text);
})();
</script>
<script id=response_json>
(async function() {
const response = new Response('{"foo":"bar","num":42}');
const json = await response.json();
testing.expectEqual('object', typeof json);
testing.expectEqual('bar', json.foo);
testing.expectEqual(42, json.num);
})();
</script>
<script id=response_null_body>
{
// Response with no body should have null body
const response = new Response();
testing.expectEqual(null, response.body);
}
</script>
<script id=response_empty_string_body>
(async function() {
// Response with empty string should have a stream that's immediately closed
const response = new Response('');
testing.expectEqual('object', typeof response.body);
const reader = response.body.getReader();
const result = await reader.read();
// Stream should be closed immediately (done: true, no value)
testing.expectEqual(true, result.done);
})();
</script>
<script id=response_status>
(async function() {
const response = new Response('test', { status: 404 });
testing.expectEqual(404, response.status);
testing.expectEqual(false, response.ok);
const text = await response.text();
testing.expectEqual('test', text);
})();
</script>
<script id=async_iterator_exists>
(async function() {
const stream = new ReadableStream();
testing.expectEqual('function', typeof stream[Symbol.asyncIterator]);
})();
</script>
<script id=async_iterator_basic>
(async function() {
const stream = new ReadableStream();
const iterator = stream[Symbol.asyncIterator]();
testing.expectEqual('object', typeof iterator);
testing.expectEqual('function', typeof iterator.next);
})();
</script>
<script id=async_iterator_for_await>
(async function() {
const response = new Response('test data');
const stream = response.body;
const chunks = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
testing.expectEqual(1, chunks.length);
testing.expectEqual(true, chunks[0] instanceof Uint8Array);
const decoder = new TextDecoder();
const text = decoder.decode(chunks[0]);
testing.expectEqual('test data', text);
})();
</script>
<script id=async_iterator_locks_stream>
(async function() {
const response = new Response('test');
const stream = response.body;
// Get async iterator (locks stream)
const iterator = stream[Symbol.asyncIterator]();
// Try to get reader - should fail
let errorThrown = false;
try {
stream.getReader();
} catch (e) {
errorThrown = true;
}
testing.expectEqual(true, errorThrown);
})();
</script>
<script id=async_iterator_manual_next>
(async function() {
const response = new Response('hello');
const stream = response.body;
const iterator = stream[Symbol.asyncIterator]();
const result = await iterator.next();
testing.expectEqual('object', typeof result);
testing.expectEqual(false, result.done);
testing.expectEqual(true, result.value instanceof Uint8Array);
// Second call should be done
const result2 = await iterator.next();
testing.expectEqual(true, result2.done);
})();
</script>
<script id=async_iterator_early_break>
(async function() {
const response = new Response('test data');
const stream = response.body;
for await (const chunk of stream) {
break; // Early exit
}
// Should not throw errors
testing.expectEqual('object', typeof stream);
})();
</script>

View File

@@ -37,7 +37,6 @@ pub fn init(page: *Page) !*MessageChannel {
});
}
pub fn getPort1(self: *const MessageChannel) *MessagePort {
return self._port1;
}

View File

@@ -134,7 +134,7 @@ const PostMessageCallback = struct {
.origin = "",
.source = null,
}, self.page) catch |err| {
log.err(.dom, "MessagePort.postMessage", .{.err = err});
log.err(.dom, "MessagePort.postMessage", .{ .err = err });
return null;
};
@@ -144,7 +144,7 @@ const PostMessageCallback = struct {
self.port._on_message,
.{ .context = "MessagePort message" },
) catch |err| {
log.err(.dom, "MessagePort.postMessage", .{.err = err});
log.err(.dom, "MessagePort.postMessage", .{ .err = err });
};
return null;

View File

@@ -265,13 +265,12 @@ pub fn postMessage(self: *Window, message: js.Object, target_origin: ?[]const u8
const origin = try self._location.getOrigin(page);
const callback = try page._factory.create(PostMessageCallback{
.window = self,
.message = try message.persist() ,
.message = try message.persist(),
.origin = try page.arena.dupe(u8, origin),
.page = page,
});
errdefer page._factory.destroy(callback);
try page.scheduler.add(callback, PostMessageCallback.run, 0, .{
.name = "postMessage",
.low_priority = false,

View File

@@ -21,6 +21,7 @@ const js = @import("../../js/js.zig");
const Page = @import("../../Page.zig");
const Headers = @import("Headers.zig");
const ReadableStream = @import("../streams/ReadableStream.zig");
const Allocator = std.mem.Allocator;
const Response = @This();
@@ -28,7 +29,7 @@ const Response = @This();
_status: u16,
_arena: Allocator,
_headers: *Headers,
_body: []const u8,
_body: ?[]const u8,
const InitOpts = struct {
status: u16 = 200,
@@ -39,10 +40,13 @@ const InitOpts = struct {
pub fn init(body_: ?[]const u8, opts_: ?InitOpts, page: *Page) !*Response {
const opts = opts_ orelse InitOpts{};
// Store empty string as empty string, not null
const body = if (body_) |b| try page.arena.dupe(u8, b) else null;
return page._factory.create(Response{
._arena = page.arena,
._status = opts.status,
._body = if (body_) |b| try page.arena.dupe(u8, b) else "",
._body = body,
._headers = opts.headers orelse try Headers.init(page),
});
}
@@ -55,15 +59,34 @@ pub fn getHeaders(self: *const Response) *Headers {
return self._headers;
}
pub fn getBody(self: *const Response, page: *Page) !?*ReadableStream {
const body = self._body orelse return null;
// Empty string should create a closed stream with no data
if (body.len == 0) {
const stream = try ReadableStream.init(page);
try stream._controller.close();
return stream;
}
return ReadableStream.initWithData(body, page);
}
pub fn isOK(self: *const Response) bool {
return self._status >= 200 and self._status <= 299;
}
pub fn getText(self: *const Response, page: *Page) !js.Promise {
const body = self._body orelse "";
return page.js.resolvePromise(body);
}
pub fn getJson(self: *Response, page: *Page) !js.Promise {
const body = self._body orelse "";
const value = std.json.parseFromSliceLeaky(
std.json.Value,
page.call_arena,
self._body,
body,
.{},
) catch |err| {
return page.js.rejectPromise(.{@errorName(err)});
@@ -83,6 +106,8 @@ pub const JsApi = struct {
pub const constructor = bridge.constructor(Response.init, .{});
pub const ok = bridge.accessor(Response.isOK, null, .{});
pub const status = bridge.accessor(Response.getStatus, null, .{});
pub const text = bridge.function(Response.getText, .{});
pub const json = bridge.function(Response.getJson, .{});
pub const headers = bridge.accessor(Response.getHeaders, null, .{});
pub const body = bridge.accessor(Response.getBody, null, .{});
};

View File

@@ -45,13 +45,13 @@ pub fn init(opts_: ?InitOpts, page: *Page) !*URLSearchParams {
.query_string => |qs| break :blk try paramsFromString(arena, qs, &page.buf),
.value => |js_val| {
if (js_val.isObject()) {
break :blk try paramsFromObject(arena, js_val.toObject());
break :blk try paramsFromObject(arena, js_val.toObject());
}
if (js_val.isString()) {
break :blk try paramsFromString(arena, try js_val.toString(arena), &page.buf);
}
return error.InvalidArgument;
}
},
}
};

View File

@@ -0,0 +1,140 @@
// Copyright (C) 2023-2025 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const js = @import("../../js/js.zig");
const Page = @import("../../Page.zig");
const ReadableStreamDefaultReader = @import("ReadableStreamDefaultReader.zig");
const ReadableStreamDefaultController = @import("ReadableStreamDefaultController.zig");
pub fn registerTypes() []const type {
return &.{
ReadableStream,
AsyncIterator,
};
}
const ReadableStream = @This();
pub const State = enum {
readable,
closed,
errored,
};
_page: *Page,
_state: State,
_reader: ?*ReadableStreamDefaultReader,
_controller: *ReadableStreamDefaultController,
_stored_error: ?[]const u8,
pub fn init(page: *Page) !*ReadableStream {
const stream = try page._factory.create(ReadableStream{
._page = page,
._state = .readable,
._reader = null,
._controller = undefined,
._stored_error = null,
});
stream._controller = try ReadableStreamDefaultController.init(stream, page);
return stream;
}
pub fn initWithData(data: []const u8, page: *Page) !*ReadableStream {
const stream = try init(page);
// For Phase 1: immediately enqueue all data and close
try stream._controller.enqueue(data);
try stream._controller.close();
return stream;
}
pub fn getReader(self: *ReadableStream, page: *Page) !*ReadableStreamDefaultReader {
if (self._reader != null) {
return error.ReaderLocked;
}
const reader = try ReadableStreamDefaultReader.init(self, page);
self._reader = reader;
return reader;
}
pub fn releaseReader(self: *ReadableStream) void {
self._reader = null;
}
pub fn getAsyncIterator(self: *ReadableStream, page: *Page) !*AsyncIterator {
return AsyncIterator.init(self, page);
}
pub const JsApi = struct {
pub const bridge = js.Bridge(ReadableStream);
pub const Meta = struct {
pub const name = "ReadableStream";
pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined;
};
pub const constructor = bridge.constructor(ReadableStream.init, .{});
pub const getReader = bridge.function(ReadableStream.getReader, .{});
pub const symbol_async_iterator = bridge.iterator(ReadableStream.getAsyncIterator, .{ .async = true });
};
pub const AsyncIterator = struct {
_stream: *ReadableStream,
_reader: *ReadableStreamDefaultReader,
pub fn init(stream: *ReadableStream, page: *Page) !*AsyncIterator {
const reader = try stream.getReader(page);
return page._factory.create(AsyncIterator{
._reader = reader,
._stream = stream,
});
}
pub fn next(self: *AsyncIterator, page: *Page) !js.Promise {
return self._reader.read(page);
}
pub fn @"return"(self: *AsyncIterator, page: *Page) !js.Promise {
self._reader.releaseLock();
return page.js.resolvePromise(.{ .done = true, .value = null });
}
pub const JsApi = struct {
pub const bridge = js.Bridge(ReadableStream.AsyncIterator);
pub const Meta = struct {
pub const name = "ReadableStreamAsyncIterator";
pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined;
};
pub const next = bridge.function(ReadableStream.AsyncIterator.next, .{});
pub const @"return" = bridge.function(ReadableStream.AsyncIterator.@"return", .{});
};
};
const testing = @import("../../../testing.zig");
test "WebApi: ReadableStream" {
try testing.htmlRunner("streams/readable_stream.html", .{});
}

View File

@@ -0,0 +1,100 @@
// Copyright (C) 2023-2025 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const js = @import("../../js/js.zig");
const Page = @import("../../Page.zig");
const ReadableStream = @import("ReadableStream.zig");
const ReadableStreamDefaultController = @This();
_page: *Page,
_stream: *ReadableStream,
_arena: std.mem.Allocator,
_queue: std.ArrayList([]const u8),
pub fn init(stream: *ReadableStream, page: *Page) !*ReadableStreamDefaultController {
return page._factory.create(ReadableStreamDefaultController{
._page = page,
._stream = stream,
._arena = page.arena,
._queue = std.ArrayList([]const u8){},
});
}
pub fn enqueue(self: *ReadableStreamDefaultController, chunk: []const u8) !void {
if (self._stream._state != .readable) {
return error.StreamNotReadable;
}
// Store a copy of the chunk in the page arena
const chunk_copy = try self._page.arena.dupe(u8, chunk);
try self._queue.append(self._arena, chunk_copy);
}
pub fn close(self: *ReadableStreamDefaultController) !void {
if (self._stream._state != .readable) {
return error.StreamNotReadable;
}
self._stream._state = .closed;
}
pub fn doError(self: *ReadableStreamDefaultController, err: []const u8) !void {
if (self._stream._state != .readable) {
return;
}
self._stream._state = .errored;
self._stream._stored_error = try self._page.arena.dupe(u8, err);
}
pub fn dequeue(self: *ReadableStreamDefaultController) ?[]const u8 {
if (self._queue.items.len == 0) {
return null;
}
return self._queue.orderedRemove(0);
}
pub fn getDesiredSize(self: *const ReadableStreamDefaultController) ?i32 {
switch (self._stream._state) {
.errored => return null,
.closed => return 0,
.readable => {
// For now, just report based on queue size
// In a real implementation, this would use highWaterMark
return @as(i32, 1) - @as(i32, @intCast(self._queue.items.len));
},
}
}
pub const JsApi = struct {
pub const bridge = js.Bridge(ReadableStreamDefaultController);
pub const Meta = struct {
pub const name = "ReadableStreamDefaultController";
pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined;
};
pub const enqueue = bridge.function(ReadableStreamDefaultController.enqueue, .{});
pub const close = bridge.function(ReadableStreamDefaultController.close, .{});
pub const @"error" = bridge.function(ReadableStreamDefaultController.doError, .{});
pub const desiredSize = bridge.accessor(ReadableStreamDefaultController.getDesiredSize, null, .{});
};

View File

@@ -0,0 +1,107 @@
// Copyright (C) 2023-2025 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const js = @import("../../js/js.zig");
const Page = @import("../../Page.zig");
const ReadableStream = @import("ReadableStream.zig");
const ReadableStreamDefaultReader = @This();
_page: *Page,
_stream: ?*ReadableStream,
pub fn init(stream: *ReadableStream, page: *Page) !*ReadableStreamDefaultReader {
return page._factory.create(ReadableStreamDefaultReader{
._stream = stream,
._page = page,
});
}
pub const ReadResult = struct {
done: bool,
value: ?js.TypedArray(u8),
};
pub fn read(self: *ReadableStreamDefaultReader, page: *Page) !js.Promise {
const stream = self._stream orelse {
return page.js.rejectPromise("Reader has been released");
};
if (stream._state == .errored) {
const err = stream._stored_error orelse "Stream errored";
return page.js.rejectPromise(err);
}
if (stream._controller.dequeue()) |chunk| {
const result = ReadResult{
.done = false,
.value = js.TypedArray(u8){ .values = chunk },
};
return page.js.resolvePromise(result);
}
if (stream._state == .closed) {
const result = ReadResult{
.value = null,
.done = true,
};
return page.js.resolvePromise(result);
}
const result = ReadResult{
.done = true,
.value = null,
};
return page.js.resolvePromise(result);
}
pub fn releaseLock(self: *ReadableStreamDefaultReader) void {
if (self._stream) |stream| {
stream.releaseReader();
self._stream = null;
}
}
pub fn cancel(self: *ReadableStreamDefaultReader, reason_: ?[]const u8, page: *Page) !js.Promise {
const stream = self._stream orelse {
return page.js.rejectPromise("Reader has been released");
};
const reason = reason_ orelse "canceled";
try stream._controller.doError(reason);
self.releaseLock();
return page.js.resolvePromise(.{});
}
pub const JsApi = struct {
pub const bridge = js.Bridge(ReadableStreamDefaultReader);
pub const Meta = struct {
pub const name = "ReadableStreamDefaultReader";
pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined;
};
pub const read = bridge.function(ReadableStreamDefaultReader.read, .{});
pub const cancel = bridge.function(ReadableStreamDefaultReader.cancel, .{});
pub const releaseLock = bridge.function(ReadableStreamDefaultReader.releaseLock, .{});
};