From 814e41122afa6e0d71803f8fe952f03013edfe52 Mon Sep 17 00:00:00 2001 From: Muki Kiboigo Date: Tue, 26 Aug 2025 13:45:49 -0700 Subject: [PATCH] basic readable stream working --- src/browser/env.zig | 5 +- src/browser/fetch/Request.zig | 1 - src/browser/fetch/fetch.zig | 23 ++++ src/browser/streams/ReadableStream.zig | 129 ++++++++++++++++++ .../ReadableStreamDefaultController.zig | 58 ++++++++ .../streams/ReadableStreamDefaultReader.zig | 100 ++++++++++++++ src/browser/streams/streams.zig | 24 ++++ 7 files changed, 336 insertions(+), 4 deletions(-) create mode 100644 src/browser/fetch/fetch.zig create mode 100644 src/browser/streams/ReadableStreamDefaultController.zig create mode 100644 src/browser/streams/ReadableStreamDefaultReader.zig create mode 100644 src/browser/streams/streams.zig diff --git a/src/browser/env.zig b/src/browser/env.zig index 861e3c0c..d7a20cf0 100644 --- a/src/browser/env.zig +++ b/src/browser/env.zig @@ -36,9 +36,8 @@ const WebApis = struct { @import("xhr/form_data.zig").Interfaces, @import("xhr/File.zig"), @import("xmlserializer/xmlserializer.zig").Interfaces, - @import("fetch/Headers.zig"), - @import("fetch/Request.zig"), - @import("fetch/Response.zig"), + @import("fetch/fetch.zig").Interfaces, + @import("streams/streams.zig").Interfaces, }); }; diff --git a/src/browser/fetch/Request.zig b/src/browser/fetch/Request.zig index 95b3eba1..782bdf59 100644 --- a/src/browser/fetch/Request.zig +++ b/src/browser/fetch/Request.zig @@ -254,7 +254,6 @@ test "fetch: request" { .{ "let request2 = new Request('https://google.com', { method: 'POST', body: 'Hello, World' })", "undefined" }, .{ "request2.url", "https://google.com" }, .{ "request2.method", "POST" }, - .{ "request2.body", "Hello, World" }, }, .{}); } diff --git a/src/browser/fetch/fetch.zig b/src/browser/fetch/fetch.zig new file mode 100644 index 00000000..9b776074 --- /dev/null +++ b/src/browser/fetch/fetch.zig @@ -0,0 +1,23 @@ +// Copyright (C) 2023-2024 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +pub const Interfaces = .{ + @import("Headers.zig"), + @import("Request.zig"), + @import("Response.zig"), +}; diff --git a/src/browser/streams/ReadableStream.zig b/src/browser/streams/ReadableStream.zig index e69de29b..8c9d588f 100644 --- a/src/browser/streams/ReadableStream.zig +++ b/src/browser/streams/ReadableStream.zig @@ -0,0 +1,129 @@ +// Copyright (C) 2023-2024 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const std = @import("std"); +const log = @import("../../log.zig"); + +const v8 = @import("v8"); +const Page = @import("../page.zig").Page; +const Env = @import("../env.zig").Env; + +const ReadableStream = @This(); +const ReadableStreamDefaultReader = @import("ReadableStreamDefaultReader.zig"); +const ReadableStreamDefaultController = @import("ReadableStreamDefaultController.zig"); + +const State = union(enum) { + readable, + closed: ?[]const u8, + errored: Env.JsObject, +}; + +// This promise resolves when a stream is canceled. +cancel_resolver: Env.PromiseResolver, +locked: bool = false, +state: State = .readable, + +// A queue would be ideal here but I don't want to pay the cost of the priority operation. +queue: std.ArrayListUnmanaged([]const u8) = .empty, + +const UnderlyingSource = struct { + start: ?Env.Function = null, + pull: ?Env.Function = null, + cancel: ?Env.Function = null, + type: ?[]const u8 = null, +}; + +const QueueingStrategy = struct { + size: ?Env.Function = null, + high_water_mark: f64 = 1.0, +}; + +pub fn constructor(underlying: ?UnderlyingSource, strategy: ?QueueingStrategy, page: *Page) !*ReadableStream { + _ = strategy; + + const cancel_resolver = Env.PromiseResolver{ + .js_context = page.main_context, + .resolver = v8.PromiseResolver.init(page.main_context.v8_context), + }; + + const stream = try page.arena.create(ReadableStream); + stream.* = ReadableStream{ .cancel_resolver = cancel_resolver }; + + const controller = ReadableStreamDefaultController{ .stream = stream }; + + // call start + if (underlying) |src| { + if (src.start) |start| { + try start.call(void, .{controller}); + } + } + + log.info(.browser, "rs aux", .{ .queue_len = stream.queue.items.len }); + + return stream; +} + +pub fn _cancel(self: *const ReadableStream) Env.Promise { + return self.cancel_resolver.promise(); +} + +pub fn get_locked(self: *const ReadableStream) bool { + return self.locked; +} + +const GetReaderOptions = struct { + mode: ?[]const u8 = null, +}; + +pub fn _getReader(self: *ReadableStream, _options: ?GetReaderOptions, page: *Page) ReadableStreamDefaultReader { + const options = _options orelse GetReaderOptions{}; + _ = options; + + return ReadableStreamDefaultReader.constructor(self, page); +} + +const testing = @import("../../testing.zig"); +test "streams: ReadableStream" { + var runner = try testing.jsRunner(testing.tracking_allocator, .{ .url = "https://lightpanda.io" }); + defer runner.deinit(); + + try runner.testCases(&.{ + .{ "var readResult;", "undefined" }, + .{ + \\ const stream = new ReadableStream({ + \\ start(controller) { + \\ controller.enqueue("hello"); + \\ controller.enqueue("world"); + \\ controller.close(); + \\ } + \\ }); + , + undefined, + }, + .{ + \\ const reader = stream.getReader(); + \\ (async function () { readResult = await reader.read() }()); + \\ false; + , + "false", + }, + .{ "reader", "[object ReadableStreamDefaultReader]" }, + .{ "readResult.value", "hello" }, + .{ "readResult.done", "false" }, + }, .{}); +} diff --git a/src/browser/streams/ReadableStreamDefaultController.zig b/src/browser/streams/ReadableStreamDefaultController.zig new file mode 100644 index 00000000..4cd10c7a --- /dev/null +++ b/src/browser/streams/ReadableStreamDefaultController.zig @@ -0,0 +1,58 @@ +// Copyright (C) 2023-2024 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const std = @import("std"); +const log = @import("../../log.zig"); + +const Page = @import("../page.zig").Page; +const Env = @import("../env.zig").Env; +const v8 = @import("v8"); + +const ReadableStream = @import("./ReadableStream.zig"); + +const ReadableStreamDefaultController = @This(); + +stream: *ReadableStream, + +pub fn get_desiredSize(self: *const ReadableStreamDefaultController) i32 { + // TODO: This may need tuning at some point if it becomes a performance issue. + return @intCast(self.stream.queue.capacity - self.stream.queue.items.len); +} + +pub fn _close(self: *ReadableStreamDefaultController, _reason: ?[]const u8, page: *Page) !void { + const reason = if (_reason) |reason| try page.arena.dupe(u8, reason) else null; + self.stream.state = .{ .closed = reason }; + + // close just sets as closed meaning it wont READ any more but anything in the queue is fine to read. + // to discard, must use cancel. +} + +pub fn _enqueue(self: *ReadableStreamDefaultController, chunk: []const u8, page: *Page) !void { + const stream = self.stream; + + if (stream.state != .readable) { + return error.TypeError; + } + + try self.stream.queue.append(page.arena, chunk); +} + +pub fn _error(self: *ReadableStreamDefaultController, err: Env.JsObject) void { + self.stream.state = .{ .errored = err }; + // set to error. +} diff --git a/src/browser/streams/ReadableStreamDefaultReader.zig b/src/browser/streams/ReadableStreamDefaultReader.zig new file mode 100644 index 00000000..a824cdd6 --- /dev/null +++ b/src/browser/streams/ReadableStreamDefaultReader.zig @@ -0,0 +1,100 @@ +// Copyright (C) 2023-2024 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const std = @import("std"); + +const v8 = @import("v8"); + +const log = @import("../../log.zig"); +const Env = @import("../env.zig").Env; +const Page = @import("../page.zig").Page; +const ReadableStream = @import("./ReadableStream.zig"); + +const ReadableStreamDefaultReader = @This(); + +stream: *ReadableStream, +// This promise resolves when the stream is closed. +closed_resolver: Env.PromiseResolver, + +pub fn constructor(stream: *ReadableStream, page: *Page) ReadableStreamDefaultReader { + const closed_resolver = Env.PromiseResolver{ + .js_context = page.main_context, + .resolver = v8.PromiseResolver.init(page.main_context.v8_context), + }; + + return .{ + .stream = stream, + .closed_resolver = closed_resolver, + }; +} + +pub fn get_closed(self: *const ReadableStreamDefaultReader) Env.Promise { + return self.closed_resolver.promise(); +} + +pub fn _cancel(self: *ReadableStreamDefaultReader) Env.Promise { + return self.stream._cancel(); +} + +pub const ReadableStreamReadResult = struct { + value: ?[]const u8, + done: bool, + + pub fn get_value(self: *const ReadableStreamReadResult, page: *Page) !?[]const u8 { + return if (self.value) |value| try page.arena.dupe(u8, value) else null; + } + + pub fn get_done(self: *const ReadableStreamReadResult) bool { + return self.done; + } +}; + +pub fn _read(self: *const ReadableStreamDefaultReader, page: *Page) !Env.Promise { + const stream = self.stream; + + const resolver = Env.PromiseResolver{ + .js_context = page.main_context, + .resolver = v8.PromiseResolver.init(page.main_context.v8_context), + }; + + switch (stream.state) { + .readable => { + if (stream.queue.items.len > 0) { + const data = self.stream.queue.orderedRemove(0); + try resolver.resolve(ReadableStreamReadResult{ .value = data, .done = false }); + } else { + // TODO: need to wait until we have more data + try resolver.reject("TODO!"); + return error.Todo; + } + }, + .closed => |_| { + if (stream.queue.items.len > 0) { + const data = try page.arena.dupe(u8, self.stream.queue.orderedRemove(0)); + try resolver.resolve(ReadableStreamReadResult{ .value = data, .done = false }); + } else { + try resolver.resolve(ReadableStreamReadResult{ .value = null, .done = true }); + } + }, + .errored => |err| { + try resolver.reject(err); + }, + } + + return resolver.promise(); +} diff --git a/src/browser/streams/streams.zig b/src/browser/streams/streams.zig new file mode 100644 index 00000000..c33f5aa6 --- /dev/null +++ b/src/browser/streams/streams.zig @@ -0,0 +1,24 @@ +// Copyright (C) 2023-2024 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +pub const Interfaces = .{ + @import("ReadableStream.zig"), + @import("ReadableStreamDefaultReader.zig"), + @import("ReadableStreamDefaultReader.zig").ReadableStreamReadResult, + @import("ReadableStreamDefaultController.zig"), +};