basic readable stream working

This commit is contained in:
Muki Kiboigo
2025-08-26 13:45:49 -07:00
parent ab60f64452
commit 1d7e731034
7 changed files with 336 additions and 4 deletions

View File

@@ -36,9 +36,8 @@ const WebApis = struct {
@import("xhr/form_data.zig").Interfaces,
@import("xhr/File.zig"),
@import("xmlserializer/xmlserializer.zig").Interfaces,
@import("fetch/Headers.zig"),
@import("fetch/Request.zig"),
@import("fetch/Response.zig"),
@import("fetch/fetch.zig").Interfaces,
@import("streams/streams.zig").Interfaces,
});
};

View File

@@ -254,7 +254,6 @@ test "fetch: request" {
.{ "let request2 = new Request('https://google.com', { method: 'POST', body: 'Hello, World' })", "undefined" },
.{ "request2.url", "https://google.com" },
.{ "request2.method", "POST" },
.{ "request2.body", "Hello, World" },
}, .{});
}

View File

@@ -0,0 +1,23 @@
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
pub const Interfaces = .{
@import("Headers.zig"),
@import("Request.zig"),
@import("Response.zig"),
};

View File

@@ -0,0 +1,129 @@
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const log = @import("../../log.zig");
const v8 = @import("v8");
const Page = @import("../page.zig").Page;
const Env = @import("../env.zig").Env;
const ReadableStream = @This();
const ReadableStreamDefaultReader = @import("ReadableStreamDefaultReader.zig");
const ReadableStreamDefaultController = @import("ReadableStreamDefaultController.zig");
const State = union(enum) {
readable,
closed: ?[]const u8,
errored: Env.JsObject,
};
// This promise resolves when a stream is canceled.
cancel_resolver: Env.PromiseResolver,
locked: bool = false,
state: State = .readable,
// A queue would be ideal here but I don't want to pay the cost of the priority operation.
queue: std.ArrayListUnmanaged([]const u8) = .empty,
const UnderlyingSource = struct {
start: ?Env.Function = null,
pull: ?Env.Function = null,
cancel: ?Env.Function = null,
type: ?[]const u8 = null,
};
const QueueingStrategy = struct {
size: ?Env.Function = null,
high_water_mark: f64 = 1.0,
};
pub fn constructor(underlying: ?UnderlyingSource, strategy: ?QueueingStrategy, page: *Page) !*ReadableStream {
_ = strategy;
const cancel_resolver = Env.PromiseResolver{
.js_context = page.main_context,
.resolver = v8.PromiseResolver.init(page.main_context.v8_context),
};
const stream = try page.arena.create(ReadableStream);
stream.* = ReadableStream{ .cancel_resolver = cancel_resolver };
const controller = ReadableStreamDefaultController{ .stream = stream };
// call start
if (underlying) |src| {
if (src.start) |start| {
try start.call(void, .{controller});
}
}
log.info(.browser, "rs aux", .{ .queue_len = stream.queue.items.len });
return stream;
}
pub fn _cancel(self: *const ReadableStream) Env.Promise {
return self.cancel_resolver.promise();
}
pub fn get_locked(self: *const ReadableStream) bool {
return self.locked;
}
const GetReaderOptions = struct {
mode: ?[]const u8 = null,
};
pub fn _getReader(self: *ReadableStream, _options: ?GetReaderOptions, page: *Page) ReadableStreamDefaultReader {
const options = _options orelse GetReaderOptions{};
_ = options;
return ReadableStreamDefaultReader.constructor(self, page);
}
const testing = @import("../../testing.zig");
test "streams: ReadableStream" {
var runner = try testing.jsRunner(testing.tracking_allocator, .{ .url = "https://lightpanda.io" });
defer runner.deinit();
try runner.testCases(&.{
.{ "var readResult;", "undefined" },
.{
\\ const stream = new ReadableStream({
\\ start(controller) {
\\ controller.enqueue("hello");
\\ controller.enqueue("world");
\\ controller.close();
\\ }
\\ });
,
undefined,
},
.{
\\ const reader = stream.getReader();
\\ (async function () { readResult = await reader.read() }());
\\ false;
,
"false",
},
.{ "reader", "[object ReadableStreamDefaultReader]" },
.{ "readResult.value", "hello" },
.{ "readResult.done", "false" },
}, .{});
}

View File

@@ -0,0 +1,58 @@
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const log = @import("../../log.zig");
const Page = @import("../page.zig").Page;
const Env = @import("../env.zig").Env;
const v8 = @import("v8");
const ReadableStream = @import("./ReadableStream.zig");
const ReadableStreamDefaultController = @This();
stream: *ReadableStream,
pub fn get_desiredSize(self: *const ReadableStreamDefaultController) i32 {
// TODO: This may need tuning at some point if it becomes a performance issue.
return @intCast(self.stream.queue.capacity - self.stream.queue.items.len);
}
pub fn _close(self: *ReadableStreamDefaultController, _reason: ?[]const u8, page: *Page) !void {
const reason = if (_reason) |reason| try page.arena.dupe(u8, reason) else null;
self.stream.state = .{ .closed = reason };
// close just sets as closed meaning it wont READ any more but anything in the queue is fine to read.
// to discard, must use cancel.
}
pub fn _enqueue(self: *ReadableStreamDefaultController, chunk: []const u8, page: *Page) !void {
const stream = self.stream;
if (stream.state != .readable) {
return error.TypeError;
}
try self.stream.queue.append(page.arena, chunk);
}
pub fn _error(self: *ReadableStreamDefaultController, err: Env.JsObject) void {
self.stream.state = .{ .errored = err };
// set to error.
}

View File

@@ -0,0 +1,100 @@
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const v8 = @import("v8");
const log = @import("../../log.zig");
const Env = @import("../env.zig").Env;
const Page = @import("../page.zig").Page;
const ReadableStream = @import("./ReadableStream.zig");
const ReadableStreamDefaultReader = @This();
stream: *ReadableStream,
// This promise resolves when the stream is closed.
closed_resolver: Env.PromiseResolver,
pub fn constructor(stream: *ReadableStream, page: *Page) ReadableStreamDefaultReader {
const closed_resolver = Env.PromiseResolver{
.js_context = page.main_context,
.resolver = v8.PromiseResolver.init(page.main_context.v8_context),
};
return .{
.stream = stream,
.closed_resolver = closed_resolver,
};
}
pub fn get_closed(self: *const ReadableStreamDefaultReader) Env.Promise {
return self.closed_resolver.promise();
}
pub fn _cancel(self: *ReadableStreamDefaultReader) Env.Promise {
return self.stream._cancel();
}
pub const ReadableStreamReadResult = struct {
value: ?[]const u8,
done: bool,
pub fn get_value(self: *const ReadableStreamReadResult, page: *Page) !?[]const u8 {
return if (self.value) |value| try page.arena.dupe(u8, value) else null;
}
pub fn get_done(self: *const ReadableStreamReadResult) bool {
return self.done;
}
};
pub fn _read(self: *const ReadableStreamDefaultReader, page: *Page) !Env.Promise {
const stream = self.stream;
const resolver = Env.PromiseResolver{
.js_context = page.main_context,
.resolver = v8.PromiseResolver.init(page.main_context.v8_context),
};
switch (stream.state) {
.readable => {
if (stream.queue.items.len > 0) {
const data = self.stream.queue.orderedRemove(0);
try resolver.resolve(ReadableStreamReadResult{ .value = data, .done = false });
} else {
// TODO: need to wait until we have more data
try resolver.reject("TODO!");
return error.Todo;
}
},
.closed => |_| {
if (stream.queue.items.len > 0) {
const data = try page.arena.dupe(u8, self.stream.queue.orderedRemove(0));
try resolver.resolve(ReadableStreamReadResult{ .value = data, .done = false });
} else {
try resolver.resolve(ReadableStreamReadResult{ .value = null, .done = true });
}
},
.errored => |err| {
try resolver.reject(err);
},
}
return resolver.promise();
}

View File

@@ -0,0 +1,24 @@
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
pub const Interfaces = .{
@import("ReadableStream.zig"),
@import("ReadableStreamDefaultReader.zig"),
@import("ReadableStreamDefaultReader.zig").ReadableStreamReadResult,
@import("ReadableStreamDefaultController.zig"),
};