Remove the loop

Previously, the IO loop was doing three things:
1 - Managing timeouts (either from scripts or for our own needs)
2 - Handling browser IO events (page/script/xhr)
3 - Handling CDP events (accept, read, write, timeout)

With the libcurl merge, 1 was moved to an in-process scheduler and 2 was moved
to libcurl's own event loop. That means the entire loop code, including
the dependency on tigerbeetle-io existed for handling a single TCP client.
Not only is that a lot of code, there was also friction between the two loops
(the libcurl one and our IO loop), which would result in latency - while one
loop is waiting for the events, any events on the other loop go un-processed.

This PR removes our IO loop. To accomplish this:

1 - The main accept loop is blocking. This is simpler and works perfectly well,
given we only allow 1 active connection.
2 - The client socket is passed to libcurl - yes, libcurl's loop can take
arbitrary FDs and poll them along with its own.

In addition to having one less dependency, the CDP code is quite a bit simpler,
especially around shutdowns and writes. This also removes _some_ of the latency
caused by the friction between page process and CDP processing. Specifically,
when CDP now blocks for input, http page events (script loading, xhr, ...) will
still be processed.

There's still friction. For one, the reverse isn't true: when the page is
waiting for events, CDP events aren't going to be processed. But the page.wait
already have some sensitivity to this (e.g. the page.request_intercepted flag).
Also, when CDP waits, while we will process network events, page timeouts are
still not processed. Because of both these remaining issues, we still need to
jump between the two loops - but being able to block on CDP (even for a short
time) WITHOUT stopping the page's network I/O, should reduce some latency.
This commit is contained in:
Karl Seguin
2025-08-25 17:27:28 +08:00
parent 390a21e4aa
commit 0959eea677
15 changed files with 266 additions and 828 deletions

View File

@@ -153,8 +153,6 @@ fn addDependencies(b: *Build, mod: *Build.Module, opts: *Build.Step.Options) !vo
.optimize = mod.optimize.?,
};
mod.addImport("tigerbeetle-io", b.dependency("tigerbeetle_io", .{}).module("tigerbeetle_io"));
mod.addIncludePath(b.path("vendor/lightpanda"));
{

View File

@@ -4,15 +4,10 @@
.version = "0.0.0",
.fingerprint = 0xda130f3af836cea0,
.dependencies = .{
.tigerbeetle_io = .{
.url = "https://github.com/lightpanda-io/tigerbeetle-io/archive/19ae89eb3814d48c202ac9e0495fc5cadb29dfe7.tar.gz",
.hash = "tigerbeetle_io-0.0.0-ViLgxjqSBADhuHO_RZm4yNzuoKDXWP39hDn60Kht40OC",
},
.v8 = .{
.url = "https://github.com/lightpanda-io/zig-v8-fork/archive/cf412d5b3d9d608582571d821e0d552337ef690d.tar.gz",
.hash = "v8-0.0.0-xddH69zDAwA4fp1dBo_jEDjS5bhXycPwRlZHp6_X890t",
},
//.v8 = .{ .path = "../zig-v8-fork" },
//.tigerbeetle_io = .{ .path = "../tigerbeetle-io" },
},
}

View File

@@ -4,7 +4,6 @@ const Allocator = std.mem.Allocator;
const log = @import("log.zig");
const Http = @import("http/Http.zig");
const Loop = @import("runtime/loop.zig").Loop;
const Platform = @import("runtime/js.zig").Platform;
const Telemetry = @import("telemetry/telemetry.zig").Telemetry;
@@ -14,7 +13,6 @@ const Notification = @import("notification.zig").Notification;
// might need.
pub const App = struct {
http: Http,
loop: *Loop,
config: Config,
platform: ?*const Platform,
allocator: Allocator,
@@ -45,12 +43,6 @@ pub const App = struct {
const app = try allocator.create(App);
errdefer allocator.destroy(app);
const loop = try allocator.create(Loop);
errdefer allocator.destroy(loop);
loop.* = try Loop.init(allocator);
errdefer loop.deinit();
const notification = try Notification.init(allocator, null);
errdefer notification.deinit();
@@ -68,7 +60,6 @@ pub const App = struct {
const app_dir_path = getAndMakeAppDir(allocator);
app.* = .{
.loop = loop,
.http = http,
.allocator = allocator,
.telemetry = undefined,
@@ -92,8 +83,6 @@ pub const App = struct {
allocator.free(app_dir_path);
}
self.telemetry.deinit();
self.loop.deinit();
allocator.destroy(self.loop);
self.notification.deinit();
self.http.deinit();
allocator.destroy(self);

View File

@@ -59,15 +59,15 @@ pub fn add(self: *Scheduler, ctx: *anyopaque, func: Task.Func, ms: u32, opts: Ad
});
}
pub fn runHighPriority(self: *Scheduler) !?u32 {
pub fn runHighPriority(self: *Scheduler) !?i32 {
return self.runQueue(&self.primary);
}
pub fn runLowPriority(self: *Scheduler) !?u32 {
pub fn runLowPriority(self: *Scheduler) !?i32 {
return self.runQueue(&self.secondary);
}
fn runQueue(self: *Scheduler, queue: *Queue) !?u32 {
fn runQueue(self: *Scheduler, queue: *Queue) !?i32 {
// this is O(1)
if (queue.count() == 0) {
return null;

View File

@@ -317,7 +317,7 @@ pub fn blockingGet(self: *ScriptManager, url: [:0]const u8) !BlockingResult {
// rely on http's timeout settings to avoid an endless/long loop.
while (true) {
try client.tick(200);
_ = try client.tick(.{ .timeout_ms = 200 });
switch (blocking.state) {
.running => {},
.done => |result| return result,

View File

@@ -51,10 +51,6 @@ const polyfill = @import("polyfill/polyfill.zig");
pub const Page = struct {
cookie_jar: *storage.CookieJar,
// Pre-configured http/cilent.zig used to make HTTP requests.
// @newhttp
// request_factory: RequestFactory,
session: *Session,
// An arena with a lifetime for the entire duration of the page
@@ -146,12 +142,9 @@ pub const Page = struct {
.scheduler = Scheduler.init(arena),
.keydown_event_node = .{ .func = keydownCallback },
.window_clicked_event_node = .{ .func = windowClicked },
// @newhttp
// .request_factory = browser.http_client.requestFactory(.{
// .notification = browser.notification,
// }),
.main_context = undefined,
};
self.main_context = try session.executor.createJsContext(&self.window, self, self, true, Env.GlobalMissingCallback.init(&self.polyfill_loader));
try polyfill.preload(self.arena, self.main_context);
@@ -269,7 +262,7 @@ pub const Page = struct {
return self.script_manager.blockingGet(src);
}
pub fn wait(self: *Page, wait_sec: usize) void {
pub fn wait(self: *Page, wait_sec: u16) void {
self._wait(wait_sec) catch |err| switch (err) {
error.JsError => {}, // already logged (with hopefully more context)
else => {
@@ -283,9 +276,9 @@ pub const Page = struct {
};
}
fn _wait(self: *Page, wait_sec: usize) !void {
var ms_remaining = wait_sec * 1000;
fn _wait(self: *Page, wait_sec: u16) !void {
var timer = try std.time.Timer.start();
var ms_remaining: i32 = @intCast(wait_sec * 1000);
var try_catch: Env.TryCatch = undefined;
try_catch.init(self.main_context);
@@ -320,7 +313,7 @@ pub const Page = struct {
}
// There should only be 1 active http transfer, the main page
try http_client.tick(ms_remaining);
_ = try http_client.tick(.{ .timeout_ms = ms_remaining });
},
.html, .parsed => {
// The HTML page was parsed. We now either have JS scripts to
@@ -381,7 +374,7 @@ pub const Page = struct {
// inflight requests
else @min(ms_remaining, ms_to_next_task orelse 1000);
try http_client.tick(ms_to_wait);
_ = try http_client.tick(.{ .timeout_ms = ms_to_wait });
if (request_intercepted) {
// Again, proritizing intercepted requests. Exit this
@@ -401,7 +394,7 @@ pub const Page = struct {
if (ms_elapsed >= ms_remaining) {
return;
}
ms_remaining -= ms_elapsed;
ms_remaining -= @intCast(ms_elapsed);
}
}

View File

@@ -137,7 +137,7 @@ pub const Session = struct {
return &(self.page orelse return null);
}
pub fn wait(self: *Session, wait_sec: usize) void {
pub fn wait(self: *Session, wait_sec: u16) void {
if (self.queued_navigation) |qn| {
// This was already aborted on the page, but it would be pretty
// bad if old requests went to the new page, so let's make double sure

View File

@@ -114,12 +114,9 @@ pub fn CDPT(comptime TypeProvider: type) type {
}
// @newhttp
// A bit hacky right now. The main server loop blocks only for CDP
// messages. It no longer blocks for page timeouts of page HTTP
// transfers. So we need to call this more ourselves.
// This is called after every message and [very hackily] from the server
// loop.
// This is hopefully temporary.
// A bit hacky right now. The main server loop doesn't unblock for
// scheduled task. So we run this directly in order to process any
// timeouts (or http events) which are ready to be processed.
pub fn pageWait(self: *Self) void {
const session = &(self.browser.session orelse return);
// exits early if there's nothing to do, so a large value like
@@ -592,8 +589,7 @@ pub fn BrowserContext(comptime CDP_T: type) type {
};
const cdp = self.cdp;
var arena = std.heap.ArenaAllocator.init(cdp.allocator);
errdefer arena.deinit();
const allocator = cdp.client.send_arena.allocator();
const field = ",\"sessionId\":\"";
@@ -602,7 +598,7 @@ pub fn BrowserContext(comptime CDP_T: type) type {
const message_len = msg.len + session_id.len + 1 + field.len + 10;
var buf: std.ArrayListUnmanaged(u8) = .{};
buf.ensureTotalCapacity(arena.allocator(), message_len) catch |err| {
buf.ensureTotalCapacity(allocator, message_len) catch |err| {
log.err(.cdp, "inspector buffer", .{ .err = err });
return;
};
@@ -617,7 +613,7 @@ pub fn BrowserContext(comptime CDP_T: type) type {
buf.appendSliceAssumeCapacity("\"}");
std.debug.assert(buf.items.len == message_len);
try cdp.client.sendJSONRaw(arena, buf);
try cdp.client.sendJSONRaw(buf);
}
};
}

View File

@@ -39,12 +39,14 @@ pub const Document = @import("../testing.zig").Document;
const Client = struct {
allocator: Allocator,
send_arena: ArenaAllocator,
sent: std.ArrayListUnmanaged(json.Value) = .{},
serialized: std.ArrayListUnmanaged([]const u8) = .{},
fn init(alloc: Allocator) Client {
return .{
.allocator = alloc,
.send_arena = ArenaAllocator.init(alloc),
};
}
@@ -58,7 +60,7 @@ const Client = struct {
try self.sent.append(self.allocator, value);
}
pub fn sendJSONRaw(self: *Client, _: ArenaAllocator, buf: std.ArrayListUnmanaged(u8)) !void {
pub fn sendJSONRaw(self: *Client, buf: std.ArrayListUnmanaged(u8)) !void {
const value = try json.parseFromSliceLeaky(json.Value, self.allocator, buf.items, .{});
try self.sent.append(self.allocator, value);
}

View File

@@ -27,6 +27,7 @@ const CookieJar = @import("../browser/storage/storage.zig").CookieJar;
const urlStitch = @import("../url.zig").stitch;
const c = Http.c;
const posix = std.posix;
const Allocator = std.mem.Allocator;
const ArenaAllocator = std.heap.ArenaAllocator;
@@ -167,10 +168,13 @@ pub fn abort(self: *Client) void {
}
}
pub fn tick(self: *Client, timeout_ms: usize) !void {
var handles = &self.handles;
const TickOpts = struct {
timeout_ms: i32 = 0,
poll_socket: ?posix.socket_t = null,
};
pub fn tick(self: *Client, opts: TickOpts) !bool {
while (true) {
if (handles.hasAvailable() == false) {
if (self.handles.hasAvailable() == false) {
break;
}
const queue_node = self.queue.popFirst() orelse break;
@@ -178,11 +182,10 @@ pub fn tick(self: *Client, timeout_ms: usize) !void {
self.queue_node_pool.destroy(queue_node);
// we know this exists, because we checked isEmpty() above
const handle = handles.getFreeHandle().?;
const handle = self.handles.getFreeHandle().?;
try self.makeRequest(handle, req);
}
try self.perform(@intCast(timeout_ms));
return self.perform(opts.timeout_ms, opts.poll_socket);
}
pub fn request(self: *Client, req: Request) !void {
@@ -343,16 +346,26 @@ fn makeRequest(self: *Client, handle: *Handle, transfer: *Transfer) !void {
}
self.active += 1;
return self.perform(0);
_ = try self.perform(0, null);
}
fn perform(self: *Client, timeout_ms: c_int) !void {
fn perform(self: *Client, timeout_ms: c_int, socket: ?posix.socket_t) !bool {
const multi = self.multi;
var running: c_int = undefined;
try errorMCheck(c.curl_multi_perform(multi, &running));
if (running > 0 and timeout_ms > 0) {
if (socket) |s| {
var wait_fd = c.curl_waitfd{
.fd = s,
.events = c.CURL_WAIT_POLLIN,
.revents = 0,
};
try errorMCheck(c.curl_multi_poll(multi, &wait_fd, 1, timeout_ms, null));
if (wait_fd.revents != 0) {
// the extra socket we passed in is ready, let's signal our caller
return true;
}
} else if (running > 0 and timeout_ms > 0) {
try errorMCheck(c.curl_multi_poll(multi, null, 0, timeout_ms, null));
}
@@ -388,6 +401,8 @@ fn perform(self: *Client, timeout_ms: c_int) !void {
self.requestFailed(transfer, err);
}
}
return false;
}
fn endTransfer(self: *Client, transfer: *Transfer) void {

View File

@@ -17,6 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const posix = std.posix;
pub const c = @cImport({
@cInclude("curl/curl.h");
@@ -26,6 +27,7 @@ pub const ENABLE_DEBUG = false;
pub const Client = @import("Client.zig");
pub const Transfer = Client.Transfer;
const log = @import("../log.zig");
const errors = @import("errors.zig");
const Allocator = std.mem.Allocator;
@@ -85,6 +87,16 @@ pub fn deinit(self: *Http) void {
self.arena.deinit();
}
pub fn poll(self: *Http, timeout_ms: i32, socket: posix.socket_t) bool {
return self.client.tick(.{
.timeout_ms = timeout_ms,
.poll_socket = socket,
}) catch |err| {
log.err(.app, "http poll", .{ .err = err });
return false;
};
}
pub fn newConnection(self: *Http) !Connection {
return Connection.init(self.ca_blob, &self.opts);
}

View File

@@ -21,8 +21,8 @@ const builtin = @import("builtin");
const Allocator = std.mem.Allocator;
const log = @import("log.zig");
const server = @import("server.zig");
const App = @import("app.zig").App;
const Server = @import("server.zig").Server;
const Http = @import("http/Http.zig");
const Platform = @import("runtime/js.zig").Platform;
const Browser = @import("browser/browser.zig").Browser;
@@ -103,8 +103,10 @@ fn run(alloc: Allocator) !void {
return args.printUsageAndExit(false);
};
const timeout = std.time.ns_per_s * @as(u64, opts.timeout);
server.run(app, address, timeout) catch |err| {
var server = try Server.init(app, address);
defer server.deinit();
server.run(address, opts.timeout) catch |err| {
log.fatal(.app, "server run error", .{ .err = err });
return err;
};
@@ -773,7 +775,9 @@ fn serveCDP(address: std.net.Address, platform: *const Platform) !void {
defer app.deinit();
test_wg.finish();
server.run(app, address, std.time.ns_per_s * 2) catch |err| {
var server = try Server.init(app, address);
defer server.deinit();
server.run(address, 5) catch |err| {
std.debug.print("CDP server error: {}", .{err});
return err;
};

View File

@@ -167,7 +167,7 @@ fn run(
var try_catch: Env.TryCatch = undefined;
try_catch.init(runner.page.main_context);
defer try_catch.deinit();
runner.page.wait(std.time.ns_per_ms * 200);
runner.page.wait(2);
if (try_catch.hasCaught()) {
err_out.* = (try try_catch.err(arena)) orelse "unknwon error";

View File

@@ -1,331 +0,0 @@
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const builtin = @import("builtin");
const MemoryPool = std.heap.MemoryPool;
const log = @import("../log.zig");
pub const IO = @import("tigerbeetle-io").IO;
const RUN_DURATION = 10 * std.time.ns_per_ms;
// SingleThreaded I/O Loop based on Tigerbeetle io_uring loop.
// On Linux it's using io_uring.
// On MacOS and Windows it's using kqueue/IOCP with a ring design.
// This is a thread-unsafe version without any lock on shared resources,
// use it only on a single thread.
// The loop provides I/O APIs based on callbacks.
// I/O APIs based on async/await might be added in the future.
pub const Loop = struct {
alloc: std.mem.Allocator, // TODO: unmanaged version ?
io: IO,
// number of pending network events we have
pending_network_count: usize,
// number of pending timeout events we have
pending_timeout_count: usize,
// Used to stop repeating timeouts when loop.run is called.
stopping: bool,
// ctx_id is incremented each time the loop is reset.
// All callbacks store an initial ctx_id and compare before execution.
// If a ctx is outdated, the callback is ignored.
// This is a weak way to cancel all future callbacks.
ctx_id: u32 = 0,
// We use this to track cancellation ids and, on the timeout callback,
// we can can check here to see if it's been cancelled.
cancelled: std.AutoHashMapUnmanaged(usize, void),
timeout_pool: MemoryPool(ContextTimeout),
event_callback_pool: MemoryPool(EventCallbackContext),
const Self = @This();
pub const Completion = IO.Completion;
pub const RecvError = IO.RecvError;
pub const SendError = IO.SendError;
pub const ConnectError = IO.ConnectError;
pub fn init(alloc: std.mem.Allocator) !Self {
return .{
.alloc = alloc,
.cancelled = .{},
.io = try IO.init(32, 0),
.stopping = false,
.pending_network_count = 0,
.pending_timeout_count = 0,
.timeout_pool = MemoryPool(ContextTimeout).init(alloc),
.event_callback_pool = MemoryPool(EventCallbackContext).init(alloc),
};
}
pub fn deinit(self: *Self) void {
self.reset();
// run tail events. We do run the tail events to ensure all the
// contexts are correcly free.
while (self.pending_network_count != 0 or self.pending_timeout_count != 0) {
self.io.run_for_ns(RUN_DURATION) catch |err| {
log.err(.loop, "deinit", .{ .err = err });
break;
};
}
if (comptime CANCEL_SUPPORTED) {
self.io.cancel_all();
}
self.io.deinit();
self.timeout_pool.deinit();
self.event_callback_pool.deinit();
self.cancelled.deinit(self.alloc);
}
// Retrieve all registred I/O events completed by OS kernel,
// and execute sequentially their callbacks.
// Stops when there is no more I/O events registered on the loop.
// Note that I/O events callbacks might register more I/O events
// on the go when they are executed (ie. nested I/O events).
pub fn run(self: *Self, wait_ns: usize) !void {
// stop repeating / interval timeouts from re-registering
self.stopping = true;
defer self.stopping = false;
const max_iterations = wait_ns / (RUN_DURATION);
for (0..max_iterations) |_| {
if (self.pending_network_count == 0 and self.pending_timeout_count == 0) {
break;
}
self.io.run_for_ns(std.time.ns_per_ms * 10) catch |err| {
log.err(.loop, "deinit", .{ .err = err });
break;
};
}
}
pub fn hasPendingTimeout(self: *Self) bool {
return self.pending_timeout_count > 0;
}
// JS callbacks APIs
// -----------------
// Timeout
// The state that we add to a timeout. This is what we get back from a
// timeoutCallback. It contains the function to execute. The user is expected
// to be able to turn a reference to this into whatever state it needs,
// probably by inserting this node into its own stae and using @fieldParentPtr
pub const CallbackNode = struct {
func: *const fn (node: *CallbackNode, repeat: *?u63) void,
};
const ContextTimeout = struct {
loop: *Self,
ctx_id: u32,
initial: bool = true,
callback_node: ?*CallbackNode,
};
fn timeoutCallback(
ctx: *ContextTimeout,
completion: *IO.Completion,
result: IO.TimeoutError!void,
) void {
var repeating = false;
const loop = ctx.loop;
if (ctx.initial) {
loop.pending_timeout_count -= 1;
}
defer {
if (repeating == false) {
loop.timeout_pool.destroy(ctx);
loop.alloc.destroy(completion);
}
}
if (loop.cancelled.remove(@intFromPtr(completion))) {
return;
}
// Abort if this completion was created for a different version of the loop.
if (ctx.ctx_id != loop.ctx_id) {
return;
}
// TODO: return the error to the callback
result catch |err| {
switch (err) {
error.Canceled => {},
else => log.err(.loop, "timeout callback error", .{ .err = err }),
}
return;
};
if (ctx.callback_node) |cn| {
var repeat_in: ?u63 = null;
cn.func(cn, &repeat_in);
if (loop.stopping == false) {
if (repeat_in) |r| {
// prevents our context and completion from being cleaned up
repeating = true;
ctx.initial = false;
loop.scheduleTimeout(r, ctx, completion);
}
}
}
}
pub fn timeout(self: *Self, nanoseconds: u63, callback_node: ?*CallbackNode) !usize {
if (self.stopping and nanoseconds > std.time.ns_per_ms * 500) {
// we're trying to shutdown, we probably don't want to wait for a new
// long timeout
return 0;
}
const completion = try self.alloc.create(Completion);
errdefer self.alloc.destroy(completion);
completion.* = undefined;
const ctx = try self.timeout_pool.create();
errdefer self.timeout_pool.destroy(ctx);
ctx.* = .{
.loop = self,
.ctx_id = self.ctx_id,
.callback_node = callback_node,
};
self.pending_timeout_count += 1;
self.scheduleTimeout(nanoseconds, ctx, completion);
return @intFromPtr(completion);
}
fn scheduleTimeout(self: *Self, nanoseconds: u63, ctx: *ContextTimeout, completion: *Completion) void {
self.io.timeout(*ContextTimeout, ctx, timeoutCallback, completion, nanoseconds);
}
pub fn cancel(self: *Self, id: usize) !void {
try self.cancelled.put(self.alloc, id, {});
}
// Reset all existing callbacks.
// The existing events will happen and their memory will be cleanup but the
// corresponding callbacks will not be called.
pub fn reset(self: *Self) void {
self.ctx_id += 1;
self.cancelled.clearRetainingCapacity();
}
// IO callbacks APIs
// -----------------
// Connect
pub fn connect(
self: *Self,
comptime Ctx: type,
ctx: *Ctx,
completion: *Completion,
comptime cbk: fn (ctx: *Ctx, _: *Completion, res: ConnectError!void) void,
socket: std.posix.socket_t,
address: std.net.Address,
) !void {
const onConnect = struct {
fn onConnect(callback: *EventCallbackContext, completion_: *Completion, res: ConnectError!void) void {
callback.loop.pending_network_count -= 1;
defer callback.loop.event_callback_pool.destroy(callback);
cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res);
}
}.onConnect;
const callback = try self.event_callback_pool.create();
errdefer self.event_callback_pool.destroy(callback);
callback.* = .{ .loop = self, .ctx = ctx };
self.pending_network_count += 1;
self.io.connect(*EventCallbackContext, callback, onConnect, completion, socket, address);
}
// Send
pub fn send(
self: *Self,
comptime Ctx: type,
ctx: *Ctx,
completion: *Completion,
comptime cbk: fn (ctx: *Ctx, completion: *Completion, res: SendError!usize) void,
socket: std.posix.socket_t,
buf: []const u8,
) !void {
const onSend = struct {
fn onSend(callback: *EventCallbackContext, completion_: *Completion, res: SendError!usize) void {
callback.loop.pending_network_count -= 1;
defer callback.loop.event_callback_pool.destroy(callback);
cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res);
}
}.onSend;
const callback = try self.event_callback_pool.create();
errdefer self.event_callback_pool.destroy(callback);
callback.* = .{ .loop = self, .ctx = ctx };
self.pending_network_count += 1;
self.io.send(*EventCallbackContext, callback, onSend, completion, socket, buf);
}
// Recv
pub fn recv(
self: *Self,
comptime Ctx: type,
ctx: *Ctx,
completion: *Completion,
comptime cbk: fn (ctx: *Ctx, completion: *Completion, res: RecvError!usize) void,
socket: std.posix.socket_t,
buf: []u8,
) !void {
const onRecv = struct {
fn onRecv(callback: *EventCallbackContext, completion_: *Completion, res: RecvError!usize) void {
callback.loop.pending_network_count -= 1;
defer callback.loop.event_callback_pool.destroy(callback);
cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res);
}
}.onRecv;
const callback = try self.event_callback_pool.create();
errdefer self.event_callback_pool.destroy(callback);
callback.* = .{ .loop = self, .ctx = ctx };
self.pending_network_count += 1;
self.io.recv(*EventCallbackContext, callback, onRecv, completion, socket, buf);
}
};
const EventCallbackContext = struct {
ctx: *anyopaque,
loop: *Loop,
};
const CANCEL_SUPPORTED = switch (builtin.target.os.tag) {
.linux => true,
.macos, .tvos, .watchos, .ios => false,
else => @compileError("IO is not supported for platform"),
};

View File

@@ -26,14 +26,6 @@ const Allocator = std.mem.Allocator;
const ArenaAllocator = std.heap.ArenaAllocator;
const log = @import("log.zig");
const IO = @import("runtime/loop.zig").IO;
const Completion = IO.Completion;
const AcceptError = IO.AcceptError;
const RecvError = IO.RecvError;
const SendError = IO.SendError;
const TimeoutError = IO.TimeoutError;
const Loop = @import("runtime/loop.zig").Loop;
const App = @import("app.zig").App;
const CDP = @import("cdp/cdp.zig").CDP;
@@ -46,114 +38,143 @@ const MAX_HTTP_REQUEST_SIZE = 4096;
// +140 for the max control packet that might be interleaved in a message
const MAX_MESSAGE_SIZE = 512 * 1024 + 14;
const Server = struct {
pub const Server = struct {
app: *App,
loop: *Loop,
allocator: Allocator,
client: ?*Client = null,
// internal fields
listener: posix.socket_t,
timeout: u64,
// I/O fields
accept_completion: Completion,
// The response to send on a GET /json/version request
client: ?posix.socket_t,
listener: ?posix.socket_t,
json_version_response: []const u8,
fn deinit(self: *Server) void {
_ = self;
}
pub fn init(app: *App, address: net.Address) !Server {
const allocator = app.allocator;
const json_version_response = try buildJSONVersionResponse(allocator, address);
errdefer allocator.free(json_version_response);
fn queueAccept(self: *Server) void {
log.debug(.app, "accepting connection", .{});
self.loop.io.accept(
*Server,
self,
callbackAccept,
&self.accept_completion,
self.listener,
);
}
fn callbackAccept(
self: *Server,
completion: *Completion,
result: AcceptError!posix.socket_t,
) void {
std.debug.assert(completion == &self.accept_completion);
self.doCallbackAccept(result) catch |err| {
log.err(.app, "server accept error", .{ .err = err });
self.queueAccept();
return .{
.app = app,
.client = null,
.listener = null,
.allocator = allocator,
.json_version_response = json_version_response,
};
}
fn doCallbackAccept(
self: *Server,
result: AcceptError!posix.socket_t,
) !void {
const socket = try result;
const client = try self.allocator.create(Client);
client.* = Client.init(socket, self);
client.start();
self.client = client;
pub fn deinit(self: *Server) void {
self.allocator.free(self.json_version_response);
if (self.listener) |listener| {
posix.close(listener);
}
}
pub fn run(self: *Server, address: net.Address, timeout_ms: i32) !void {
const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC;
const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP);
self.listener = listener;
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
// TODO: Broken on darwin
// https://github.com/ziglang/zig/issues/17260 (fixed in Zig 0.14)
// if (@hasDecl(os.TCP, "NODELAY")) {
// try os.setsockopt(socket.sockfd.?, os.IPPROTO.TCP, os.TCP.NODELAY, &std.mem.toBytes(@as(c_int, 1)));
// }
try posix.setsockopt(listener, posix.IPPROTO.TCP, 1, &std.mem.toBytes(@as(c_int, 1)));
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 1);
log.info(.app, "server running", .{ .address = address });
while (true) {
const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| {
log.err(.app, "CDP accept", .{ .err = err });
std.time.sleep(std.time.ns_per_s);
continue;
};
self.client = socket;
defer if (self.client) |s| {
posix.close(s);
self.client = null;
};
if (log.enabled(.app, .info)) {
var address: std.net.Address = undefined;
var client_address: std.net.Address = undefined;
var socklen: posix.socklen_t = @sizeOf(net.Address);
try std.posix.getsockname(socket, &address.any, &socklen);
log.info(.app, "client connected", .{ .ip = address });
try std.posix.getsockname(socket, &client_address.any, &socklen);
log.info(.app, "client connected", .{ .ip = client_address });
}
self.readLoop(socket, timeout_ms) catch |err| {
log.err(.app, "CDP client loop", .{ .err = err });
};
}
}
fn releaseClient(self: *Server, client: *Client) void {
self.allocator.destroy(client);
self.client = null;
fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: i32) !void {
// This shouldn't be necessary, but the Client is HUGE (> 512KB) because
// it has a large read buffer. I don't know why, but v8 crashes if this
// is on the stack (and I assume it's related to its size).
const client = try self.allocator.create(Client);
defer self.allocator.destroy(client);
client.* = try Client.init(socket, self);
defer client.deinit();
var last_message = timestamp();
var http = &self.app.http;
while (true) {
if (http.poll(20, socket)) {
const n = posix.read(socket, client.readBuf()) catch |err| {
log.warn(.app, "CDP read", .{ .err = err });
return;
};
if (n == 0) {
log.info(.app, "CDP disconnect", .{});
return;
}
const more = client.processData(n) catch false;
if (!more) {
return;
}
last_message = timestamp();
} else if (timestamp() - last_message > timeout_ms) {
log.info(.app, "CDP timeout", .{});
return;
}
// We have 3 types of "events":
// - Incoming CDP messages
// - Network events from the browser
// - Timeouts from the browser
// The call to http.poll above handles the first two (which is why
// we pass the client socket to it). But browser timeouts aren't
// hooked into that. So we need to go to the browser page (if there
// is one), and ask it to process any pending events. That action
// doesn't starve #2 (Network events from the browser), because
// page.wait() handles that too. But it does starve #1 (Incoming CDP
// messages). The good news is that, while the Page is mostly
// unaware of CDP, it will only block if it actually has something to
// do AND it knows if we're waiting on an intercept request, and will
// eagerly return control here in those cases.
if (client.mode == .cdp) {
client.mode.cdp.pageWait();
}
}
}
};
// Client
// --------
pub const Client = struct {
// The client is initially serving HTTP requests but, under normal circumstances
// should eventually be upgraded to a websocket connections
mode: Mode,
mode: union(enum) {
http: void,
cdp: CDP,
},
// The CDP instance that processes messages from this client
// (a generic so we can test with a mock
// null until mode == .websocket
cdp: ?CDP,
// Our Server (a generic so we can test with a mock)
server: *Server,
reader: Reader(true),
socket: posix.socket_t,
last_active: std.time.Instant,
// queue of messages to send
send_queue: SendQueue,
send_queue_node_pool: std.heap.MemoryPool(SendQueue.Node),
read_pending: bool,
read_completion: Completion,
write_pending: bool,
write_completion: Completion,
timeout_pending: bool,
timeout_completion: Completion,
// Used along with xyx_pending to figure out the lifetime of
// the client. When connected == false and we have no more pending
// completions, we can kill the client
connected: bool,
const Mode = enum {
http,
websocket,
};
socket_flags: usize,
send_arena: ArenaAllocator,
const EMPTY_PONG = [_]u8{ 138, 0 };
@@ -164,118 +185,29 @@ pub const Client = struct {
// "private-use" close codes must be from 4000-49999
const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000
const SendQueue = std.DoublyLinkedList(Outgoing);
fn init(socket: posix.socket_t, server: *Server) !Client {
const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0);
const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true }));
// we expect the socket to come to us as nonblocking
std.debug.assert(socket_flags & nonblocking == nonblocking);
fn init(socket: posix.socket_t, server: *Server) Client {
return .{
.cdp = null,
.mode = .http,
.socket = socket,
.server = server,
.last_active = now(),
.send_queue = .{},
.read_pending = false,
.read_completion = undefined,
.write_pending = false,
.write_completion = undefined,
.timeout_pending = false,
.timeout_completion = undefined,
.connected = true,
.mode = .{ .http = {} },
.socket_flags = socket_flags,
.reader = .{ .allocator = server.allocator },
.send_queue_node_pool = std.heap.MemoryPool(SendQueue.Node).init(server.allocator),
.send_arena = ArenaAllocator.init(server.allocator),
};
}
fn maybeDeinit(self: *Client) void {
if (self.read_pending or self.write_pending) {
// We cannot do anything as long as we still have these pending
// They should not be pending for long as we're only here after
// having shutdown the socket
return;
fn deinit(self: *Client) void {
switch (self.mode) {
.cdp => |*cdp| cdp.deinit(),
.http => {},
}
// We don't have a read nor a write completion pending, we can start
// to shutdown.
self.reader.deinit();
var node = self.send_queue.first;
while (node) |n| {
if (n.data.arena) |*arena| {
arena.deinit();
}
node = n.next;
}
if (self.cdp) |*cdp| {
cdp.deinit();
}
self.send_queue_node_pool.deinit();
posix.close(self.socket);
// let the client accept a new connection
self.server.queueAccept();
if (self.timeout_pending == false) {
// We also don't have a pending timeout, we can release the client.
// See callbackTimeout for more explanation about this. But, TL;DR
// we want to call `queueAccept` as soon as we have no more read/write
// but we don't want to wait for the timeout callback.
self.server.releaseClient(self);
}
}
fn close(self: *Client) void {
log.info(.app, "client disconnected", .{});
self.connected = false;
// recv only, because we might have pending writes we'd like to get
// out (like the HTTP error response)
posix.shutdown(self.socket, .recv) catch {};
self.maybeDeinit();
}
fn start(self: *Client) void {
self.queueRead();
self.queueTimeout();
}
fn queueRead(self: *Client) void {
self.server.loop.io.recv(
*Client,
self,
callbackRead,
&self.read_completion,
self.socket,
self.readBuf(),
);
self.read_pending = true;
}
fn callbackRead(self: *Client, _: *Completion, result: RecvError!usize) void {
self.read_pending = false;
if (self.connected == false) {
self.maybeDeinit();
return;
}
const size = result catch |err| {
log.err(.app, "server read error", .{ .err = err });
self.close();
return;
};
if (size == 0) {
self.close();
return;
}
const more = self.processData(size) catch {
self.close();
return;
};
// if more == false, the client is disconnecting
if (more) {
self.queueRead();
}
self.send_arena.deinit();
}
fn readBuf(self: *Client) []u8 {
@@ -283,19 +215,15 @@ pub const Client = struct {
}
fn processData(self: *Client, len: usize) !bool {
self.last_active = now();
self.reader.len += len;
switch (self.mode) {
.http => {
try self.processHTTPRequest();
return true;
},
.websocket => return self.processWebsocketMessage(),
.cdp => |*cdp| return self.processWebsocketMessage(cdp),
.http => return self.processHTTPRequest(),
}
}
fn processHTTPRequest(self: *Client) !void {
fn processHTTPRequest(self: *Client) !bool {
std.debug.assert(self.reader.pos == 0);
const request = self.reader.buf[0..self.reader.len];
@@ -307,10 +235,12 @@ pub const Client = struct {
// we're only expecting [body-less] GET requests.
if (std.mem.endsWith(u8, request, "\r\n\r\n") == false) {
// we need more data, put any more data here
return;
return true;
}
self.handleHTTPRequest(request) catch |err| {
// the next incoming data can go to the front of our buffer
defer self.reader.len = 0;
return self.handleHTTPRequest(request) catch |err| {
switch (err) {
error.NotFound => self.writeHTTPErrorResponse(404, "Not found"),
error.InvalidRequest => self.writeHTTPErrorResponse(400, "Invalid request"),
@@ -326,12 +256,9 @@ pub const Client = struct {
}
return err;
};
// the next incoming data can go to the front of our buffer
self.reader.len = 0;
}
fn handleHTTPRequest(self: *Client, request: []u8) !void {
fn handleHTTPRequest(self: *Client, request: []u8) !bool {
if (request.len < 18) {
// 18 is [generously] the smallest acceptable HTTP request
return error.InvalidRequest;
@@ -348,11 +275,12 @@ pub const Client = struct {
const url = request[4..url_end];
if (std.mem.eql(u8, url, "/")) {
return self.upgradeConnection(request);
try self.upgradeConnection(request);
return true;
}
if (std.mem.eql(u8, url, "/json/version")) {
try self.send(null, self.server.json_version_response);
try self.send(self.server.json_version_response);
// Chromedp (a Go driver) does an http request to /json/version
// then to / (websocket upgrade) using a different connection.
// Since we only allow 1 connection at a time, the 2nd one (the
@@ -360,7 +288,7 @@ pub const Client = struct {
// We can avoid that by closing the connection. json_version_response
// has a Connection: Close header too.
try posix.shutdown(self.socket, .recv);
return;
return false;
}
return error.NotFound;
@@ -426,8 +354,7 @@ pub const Client = struct {
// our caller has already made sure this request ended in \r\n\r\n
// so it isn't something we need to check again
var arena = ArenaAllocator.init(self.server.allocator);
errdefer arena.deinit();
const allocator = self.send_arena.allocator();
const response = blk: {
// Response to an ugprade request is always this, with
@@ -442,7 +369,7 @@ pub const Client = struct {
// The response will be sent via the IO Loop and thus has to have its
// own lifetime.
const res = try arena.allocator().dupe(u8, template);
const res = try allocator.dupe(u8, template);
// magic response
const key_pos = res.len - 32;
@@ -458,9 +385,8 @@ pub const Client = struct {
break :blk res;
};
self.mode = .websocket;
self.cdp = try CDP.init(self.server.app, self);
return self.send(arena, response);
self.mode = .{ .cdp = try CDP.init(self.server.app, self) };
return self.send(response);
}
fn writeHTTPErrorResponse(self: *Client, comptime status: u16, comptime body: []const u8) void {
@@ -471,24 +397,21 @@ pub const Client = struct {
// we're going to close this connection anyways, swallowing any
// error seems safe
self.send(null, response) catch {};
self.send(response) catch {};
}
fn processWebsocketMessage(self: *Client) !bool {
errdefer self.close();
fn processWebsocketMessage(self: *Client, cdp: *CDP) !bool {
var reader = &self.reader;
while (true) {
const msg = reader.next() catch |err| {
switch (err) {
error.TooLarge => self.send(null, &CLOSE_TOO_BIG) catch {},
error.NotMasked => self.send(null, &CLOSE_PROTOCOL_ERROR) catch {},
error.ReservedFlags => self.send(null, &CLOSE_PROTOCOL_ERROR) catch {},
error.InvalidMessageType => self.send(null, &CLOSE_PROTOCOL_ERROR) catch {},
error.ControlTooLarge => self.send(null, &CLOSE_PROTOCOL_ERROR) catch {},
error.InvalidContinuation => self.send(null, &CLOSE_PROTOCOL_ERROR) catch {},
error.NestedFragementation => self.send(null, &CLOSE_PROTOCOL_ERROR) catch {},
error.TooLarge => self.send(&CLOSE_TOO_BIG) catch {},
error.NotMasked => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.ReservedFlags => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.InvalidMessageType => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.ControlTooLarge => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.InvalidContinuation => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.NestedFragementation => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.OutOfMemory => {}, // don't borther trying to send an error in this case
}
return err;
@@ -498,12 +421,10 @@ pub const Client = struct {
.pong => {},
.ping => try self.sendPong(msg.data),
.close => {
self.send(null, &CLOSE_NORMAL) catch {};
self.close();
self.send(&CLOSE_NORMAL) catch {};
return false;
},
.text, .binary => if (self.cdp.?.handleMessage(msg.data) == false) {
self.close();
.text, .binary => if (cdp.handleMessage(msg.data) == false) {
return false;
},
}
@@ -520,18 +441,16 @@ pub const Client = struct {
fn sendPong(self: *Client, data: []const u8) !void {
if (data.len == 0) {
return self.send(null, &EMPTY_PONG);
return self.send(&EMPTY_PONG);
}
var header_buf: [10]u8 = undefined;
const header = websocketHeader(&header_buf, .pong, data.len);
var arena = ArenaAllocator.init(self.server.allocator);
errdefer arena.deinit();
var framed = try arena.allocator().alloc(u8, header.len + data.len);
const allocator = self.send_arena.allocator();
var framed = try allocator.alloc(u8, header.len + data.len);
@memcpy(framed[0..header.len], header);
@memcpy(framed[header.len..], data);
return self.send(arena, framed);
return self.send(framed);
}
// called by CDP
@@ -541,10 +460,7 @@ pub const Client = struct {
// buffer, where the first 10 bytes are reserved. We can then backfill
// the header and send the slice.
pub fn sendJSON(self: *Client, message: anytype, opts: std.json.StringifyOptions) !void {
var arena = ArenaAllocator.init(self.server.allocator);
errdefer arena.deinit();
const allocator = arena.allocator();
const allocator = self.send_arena.allocator();
var buf: std.ArrayListUnmanaged(u8) = .{};
try buf.ensureTotalCapacity(allocator, 512);
@@ -554,144 +470,60 @@ pub const Client = struct {
try std.json.stringify(message, opts, buf.writer(allocator));
const framed = fillWebsocketHeader(buf);
return self.send(arena, framed);
return self.send(framed);
}
pub fn sendJSONRaw(
self: *Client,
arena: ArenaAllocator,
buf: std.ArrayListUnmanaged(u8),
) !void {
// Dangerous API!. We assume the caller has reserved the first 10
// bytes in `buf`.
const framed = fillWebsocketHeader(buf);
return self.send(arena, framed);
return self.send(framed);
}
fn queueTimeout(self: *Client) void {
self.server.loop.io.timeout(
*Client,
self,
callbackTimeout,
&self.timeout_completion,
TimeoutCheck,
);
self.timeout_pending = true;
}
fn send(self: *Client, data: []const u8) !void {
var pos: usize = 0;
var changed_to_blocking: bool = false;
defer _ = self.send_arena.reset(.{ .retain_with_limit = 1024 * 32 });
fn callbackTimeout(self: *Client, _: *Completion, result: TimeoutError!void) void {
self.timeout_pending = false;
if (self.connected == false) {
if (self.read_pending == false and self.write_pending == false) {
// Timeout is problematic. Ideally, we'd just call maybeDeinit
// here and check for timeout_pending == true. But that would
// mean not being able to accept a new connection until this
// callback fires - introducing a noticeable delay.
// So, when read_pending and write_pending are both false, we
// clean up as much as we can, and let the server accept a new
// connection but we keep the client around to handle this
// completion (if only we could cancel a completion!).
// If we're here, with connected == false, read_pending == false
// and write_pending == false, then everything has already been
// cleaned up, and we just need to release the client.
self.server.releaseClient(self);
}
return;
}
if (result) |_| {
if (now().since(self.last_active) >= self.server.timeout) {
log.info(.app, "client connection timeout", .{});
if (self.mode == .websocket) {
self.send(null, &CLOSE_TIMEOUT) catch {};
}
self.close();
return;
}
} else |err| {
log.err(.app, "server timeout error", .{ .err = err });
}
self.queueTimeout();
}
fn send(self: *Client, arena: ?ArenaAllocator, data: []const u8) !void {
const node = try self.send_queue_node_pool.create();
errdefer self.send_queue_node_pool.destroy(node);
node.data = Outgoing{
.arena = arena,
.to_send = data,
defer if (changed_to_blocking) {
// We had to change our socket to blocking me to get our write out
// We need to change it back to non-blocking.
_ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags) catch |err| {
log.err(.app, "CDP restore nonblocking", .{ .err = err });
};
self.send_queue.append(node);
if (self.send_queue.len > 1) {
// if we already had a message in the queue, then our send loop
// is already setup.
return;
}
self.queueSend();
}
fn queueSend(self: *Client) void {
if (self.connected == false) {
return;
}
const node = self.send_queue.first orelse {
// no more messages to send;
return;
};
self.server.loop.io.send(
*Client,
self,
sendCallback,
&self.write_completion,
self.socket,
node.data.to_send,
);
self.write_pending = true;
}
fn sendCallback(self: *Client, _: *Completion, result: SendError!usize) void {
self.write_pending = false;
if (self.connected == false) {
self.maybeDeinit();
return;
}
const sent = result catch |err| {
log.warn(.app, "server send error", .{ .err = err });
self.close();
return;
LOOP: while (pos < data.len) {
const written = posix.write(self.socket, data[pos..]) catch |err| switch (err) {
error.WouldBlock => {
// self.socket is nonblocking, because we don't want to block
// reads. But our life is a lot easier if we block writes,
// largely, because we don't have to maintain a queue of pending
// writes (which would each need their own allocations). So
// if we get a WouldBlock error, we'll switch the socket to
// blocking and switch it back to non-blocking after the write
// is complete. Doesn't seem particularly efficiently, but
// this should virtually never happen.
std.debug.assert(changed_to_blocking == false);
log.debug(.app, "CDP write would block", .{});
changed_to_blocking = true;
_ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true })));
continue :LOOP;
},
else => return err,
};
const node = self.send_queue.popFirst().?;
const outgoing = &node.data;
if (sent == outgoing.to_send.len) {
if (outgoing.arena) |*arena| {
arena.deinit();
if (written == 0) {
return error.Closed;
}
self.send_queue_node_pool.destroy(node);
} else {
// oops, we shouldn't have popped this node off, we need
// to add it back to the front in order to send the unsent data
// (this is less likely to happen, which is why we eagerly
// pop it off)
std.debug.assert(sent < outgoing.to_send.len);
node.data.to_send = outgoing.to_send[sent..];
self.send_queue.prepend(node);
pos += written;
}
self.queueSend();
}
};
const Outgoing = struct {
to_send: []const u8,
arena: ?ArenaAllocator,
};
// WebSocket message reader. Given websocket message, acts as an iterator that
// can return zero or more Messages. When next returns null, any incomplete
// message will remain in reader.data
@@ -1008,72 +840,6 @@ fn websocketHeader(buf: []u8, op_code: OpCode, payload_len: usize) []const u8 {
return buf[0..10];
}
pub fn run(
app: *App,
address: net.Address,
timeout: u64,
) !void {
// create socket
const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK;
const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP);
defer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
// TODO: Broken on darwin
// https://github.com/ziglang/zig/issues/17260 (fixed in Zig 0.14)
// if (@hasDecl(os.TCP, "NODELAY")) {
// try os.setsockopt(socket.sockfd.?, os.IPPROTO.TCP, os.TCP.NODELAY, &std.mem.toBytes(@as(c_int, 1)));
// }
try posix.setsockopt(listener, posix.IPPROTO.TCP, 1, &std.mem.toBytes(@as(c_int, 1)));
// bind & listen
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 1);
var loop = app.loop;
const allocator = app.allocator;
const json_version_response = try buildJSONVersionResponse(allocator, address);
defer allocator.free(json_version_response);
var server = Server{
.app = app,
.loop = loop,
.timeout = timeout,
.listener = listener,
.allocator = allocator,
.accept_completion = undefined,
.json_version_response = json_version_response,
};
defer server.deinit();
// accept an connection
server.queueAccept();
log.info(.app, "server running", .{ .address = address });
// infinite loop on I/O events, either:
// - cmd from incoming connection on server socket
// - JS callbacks events from scripts
// var http_client = app.http_client;
while (true) {
// @newhttp. This is a hack. We used to just have 1 loop, so we could
// sleep it it "forever" and any activity (message to this server,
// JS callback, http data) would wake it up.
// Now we have 2 loops. If we block on one, the other won't get woken
// up. We don't block "forever" but even 10ms adds a bunch of latency
// since this is called in a loop.
// Hopefully this is temporary and we can remove the io loop and then
// only have 1 loop. But, until then, we need to check both loops and
// pay some blocking penalty.
if (server.client) |client| {
if (client.cdp) |*cdp| {
cdp.pageWait();
}
}
try loop.io.run_for_ns(10 * std.time.ns_per_ms);
}
}
// Utils
// --------
@@ -1099,9 +865,9 @@ fn buildJSONVersionResponse(
return try std.fmt.allocPrint(allocator, response_format, .{ body_len, address });
}
fn now() std.time.Instant {
// can only fail on platforms we don't support
return std.time.Instant.now() catch unreachable;
fn timestamp() u32 {
const ts = std.posix.clock_gettime(std.posix.CLOCK.MONOTONIC) catch unreachable;
return @intCast(ts.sec);
}
// In-place string lowercase
@@ -1465,8 +1231,7 @@ const MockCDP = struct {
allocator: Allocator = testing.allocator,
fn init(_: Allocator, client: anytype, loop: *Loop) MockCDP {
_ = loop;
fn init(_: Allocator, client: anytype) MockCDP {
_ = client;
return .{};
}