use Env.PersistentPromiseResolver

This commit is contained in:
Muki Kiboigo
2025-09-16 12:09:54 -07:00
parent 24330a7491
commit c553a2cd38
5 changed files with 97 additions and 118 deletions

View File

@@ -19,7 +19,6 @@
const std = @import("std"); const std = @import("std");
const log = @import("../../log.zig"); const log = @import("../../log.zig");
const v8 = @import("v8");
const Env = @import("../env.zig").Env; const Env = @import("../env.zig").Env;
const Page = @import("../page.zig").Page; const Page = @import("../page.zig").Page;
@@ -46,7 +45,7 @@ pub const Interfaces = .{
pub const FetchContext = struct { pub const FetchContext = struct {
arena: std.mem.Allocator, arena: std.mem.Allocator,
js_ctx: *Env.JsContext, js_ctx: *Env.JsContext,
promise_resolver: v8.Persistent(v8.PromiseResolver), promise_resolver: Env.PersistentPromiseResolver,
method: Http.Method, method: Http.Method,
url: []const u8, url: []const u8,
@@ -82,13 +81,9 @@ pub const FetchContext = struct {
pub fn destructor(self: *FetchContext) void { pub fn destructor(self: *FetchContext) void {
if (self.transfer) |_| { if (self.transfer) |_| {
const resolver = Env.PromiseResolver{ self.promise_resolver.reject("TypeError") catch unreachable;
.js_context = self.js_ctx,
.resolver = self.promise_resolver.castToPromiseResolver(),
};
resolver.reject("TypeError") catch unreachable;
self.promise_resolver.deinit(); self.promise_resolver.deinit();
self.transfer = null;
} }
} }
}; };
@@ -99,10 +94,7 @@ pub fn fetch(input: RequestInput, options: ?RequestInit, page: *Page) !Env.Promi
const req = try Request.constructor(input, options, page); const req = try Request.constructor(input, options, page);
const resolver = Env.PromiseResolver{ const resolver = page.main_context.createPersistentPromiseResolver();
.js_context = page.main_context,
.resolver = v8.PromiseResolver.init(page.main_context.v8_context),
};
var headers = try Http.Headers.init(); var headers = try Http.Headers.init();
@@ -125,10 +117,7 @@ pub fn fetch(input: RequestInput, options: ?RequestInit, page: *Page) !Env.Promi
fetch_ctx.* = .{ fetch_ctx.* = .{
.arena = arena, .arena = arena,
.js_ctx = page.main_context, .js_ctx = page.main_context,
.promise_resolver = v8.Persistent(v8.PromiseResolver).init( .promise_resolver = resolver,
page.main_context.isolate,
resolver.resolver,
),
.method = req.method, .method = req.method,
.url = req.url, .url = req.url,
}; };
@@ -205,24 +194,23 @@ pub fn fetch(input: RequestInput, options: ?RequestInit, page: *Page) !Env.Promi
}); });
const response = try self.toResponse(); const response = try self.toResponse();
const promise_resolver: Env.PromiseResolver = .{ try self.promise_resolver.resolve(response);
.js_context = self.js_ctx,
.resolver = self.promise_resolver.castToPromiseResolver(),
};
try promise_resolver.resolve(response);
} }
}.doneCallback, }.doneCallback,
.error_callback = struct { .error_callback = struct {
fn errorCallback(ctx: *anyopaque, err: anyerror) void { fn errorCallback(ctx: *anyopaque, err: anyerror) void {
const self: *FetchContext = @ptrCast(@alignCast(ctx)); const self: *FetchContext = @ptrCast(@alignCast(ctx));
self.transfer = null; if (self.transfer != null) {
self.transfer = null;
log.err(.http, "error", .{ log.err(.http, "error", .{
.url = self.url, .url = self.url,
.err = err, .err = err,
.source = "fetch error", .source = "fetch error",
}); });
self.promise_resolver.reject(@errorName(err)) catch unreachable;
}
} }
}.errorCallback, }.errorCallback,
}); });

View File

@@ -19,7 +19,6 @@
const std = @import("std"); const std = @import("std");
const log = @import("../../log.zig"); const log = @import("../../log.zig");
const v8 = @import("v8");
const Page = @import("../page.zig").Page; const Page = @import("../page.zig").Page;
const Env = @import("../env.zig").Env; const Env = @import("../env.zig").Env;
@@ -35,9 +34,9 @@ const State = union(enum) {
}; };
// This promise resolves when a stream is canceled. // This promise resolves when a stream is canceled.
cancel_resolver: v8.Persistent(v8.PromiseResolver), cancel_resolver: Env.PersistentPromiseResolver,
closed_resolver: v8.Persistent(v8.PromiseResolver), closed_resolver: Env.PersistentPromiseResolver,
reader_resolver: ?v8.Persistent(v8.PromiseResolver) = null, reader_resolver: ?Env.PersistentPromiseResolver = null,
locked: bool = false, locked: bool = false,
state: State = .readable, state: State = .readable,
@@ -79,15 +78,8 @@ const QueueingStrategy = struct {
pub fn constructor(underlying: ?UnderlyingSource, _strategy: ?QueueingStrategy, page: *Page) !*ReadableStream { pub fn constructor(underlying: ?UnderlyingSource, _strategy: ?QueueingStrategy, page: *Page) !*ReadableStream {
const strategy: QueueingStrategy = _strategy orelse .{}; const strategy: QueueingStrategy = _strategy orelse .{};
const cancel_resolver = v8.Persistent(v8.PromiseResolver).init( const cancel_resolver = page.main_context.createPersistentPromiseResolver();
page.main_context.isolate, const closed_resolver = page.main_context.createPersistentPromiseResolver();
v8.PromiseResolver.init(page.main_context.v8_context),
);
const closed_resolver = v8.Persistent(v8.PromiseResolver).init(
page.main_context.isolate,
v8.PromiseResolver.init(page.main_context.v8_context),
);
const stream = try page.arena.create(ReadableStream); const stream = try page.arena.create(ReadableStream);
stream.* = ReadableStream{ .cancel_resolver = cancel_resolver, .closed_resolver = closed_resolver, .strategy = strategy }; stream.* = ReadableStream{ .cancel_resolver = cancel_resolver, .closed_resolver = closed_resolver, .strategy = strategy };
@@ -131,11 +123,6 @@ pub fn _cancel(self: *ReadableStream, reason: ?[]const u8, page: *Page) !Env.Pro
return error.TypeError; return error.TypeError;
} }
const resolver = Env.PromiseResolver{
.js_context = page.main_context,
.resolver = self.cancel_resolver.castToPromiseResolver(),
};
self.state = .{ .cancelled = if (reason) |r| try page.arena.dupe(u8, r) else null }; self.state = .{ .cancelled = if (reason) |r| try page.arena.dupe(u8, r) else null };
// Call cancel callback. // Call cancel callback.
@@ -147,8 +134,8 @@ pub fn _cancel(self: *ReadableStream, reason: ?[]const u8, page: *Page) !Env.Pro
} }
} }
try resolver.resolve({}); try self.cancel_resolver.resolve({});
return resolver.promise(); return self.cancel_resolver.promise();
} }
pub fn pullIf(self: *ReadableStream) !void { pub fn pullIf(self: *ReadableStream) !void {

View File

@@ -21,7 +21,6 @@ const log = @import("../../log.zig");
const Page = @import("../page.zig").Page; const Page = @import("../page.zig").Page;
const Env = @import("../env.zig").Env; const Env = @import("../env.zig").Env;
const v8 = @import("v8");
const ReadableStream = @import("./ReadableStream.zig"); const ReadableStream = @import("./ReadableStream.zig");
const ReadableStreamReadResult = @import("./ReadableStream.zig").ReadableStreamReadResult; const ReadableStreamReadResult = @import("./ReadableStream.zig").ReadableStreamReadResult;
@@ -40,23 +39,14 @@ pub fn _close(self: *ReadableStreamDefaultController, _reason: ?[]const u8, page
self.stream.state = .{ .closed = reason }; self.stream.state = .{ .closed = reason };
// Resolve the Reader Promise // Resolve the Reader Promise
if (self.stream.reader_resolver) |rr| { if (self.stream.reader_resolver) |*rr| {
const resolver = Env.PromiseResolver{ defer rr.deinit();
.js_context = page.main_context, try rr.resolve(ReadableStreamReadResult{ .value = .empty, .done = true });
.resolver = rr.castToPromiseResolver(),
};
try resolver.resolve(ReadableStreamReadResult{ .value = .empty, .done = true });
self.stream.reader_resolver = null; self.stream.reader_resolver = null;
} }
// Resolve the Closed promise. // Resolve the Closed promise.
const closed_resolver = Env.PromiseResolver{ try self.stream.closed_resolver.resolve({});
.js_context = page.main_context,
.resolver = self.stream.closed_resolver.castToPromiseResolver(),
};
try closed_resolver.resolve({});
// close just sets as closed meaning it wont READ any more but anything in the queue is fine to read. // close just sets as closed meaning it wont READ any more but anything in the queue is fine to read.
// to discard, must use cancel. // to discard, must use cancel.
@@ -69,37 +59,22 @@ pub fn _enqueue(self: *ReadableStreamDefaultController, chunk: []const u8, page:
return error.TypeError; return error.TypeError;
} }
if (self.stream.reader_resolver) |rr| { if (self.stream.reader_resolver) |*rr| {
const resolver = Env.PromiseResolver{ defer rr.deinit();
.js_context = page.main_context, try rr.resolve(ReadableStreamReadResult{ .value = .{ .data = chunk }, .done = false });
.resolver = rr.castToPromiseResolver(),
};
try resolver.resolve(ReadableStreamReadResult{ .value = .{ .data = chunk }, .done = false });
self.stream.reader_resolver = null; self.stream.reader_resolver = null;
// rr.setWeakFinalizer(@ptrCast(self.stream), struct {
// fn callback(info: ?*v8.c.WeakCallbackInfo) void {
// const inner_stream: *ReadableStream = @ptrCast(@alignCast(v8.c.v8__WeakCallbackInfo__GetParameter(info).?));
// inner_stream.reader_resolver = null;
// }
// }.callback, .kParameter);
} }
try self.stream.queue.append(page.arena, chunk); try self.stream.queue.append(page.arena, chunk);
try self.stream.pullIf(); try self.stream.pullIf();
} }
pub fn _error(self: *ReadableStreamDefaultController, err: Env.JsObject, page: *Page) !void { pub fn _error(self: *ReadableStreamDefaultController, err: Env.JsObject) !void {
self.stream.state = .{ .errored = err }; self.stream.state = .{ .errored = err };
if (self.stream.reader_resolver) |rr| { if (self.stream.reader_resolver) |*rr| {
const resolver = Env.PromiseResolver{ defer rr.deinit();
.js_context = page.main_context, try rr.reject(err);
.resolver = rr.castToPromiseResolver(),
};
try resolver.reject(err);
self.stream.reader_resolver = null; self.stream.reader_resolver = null;
} }
} }

View File

@@ -18,8 +18,6 @@
const std = @import("std"); const std = @import("std");
const v8 = @import("v8");
const log = @import("../../log.zig"); const log = @import("../../log.zig");
const Env = @import("../env.zig").Env; const Env = @import("../env.zig").Env;
const Page = @import("../page.zig").Page; const Page = @import("../page.zig").Page;
@@ -34,13 +32,8 @@ pub fn constructor(stream: *ReadableStream) ReadableStreamDefaultReader {
return .{ .stream = stream }; return .{ .stream = stream };
} }
pub fn get_closed(self: *const ReadableStreamDefaultReader, page: *Page) Env.Promise { pub fn get_closed(self: *const ReadableStreamDefaultReader) Env.Promise {
const resolver = Env.PromiseResolver{ return self.stream.closed_resolver.promise();
.js_context = page.main_context,
.resolver = self.stream.closed_resolver.castToPromiseResolver(),
};
return resolver.promise();
} }
pub fn _cancel(self: *ReadableStreamDefaultReader, reason: ?[]const u8, page: *Page) !Env.Promise { pub fn _cancel(self: *ReadableStreamDefaultReader, reason: ?[]const u8, page: *Page) !Env.Promise {
@@ -50,61 +43,53 @@ pub fn _cancel(self: *ReadableStreamDefaultReader, reason: ?[]const u8, page: *P
pub fn _read(self: *const ReadableStreamDefaultReader, page: *Page) !Env.Promise { pub fn _read(self: *const ReadableStreamDefaultReader, page: *Page) !Env.Promise {
const stream = self.stream; const stream = self.stream;
const resolver = Env.PromiseResolver{
.js_context = page.main_context,
.resolver = v8.PromiseResolver.init(page.main_context.v8_context),
};
switch (stream.state) { switch (stream.state) {
.readable => { .readable => {
if (stream.queue.items.len > 0) { if (stream.queue.items.len > 0) {
const data = self.stream.queue.orderedRemove(0); const data = self.stream.queue.orderedRemove(0);
const resolver = page.main_context.createPromiseResolver();
try resolver.resolve(ReadableStreamReadResult{ .value = .{ .data = data }, .done = false }); try resolver.resolve(ReadableStreamReadResult{ .value = .{ .data = data }, .done = false });
try self.stream.pullIf();
return resolver.promise();
} else { } else {
if (self.stream.reader_resolver) |rr| { if (self.stream.reader_resolver) |rr| {
const r_resolver = Env.PromiseResolver{ return rr.promise();
.js_context = page.main_context,
.resolver = rr.castToPromiseResolver(),
};
return r_resolver.promise();
} else { } else {
const p_resolver = v8.Persistent(v8.PromiseResolver).init(page.main_context.isolate, resolver.resolver); const persistent_resolver = page.main_context.createPersistentPromiseResolver();
self.stream.reader_resolver = p_resolver; self.stream.reader_resolver = persistent_resolver;
return resolver.promise(); return persistent_resolver.promise();
} }
try self.stream.pullIf();
} }
}, },
.closed => |_| { .closed => |_| {
const resolver = page.main_context.createPromiseResolver();
if (stream.queue.items.len > 0) { if (stream.queue.items.len > 0) {
const data = self.stream.queue.orderedRemove(0); const data = self.stream.queue.orderedRemove(0);
try resolver.resolve(ReadableStreamReadResult{ .value = .{ .data = data }, .done = false }); try resolver.resolve(ReadableStreamReadResult{ .value = .{ .data = data }, .done = false });
} else { } else {
try resolver.resolve(ReadableStreamReadResult{ .value = .empty, .done = true }); try resolver.resolve(ReadableStreamReadResult{ .value = .empty, .done = true });
} }
return resolver.promise();
}, },
.cancelled => |_| { .cancelled => |_| {
const resolver = page.main_context.createPromiseResolver();
try resolver.resolve(ReadableStreamReadResult{ .value = .empty, .done = true }); try resolver.resolve(ReadableStreamReadResult{ .value = .empty, .done = true });
return resolver.promise();
}, },
.errored => |err| { .errored => |err| {
const resolver = page.main_context.createPromiseResolver();
try resolver.reject(err); try resolver.reject(err);
return resolver.promise();
}, },
} }
return resolver.promise();
} }
pub fn _releaseLock(self: *const ReadableStreamDefaultReader, page: *Page) !void { pub fn _releaseLock(self: *const ReadableStreamDefaultReader) !void {
self.stream.locked = false; self.stream.locked = false;
if (self.stream.reader_resolver) |rr| { if (self.stream.reader_resolver) |rr| {
const resolver = Env.PromiseResolver{ try rr.reject("TypeError");
.js_context = page.main_context,
.resolver = rr.castToPromiseResolver(),
};
try resolver.reject("TypeError");
} }
} }

View File

@@ -1261,6 +1261,13 @@ pub fn Env(comptime State: type, comptime WebApis: type) type {
}; };
} }
pub fn createPersistentPromiseResolver(self: *JsContext) PersistentPromiseResolver {
return .{
.js_context = self,
.resolver = v8.Persistent(v8.PromiseResolver).init(self.isolate, v8.PromiseResolver.init(self.v8_context)),
};
}
// Probing is part of trying to map a JS value to a Zig union. There's // Probing is part of trying to map a JS value to a Zig union. There's
// a lot of ambiguity in this process, in part because some JS values // a lot of ambiguity in this process, in part because some JS values
// can almost always be coerced. For example, anything can be coerced // can almost always be coerced. For example, anything can be coerced
@@ -2231,6 +2238,43 @@ pub fn Env(comptime State: type, comptime WebApis: type) type {
} }
}; };
pub const PersistentPromiseResolver = struct {
js_context: *JsContext,
resolver: v8.Persistent(v8.PromiseResolver),
pub fn deinit(self: *PersistentPromiseResolver) void {
self.resolver.deinit();
}
pub fn promise(self: PersistentPromiseResolver) Promise {
return .{
.promise = self.resolver.castToPromiseResolver().getPromise(),
};
}
pub fn resolve(self: PersistentPromiseResolver, value: anytype) !void {
const js_context = self.js_context;
const js_value = try js_context.zigValueToJs(value);
// resolver.resolve will return null if the promise isn't pending
const ok = self.resolver.castToPromiseResolver().resolve(js_context.v8_context, js_value) orelse return;
if (!ok) {
return error.FailedToResolvePromise;
}
}
pub fn reject(self: PersistentPromiseResolver, value: anytype) !void {
const js_context = self.js_context;
const js_value = try js_context.zigValueToJs(value);
// resolver.reject will return null if the promise isn't pending
const ok = self.resolver.castToPromiseResolver().reject(js_context.v8_context, js_value) orelse return;
if (!ok) {
return error.FailedToRejectPromise;
}
}
};
pub const Promise = struct { pub const Promise = struct {
promise: v8.Promise, promise: v8.Promise,
}; };