Merge pull request #970 from lightpanda-io/remove_loop

Remove the loop
This commit is contained in:
Pierre Tachoire
2025-08-26 18:17:32 +02:00
committed by GitHub
15 changed files with 263 additions and 828 deletions

View File

@@ -153,8 +153,6 @@ fn addDependencies(b: *Build, mod: *Build.Module, opts: *Build.Step.Options) !vo
.optimize = mod.optimize.?, .optimize = mod.optimize.?,
}; };
mod.addImport("tigerbeetle-io", b.dependency("tigerbeetle_io", .{}).module("tigerbeetle_io"));
mod.addIncludePath(b.path("vendor/lightpanda")); mod.addIncludePath(b.path("vendor/lightpanda"));
{ {

View File

@@ -4,15 +4,10 @@
.version = "0.0.0", .version = "0.0.0",
.fingerprint = 0xda130f3af836cea0, .fingerprint = 0xda130f3af836cea0,
.dependencies = .{ .dependencies = .{
.tigerbeetle_io = .{
.url = "https://github.com/lightpanda-io/tigerbeetle-io/archive/19ae89eb3814d48c202ac9e0495fc5cadb29dfe7.tar.gz",
.hash = "tigerbeetle_io-0.0.0-ViLgxjqSBADhuHO_RZm4yNzuoKDXWP39hDn60Kht40OC",
},
.v8 = .{ .v8 = .{
.url = "https://github.com/lightpanda-io/zig-v8-fork/archive/cf412d5b3d9d608582571d821e0d552337ef690d.tar.gz", .url = "https://github.com/lightpanda-io/zig-v8-fork/archive/cf412d5b3d9d608582571d821e0d552337ef690d.tar.gz",
.hash = "v8-0.0.0-xddH69zDAwA4fp1dBo_jEDjS5bhXycPwRlZHp6_X890t", .hash = "v8-0.0.0-xddH69zDAwA4fp1dBo_jEDjS5bhXycPwRlZHp6_X890t",
}, },
//.v8 = .{ .path = "../zig-v8-fork" }, //.v8 = .{ .path = "../zig-v8-fork" },
//.tigerbeetle_io = .{ .path = "../tigerbeetle-io" },
}, },
} }

View File

@@ -4,7 +4,6 @@ const Allocator = std.mem.Allocator;
const log = @import("log.zig"); const log = @import("log.zig");
const Http = @import("http/Http.zig"); const Http = @import("http/Http.zig");
const Loop = @import("runtime/loop.zig").Loop;
const Platform = @import("runtime/js.zig").Platform; const Platform = @import("runtime/js.zig").Platform;
const Telemetry = @import("telemetry/telemetry.zig").Telemetry; const Telemetry = @import("telemetry/telemetry.zig").Telemetry;
@@ -14,7 +13,6 @@ const Notification = @import("notification.zig").Notification;
// might need. // might need.
pub const App = struct { pub const App = struct {
http: Http, http: Http,
loop: *Loop,
config: Config, config: Config,
platform: ?*const Platform, platform: ?*const Platform,
allocator: Allocator, allocator: Allocator,
@@ -45,12 +43,6 @@ pub const App = struct {
const app = try allocator.create(App); const app = try allocator.create(App);
errdefer allocator.destroy(app); errdefer allocator.destroy(app);
const loop = try allocator.create(Loop);
errdefer allocator.destroy(loop);
loop.* = try Loop.init(allocator);
errdefer loop.deinit();
const notification = try Notification.init(allocator, null); const notification = try Notification.init(allocator, null);
errdefer notification.deinit(); errdefer notification.deinit();
@@ -68,7 +60,6 @@ pub const App = struct {
const app_dir_path = getAndMakeAppDir(allocator); const app_dir_path = getAndMakeAppDir(allocator);
app.* = .{ app.* = .{
.loop = loop,
.http = http, .http = http,
.allocator = allocator, .allocator = allocator,
.telemetry = undefined, .telemetry = undefined,
@@ -92,8 +83,6 @@ pub const App = struct {
allocator.free(app_dir_path); allocator.free(app_dir_path);
} }
self.telemetry.deinit(); self.telemetry.deinit();
self.loop.deinit();
allocator.destroy(self.loop);
self.notification.deinit(); self.notification.deinit();
self.http.deinit(); self.http.deinit();
allocator.destroy(self); allocator.destroy(self);

View File

@@ -59,15 +59,15 @@ pub fn add(self: *Scheduler, ctx: *anyopaque, func: Task.Func, ms: u32, opts: Ad
}); });
} }
pub fn runHighPriority(self: *Scheduler) !?u32 { pub fn runHighPriority(self: *Scheduler) !?i32 {
return self.runQueue(&self.primary); return self.runQueue(&self.primary);
} }
pub fn runLowPriority(self: *Scheduler) !?u32 { pub fn runLowPriority(self: *Scheduler) !?i32 {
return self.runQueue(&self.secondary); return self.runQueue(&self.secondary);
} }
fn runQueue(self: *Scheduler, queue: *Queue) !?u32 { fn runQueue(self: *Scheduler, queue: *Queue) !?i32 {
// this is O(1) // this is O(1)
if (queue.count() == 0) { if (queue.count() == 0) {
return null; return null;

View File

@@ -313,7 +313,7 @@ pub fn blockingGet(self: *ScriptManager, url: [:0]const u8) !BlockingResult {
// rely on http's timeout settings to avoid an endless/long loop. // rely on http's timeout settings to avoid an endless/long loop.
while (true) { while (true) {
try client.tick(200); _ = try client.tick(.{ .timeout_ms = 200 });
switch (blocking.state) { switch (blocking.state) {
.running => {}, .running => {},
.done => |result| return result, .done => |result| return result,

View File

@@ -51,10 +51,6 @@ const polyfill = @import("polyfill/polyfill.zig");
pub const Page = struct { pub const Page = struct {
cookie_jar: *storage.CookieJar, cookie_jar: *storage.CookieJar,
// Pre-configured http/cilent.zig used to make HTTP requests.
// @newhttp
// request_factory: RequestFactory,
session: *Session, session: *Session,
// An arena with a lifetime for the entire duration of the page // An arena with a lifetime for the entire duration of the page
@@ -146,12 +142,9 @@ pub const Page = struct {
.scheduler = Scheduler.init(arena), .scheduler = Scheduler.init(arena),
.keydown_event_node = .{ .func = keydownCallback }, .keydown_event_node = .{ .func = keydownCallback },
.window_clicked_event_node = .{ .func = windowClicked }, .window_clicked_event_node = .{ .func = windowClicked },
// @newhttp
// .request_factory = browser.http_client.requestFactory(.{
// .notification = browser.notification,
// }),
.main_context = undefined, .main_context = undefined,
}; };
self.main_context = try session.executor.createJsContext(&self.window, self, self, true, Env.GlobalMissingCallback.init(&self.polyfill_loader)); self.main_context = try session.executor.createJsContext(&self.window, self, self, true, Env.GlobalMissingCallback.init(&self.polyfill_loader));
try polyfill.preload(self.arena, self.main_context); try polyfill.preload(self.arena, self.main_context);
@@ -269,7 +262,7 @@ pub const Page = struct {
return self.script_manager.blockingGet(src); return self.script_manager.blockingGet(src);
} }
pub fn wait(self: *Page, wait_sec: usize) void { pub fn wait(self: *Page, wait_sec: u16) void {
self._wait(wait_sec) catch |err| switch (err) { self._wait(wait_sec) catch |err| switch (err) {
error.JsError => {}, // already logged (with hopefully more context) error.JsError => {}, // already logged (with hopefully more context)
else => { else => {
@@ -283,9 +276,9 @@ pub const Page = struct {
}; };
} }
fn _wait(self: *Page, wait_sec: usize) !void { fn _wait(self: *Page, wait_sec: u16) !void {
var ms_remaining = wait_sec * 1000;
var timer = try std.time.Timer.start(); var timer = try std.time.Timer.start();
var ms_remaining: i32 = @intCast(wait_sec * 1000);
var try_catch: Env.TryCatch = undefined; var try_catch: Env.TryCatch = undefined;
try_catch.init(self.main_context); try_catch.init(self.main_context);
@@ -320,7 +313,7 @@ pub const Page = struct {
} }
// There should only be 1 active http transfer, the main page // There should only be 1 active http transfer, the main page
try http_client.tick(ms_remaining); _ = try http_client.tick(.{ .timeout_ms = ms_remaining });
}, },
.html, .parsed => { .html, .parsed => {
// The HTML page was parsed. We now either have JS scripts to // The HTML page was parsed. We now either have JS scripts to
@@ -381,7 +374,7 @@ pub const Page = struct {
// inflight requests // inflight requests
else @min(ms_remaining, ms_to_next_task orelse 1000); else @min(ms_remaining, ms_to_next_task orelse 1000);
try http_client.tick(ms_to_wait); _ = try http_client.tick(.{ .timeout_ms = ms_to_wait });
if (request_intercepted) { if (request_intercepted) {
// Again, proritizing intercepted requests. Exit this // Again, proritizing intercepted requests. Exit this
@@ -401,7 +394,7 @@ pub const Page = struct {
if (ms_elapsed >= ms_remaining) { if (ms_elapsed >= ms_remaining) {
return; return;
} }
ms_remaining -= ms_elapsed; ms_remaining -= @intCast(ms_elapsed);
} }
} }

View File

@@ -137,7 +137,7 @@ pub const Session = struct {
return &(self.page orelse return null); return &(self.page orelse return null);
} }
pub fn wait(self: *Session, wait_sec: usize) void { pub fn wait(self: *Session, wait_sec: u16) void {
if (self.queued_navigation) |qn| { if (self.queued_navigation) |qn| {
// This was already aborted on the page, but it would be pretty // This was already aborted on the page, but it would be pretty
// bad if old requests went to the new page, so let's make double sure // bad if old requests went to the new page, so let's make double sure

View File

@@ -114,12 +114,9 @@ pub fn CDPT(comptime TypeProvider: type) type {
} }
// @newhttp // @newhttp
// A bit hacky right now. The main server loop blocks only for CDP // A bit hacky right now. The main server loop doesn't unblock for
// messages. It no longer blocks for page timeouts of page HTTP // scheduled task. So we run this directly in order to process any
// transfers. So we need to call this more ourselves. // timeouts (or http events) which are ready to be processed.
// This is called after every message and [very hackily] from the server
// loop.
// This is hopefully temporary.
pub fn pageWait(self: *Self) void { pub fn pageWait(self: *Self) void {
const session = &(self.browser.session orelse return); const session = &(self.browser.session orelse return);
// exits early if there's nothing to do, so a large value like // exits early if there's nothing to do, so a large value like
@@ -592,8 +589,7 @@ pub fn BrowserContext(comptime CDP_T: type) type {
}; };
const cdp = self.cdp; const cdp = self.cdp;
var arena = std.heap.ArenaAllocator.init(cdp.allocator); const allocator = cdp.client.send_arena.allocator();
errdefer arena.deinit();
const field = ",\"sessionId\":\""; const field = ",\"sessionId\":\"";
@@ -602,7 +598,7 @@ pub fn BrowserContext(comptime CDP_T: type) type {
const message_len = msg.len + session_id.len + 1 + field.len + 10; const message_len = msg.len + session_id.len + 1 + field.len + 10;
var buf: std.ArrayListUnmanaged(u8) = .{}; var buf: std.ArrayListUnmanaged(u8) = .{};
buf.ensureTotalCapacity(arena.allocator(), message_len) catch |err| { buf.ensureTotalCapacity(allocator, message_len) catch |err| {
log.err(.cdp, "inspector buffer", .{ .err = err }); log.err(.cdp, "inspector buffer", .{ .err = err });
return; return;
}; };
@@ -617,7 +613,7 @@ pub fn BrowserContext(comptime CDP_T: type) type {
buf.appendSliceAssumeCapacity("\"}"); buf.appendSliceAssumeCapacity("\"}");
std.debug.assert(buf.items.len == message_len); std.debug.assert(buf.items.len == message_len);
try cdp.client.sendJSONRaw(arena, buf); try cdp.client.sendJSONRaw(buf);
} }
}; };
} }

View File

@@ -39,12 +39,14 @@ pub const Document = @import("../testing.zig").Document;
const Client = struct { const Client = struct {
allocator: Allocator, allocator: Allocator,
send_arena: ArenaAllocator,
sent: std.ArrayListUnmanaged(json.Value) = .{}, sent: std.ArrayListUnmanaged(json.Value) = .{},
serialized: std.ArrayListUnmanaged([]const u8) = .{}, serialized: std.ArrayListUnmanaged([]const u8) = .{},
fn init(alloc: Allocator) Client { fn init(alloc: Allocator) Client {
return .{ return .{
.allocator = alloc, .allocator = alloc,
.send_arena = ArenaAllocator.init(alloc),
}; };
} }
@@ -58,7 +60,7 @@ const Client = struct {
try self.sent.append(self.allocator, value); try self.sent.append(self.allocator, value);
} }
pub fn sendJSONRaw(self: *Client, _: ArenaAllocator, buf: std.ArrayListUnmanaged(u8)) !void { pub fn sendJSONRaw(self: *Client, buf: std.ArrayListUnmanaged(u8)) !void {
const value = try json.parseFromSliceLeaky(json.Value, self.allocator, buf.items, .{}); const value = try json.parseFromSliceLeaky(json.Value, self.allocator, buf.items, .{});
try self.sent.append(self.allocator, value); try self.sent.append(self.allocator, value);
} }

View File

@@ -27,6 +27,7 @@ const CookieJar = @import("../browser/storage/storage.zig").CookieJar;
const urlStitch = @import("../url.zig").stitch; const urlStitch = @import("../url.zig").stitch;
const c = Http.c; const c = Http.c;
const posix = std.posix;
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const ArenaAllocator = std.heap.ArenaAllocator; const ArenaAllocator = std.heap.ArenaAllocator;
@@ -167,10 +168,13 @@ pub fn abort(self: *Client) void {
} }
} }
pub fn tick(self: *Client, timeout_ms: usize) !void { const TickOpts = struct {
var handles = &self.handles; timeout_ms: i32 = 0,
poll_socket: ?posix.socket_t = null,
};
pub fn tick(self: *Client, opts: TickOpts) !bool {
while (true) { while (true) {
if (handles.hasAvailable() == false) { if (self.handles.hasAvailable() == false) {
break; break;
} }
const queue_node = self.queue.popFirst() orelse break; const queue_node = self.queue.popFirst() orelse break;
@@ -178,11 +182,10 @@ pub fn tick(self: *Client, timeout_ms: usize) !void {
self.queue_node_pool.destroy(queue_node); self.queue_node_pool.destroy(queue_node);
// we know this exists, because we checked isEmpty() above // we know this exists, because we checked isEmpty() above
const handle = handles.getFreeHandle().?; const handle = self.handles.getFreeHandle().?;
try self.makeRequest(handle, req); try self.makeRequest(handle, req);
} }
return self.perform(opts.timeout_ms, opts.poll_socket);
try self.perform(@intCast(timeout_ms));
} }
pub fn request(self: *Client, req: Request) !void { pub fn request(self: *Client, req: Request) !void {
@@ -343,16 +346,26 @@ fn makeRequest(self: *Client, handle: *Handle, transfer: *Transfer) !void {
} }
self.active += 1; self.active += 1;
return self.perform(0); _ = try self.perform(0, null);
} }
fn perform(self: *Client, timeout_ms: c_int) !void { fn perform(self: *Client, timeout_ms: c_int, socket: ?posix.socket_t) !bool {
const multi = self.multi; const multi = self.multi;
var running: c_int = undefined; var running: c_int = undefined;
try errorMCheck(c.curl_multi_perform(multi, &running)); try errorMCheck(c.curl_multi_perform(multi, &running));
if (running > 0 and timeout_ms > 0) { if (socket) |s| {
var wait_fd = c.curl_waitfd{
.fd = s,
.events = c.CURL_WAIT_POLLIN,
.revents = 0,
};
try errorMCheck(c.curl_multi_poll(multi, &wait_fd, 1, timeout_ms, null));
if (wait_fd.revents != 0) {
// the extra socket we passed in is ready, let's signal our caller
return true;
}
} else if (running > 0 and timeout_ms > 0) {
try errorMCheck(c.curl_multi_poll(multi, null, 0, timeout_ms, null)); try errorMCheck(c.curl_multi_poll(multi, null, 0, timeout_ms, null));
} }
@@ -397,6 +410,8 @@ fn perform(self: *Client, timeout_ms: c_int) !void {
self.requestFailed(transfer, err); self.requestFailed(transfer, err);
} }
} }
return false;
} }
fn endTransfer(self: *Client, transfer: *Transfer) void { fn endTransfer(self: *Client, transfer: *Transfer) void {

View File

@@ -17,6 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std"); const std = @import("std");
const posix = std.posix;
pub const c = @cImport({ pub const c = @cImport({
@cInclude("curl/curl.h"); @cInclude("curl/curl.h");
@@ -26,6 +27,7 @@ pub const ENABLE_DEBUG = false;
pub const Client = @import("Client.zig"); pub const Client = @import("Client.zig");
pub const Transfer = Client.Transfer; pub const Transfer = Client.Transfer;
const log = @import("../log.zig");
const errors = @import("errors.zig"); const errors = @import("errors.zig");
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
@@ -85,6 +87,16 @@ pub fn deinit(self: *Http) void {
self.arena.deinit(); self.arena.deinit();
} }
pub fn poll(self: *Http, timeout_ms: i32, socket: posix.socket_t) bool {
return self.client.tick(.{
.timeout_ms = timeout_ms,
.poll_socket = socket,
}) catch |err| {
log.err(.app, "http poll", .{ .err = err });
return false;
};
}
pub fn newConnection(self: *Http) !Connection { pub fn newConnection(self: *Http) !Connection {
return Connection.init(self.ca_blob, &self.opts); return Connection.init(self.ca_blob, &self.opts);
} }

View File

@@ -21,8 +21,8 @@ const builtin = @import("builtin");
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const log = @import("log.zig"); const log = @import("log.zig");
const server = @import("server.zig");
const App = @import("app.zig").App; const App = @import("app.zig").App;
const Server = @import("server.zig").Server;
const Http = @import("http/Http.zig"); const Http = @import("http/Http.zig");
const Platform = @import("runtime/js.zig").Platform; const Platform = @import("runtime/js.zig").Platform;
const Browser = @import("browser/browser.zig").Browser; const Browser = @import("browser/browser.zig").Browser;
@@ -103,8 +103,10 @@ fn run(alloc: Allocator) !void {
return args.printUsageAndExit(false); return args.printUsageAndExit(false);
}; };
const timeout = std.time.ns_per_s * @as(u64, opts.timeout); var server = try Server.init(app, address);
server.run(app, address, timeout) catch |err| { defer server.deinit();
server.run(address, opts.timeout) catch |err| {
log.fatal(.app, "server run error", .{ .err = err }); log.fatal(.app, "server run error", .{ .err = err });
return err; return err;
}; };
@@ -773,7 +775,9 @@ fn serveCDP(address: std.net.Address, platform: *const Platform) !void {
defer app.deinit(); defer app.deinit();
test_wg.finish(); test_wg.finish();
server.run(app, address, std.time.ns_per_s * 2) catch |err| { var server = try Server.init(app, address);
defer server.deinit();
server.run(address, 5) catch |err| {
std.debug.print("CDP server error: {}", .{err}); std.debug.print("CDP server error: {}", .{err});
return err; return err;
}; };

View File

@@ -167,7 +167,7 @@ fn run(
var try_catch: Env.TryCatch = undefined; var try_catch: Env.TryCatch = undefined;
try_catch.init(runner.page.main_context); try_catch.init(runner.page.main_context);
defer try_catch.deinit(); defer try_catch.deinit();
runner.page.wait(std.time.ns_per_ms * 200); runner.page.wait(2);
if (try_catch.hasCaught()) { if (try_catch.hasCaught()) {
err_out.* = (try try_catch.err(arena)) orelse "unknwon error"; err_out.* = (try try_catch.err(arena)) orelse "unknwon error";

View File

@@ -1,331 +0,0 @@
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const builtin = @import("builtin");
const MemoryPool = std.heap.MemoryPool;
const log = @import("../log.zig");
pub const IO = @import("tigerbeetle-io").IO;
const RUN_DURATION = 10 * std.time.ns_per_ms;
// SingleThreaded I/O Loop based on Tigerbeetle io_uring loop.
// On Linux it's using io_uring.
// On MacOS and Windows it's using kqueue/IOCP with a ring design.
// This is a thread-unsafe version without any lock on shared resources,
// use it only on a single thread.
// The loop provides I/O APIs based on callbacks.
// I/O APIs based on async/await might be added in the future.
pub const Loop = struct {
alloc: std.mem.Allocator, // TODO: unmanaged version ?
io: IO,
// number of pending network events we have
pending_network_count: usize,
// number of pending timeout events we have
pending_timeout_count: usize,
// Used to stop repeating timeouts when loop.run is called.
stopping: bool,
// ctx_id is incremented each time the loop is reset.
// All callbacks store an initial ctx_id and compare before execution.
// If a ctx is outdated, the callback is ignored.
// This is a weak way to cancel all future callbacks.
ctx_id: u32 = 0,
// We use this to track cancellation ids and, on the timeout callback,
// we can can check here to see if it's been cancelled.
cancelled: std.AutoHashMapUnmanaged(usize, void),
timeout_pool: MemoryPool(ContextTimeout),
event_callback_pool: MemoryPool(EventCallbackContext),
const Self = @This();
pub const Completion = IO.Completion;
pub const RecvError = IO.RecvError;
pub const SendError = IO.SendError;
pub const ConnectError = IO.ConnectError;
pub fn init(alloc: std.mem.Allocator) !Self {
return .{
.alloc = alloc,
.cancelled = .{},
.io = try IO.init(32, 0),
.stopping = false,
.pending_network_count = 0,
.pending_timeout_count = 0,
.timeout_pool = MemoryPool(ContextTimeout).init(alloc),
.event_callback_pool = MemoryPool(EventCallbackContext).init(alloc),
};
}
pub fn deinit(self: *Self) void {
self.reset();
// run tail events. We do run the tail events to ensure all the
// contexts are correcly free.
while (self.pending_network_count != 0 or self.pending_timeout_count != 0) {
self.io.run_for_ns(RUN_DURATION) catch |err| {
log.err(.loop, "deinit", .{ .err = err });
break;
};
}
if (comptime CANCEL_SUPPORTED) {
self.io.cancel_all();
}
self.io.deinit();
self.timeout_pool.deinit();
self.event_callback_pool.deinit();
self.cancelled.deinit(self.alloc);
}
// Retrieve all registred I/O events completed by OS kernel,
// and execute sequentially their callbacks.
// Stops when there is no more I/O events registered on the loop.
// Note that I/O events callbacks might register more I/O events
// on the go when they are executed (ie. nested I/O events).
pub fn run(self: *Self, wait_ns: usize) !void {
// stop repeating / interval timeouts from re-registering
self.stopping = true;
defer self.stopping = false;
const max_iterations = wait_ns / (RUN_DURATION);
for (0..max_iterations) |_| {
if (self.pending_network_count == 0 and self.pending_timeout_count == 0) {
break;
}
self.io.run_for_ns(std.time.ns_per_ms * 10) catch |err| {
log.err(.loop, "deinit", .{ .err = err });
break;
};
}
}
pub fn hasPendingTimeout(self: *Self) bool {
return self.pending_timeout_count > 0;
}
// JS callbacks APIs
// -----------------
// Timeout
// The state that we add to a timeout. This is what we get back from a
// timeoutCallback. It contains the function to execute. The user is expected
// to be able to turn a reference to this into whatever state it needs,
// probably by inserting this node into its own stae and using @fieldParentPtr
pub const CallbackNode = struct {
func: *const fn (node: *CallbackNode, repeat: *?u63) void,
};
const ContextTimeout = struct {
loop: *Self,
ctx_id: u32,
initial: bool = true,
callback_node: ?*CallbackNode,
};
fn timeoutCallback(
ctx: *ContextTimeout,
completion: *IO.Completion,
result: IO.TimeoutError!void,
) void {
var repeating = false;
const loop = ctx.loop;
if (ctx.initial) {
loop.pending_timeout_count -= 1;
}
defer {
if (repeating == false) {
loop.timeout_pool.destroy(ctx);
loop.alloc.destroy(completion);
}
}
if (loop.cancelled.remove(@intFromPtr(completion))) {
return;
}
// Abort if this completion was created for a different version of the loop.
if (ctx.ctx_id != loop.ctx_id) {
return;
}
// TODO: return the error to the callback
result catch |err| {
switch (err) {
error.Canceled => {},
else => log.err(.loop, "timeout callback error", .{ .err = err }),
}
return;
};
if (ctx.callback_node) |cn| {
var repeat_in: ?u63 = null;
cn.func(cn, &repeat_in);
if (loop.stopping == false) {
if (repeat_in) |r| {
// prevents our context and completion from being cleaned up
repeating = true;
ctx.initial = false;
loop.scheduleTimeout(r, ctx, completion);
}
}
}
}
pub fn timeout(self: *Self, nanoseconds: u63, callback_node: ?*CallbackNode) !usize {
if (self.stopping and nanoseconds > std.time.ns_per_ms * 500) {
// we're trying to shutdown, we probably don't want to wait for a new
// long timeout
return 0;
}
const completion = try self.alloc.create(Completion);
errdefer self.alloc.destroy(completion);
completion.* = undefined;
const ctx = try self.timeout_pool.create();
errdefer self.timeout_pool.destroy(ctx);
ctx.* = .{
.loop = self,
.ctx_id = self.ctx_id,
.callback_node = callback_node,
};
self.pending_timeout_count += 1;
self.scheduleTimeout(nanoseconds, ctx, completion);
return @intFromPtr(completion);
}
fn scheduleTimeout(self: *Self, nanoseconds: u63, ctx: *ContextTimeout, completion: *Completion) void {
self.io.timeout(*ContextTimeout, ctx, timeoutCallback, completion, nanoseconds);
}
pub fn cancel(self: *Self, id: usize) !void {
try self.cancelled.put(self.alloc, id, {});
}
// Reset all existing callbacks.
// The existing events will happen and their memory will be cleanup but the
// corresponding callbacks will not be called.
pub fn reset(self: *Self) void {
self.ctx_id += 1;
self.cancelled.clearRetainingCapacity();
}
// IO callbacks APIs
// -----------------
// Connect
pub fn connect(
self: *Self,
comptime Ctx: type,
ctx: *Ctx,
completion: *Completion,
comptime cbk: fn (ctx: *Ctx, _: *Completion, res: ConnectError!void) void,
socket: std.posix.socket_t,
address: std.net.Address,
) !void {
const onConnect = struct {
fn onConnect(callback: *EventCallbackContext, completion_: *Completion, res: ConnectError!void) void {
callback.loop.pending_network_count -= 1;
defer callback.loop.event_callback_pool.destroy(callback);
cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res);
}
}.onConnect;
const callback = try self.event_callback_pool.create();
errdefer self.event_callback_pool.destroy(callback);
callback.* = .{ .loop = self, .ctx = ctx };
self.pending_network_count += 1;
self.io.connect(*EventCallbackContext, callback, onConnect, completion, socket, address);
}
// Send
pub fn send(
self: *Self,
comptime Ctx: type,
ctx: *Ctx,
completion: *Completion,
comptime cbk: fn (ctx: *Ctx, completion: *Completion, res: SendError!usize) void,
socket: std.posix.socket_t,
buf: []const u8,
) !void {
const onSend = struct {
fn onSend(callback: *EventCallbackContext, completion_: *Completion, res: SendError!usize) void {
callback.loop.pending_network_count -= 1;
defer callback.loop.event_callback_pool.destroy(callback);
cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res);
}
}.onSend;
const callback = try self.event_callback_pool.create();
errdefer self.event_callback_pool.destroy(callback);
callback.* = .{ .loop = self, .ctx = ctx };
self.pending_network_count += 1;
self.io.send(*EventCallbackContext, callback, onSend, completion, socket, buf);
}
// Recv
pub fn recv(
self: *Self,
comptime Ctx: type,
ctx: *Ctx,
completion: *Completion,
comptime cbk: fn (ctx: *Ctx, completion: *Completion, res: RecvError!usize) void,
socket: std.posix.socket_t,
buf: []u8,
) !void {
const onRecv = struct {
fn onRecv(callback: *EventCallbackContext, completion_: *Completion, res: RecvError!usize) void {
callback.loop.pending_network_count -= 1;
defer callback.loop.event_callback_pool.destroy(callback);
cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res);
}
}.onRecv;
const callback = try self.event_callback_pool.create();
errdefer self.event_callback_pool.destroy(callback);
callback.* = .{ .loop = self, .ctx = ctx };
self.pending_network_count += 1;
self.io.recv(*EventCallbackContext, callback, onRecv, completion, socket, buf);
}
};
const EventCallbackContext = struct {
ctx: *anyopaque,
loop: *Loop,
};
const CANCEL_SUPPORTED = switch (builtin.target.os.tag) {
.linux => true,
.macos, .tvos, .watchos, .ios => false,
else => @compileError("IO is not supported for platform"),
};

View File

@@ -26,14 +26,6 @@ const Allocator = std.mem.Allocator;
const ArenaAllocator = std.heap.ArenaAllocator; const ArenaAllocator = std.heap.ArenaAllocator;
const log = @import("log.zig"); const log = @import("log.zig");
const IO = @import("runtime/loop.zig").IO;
const Completion = IO.Completion;
const AcceptError = IO.AcceptError;
const RecvError = IO.RecvError;
const SendError = IO.SendError;
const TimeoutError = IO.TimeoutError;
const Loop = @import("runtime/loop.zig").Loop;
const App = @import("app.zig").App; const App = @import("app.zig").App;
const CDP = @import("cdp/cdp.zig").CDP; const CDP = @import("cdp/cdp.zig").CDP;
@@ -46,114 +38,140 @@ const MAX_HTTP_REQUEST_SIZE = 4096;
// +140 for the max control packet that might be interleaved in a message // +140 for the max control packet that might be interleaved in a message
const MAX_MESSAGE_SIZE = 512 * 1024 + 14; const MAX_MESSAGE_SIZE = 512 * 1024 + 14;
const Server = struct { pub const Server = struct {
app: *App, app: *App,
loop: *Loop,
allocator: Allocator, allocator: Allocator,
client: ?*Client = null, client: ?posix.socket_t,
listener: ?posix.socket_t,
// internal fields
listener: posix.socket_t,
timeout: u64,
// I/O fields
accept_completion: Completion,
// The response to send on a GET /json/version request
json_version_response: []const u8, json_version_response: []const u8,
fn deinit(self: *Server) void { pub fn init(app: *App, address: net.Address) !Server {
_ = self; const allocator = app.allocator;
} const json_version_response = try buildJSONVersionResponse(allocator, address);
errdefer allocator.free(json_version_response);
fn queueAccept(self: *Server) void { return .{
log.debug(.app, "accepting connection", .{}); .app = app,
self.loop.io.accept( .client = null,
*Server, .listener = null,
self, .allocator = allocator,
callbackAccept, .json_version_response = json_version_response,
&self.accept_completion,
self.listener,
);
}
fn callbackAccept(
self: *Server,
completion: *Completion,
result: AcceptError!posix.socket_t,
) void {
std.debug.assert(completion == &self.accept_completion);
self.doCallbackAccept(result) catch |err| {
log.err(.app, "server accept error", .{ .err = err });
self.queueAccept();
}; };
} }
fn doCallbackAccept( pub fn deinit(self: *Server) void {
self: *Server, self.allocator.free(self.json_version_response);
result: AcceptError!posix.socket_t, if (self.listener) |listener| {
) !void { posix.close(listener);
const socket = try result;
const client = try self.allocator.create(Client);
client.* = Client.init(socket, self);
client.start();
self.client = client;
if (log.enabled(.app, .info)) {
var address: std.net.Address = undefined;
var socklen: posix.socklen_t = @sizeOf(net.Address);
try std.posix.getsockname(socket, &address.any, &socklen);
log.info(.app, "client connected", .{ .ip = address });
} }
} }
fn releaseClient(self: *Server, client: *Client) void { pub fn run(self: *Server, address: net.Address, timeout_ms: i32) !void {
self.allocator.destroy(client); const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC;
self.client = null; const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP);
self.listener = listener;
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
if (@hasDecl(posix.TCP, "NODELAY")) {
try posix.setsockopt(listener, posix.IPPROTO.TCP, posix.TCP.NODELAY, &std.mem.toBytes(@as(c_int, 1)));
}
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 1);
log.info(.app, "server running", .{ .address = address });
while (true) {
const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| {
log.err(.app, "CDP accept", .{ .err = err });
std.time.sleep(std.time.ns_per_s);
continue;
};
self.client = socket;
defer if (self.client) |s| {
posix.close(s);
self.client = null;
};
if (log.enabled(.app, .info)) {
var client_address: std.net.Address = undefined;
var socklen: posix.socklen_t = @sizeOf(net.Address);
try std.posix.getsockname(socket, &client_address.any, &socklen);
log.info(.app, "client connected", .{ .ip = client_address });
}
self.readLoop(socket, timeout_ms) catch |err| {
log.err(.app, "CDP client loop", .{ .err = err });
};
}
}
fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: i32) !void {
// This shouldn't be necessary, but the Client is HUGE (> 512KB) because
// it has a large read buffer. I don't know why, but v8 crashes if this
// is on the stack (and I assume it's related to its size).
const client = try self.allocator.create(Client);
defer self.allocator.destroy(client);
client.* = try Client.init(socket, self);
defer client.deinit();
var last_message = timestamp();
var http = &self.app.http;
while (true) {
if (http.poll(20, socket)) {
const n = posix.read(socket, client.readBuf()) catch |err| {
log.warn(.app, "CDP read", .{ .err = err });
return;
};
if (n == 0) {
log.info(.app, "CDP disconnect", .{});
return;
}
const more = client.processData(n) catch false;
if (!more) {
return;
}
last_message = timestamp();
} else if (timestamp() - last_message > timeout_ms) {
log.info(.app, "CDP timeout", .{});
return;
}
// We have 3 types of "events":
// - Incoming CDP messages
// - Network events from the browser
// - Timeouts from the browser
// The call to http.poll above handles the first two (which is why
// we pass the client socket to it). But browser timeouts aren't
// hooked into that. So we need to go to the browser page (if there
// is one), and ask it to process any pending events. That action
// doesn't starve #2 (Network events from the browser), because
// page.wait() handles that too. But it does starve #1 (Incoming CDP
// messages). The good news is that, while the Page is mostly
// unaware of CDP, it will only block if it actually has something to
// do AND it knows if we're waiting on an intercept request, and will
// eagerly return control here in those cases.
if (client.mode == .cdp) {
client.mode.cdp.pageWait();
}
}
} }
}; };
// Client
// --------
pub const Client = struct { pub const Client = struct {
// The client is initially serving HTTP requests but, under normal circumstances // The client is initially serving HTTP requests but, under normal circumstances
// should eventually be upgraded to a websocket connections // should eventually be upgraded to a websocket connections
mode: Mode, mode: union(enum) {
http: void,
cdp: CDP,
},
// The CDP instance that processes messages from this client
// (a generic so we can test with a mock
// null until mode == .websocket
cdp: ?CDP,
// Our Server (a generic so we can test with a mock)
server: *Server, server: *Server,
reader: Reader(true), reader: Reader(true),
socket: posix.socket_t, socket: posix.socket_t,
last_active: std.time.Instant, socket_flags: usize,
send_arena: ArenaAllocator,
// queue of messages to send
send_queue: SendQueue,
send_queue_node_pool: std.heap.MemoryPool(SendQueue.Node),
read_pending: bool,
read_completion: Completion,
write_pending: bool,
write_completion: Completion,
timeout_pending: bool,
timeout_completion: Completion,
// Used along with xyx_pending to figure out the lifetime of
// the client. When connected == false and we have no more pending
// completions, we can kill the client
connected: bool,
const Mode = enum {
http,
websocket,
};
const EMPTY_PONG = [_]u8{ 138, 0 }; const EMPTY_PONG = [_]u8{ 138, 0 };
@@ -164,118 +182,29 @@ pub const Client = struct {
// "private-use" close codes must be from 4000-49999 // "private-use" close codes must be from 4000-49999
const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000 const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000
const SendQueue = std.DoublyLinkedList(Outgoing); fn init(socket: posix.socket_t, server: *Server) !Client {
const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0);
const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true }));
// we expect the socket to come to us as nonblocking
std.debug.assert(socket_flags & nonblocking == nonblocking);
fn init(socket: posix.socket_t, server: *Server) Client {
return .{ return .{
.cdp = null,
.mode = .http,
.socket = socket, .socket = socket,
.server = server, .server = server,
.last_active = now(), .mode = .{ .http = {} },
.send_queue = .{}, .socket_flags = socket_flags,
.read_pending = false,
.read_completion = undefined,
.write_pending = false,
.write_completion = undefined,
.timeout_pending = false,
.timeout_completion = undefined,
.connected = true,
.reader = .{ .allocator = server.allocator }, .reader = .{ .allocator = server.allocator },
.send_queue_node_pool = std.heap.MemoryPool(SendQueue.Node).init(server.allocator), .send_arena = ArenaAllocator.init(server.allocator),
}; };
} }
fn maybeDeinit(self: *Client) void { fn deinit(self: *Client) void {
if (self.read_pending or self.write_pending) { switch (self.mode) {
// We cannot do anything as long as we still have these pending .cdp => |*cdp| cdp.deinit(),
// They should not be pending for long as we're only here after .http => {},
// having shutdown the socket
return;
} }
// We don't have a read nor a write completion pending, we can start
// to shutdown.
self.reader.deinit(); self.reader.deinit();
var node = self.send_queue.first; self.send_arena.deinit();
while (node) |n| {
if (n.data.arena) |*arena| {
arena.deinit();
}
node = n.next;
}
if (self.cdp) |*cdp| {
cdp.deinit();
}
self.send_queue_node_pool.deinit();
posix.close(self.socket);
// let the client accept a new connection
self.server.queueAccept();
if (self.timeout_pending == false) {
// We also don't have a pending timeout, we can release the client.
// See callbackTimeout for more explanation about this. But, TL;DR
// we want to call `queueAccept` as soon as we have no more read/write
// but we don't want to wait for the timeout callback.
self.server.releaseClient(self);
}
}
fn close(self: *Client) void {
log.info(.app, "client disconnected", .{});
self.connected = false;
// recv only, because we might have pending writes we'd like to get
// out (like the HTTP error response)
posix.shutdown(self.socket, .recv) catch {};
self.maybeDeinit();
}
fn start(self: *Client) void {
self.queueRead();
self.queueTimeout();
}
fn queueRead(self: *Client) void {
self.server.loop.io.recv(
*Client,
self,
callbackRead,
&self.read_completion,
self.socket,
self.readBuf(),
);
self.read_pending = true;
}
fn callbackRead(self: *Client, _: *Completion, result: RecvError!usize) void {
self.read_pending = false;
if (self.connected == false) {
self.maybeDeinit();
return;
}
const size = result catch |err| {
log.err(.app, "server read error", .{ .err = err });
self.close();
return;
};
if (size == 0) {
self.close();
return;
}
const more = self.processData(size) catch {
self.close();
return;
};
// if more == false, the client is disconnecting
if (more) {
self.queueRead();
}
} }
fn readBuf(self: *Client) []u8 { fn readBuf(self: *Client) []u8 {
@@ -283,19 +212,15 @@ pub const Client = struct {
} }
fn processData(self: *Client, len: usize) !bool { fn processData(self: *Client, len: usize) !bool {
self.last_active = now();
self.reader.len += len; self.reader.len += len;
switch (self.mode) { switch (self.mode) {
.http => { .cdp => |*cdp| return self.processWebsocketMessage(cdp),
try self.processHTTPRequest(); .http => return self.processHTTPRequest(),
return true;
},
.websocket => return self.processWebsocketMessage(),
} }
} }
fn processHTTPRequest(self: *Client) !void { fn processHTTPRequest(self: *Client) !bool {
std.debug.assert(self.reader.pos == 0); std.debug.assert(self.reader.pos == 0);
const request = self.reader.buf[0..self.reader.len]; const request = self.reader.buf[0..self.reader.len];
@@ -307,10 +232,12 @@ pub const Client = struct {
// we're only expecting [body-less] GET requests. // we're only expecting [body-less] GET requests.
if (std.mem.endsWith(u8, request, "\r\n\r\n") == false) { if (std.mem.endsWith(u8, request, "\r\n\r\n") == false) {
// we need more data, put any more data here // we need more data, put any more data here
return; return true;
} }
self.handleHTTPRequest(request) catch |err| { // the next incoming data can go to the front of our buffer
defer self.reader.len = 0;
return self.handleHTTPRequest(request) catch |err| {
switch (err) { switch (err) {
error.NotFound => self.writeHTTPErrorResponse(404, "Not found"), error.NotFound => self.writeHTTPErrorResponse(404, "Not found"),
error.InvalidRequest => self.writeHTTPErrorResponse(400, "Invalid request"), error.InvalidRequest => self.writeHTTPErrorResponse(400, "Invalid request"),
@@ -326,12 +253,9 @@ pub const Client = struct {
} }
return err; return err;
}; };
// the next incoming data can go to the front of our buffer
self.reader.len = 0;
} }
fn handleHTTPRequest(self: *Client, request: []u8) !void { fn handleHTTPRequest(self: *Client, request: []u8) !bool {
if (request.len < 18) { if (request.len < 18) {
// 18 is [generously] the smallest acceptable HTTP request // 18 is [generously] the smallest acceptable HTTP request
return error.InvalidRequest; return error.InvalidRequest;
@@ -348,11 +272,12 @@ pub const Client = struct {
const url = request[4..url_end]; const url = request[4..url_end];
if (std.mem.eql(u8, url, "/")) { if (std.mem.eql(u8, url, "/")) {
return self.upgradeConnection(request); try self.upgradeConnection(request);
return true;
} }
if (std.mem.eql(u8, url, "/json/version")) { if (std.mem.eql(u8, url, "/json/version")) {
try self.send(null, self.server.json_version_response); try self.send(self.server.json_version_response);
// Chromedp (a Go driver) does an http request to /json/version // Chromedp (a Go driver) does an http request to /json/version
// then to / (websocket upgrade) using a different connection. // then to / (websocket upgrade) using a different connection.
// Since we only allow 1 connection at a time, the 2nd one (the // Since we only allow 1 connection at a time, the 2nd one (the
@@ -360,7 +285,7 @@ pub const Client = struct {
// We can avoid that by closing the connection. json_version_response // We can avoid that by closing the connection. json_version_response
// has a Connection: Close header too. // has a Connection: Close header too.
try posix.shutdown(self.socket, .recv); try posix.shutdown(self.socket, .recv);
return; return false;
} }
return error.NotFound; return error.NotFound;
@@ -426,8 +351,7 @@ pub const Client = struct {
// our caller has already made sure this request ended in \r\n\r\n // our caller has already made sure this request ended in \r\n\r\n
// so it isn't something we need to check again // so it isn't something we need to check again
var arena = ArenaAllocator.init(self.server.allocator); const allocator = self.send_arena.allocator();
errdefer arena.deinit();
const response = blk: { const response = blk: {
// Response to an ugprade request is always this, with // Response to an ugprade request is always this, with
@@ -442,7 +366,7 @@ pub const Client = struct {
// The response will be sent via the IO Loop and thus has to have its // The response will be sent via the IO Loop and thus has to have its
// own lifetime. // own lifetime.
const res = try arena.allocator().dupe(u8, template); const res = try allocator.dupe(u8, template);
// magic response // magic response
const key_pos = res.len - 32; const key_pos = res.len - 32;
@@ -458,9 +382,8 @@ pub const Client = struct {
break :blk res; break :blk res;
}; };
self.mode = .websocket; self.mode = .{ .cdp = try CDP.init(self.server.app, self) };
self.cdp = try CDP.init(self.server.app, self); return self.send(response);
return self.send(arena, response);
} }
fn writeHTTPErrorResponse(self: *Client, comptime status: u16, comptime body: []const u8) void { fn writeHTTPErrorResponse(self: *Client, comptime status: u16, comptime body: []const u8) void {
@@ -471,24 +394,21 @@ pub const Client = struct {
// we're going to close this connection anyways, swallowing any // we're going to close this connection anyways, swallowing any
// error seems safe // error seems safe
self.send(null, response) catch {}; self.send(response) catch {};
} }
fn processWebsocketMessage(self: *Client) !bool { fn processWebsocketMessage(self: *Client, cdp: *CDP) !bool {
errdefer self.close();
var reader = &self.reader; var reader = &self.reader;
while (true) { while (true) {
const msg = reader.next() catch |err| { const msg = reader.next() catch |err| {
switch (err) { switch (err) {
error.TooLarge => self.send(null, &CLOSE_TOO_BIG) catch {}, error.TooLarge => self.send(&CLOSE_TOO_BIG) catch {},
error.NotMasked => self.send(null, &CLOSE_PROTOCOL_ERROR) catch {}, error.NotMasked => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.ReservedFlags => self.send(null, &CLOSE_PROTOCOL_ERROR) catch {}, error.ReservedFlags => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.InvalidMessageType => self.send(null, &CLOSE_PROTOCOL_ERROR) catch {}, error.InvalidMessageType => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.ControlTooLarge => self.send(null, &CLOSE_PROTOCOL_ERROR) catch {}, error.ControlTooLarge => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.InvalidContinuation => self.send(null, &CLOSE_PROTOCOL_ERROR) catch {}, error.InvalidContinuation => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.NestedFragementation => self.send(null, &CLOSE_PROTOCOL_ERROR) catch {}, error.NestedFragementation => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.OutOfMemory => {}, // don't borther trying to send an error in this case error.OutOfMemory => {}, // don't borther trying to send an error in this case
} }
return err; return err;
@@ -498,12 +418,10 @@ pub const Client = struct {
.pong => {}, .pong => {},
.ping => try self.sendPong(msg.data), .ping => try self.sendPong(msg.data),
.close => { .close => {
self.send(null, &CLOSE_NORMAL) catch {}; self.send(&CLOSE_NORMAL) catch {};
self.close();
return false; return false;
}, },
.text, .binary => if (self.cdp.?.handleMessage(msg.data) == false) { .text, .binary => if (cdp.handleMessage(msg.data) == false) {
self.close();
return false; return false;
}, },
} }
@@ -520,18 +438,16 @@ pub const Client = struct {
fn sendPong(self: *Client, data: []const u8) !void { fn sendPong(self: *Client, data: []const u8) !void {
if (data.len == 0) { if (data.len == 0) {
return self.send(null, &EMPTY_PONG); return self.send(&EMPTY_PONG);
} }
var header_buf: [10]u8 = undefined; var header_buf: [10]u8 = undefined;
const header = websocketHeader(&header_buf, .pong, data.len); const header = websocketHeader(&header_buf, .pong, data.len);
var arena = ArenaAllocator.init(self.server.allocator); const allocator = self.send_arena.allocator();
errdefer arena.deinit(); var framed = try allocator.alloc(u8, header.len + data.len);
var framed = try arena.allocator().alloc(u8, header.len + data.len);
@memcpy(framed[0..header.len], header); @memcpy(framed[0..header.len], header);
@memcpy(framed[header.len..], data); @memcpy(framed[header.len..], data);
return self.send(arena, framed); return self.send(framed);
} }
// called by CDP // called by CDP
@@ -541,10 +457,7 @@ pub const Client = struct {
// buffer, where the first 10 bytes are reserved. We can then backfill // buffer, where the first 10 bytes are reserved. We can then backfill
// the header and send the slice. // the header and send the slice.
pub fn sendJSON(self: *Client, message: anytype, opts: std.json.StringifyOptions) !void { pub fn sendJSON(self: *Client, message: anytype, opts: std.json.StringifyOptions) !void {
var arena = ArenaAllocator.init(self.server.allocator); const allocator = self.send_arena.allocator();
errdefer arena.deinit();
const allocator = arena.allocator();
var buf: std.ArrayListUnmanaged(u8) = .{}; var buf: std.ArrayListUnmanaged(u8) = .{};
try buf.ensureTotalCapacity(allocator, 512); try buf.ensureTotalCapacity(allocator, 512);
@@ -554,144 +467,60 @@ pub const Client = struct {
try std.json.stringify(message, opts, buf.writer(allocator)); try std.json.stringify(message, opts, buf.writer(allocator));
const framed = fillWebsocketHeader(buf); const framed = fillWebsocketHeader(buf);
return self.send(arena, framed); return self.send(framed);
} }
pub fn sendJSONRaw( pub fn sendJSONRaw(
self: *Client, self: *Client,
arena: ArenaAllocator,
buf: std.ArrayListUnmanaged(u8), buf: std.ArrayListUnmanaged(u8),
) !void { ) !void {
// Dangerous API!. We assume the caller has reserved the first 10 // Dangerous API!. We assume the caller has reserved the first 10
// bytes in `buf`. // bytes in `buf`.
const framed = fillWebsocketHeader(buf); const framed = fillWebsocketHeader(buf);
return self.send(arena, framed); return self.send(framed);
} }
fn queueTimeout(self: *Client) void { fn send(self: *Client, data: []const u8) !void {
self.server.loop.io.timeout( var pos: usize = 0;
*Client, var changed_to_blocking: bool = false;
self, defer _ = self.send_arena.reset(.{ .retain_with_limit = 1024 * 32 });
callbackTimeout,
&self.timeout_completion,
TimeoutCheck,
);
self.timeout_pending = true;
}
fn callbackTimeout(self: *Client, _: *Completion, result: TimeoutError!void) void { defer if (changed_to_blocking) {
self.timeout_pending = false; // We had to change our socket to blocking me to get our write out
if (self.connected == false) { // We need to change it back to non-blocking.
if (self.read_pending == false and self.write_pending == false) { _ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags) catch |err| {
// Timeout is problematic. Ideally, we'd just call maybeDeinit log.err(.app, "CDP restore nonblocking", .{ .err = err });
// here and check for timeout_pending == true. But that would };
// mean not being able to accept a new connection until this
// callback fires - introducing a noticeable delay.
// So, when read_pending and write_pending are both false, we
// clean up as much as we can, and let the server accept a new
// connection but we keep the client around to handle this
// completion (if only we could cancel a completion!).
// If we're here, with connected == false, read_pending == false
// and write_pending == false, then everything has already been
// cleaned up, and we just need to release the client.
self.server.releaseClient(self);
}
return;
}
if (result) |_| {
if (now().since(self.last_active) >= self.server.timeout) {
log.info(.app, "client connection timeout", .{});
if (self.mode == .websocket) {
self.send(null, &CLOSE_TIMEOUT) catch {};
}
self.close();
return;
}
} else |err| {
log.err(.app, "server timeout error", .{ .err = err });
}
self.queueTimeout();
}
fn send(self: *Client, arena: ?ArenaAllocator, data: []const u8) !void {
const node = try self.send_queue_node_pool.create();
errdefer self.send_queue_node_pool.destroy(node);
node.data = Outgoing{
.arena = arena,
.to_send = data,
};
self.send_queue.append(node);
if (self.send_queue.len > 1) {
// if we already had a message in the queue, then our send loop
// is already setup.
return;
}
self.queueSend();
}
fn queueSend(self: *Client) void {
if (self.connected == false) {
return;
}
const node = self.send_queue.first orelse {
// no more messages to send;
return;
}; };
self.server.loop.io.send( LOOP: while (pos < data.len) {
*Client, const written = posix.write(self.socket, data[pos..]) catch |err| switch (err) {
self, error.WouldBlock => {
sendCallback, // self.socket is nonblocking, because we don't want to block
&self.write_completion, // reads. But our life is a lot easier if we block writes,
self.socket, // largely, because we don't have to maintain a queue of pending
node.data.to_send, // writes (which would each need their own allocations). So
); // if we get a WouldBlock error, we'll switch the socket to
self.write_pending = true; // blocking and switch it back to non-blocking after the write
} // is complete. Doesn't seem particularly efficiently, but
// this should virtually never happen.
std.debug.assert(changed_to_blocking == false);
log.debug(.app, "CDP write would block", .{});
changed_to_blocking = true;
_ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true })));
continue :LOOP;
},
else => return err,
};
fn sendCallback(self: *Client, _: *Completion, result: SendError!usize) void { if (written == 0) {
self.write_pending = false; return error.Closed;
if (self.connected == false) {
self.maybeDeinit();
return;
}
const sent = result catch |err| {
log.warn(.app, "server send error", .{ .err = err });
self.close();
return;
};
const node = self.send_queue.popFirst().?;
const outgoing = &node.data;
if (sent == outgoing.to_send.len) {
if (outgoing.arena) |*arena| {
arena.deinit();
} }
self.send_queue_node_pool.destroy(node); pos += written;
} else {
// oops, we shouldn't have popped this node off, we need
// to add it back to the front in order to send the unsent data
// (this is less likely to happen, which is why we eagerly
// pop it off)
std.debug.assert(sent < outgoing.to_send.len);
node.data.to_send = outgoing.to_send[sent..];
self.send_queue.prepend(node);
} }
self.queueSend();
} }
}; };
const Outgoing = struct {
to_send: []const u8,
arena: ?ArenaAllocator,
};
// WebSocket message reader. Given websocket message, acts as an iterator that // WebSocket message reader. Given websocket message, acts as an iterator that
// can return zero or more Messages. When next returns null, any incomplete // can return zero or more Messages. When next returns null, any incomplete
// message will remain in reader.data // message will remain in reader.data
@@ -1008,72 +837,6 @@ fn websocketHeader(buf: []u8, op_code: OpCode, payload_len: usize) []const u8 {
return buf[0..10]; return buf[0..10];
} }
pub fn run(
app: *App,
address: net.Address,
timeout: u64,
) !void {
// create socket
const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK;
const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
// TODO: Broken on darwin
// https://github.com/ziglang/zig/issues/17260 (fixed in Zig 0.14)
// if (@hasDecl(os.TCP, "NODELAY")) {
// try os.setsockopt(socket.sockfd.?, os.IPPROTO.TCP, os.TCP.NODELAY, &std.mem.toBytes(@as(c_int, 1)));
// }
try posix.setsockopt(listener, posix.IPPROTO.TCP, 1, &std.mem.toBytes(@as(c_int, 1)));
// bind & listen
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 1);
var loop = app.loop;
const allocator = app.allocator;
const json_version_response = try buildJSONVersionResponse(allocator, address);
defer allocator.free(json_version_response);
var server = Server{
.app = app,
.loop = loop,
.timeout = timeout,
.listener = listener,
.allocator = allocator,
.accept_completion = undefined,
.json_version_response = json_version_response,
};
defer server.deinit();
// accept an connection
server.queueAccept();
log.info(.app, "server running", .{ .address = address });
// infinite loop on I/O events, either:
// - cmd from incoming connection on server socket
// - JS callbacks events from scripts
// var http_client = app.http_client;
while (true) {
// @newhttp. This is a hack. We used to just have 1 loop, so we could
// sleep it it "forever" and any activity (message to this server,
// JS callback, http data) would wake it up.
// Now we have 2 loops. If we block on one, the other won't get woken
// up. We don't block "forever" but even 10ms adds a bunch of latency
// since this is called in a loop.
// Hopefully this is temporary and we can remove the io loop and then
// only have 1 loop. But, until then, we need to check both loops and
// pay some blocking penalty.
if (server.client) |client| {
if (client.cdp) |*cdp| {
cdp.pageWait();
}
}
try loop.io.run_for_ns(10 * std.time.ns_per_ms);
}
}
// Utils // Utils
// -------- // --------
@@ -1099,9 +862,9 @@ fn buildJSONVersionResponse(
return try std.fmt.allocPrint(allocator, response_format, .{ body_len, address }); return try std.fmt.allocPrint(allocator, response_format, .{ body_len, address });
} }
fn now() std.time.Instant { fn timestamp() u32 {
// can only fail on platforms we don't support const ts = std.posix.clock_gettime(std.posix.CLOCK.MONOTONIC) catch unreachable;
return std.time.Instant.now() catch unreachable; return @intCast(ts.sec);
} }
// In-place string lowercase // In-place string lowercase
@@ -1465,8 +1228,7 @@ const MockCDP = struct {
allocator: Allocator = testing.allocator, allocator: Allocator = testing.allocator,
fn init(_: Allocator, client: anytype, loop: *Loop) MockCDP { fn init(_: Allocator, client: anytype) MockCDP {
_ = loop;
_ = client; _ = client;
return .{}; return .{};
} }