mirror of
				https://github.com/lightpanda-io/browser.git
				synced 2025-10-30 15:41:48 +00:00 
			
		
		
		
	implement remaining ReadableStream functionality
This commit is contained in:
		| @@ -30,6 +30,7 @@ const ReadableStreamDefaultController = @import("ReadableStreamDefaultController | ||||
| const State = union(enum) { | ||||
|     readable, | ||||
|     closed: ?[]const u8, | ||||
|     cancelled: ?[]const u8, | ||||
|     errored: Env.JsObject, | ||||
| }; | ||||
|  | ||||
| @@ -38,8 +39,29 @@ cancel_resolver: v8.Persistent(v8.PromiseResolver), | ||||
| locked: bool = false, | ||||
| state: State = .readable, | ||||
|  | ||||
| cancel_fn: ?Env.Function = null, | ||||
| pull_fn: ?Env.Function = null, | ||||
|  | ||||
| strategy: QueueingStrategy, | ||||
| reader_resolver: ?v8.Persistent(v8.PromiseResolver) = null, | ||||
| queue: std.ArrayListUnmanaged([]const u8) = .empty, | ||||
|  | ||||
| pub const ReadableStreamReadResult = struct { | ||||
|     const ValueUnion = | ||||
|         union(enum) { data: []const u8, empty: void }; | ||||
|  | ||||
|     value: ValueUnion, | ||||
|     done: bool, | ||||
|  | ||||
|     pub fn get_value(self: *const ReadableStreamReadResult) ValueUnion { | ||||
|         return self.value; | ||||
|     } | ||||
|  | ||||
|     pub fn get_done(self: *const ReadableStreamReadResult) bool { | ||||
|         return self.done; | ||||
|     } | ||||
| }; | ||||
|  | ||||
| const UnderlyingSource = struct { | ||||
|     start: ?Env.Function = null, | ||||
|     pull: ?Env.Function = null, | ||||
| @@ -49,11 +71,11 @@ const UnderlyingSource = struct { | ||||
|  | ||||
| const QueueingStrategy = struct { | ||||
|     size: ?Env.Function = null, | ||||
|     high_water_mark: f64 = 1.0, | ||||
|     high_water_mark: u32 = 1, | ||||
| }; | ||||
|  | ||||
| pub fn constructor(underlying: ?UnderlyingSource, strategy: ?QueueingStrategy, page: *Page) !*ReadableStream { | ||||
|     _ = strategy; | ||||
| pub fn constructor(underlying: ?UnderlyingSource, _strategy: ?QueueingStrategy, page: *Page) !*ReadableStream { | ||||
|     const strategy: QueueingStrategy = _strategy orelse .{}; | ||||
|  | ||||
|     const cancel_resolver = v8.Persistent(v8.PromiseResolver).init( | ||||
|         page.main_context.isolate, | ||||
| @@ -61,7 +83,7 @@ pub fn constructor(underlying: ?UnderlyingSource, strategy: ?QueueingStrategy, p | ||||
|     ); | ||||
|  | ||||
|     const stream = try page.arena.create(ReadableStream); | ||||
|     stream.* = ReadableStream{ .cancel_resolver = cancel_resolver }; | ||||
|     stream.* = ReadableStream{ .cancel_resolver = cancel_resolver, .strategy = strategy }; | ||||
|  | ||||
|     const controller = ReadableStreamDefaultController{ .stream = stream }; | ||||
|  | ||||
| @@ -70,6 +92,15 @@ pub fn constructor(underlying: ?UnderlyingSource, strategy: ?QueueingStrategy, p | ||||
|         if (src.start) |start| { | ||||
|             try start.call(void, .{controller}); | ||||
|         } | ||||
|  | ||||
|         if (src.cancel) |cancel| { | ||||
|             stream.cancel_fn = cancel; | ||||
|         } | ||||
|  | ||||
|         if (src.pull) |pull| { | ||||
|             stream.pull_fn = pull; | ||||
|             try stream.pullIf(); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     return stream; | ||||
| @@ -79,7 +110,7 @@ pub fn get_locked(self: *const ReadableStream) bool { | ||||
|     return self.locked; | ||||
| } | ||||
|  | ||||
| pub fn _cancel(self: *const ReadableStream, page: *Page) !Env.Promise { | ||||
| pub fn _cancel(self: *ReadableStream, reason: ?[]const u8, page: *Page) !Env.Promise { | ||||
|     if (self.locked) { | ||||
|         return error.TypeError; | ||||
|     } | ||||
| @@ -89,9 +120,31 @@ pub fn _cancel(self: *const ReadableStream, page: *Page) !Env.Promise { | ||||
|         .resolver = self.cancel_resolver.castToPromiseResolver(), | ||||
|     }; | ||||
|  | ||||
|     self.state = .{ .cancelled = if (reason) |r| try page.arena.dupe(u8, r) else null }; | ||||
|  | ||||
|     // Call cancel callback. | ||||
|     if (self.cancel_fn) |cancel| { | ||||
|         if (reason) |r| { | ||||
|             try cancel.call(void, .{r}); | ||||
|         } else { | ||||
|             try cancel.call(void, .{}); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     try resolver.resolve({}); | ||||
|     return resolver.promise(); | ||||
| } | ||||
|  | ||||
| pub fn pullIf(self: *ReadableStream) !void { | ||||
|     if (self.pull_fn) |pull_fn| { | ||||
|         // Must be under the high water mark AND readable. | ||||
|         if ((self.queue.items.len < self.strategy.high_water_mark) and self.state == .readable) { | ||||
|             const controller = ReadableStreamDefaultController{ .stream = self }; | ||||
|             try pull_fn.call(void, .{controller}); | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| const GetReaderOptions = struct { | ||||
|     // Mode must equal 'byob' or be undefined. RangeError otherwise. | ||||
|     mode: ?[]const u8 = null, | ||||
| @@ -102,6 +155,7 @@ pub fn _getReader(self: *ReadableStream, _options: ?GetReaderOptions, page: *Pag | ||||
|         return error.TypeError; | ||||
|     } | ||||
|  | ||||
|     // TODO: Determine if we need the ReadableStreamBYOBReader | ||||
|     const options = _options orelse GetReaderOptions{}; | ||||
|     _ = options; | ||||
|  | ||||
| @@ -144,3 +198,138 @@ test "streams: ReadableStream" { | ||||
|         .{ "readResult.done", "false" }, | ||||
|     }, .{}); | ||||
| } | ||||
|  | ||||
| test "streams: ReadableStream cancel and close" { | ||||
|     var runner = try testing.jsRunner(testing.tracking_allocator, .{ .url = "https://lightpanda.io" }); | ||||
|     defer runner.deinit(); | ||||
|     try runner.testCases(&.{ | ||||
|         .{ "var readResult; var cancelResult; var closeResult;", "undefined" }, | ||||
|  | ||||
|         // Test 1: Stream with controller.close() | ||||
|         .{ | ||||
|             \\  const stream1 = new ReadableStream({ | ||||
|             \\    start(controller) { | ||||
|             \\      controller.enqueue("first"); | ||||
|             \\      controller.enqueue("second"); | ||||
|             \\      controller.close(); | ||||
|             \\    } | ||||
|             \\  }); | ||||
|             , | ||||
|             undefined, | ||||
|         }, | ||||
|         .{ "const reader1 = stream1.getReader();", undefined }, | ||||
|         .{ | ||||
|             \\ (async function () {  | ||||
|             \\   readResult = await reader1.read(); | ||||
|             \\ }()); | ||||
|             \\ false; | ||||
|             , | ||||
|             "false", | ||||
|         }, | ||||
|         .{ "readResult.value", "first" }, | ||||
|         .{ "readResult.done", "false" }, | ||||
|  | ||||
|         // Read second chunk | ||||
|         .{ | ||||
|             \\ (async function () {  | ||||
|             \\   readResult = await reader1.read(); | ||||
|             \\ }()); | ||||
|             \\ false; | ||||
|             , | ||||
|             "false", | ||||
|         }, | ||||
|         .{ "readResult.value", "second" }, | ||||
|         .{ "readResult.done", "false" }, | ||||
|  | ||||
|         // Read after close - should get done: true | ||||
|         .{ | ||||
|             \\ (async function () {  | ||||
|             \\   readResult = await reader1.read(); | ||||
|             \\ }()); | ||||
|             \\ false; | ||||
|             , | ||||
|             "false", | ||||
|         }, | ||||
|         .{ "readResult.value", "undefined" }, | ||||
|         .{ "readResult.done", "true" }, | ||||
|  | ||||
|         // Test 2: Stream with reader.cancel() | ||||
|         .{ | ||||
|             \\  const stream2 = new ReadableStream({ | ||||
|             \\    start(controller) { | ||||
|             \\      controller.enqueue("data1"); | ||||
|             \\      controller.enqueue("data2"); | ||||
|             \\      controller.enqueue("data3"); | ||||
|             \\    }, | ||||
|             \\    cancel(reason) { | ||||
|             \\      closeResult = `Stream cancelled: ${reason}`; | ||||
|             \\    } | ||||
|             \\  }); | ||||
|             , | ||||
|             undefined, | ||||
|         }, | ||||
|         .{ "const reader2 = stream2.getReader();", undefined }, | ||||
|  | ||||
|         // Read one chunk before canceling | ||||
|         .{ | ||||
|             \\ (async function () {  | ||||
|             \\   readResult = await reader2.read(); | ||||
|             \\ }()); | ||||
|             \\ false; | ||||
|             , | ||||
|             "false", | ||||
|         }, | ||||
|         .{ "readResult.value", "data1" }, | ||||
|         .{ "readResult.done", "false" }, | ||||
|  | ||||
|         // Cancel the stream | ||||
|         .{ | ||||
|             \\ (async function () {  | ||||
|             \\   cancelResult = await reader2.cancel("user requested"); | ||||
|             \\ }()); | ||||
|             \\ false; | ||||
|             , | ||||
|             "false", | ||||
|         }, | ||||
|         .{ "cancelResult", "undefined" }, | ||||
|         .{ "closeResult", "Stream cancelled: user requested" }, | ||||
|  | ||||
|         // Try to read after cancel - should throw or return done | ||||
|         .{ | ||||
|             \\ try { | ||||
|             \\   (async function () {  | ||||
|             \\     readResult = await reader2.read(); | ||||
|             \\   }()); | ||||
|             \\ } catch(e) { | ||||
|             \\   readResult = { error: e.name }; | ||||
|             \\ } | ||||
|             \\ false; | ||||
|             , | ||||
|             "false", | ||||
|         }, | ||||
|  | ||||
|         // Test 3: Cancel without reason | ||||
|         .{ | ||||
|             \\  const stream3 = new ReadableStream({ | ||||
|             \\    start(controller) { | ||||
|             \\      controller.enqueue("test"); | ||||
|             \\    }, | ||||
|             \\    cancel(reason) { | ||||
|             \\      closeResult = reason === undefined ? "no reason" : reason; | ||||
|             \\    } | ||||
|             \\  }); | ||||
|             , | ||||
|             undefined, | ||||
|         }, | ||||
|         .{ "const reader3 = stream3.getReader();", undefined }, | ||||
|         .{ | ||||
|             \\ (async function () {  | ||||
|             \\   await reader3.cancel(); | ||||
|             \\ }()); | ||||
|             \\ false; | ||||
|             , | ||||
|             "false", | ||||
|         }, | ||||
|         .{ "closeResult", "no reason" }, | ||||
|     }, .{}); | ||||
| } | ||||
|   | ||||
| @@ -24,6 +24,7 @@ const Env = @import("../env.zig").Env; | ||||
| const v8 = @import("v8"); | ||||
|  | ||||
| const ReadableStream = @import("./ReadableStream.zig"); | ||||
| const ReadableStreamReadResult = @import("./ReadableStream.zig").ReadableStreamReadResult; | ||||
|  | ||||
| const ReadableStreamDefaultController = @This(); | ||||
|  | ||||
| @@ -38,6 +39,16 @@ pub fn _close(self: *ReadableStreamDefaultController, _reason: ?[]const u8, page | ||||
|     const reason = if (_reason) |reason| try page.arena.dupe(u8, reason) else null; | ||||
|     self.stream.state = .{ .closed = reason }; | ||||
|  | ||||
|     if (self.stream.reader_resolver) |rr| { | ||||
|         const resolver = Env.PromiseResolver{ | ||||
|             .js_context = page.main_context, | ||||
|             .resolver = rr.castToPromiseResolver(), | ||||
|         }; | ||||
|  | ||||
|         try resolver.resolve(ReadableStreamReadResult{ .value = .empty, .done = true }); | ||||
|         self.stream.reader_resolver = null; | ||||
|     } | ||||
|  | ||||
|     // close just sets as closed meaning it wont READ any more but anything in the queue is fine to read. | ||||
|     // to discard, must use cancel. | ||||
| } | ||||
| @@ -49,9 +60,37 @@ pub fn _enqueue(self: *ReadableStreamDefaultController, chunk: []const u8, page: | ||||
|         return error.TypeError; | ||||
|     } | ||||
|  | ||||
|     if (self.stream.reader_resolver) |rr| { | ||||
|         const resolver = Env.PromiseResolver{ | ||||
|             .js_context = page.main_context, | ||||
|             .resolver = rr.castToPromiseResolver(), | ||||
|         }; | ||||
|  | ||||
|         try resolver.resolve(ReadableStreamReadResult{ .value = .{ .data = chunk }, .done = false }); | ||||
|         self.stream.reader_resolver = null; | ||||
|  | ||||
|         // rr.setWeakFinalizer(@ptrCast(self.stream), struct { | ||||
|         //     fn callback(info: ?*v8.c.WeakCallbackInfo) void { | ||||
|         //         const inner_stream: *ReadableStream = @ptrCast(@alignCast(v8.c.v8__WeakCallbackInfo__GetParameter(info).?)); | ||||
|         //         inner_stream.reader_resolver = null; | ||||
|         //     } | ||||
|         // }.callback, .kParameter); | ||||
|     } | ||||
|  | ||||
|     try self.stream.queue.append(page.arena, chunk); | ||||
|     try self.stream.pullIf(); | ||||
| } | ||||
|  | ||||
| pub fn _error(self: *ReadableStreamDefaultController, err: Env.JsObject) void { | ||||
| pub fn _error(self: *ReadableStreamDefaultController, err: Env.JsObject, page: *Page) !void { | ||||
|     self.stream.state = .{ .errored = err }; | ||||
|  | ||||
|     if (self.stream.reader_resolver) |rr| { | ||||
|         const resolver = Env.PromiseResolver{ | ||||
|             .js_context = page.main_context, | ||||
|             .resolver = rr.castToPromiseResolver(), | ||||
|         }; | ||||
|  | ||||
|         try resolver.reject(err); | ||||
|         self.stream.reader_resolver = null; | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -24,6 +24,7 @@ const log = @import("../../log.zig"); | ||||
| const Env = @import("../env.zig").Env; | ||||
| const Page = @import("../page.zig").Page; | ||||
| const ReadableStream = @import("./ReadableStream.zig"); | ||||
| const ReadableStreamReadResult = @import("./ReadableStream.zig").ReadableStreamReadResult; | ||||
|  | ||||
| const ReadableStreamDefaultReader = @This(); | ||||
|  | ||||
| @@ -47,23 +48,10 @@ pub fn get_closed(self: *const ReadableStreamDefaultReader) Env.Promise { | ||||
|     return self.closed_resolver.promise(); | ||||
| } | ||||
|  | ||||
| pub fn _cancel(self: *ReadableStreamDefaultReader, page: *Page) !Env.Promise { | ||||
|     return try self.stream._cancel(page); | ||||
| pub fn _cancel(self: *ReadableStreamDefaultReader, reason: ?[]const u8, page: *Page) !Env.Promise { | ||||
|     return try self.stream._cancel(reason, page); | ||||
| } | ||||
|  | ||||
| pub const ReadableStreamReadResult = struct { | ||||
|     value: ?[]const u8, | ||||
|     done: bool, | ||||
|  | ||||
|     pub fn get_value(self: *const ReadableStreamReadResult) !?[]const u8 { | ||||
|         return self.value; | ||||
|     } | ||||
|  | ||||
|     pub fn get_done(self: *const ReadableStreamReadResult) bool { | ||||
|         return self.done; | ||||
|     } | ||||
| }; | ||||
|  | ||||
| pub fn _read(self: *const ReadableStreamDefaultReader, page: *Page) !Env.Promise { | ||||
|     const stream = self.stream; | ||||
|  | ||||
| @@ -76,21 +64,35 @@ pub fn _read(self: *const ReadableStreamDefaultReader, page: *Page) !Env.Promise | ||||
|         .readable => { | ||||
|             if (stream.queue.items.len > 0) { | ||||
|                 const data = self.stream.queue.orderedRemove(0); | ||||
|                 try resolver.resolve(ReadableStreamReadResult{ .value = data, .done = false }); | ||||
|                 try resolver.resolve(ReadableStreamReadResult{ .value = .{ .data = data }, .done = false }); | ||||
|             } else { | ||||
|                 // TODO: need to wait until we have more data | ||||
|                 try resolver.reject("TODO!"); | ||||
|                 return error.Todo; | ||||
|                 if (self.stream.reader_resolver) |rr| { | ||||
|                     const r_resolver = Env.PromiseResolver{ | ||||
|                         .js_context = page.main_context, | ||||
|                         .resolver = rr.castToPromiseResolver(), | ||||
|                     }; | ||||
|  | ||||
|                     return r_resolver.promise(); | ||||
|                 } else { | ||||
|                     const p_resolver = v8.Persistent(v8.PromiseResolver).init(page.main_context.isolate, resolver.resolver); | ||||
|                     self.stream.reader_resolver = p_resolver; | ||||
|                     return resolver.promise(); | ||||
|                 } | ||||
|  | ||||
|                 try self.stream.pullIf(); | ||||
|             } | ||||
|         }, | ||||
|         .closed => |_| { | ||||
|             if (stream.queue.items.len > 0) { | ||||
|                 const data = self.stream.queue.orderedRemove(0); | ||||
|                 try resolver.resolve(ReadableStreamReadResult{ .value = data, .done = false }); | ||||
|                 try resolver.resolve(ReadableStreamReadResult{ .value = .{ .data = data }, .done = false }); | ||||
|             } else { | ||||
|                 try resolver.resolve(ReadableStreamReadResult{ .value = null, .done = true }); | ||||
|                 try resolver.resolve(ReadableStreamReadResult{ .value = .empty, .done = true }); | ||||
|             } | ||||
|         }, | ||||
|         .cancelled => |_| { | ||||
|             try resolver.resolve(ReadableStreamReadResult{ .value = .empty, .done = true }); | ||||
|         }, | ||||
|         .errored => |err| { | ||||
|             try resolver.reject(err); | ||||
|         }, | ||||
| @@ -98,3 +100,16 @@ pub fn _read(self: *const ReadableStreamDefaultReader, page: *Page) !Env.Promise | ||||
|  | ||||
|     return resolver.promise(); | ||||
| } | ||||
|  | ||||
| pub fn _releaseLock(self: *const ReadableStreamDefaultReader, page: *Page) !void { | ||||
|     self.stream.locked = false; | ||||
|  | ||||
|     if (self.stream.reader_resolver) |rr| { | ||||
|         const resolver = Env.PromiseResolver{ | ||||
|             .js_context = page.main_context, | ||||
|             .resolver = rr.castToPromiseResolver(), | ||||
|         }; | ||||
|  | ||||
|         try resolver.reject("TypeError"); | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -18,7 +18,7 @@ | ||||
|  | ||||
| pub const Interfaces = .{ | ||||
|     @import("ReadableStream.zig"), | ||||
|     @import("ReadableStream.zig").ReadableStreamReadResult, | ||||
|     @import("ReadableStreamDefaultReader.zig"), | ||||
|     @import("ReadableStreamDefaultReader.zig").ReadableStreamReadResult, | ||||
|     @import("ReadableStreamDefaultController.zig"), | ||||
| }; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Muki Kiboigo
					Muki Kiboigo