add reference counting for ReadableStream

This commit is contained in:
Pierre Tachoire
2026-03-03 18:39:44 +01:00
parent 8e8a1a7541
commit 812ad3f49e
3 changed files with 65 additions and 6 deletions

View File

@@ -47,13 +47,13 @@ _page: *Page,
_state: State, _state: State,
_reader: ?*ReadableStreamDefaultReader, _reader: ?*ReadableStreamDefaultReader,
_controller: *ReadableStreamDefaultController, _controller: *ReadableStreamDefaultController,
// The arena is used by the builtin controller.
_arena: std.mem.Allocator,
_stored_error: ?[]const u8, _stored_error: ?[]const u8,
_pull_fn: ?js.Function.Global = null, _pull_fn: ?js.Function.Global = null,
_pulling: bool = false, _pulling: bool = false,
_pull_again: bool = false, _pull_again: bool = false,
_cancel: ?Cancel = null, _cancel: ?Cancel = null,
_arena: std.mem.Allocator,
_rc: usize = 0,
const UnderlyingSource = struct { const UnderlyingSource = struct {
start: ?js.Function = null, start: ?js.Function = null,
@@ -70,17 +70,18 @@ const QueueingStrategy = struct {
pub fn init(src_: ?UnderlyingSource, strategy_: ?QueueingStrategy, page: *Page) !*ReadableStream { pub fn init(src_: ?UnderlyingSource, strategy_: ?QueueingStrategy, page: *Page) !*ReadableStream {
const strategy: QueueingStrategy = strategy_ orelse .{}; const strategy: QueueingStrategy = strategy_ orelse .{};
const arena = try page.getArena(.{ .debug = "Animation" }); const arena = try page.getArena(.{ .debug = "ReadableStream" });
errdefer page.releaseArena(arena); errdefer page.releaseArena(arena);
const self = try page._factory.create(ReadableStream{ const self = try arena.create(ReadableStream);
self.* = .{
._page = page, ._page = page,
._state = .readable, ._state = .readable,
._arena = arena, ._arena = arena,
._reader = null, ._reader = null,
._controller = undefined, ._controller = undefined,
._stored_error = null, ._stored_error = null,
}); };
self._controller = try ReadableStreamDefaultController.init(self, strategy.highWaterMark, page); self._controller = try ReadableStreamDefaultController.init(self, strategy.highWaterMark, page);
@@ -115,7 +116,20 @@ pub fn initWithData(data: []const u8, page: *Page) !*ReadableStream {
} }
pub fn deinit(self: *ReadableStream, _: bool, page: *Page) void { pub fn deinit(self: *ReadableStream, _: bool, page: *Page) void {
page.releaseArena(self._arena); const rc = self._rc;
if (comptime IS_DEBUG) {
std.debug.assert(rc != 0);
}
if (rc == 1) {
page.releaseArena(self._arena);
} else {
self._rc = rc - 1;
}
}
pub fn acquireRef(self: *ReadableStream) void {
self._rc += 1;
} }
pub fn getReader(self: *ReadableStream, page: *Page) !*ReadableStreamDefaultReader { pub fn getReader(self: *ReadableStream, page: *Page) !*ReadableStreamDefaultReader {
@@ -130,6 +144,12 @@ pub fn getReader(self: *ReadableStream, page: *Page) !*ReadableStreamDefaultRead
pub fn releaseReader(self: *ReadableStream) void { pub fn releaseReader(self: *ReadableStream) void {
self._reader = null; self._reader = null;
const rc = self._rc;
if (comptime IS_DEBUG) {
std.debug.assert(rc != 0);
}
self._rc = rc - 1;
} }
pub fn getAsyncIterator(self: *ReadableStream, page: *Page) !*AsyncIterator { pub fn getAsyncIterator(self: *ReadableStream, page: *Page) !*AsyncIterator {
@@ -402,6 +422,14 @@ pub const AsyncIterator = struct {
}); });
} }
pub fn acquireRef(self: *AsyncIterator) void {
self._stream.acquireRef();
}
pub fn deinit(self: *AsyncIterator, shutdown: bool, page: *Page) void {
self._stream.deinit(shutdown, page);
}
pub fn next(self: *AsyncIterator, page: *Page) !js.Promise { pub fn next(self: *AsyncIterator, page: *Page) !js.Promise {
return self._reader.read(page); return self._reader.read(page);
} }
@@ -418,6 +446,8 @@ pub const AsyncIterator = struct {
pub const name = "ReadableStreamAsyncIterator"; pub const name = "ReadableStreamAsyncIterator";
pub const prototype_chain = bridge.prototypeChain(); pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined; pub var class_id: bridge.ClassId = undefined;
pub const weak = true;
pub const finalizer = bridge.finalizer(AsyncIterator.deinit);
}; };
pub const next = bridge.function(ReadableStream.AsyncIterator.next, .{}); pub const next = bridge.function(ReadableStream.AsyncIterator.next, .{});

View File

@@ -62,6 +62,14 @@ pub fn init(stream: *ReadableStream, high_water_mark: u32, page: *Page) !*Readab
}); });
} }
pub fn acquireRef(self: *ReadableStreamDefaultController) void {
self._stream.acquireRef();
}
pub fn deinit(self: *ReadableStreamDefaultController, shutdown: bool, page: *Page) void {
self._stream.deinit(shutdown, page);
}
pub fn addPendingRead(self: *ReadableStreamDefaultController, page: *Page) !js.Promise { pub fn addPendingRead(self: *ReadableStreamDefaultController, page: *Page) !js.Promise {
const resolver = page.js.local.?.createPromiseResolver(); const resolver = page.js.local.?.createPromiseResolver();
try self._pending_reads.append(self._stream._arena, try resolver.persist()); try self._pending_reads.append(self._stream._arena, try resolver.persist());
@@ -210,6 +218,8 @@ pub const JsApi = struct {
pub const name = "ReadableStreamDefaultController"; pub const name = "ReadableStreamDefaultController";
pub const prototype_chain = bridge.prototypeChain(); pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined; pub var class_id: bridge.ClassId = undefined;
pub const weak = true;
pub const finalizer = bridge.finalizer(ReadableStreamDefaultController.deinit);
}; };
pub const enqueue = bridge.function(ReadableStreamDefaultController.enqueueValue, .{}); pub const enqueue = bridge.function(ReadableStreamDefaultController.enqueueValue, .{});

View File

@@ -19,6 +19,8 @@
const std = @import("std"); const std = @import("std");
const js = @import("../../js/js.zig"); const js = @import("../../js/js.zig");
const IS_DEBUG = @import("builtin").mode == .Debug;
const Page = @import("../../Page.zig"); const Page = @import("../../Page.zig");
const ReadableStream = @import("ReadableStream.zig"); const ReadableStream = @import("ReadableStream.zig");
const ReadableStreamDefaultController = @import("ReadableStreamDefaultController.zig"); const ReadableStreamDefaultController = @import("ReadableStreamDefaultController.zig");
@@ -35,6 +37,21 @@ pub fn init(stream: *ReadableStream, page: *Page) !*ReadableStreamDefaultReader
}); });
} }
pub fn acquireRef(self: *ReadableStreamDefaultReader) void {
const stream = self._stream orelse {
if (comptime IS_DEBUG) {
std.debug.assert(false);
}
return;
};
stream.acquireRef();
}
pub fn deinit(self: *ReadableStreamDefaultReader, shutdown: bool, page: *Page) void {
const stream = self._stream orelse return;
stream.deinit(shutdown, page);
}
pub const ReadResult = struct { pub const ReadResult = struct {
done: bool, done: bool,
value: Chunk, value: Chunk,
@@ -110,6 +127,8 @@ pub const JsApi = struct {
pub const name = "ReadableStreamDefaultReader"; pub const name = "ReadableStreamDefaultReader";
pub const prototype_chain = bridge.prototypeChain(); pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined; pub var class_id: bridge.ClassId = undefined;
pub const weak = true;
pub const finalizer = bridge.finalizer(ReadableStreamDefaultReader.deinit);
}; };
pub const read = bridge.function(ReadableStreamDefaultReader.read, .{}); pub const read = bridge.function(ReadableStreamDefaultReader.read, .{});