diff --git a/build.zig b/build.zig
index 141c3859..f7972134 100644
--- a/build.zig
+++ b/build.zig
@@ -153,8 +153,6 @@ fn addDependencies(b: *Build, mod: *Build.Module, opts: *Build.Step.Options) !vo
.optimize = mod.optimize.?,
};
- mod.addImport("tigerbeetle-io", b.dependency("tigerbeetle_io", .{}).module("tigerbeetle_io"));
-
mod.addIncludePath(b.path("vendor/lightpanda"));
{
diff --git a/build.zig.zon b/build.zig.zon
index 9940a9bb..05165cab 100644
--- a/build.zig.zon
+++ b/build.zig.zon
@@ -4,15 +4,10 @@
.version = "0.0.0",
.fingerprint = 0xda130f3af836cea0,
.dependencies = .{
- .tigerbeetle_io = .{
- .url = "https://github.com/lightpanda-io/tigerbeetle-io/archive/19ae89eb3814d48c202ac9e0495fc5cadb29dfe7.tar.gz",
- .hash = "tigerbeetle_io-0.0.0-ViLgxjqSBADhuHO_RZm4yNzuoKDXWP39hDn60Kht40OC",
- },
.v8 = .{
.url = "https://github.com/lightpanda-io/zig-v8-fork/archive/cf412d5b3d9d608582571d821e0d552337ef690d.tar.gz",
.hash = "v8-0.0.0-xddH69zDAwA4fp1dBo_jEDjS5bhXycPwRlZHp6_X890t",
},
//.v8 = .{ .path = "../zig-v8-fork" },
- //.tigerbeetle_io = .{ .path = "../tigerbeetle-io" },
},
}
diff --git a/src/app.zig b/src/app.zig
index a1d7ee30..1ba6e234 100644
--- a/src/app.zig
+++ b/src/app.zig
@@ -4,7 +4,6 @@ const Allocator = std.mem.Allocator;
const log = @import("log.zig");
const Http = @import("http/Http.zig");
-const Loop = @import("runtime/loop.zig").Loop;
const Platform = @import("runtime/js.zig").Platform;
const Telemetry = @import("telemetry/telemetry.zig").Telemetry;
@@ -14,7 +13,6 @@ const Notification = @import("notification.zig").Notification;
// might need.
pub const App = struct {
http: Http,
- loop: *Loop,
config: Config,
platform: ?*const Platform,
allocator: Allocator,
@@ -45,12 +43,6 @@ pub const App = struct {
const app = try allocator.create(App);
errdefer allocator.destroy(app);
- const loop = try allocator.create(Loop);
- errdefer allocator.destroy(loop);
-
- loop.* = try Loop.init(allocator);
- errdefer loop.deinit();
-
const notification = try Notification.init(allocator, null);
errdefer notification.deinit();
@@ -68,7 +60,6 @@ pub const App = struct {
const app_dir_path = getAndMakeAppDir(allocator);
app.* = .{
- .loop = loop,
.http = http,
.allocator = allocator,
.telemetry = undefined,
@@ -92,8 +83,6 @@ pub const App = struct {
allocator.free(app_dir_path);
}
self.telemetry.deinit();
- self.loop.deinit();
- allocator.destroy(self.loop);
self.notification.deinit();
self.http.deinit();
allocator.destroy(self);
diff --git a/src/browser/Scheduler.zig b/src/browser/Scheduler.zig
index fbdb8105..2c2d90ed 100644
--- a/src/browser/Scheduler.zig
+++ b/src/browser/Scheduler.zig
@@ -59,15 +59,15 @@ pub fn add(self: *Scheduler, ctx: *anyopaque, func: Task.Func, ms: u32, opts: Ad
});
}
-pub fn runHighPriority(self: *Scheduler) !?u32 {
+pub fn runHighPriority(self: *Scheduler) !?i32 {
return self.runQueue(&self.primary);
}
-pub fn runLowPriority(self: *Scheduler) !?u32 {
+pub fn runLowPriority(self: *Scheduler) !?i32 {
return self.runQueue(&self.secondary);
}
-fn runQueue(self: *Scheduler, queue: *Queue) !?u32 {
+fn runQueue(self: *Scheduler, queue: *Queue) !?i32 {
// this is O(1)
if (queue.count() == 0) {
return null;
diff --git a/src/browser/ScriptManager.zig b/src/browser/ScriptManager.zig
index bbed5633..ebd98b9d 100644
--- a/src/browser/ScriptManager.zig
+++ b/src/browser/ScriptManager.zig
@@ -317,7 +317,7 @@ pub fn blockingGet(self: *ScriptManager, url: [:0]const u8) !BlockingResult {
// rely on http's timeout settings to avoid an endless/long loop.
while (true) {
- try client.tick(200);
+ _ = try client.tick(.{ .timeout_ms = 200 });
switch (blocking.state) {
.running => {},
.done => |result| return result,
diff --git a/src/browser/page.zig b/src/browser/page.zig
index a533091f..726e0ee3 100644
--- a/src/browser/page.zig
+++ b/src/browser/page.zig
@@ -51,10 +51,6 @@ const polyfill = @import("polyfill/polyfill.zig");
pub const Page = struct {
cookie_jar: *storage.CookieJar,
- // Pre-configured http/cilent.zig used to make HTTP requests.
- // @newhttp
- // request_factory: RequestFactory,
-
session: *Session,
// An arena with a lifetime for the entire duration of the page
@@ -146,12 +142,9 @@ pub const Page = struct {
.scheduler = Scheduler.init(arena),
.keydown_event_node = .{ .func = keydownCallback },
.window_clicked_event_node = .{ .func = windowClicked },
- // @newhttp
- // .request_factory = browser.http_client.requestFactory(.{
- // .notification = browser.notification,
- // }),
.main_context = undefined,
};
+
self.main_context = try session.executor.createJsContext(&self.window, self, self, true, Env.GlobalMissingCallback.init(&self.polyfill_loader));
try polyfill.preload(self.arena, self.main_context);
@@ -269,7 +262,7 @@ pub const Page = struct {
return self.script_manager.blockingGet(src);
}
- pub fn wait(self: *Page, wait_sec: usize) void {
+ pub fn wait(self: *Page, wait_sec: u16) void {
self._wait(wait_sec) catch |err| switch (err) {
error.JsError => {}, // already logged (with hopefully more context)
else => {
@@ -283,9 +276,9 @@ pub const Page = struct {
};
}
- fn _wait(self: *Page, wait_sec: usize) !void {
- var ms_remaining = wait_sec * 1000;
+ fn _wait(self: *Page, wait_sec: u16) !void {
var timer = try std.time.Timer.start();
+ var ms_remaining: i32 = @intCast(wait_sec * 1000);
var try_catch: Env.TryCatch = undefined;
try_catch.init(self.main_context);
@@ -320,7 +313,7 @@ pub const Page = struct {
}
// There should only be 1 active http transfer, the main page
- try http_client.tick(ms_remaining);
+ _ = try http_client.tick(.{ .timeout_ms = ms_remaining });
},
.html, .parsed => {
// The HTML page was parsed. We now either have JS scripts to
@@ -381,7 +374,7 @@ pub const Page = struct {
// inflight requests
else @min(ms_remaining, ms_to_next_task orelse 1000);
- try http_client.tick(ms_to_wait);
+ _ = try http_client.tick(.{ .timeout_ms = ms_to_wait });
if (request_intercepted) {
// Again, proritizing intercepted requests. Exit this
@@ -401,7 +394,7 @@ pub const Page = struct {
if (ms_elapsed >= ms_remaining) {
return;
}
- ms_remaining -= ms_elapsed;
+ ms_remaining -= @intCast(ms_elapsed);
}
}
diff --git a/src/browser/session.zig b/src/browser/session.zig
index 21b5f67f..0ca39cce 100644
--- a/src/browser/session.zig
+++ b/src/browser/session.zig
@@ -137,7 +137,7 @@ pub const Session = struct {
return &(self.page orelse return null);
}
- pub fn wait(self: *Session, wait_sec: usize) void {
+ pub fn wait(self: *Session, wait_sec: u16) void {
if (self.queued_navigation) |qn| {
// This was already aborted on the page, but it would be pretty
// bad if old requests went to the new page, so let's make double sure
diff --git a/src/cdp/cdp.zig b/src/cdp/cdp.zig
index 536d4b99..bb38786d 100644
--- a/src/cdp/cdp.zig
+++ b/src/cdp/cdp.zig
@@ -114,12 +114,9 @@ pub fn CDPT(comptime TypeProvider: type) type {
}
// @newhttp
- // A bit hacky right now. The main server loop blocks only for CDP
- // messages. It no longer blocks for page timeouts of page HTTP
- // transfers. So we need to call this more ourselves.
- // This is called after every message and [very hackily] from the server
- // loop.
- // This is hopefully temporary.
+ // A bit hacky right now. The main server loop doesn't unblock for
+ // scheduled task. So we run this directly in order to process any
+ // timeouts (or http events) which are ready to be processed.
pub fn pageWait(self: *Self) void {
const session = &(self.browser.session orelse return);
// exits early if there's nothing to do, so a large value like
@@ -592,8 +589,7 @@ pub fn BrowserContext(comptime CDP_T: type) type {
};
const cdp = self.cdp;
- var arena = std.heap.ArenaAllocator.init(cdp.allocator);
- errdefer arena.deinit();
+ const allocator = cdp.client.send_arena.allocator();
const field = ",\"sessionId\":\"";
@@ -602,7 +598,7 @@ pub fn BrowserContext(comptime CDP_T: type) type {
const message_len = msg.len + session_id.len + 1 + field.len + 10;
var buf: std.ArrayListUnmanaged(u8) = .{};
- buf.ensureTotalCapacity(arena.allocator(), message_len) catch |err| {
+ buf.ensureTotalCapacity(allocator, message_len) catch |err| {
log.err(.cdp, "inspector buffer", .{ .err = err });
return;
};
@@ -617,7 +613,7 @@ pub fn BrowserContext(comptime CDP_T: type) type {
buf.appendSliceAssumeCapacity("\"}");
std.debug.assert(buf.items.len == message_len);
- try cdp.client.sendJSONRaw(arena, buf);
+ try cdp.client.sendJSONRaw(buf);
}
};
}
diff --git a/src/cdp/testing.zig b/src/cdp/testing.zig
index ccb93334..3d029568 100644
--- a/src/cdp/testing.zig
+++ b/src/cdp/testing.zig
@@ -39,12 +39,14 @@ pub const Document = @import("../testing.zig").Document;
const Client = struct {
allocator: Allocator,
+ send_arena: ArenaAllocator,
sent: std.ArrayListUnmanaged(json.Value) = .{},
serialized: std.ArrayListUnmanaged([]const u8) = .{},
fn init(alloc: Allocator) Client {
return .{
.allocator = alloc,
+ .send_arena = ArenaAllocator.init(alloc),
};
}
@@ -58,7 +60,7 @@ const Client = struct {
try self.sent.append(self.allocator, value);
}
- pub fn sendJSONRaw(self: *Client, _: ArenaAllocator, buf: std.ArrayListUnmanaged(u8)) !void {
+ pub fn sendJSONRaw(self: *Client, buf: std.ArrayListUnmanaged(u8)) !void {
const value = try json.parseFromSliceLeaky(json.Value, self.allocator, buf.items, .{});
try self.sent.append(self.allocator, value);
}
diff --git a/src/http/Client.zig b/src/http/Client.zig
index b38525d4..68952005 100644
--- a/src/http/Client.zig
+++ b/src/http/Client.zig
@@ -27,6 +27,7 @@ const CookieJar = @import("../browser/storage/storage.zig").CookieJar;
const urlStitch = @import("../url.zig").stitch;
const c = Http.c;
+const posix = std.posix;
const Allocator = std.mem.Allocator;
const ArenaAllocator = std.heap.ArenaAllocator;
@@ -167,10 +168,13 @@ pub fn abort(self: *Client) void {
}
}
-pub fn tick(self: *Client, timeout_ms: usize) !void {
- var handles = &self.handles;
+const TickOpts = struct {
+ timeout_ms: i32 = 0,
+ poll_socket: ?posix.socket_t = null,
+};
+pub fn tick(self: *Client, opts: TickOpts) !bool {
while (true) {
- if (handles.hasAvailable() == false) {
+ if (self.handles.hasAvailable() == false) {
break;
}
const queue_node = self.queue.popFirst() orelse break;
@@ -178,11 +182,10 @@ pub fn tick(self: *Client, timeout_ms: usize) !void {
self.queue_node_pool.destroy(queue_node);
// we know this exists, because we checked isEmpty() above
- const handle = handles.getFreeHandle().?;
+ const handle = self.handles.getFreeHandle().?;
try self.makeRequest(handle, req);
}
-
- try self.perform(@intCast(timeout_ms));
+ return self.perform(opts.timeout_ms, opts.poll_socket);
}
pub fn request(self: *Client, req: Request) !void {
@@ -343,16 +346,26 @@ fn makeRequest(self: *Client, handle: *Handle, transfer: *Transfer) !void {
}
self.active += 1;
- return self.perform(0);
+ _ = try self.perform(0, null);
}
-fn perform(self: *Client, timeout_ms: c_int) !void {
+fn perform(self: *Client, timeout_ms: c_int, socket: ?posix.socket_t) !bool {
const multi = self.multi;
-
var running: c_int = undefined;
try errorMCheck(c.curl_multi_perform(multi, &running));
- if (running > 0 and timeout_ms > 0) {
+ if (socket) |s| {
+ var wait_fd = c.curl_waitfd{
+ .fd = s,
+ .events = c.CURL_WAIT_POLLIN,
+ .revents = 0,
+ };
+ try errorMCheck(c.curl_multi_poll(multi, &wait_fd, 1, timeout_ms, null));
+ if (wait_fd.revents != 0) {
+ // the extra socket we passed in is ready, let's signal our caller
+ return true;
+ }
+ } else if (running > 0 and timeout_ms > 0) {
try errorMCheck(c.curl_multi_poll(multi, null, 0, timeout_ms, null));
}
@@ -388,6 +401,8 @@ fn perform(self: *Client, timeout_ms: c_int) !void {
self.requestFailed(transfer, err);
}
}
+
+ return false;
}
fn endTransfer(self: *Client, transfer: *Transfer) void {
diff --git a/src/http/Http.zig b/src/http/Http.zig
index 59b5b621..cf170c54 100644
--- a/src/http/Http.zig
+++ b/src/http/Http.zig
@@ -17,6 +17,7 @@
// along with this program. If not, see .
const std = @import("std");
+const posix = std.posix;
pub const c = @cImport({
@cInclude("curl/curl.h");
@@ -26,6 +27,7 @@ pub const ENABLE_DEBUG = false;
pub const Client = @import("Client.zig");
pub const Transfer = Client.Transfer;
+const log = @import("../log.zig");
const errors = @import("errors.zig");
const Allocator = std.mem.Allocator;
@@ -85,6 +87,16 @@ pub fn deinit(self: *Http) void {
self.arena.deinit();
}
+pub fn poll(self: *Http, timeout_ms: i32, socket: posix.socket_t) bool {
+ return self.client.tick(.{
+ .timeout_ms = timeout_ms,
+ .poll_socket = socket,
+ }) catch |err| {
+ log.err(.app, "http poll", .{ .err = err });
+ return false;
+ };
+}
+
pub fn newConnection(self: *Http) !Connection {
return Connection.init(self.ca_blob, &self.opts);
}
diff --git a/src/main.zig b/src/main.zig
index 9b95cf5c..bd20ba72 100644
--- a/src/main.zig
+++ b/src/main.zig
@@ -21,8 +21,8 @@ const builtin = @import("builtin");
const Allocator = std.mem.Allocator;
const log = @import("log.zig");
-const server = @import("server.zig");
const App = @import("app.zig").App;
+const Server = @import("server.zig").Server;
const Http = @import("http/Http.zig");
const Platform = @import("runtime/js.zig").Platform;
const Browser = @import("browser/browser.zig").Browser;
@@ -103,8 +103,10 @@ fn run(alloc: Allocator) !void {
return args.printUsageAndExit(false);
};
- const timeout = std.time.ns_per_s * @as(u64, opts.timeout);
- server.run(app, address, timeout) catch |err| {
+ var server = try Server.init(app, address);
+ defer server.deinit();
+
+ server.run(address, opts.timeout) catch |err| {
log.fatal(.app, "server run error", .{ .err = err });
return err;
};
@@ -773,7 +775,9 @@ fn serveCDP(address: std.net.Address, platform: *const Platform) !void {
defer app.deinit();
test_wg.finish();
- server.run(app, address, std.time.ns_per_s * 2) catch |err| {
+ var server = try Server.init(app, address);
+ defer server.deinit();
+ server.run(address, 5) catch |err| {
std.debug.print("CDP server error: {}", .{err});
return err;
};
diff --git a/src/main_wpt.zig b/src/main_wpt.zig
index 0dfc86c1..4ebbe3ab 100644
--- a/src/main_wpt.zig
+++ b/src/main_wpt.zig
@@ -167,7 +167,7 @@ fn run(
var try_catch: Env.TryCatch = undefined;
try_catch.init(runner.page.main_context);
defer try_catch.deinit();
- runner.page.wait(std.time.ns_per_ms * 200);
+ runner.page.wait(2);
if (try_catch.hasCaught()) {
err_out.* = (try try_catch.err(arena)) orelse "unknwon error";
diff --git a/src/runtime/loop.zig b/src/runtime/loop.zig
deleted file mode 100644
index c2c76645..00000000
--- a/src/runtime/loop.zig
+++ /dev/null
@@ -1,331 +0,0 @@
-// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
-//
-// Francis Bouvier
-// Pierre Tachoire
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as
-// published by the Free Software Foundation, either version 3 of the
-// License, or (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-const std = @import("std");
-const builtin = @import("builtin");
-const MemoryPool = std.heap.MemoryPool;
-
-const log = @import("../log.zig");
-pub const IO = @import("tigerbeetle-io").IO;
-
-const RUN_DURATION = 10 * std.time.ns_per_ms;
-
-// SingleThreaded I/O Loop based on Tigerbeetle io_uring loop.
-// On Linux it's using io_uring.
-// On MacOS and Windows it's using kqueue/IOCP with a ring design.
-// This is a thread-unsafe version without any lock on shared resources,
-// use it only on a single thread.
-// The loop provides I/O APIs based on callbacks.
-// I/O APIs based on async/await might be added in the future.
-pub const Loop = struct {
- alloc: std.mem.Allocator, // TODO: unmanaged version ?
- io: IO,
-
- // number of pending network events we have
- pending_network_count: usize,
-
- // number of pending timeout events we have
- pending_timeout_count: usize,
-
- // Used to stop repeating timeouts when loop.run is called.
- stopping: bool,
-
- // ctx_id is incremented each time the loop is reset.
- // All callbacks store an initial ctx_id and compare before execution.
- // If a ctx is outdated, the callback is ignored.
- // This is a weak way to cancel all future callbacks.
- ctx_id: u32 = 0,
-
- // We use this to track cancellation ids and, on the timeout callback,
- // we can can check here to see if it's been cancelled.
- cancelled: std.AutoHashMapUnmanaged(usize, void),
-
- timeout_pool: MemoryPool(ContextTimeout),
- event_callback_pool: MemoryPool(EventCallbackContext),
-
- const Self = @This();
- pub const Completion = IO.Completion;
-
- pub const RecvError = IO.RecvError;
- pub const SendError = IO.SendError;
- pub const ConnectError = IO.ConnectError;
-
- pub fn init(alloc: std.mem.Allocator) !Self {
- return .{
- .alloc = alloc,
- .cancelled = .{},
- .io = try IO.init(32, 0),
- .stopping = false,
- .pending_network_count = 0,
- .pending_timeout_count = 0,
- .timeout_pool = MemoryPool(ContextTimeout).init(alloc),
- .event_callback_pool = MemoryPool(EventCallbackContext).init(alloc),
- };
- }
-
- pub fn deinit(self: *Self) void {
- self.reset();
-
- // run tail events. We do run the tail events to ensure all the
- // contexts are correcly free.
- while (self.pending_network_count != 0 or self.pending_timeout_count != 0) {
- self.io.run_for_ns(RUN_DURATION) catch |err| {
- log.err(.loop, "deinit", .{ .err = err });
- break;
- };
- }
-
- if (comptime CANCEL_SUPPORTED) {
- self.io.cancel_all();
- }
- self.io.deinit();
- self.timeout_pool.deinit();
- self.event_callback_pool.deinit();
- self.cancelled.deinit(self.alloc);
- }
-
- // Retrieve all registred I/O events completed by OS kernel,
- // and execute sequentially their callbacks.
- // Stops when there is no more I/O events registered on the loop.
- // Note that I/O events callbacks might register more I/O events
- // on the go when they are executed (ie. nested I/O events).
- pub fn run(self: *Self, wait_ns: usize) !void {
- // stop repeating / interval timeouts from re-registering
- self.stopping = true;
- defer self.stopping = false;
-
- const max_iterations = wait_ns / (RUN_DURATION);
- for (0..max_iterations) |_| {
- if (self.pending_network_count == 0 and self.pending_timeout_count == 0) {
- break;
- }
- self.io.run_for_ns(std.time.ns_per_ms * 10) catch |err| {
- log.err(.loop, "deinit", .{ .err = err });
- break;
- };
- }
- }
-
- pub fn hasPendingTimeout(self: *Self) bool {
- return self.pending_timeout_count > 0;
- }
-
- // JS callbacks APIs
- // -----------------
-
- // Timeout
-
- // The state that we add to a timeout. This is what we get back from a
- // timeoutCallback. It contains the function to execute. The user is expected
- // to be able to turn a reference to this into whatever state it needs,
- // probably by inserting this node into its own stae and using @fieldParentPtr
- pub const CallbackNode = struct {
- func: *const fn (node: *CallbackNode, repeat: *?u63) void,
- };
-
- const ContextTimeout = struct {
- loop: *Self,
- ctx_id: u32,
- initial: bool = true,
- callback_node: ?*CallbackNode,
- };
-
- fn timeoutCallback(
- ctx: *ContextTimeout,
- completion: *IO.Completion,
- result: IO.TimeoutError!void,
- ) void {
- var repeating = false;
- const loop = ctx.loop;
-
- if (ctx.initial) {
- loop.pending_timeout_count -= 1;
- }
-
- defer {
- if (repeating == false) {
- loop.timeout_pool.destroy(ctx);
- loop.alloc.destroy(completion);
- }
- }
-
- if (loop.cancelled.remove(@intFromPtr(completion))) {
- return;
- }
-
- // Abort if this completion was created for a different version of the loop.
- if (ctx.ctx_id != loop.ctx_id) {
- return;
- }
-
- // TODO: return the error to the callback
- result catch |err| {
- switch (err) {
- error.Canceled => {},
- else => log.err(.loop, "timeout callback error", .{ .err = err }),
- }
- return;
- };
-
- if (ctx.callback_node) |cn| {
- var repeat_in: ?u63 = null;
- cn.func(cn, &repeat_in);
- if (loop.stopping == false) {
- if (repeat_in) |r| {
- // prevents our context and completion from being cleaned up
- repeating = true;
- ctx.initial = false;
- loop.scheduleTimeout(r, ctx, completion);
- }
- }
- }
- }
-
- pub fn timeout(self: *Self, nanoseconds: u63, callback_node: ?*CallbackNode) !usize {
- if (self.stopping and nanoseconds > std.time.ns_per_ms * 500) {
- // we're trying to shutdown, we probably don't want to wait for a new
- // long timeout
- return 0;
- }
- const completion = try self.alloc.create(Completion);
- errdefer self.alloc.destroy(completion);
- completion.* = undefined;
-
- const ctx = try self.timeout_pool.create();
- errdefer self.timeout_pool.destroy(ctx);
- ctx.* = .{
- .loop = self,
- .ctx_id = self.ctx_id,
- .callback_node = callback_node,
- };
-
- self.pending_timeout_count += 1;
- self.scheduleTimeout(nanoseconds, ctx, completion);
- return @intFromPtr(completion);
- }
-
- fn scheduleTimeout(self: *Self, nanoseconds: u63, ctx: *ContextTimeout, completion: *Completion) void {
- self.io.timeout(*ContextTimeout, ctx, timeoutCallback, completion, nanoseconds);
- }
-
- pub fn cancel(self: *Self, id: usize) !void {
- try self.cancelled.put(self.alloc, id, {});
- }
-
- // Reset all existing callbacks.
- // The existing events will happen and their memory will be cleanup but the
- // corresponding callbacks will not be called.
- pub fn reset(self: *Self) void {
- self.ctx_id += 1;
- self.cancelled.clearRetainingCapacity();
- }
-
- // IO callbacks APIs
- // -----------------
-
- // Connect
-
- pub fn connect(
- self: *Self,
- comptime Ctx: type,
- ctx: *Ctx,
- completion: *Completion,
- comptime cbk: fn (ctx: *Ctx, _: *Completion, res: ConnectError!void) void,
- socket: std.posix.socket_t,
- address: std.net.Address,
- ) !void {
- const onConnect = struct {
- fn onConnect(callback: *EventCallbackContext, completion_: *Completion, res: ConnectError!void) void {
- callback.loop.pending_network_count -= 1;
- defer callback.loop.event_callback_pool.destroy(callback);
- cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res);
- }
- }.onConnect;
-
- const callback = try self.event_callback_pool.create();
- errdefer self.event_callback_pool.destroy(callback);
- callback.* = .{ .loop = self, .ctx = ctx };
-
- self.pending_network_count += 1;
- self.io.connect(*EventCallbackContext, callback, onConnect, completion, socket, address);
- }
-
- // Send
-
- pub fn send(
- self: *Self,
- comptime Ctx: type,
- ctx: *Ctx,
- completion: *Completion,
- comptime cbk: fn (ctx: *Ctx, completion: *Completion, res: SendError!usize) void,
- socket: std.posix.socket_t,
- buf: []const u8,
- ) !void {
- const onSend = struct {
- fn onSend(callback: *EventCallbackContext, completion_: *Completion, res: SendError!usize) void {
- callback.loop.pending_network_count -= 1;
- defer callback.loop.event_callback_pool.destroy(callback);
- cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res);
- }
- }.onSend;
-
- const callback = try self.event_callback_pool.create();
- errdefer self.event_callback_pool.destroy(callback);
- callback.* = .{ .loop = self, .ctx = ctx };
-
- self.pending_network_count += 1;
- self.io.send(*EventCallbackContext, callback, onSend, completion, socket, buf);
- }
-
- // Recv
-
- pub fn recv(
- self: *Self,
- comptime Ctx: type,
- ctx: *Ctx,
- completion: *Completion,
- comptime cbk: fn (ctx: *Ctx, completion: *Completion, res: RecvError!usize) void,
- socket: std.posix.socket_t,
- buf: []u8,
- ) !void {
- const onRecv = struct {
- fn onRecv(callback: *EventCallbackContext, completion_: *Completion, res: RecvError!usize) void {
- callback.loop.pending_network_count -= 1;
- defer callback.loop.event_callback_pool.destroy(callback);
- cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res);
- }
- }.onRecv;
-
- const callback = try self.event_callback_pool.create();
- errdefer self.event_callback_pool.destroy(callback);
- callback.* = .{ .loop = self, .ctx = ctx };
- self.pending_network_count += 1;
- self.io.recv(*EventCallbackContext, callback, onRecv, completion, socket, buf);
- }
-};
-
-const EventCallbackContext = struct {
- ctx: *anyopaque,
- loop: *Loop,
-};
-
-const CANCEL_SUPPORTED = switch (builtin.target.os.tag) {
- .linux => true,
- .macos, .tvos, .watchos, .ios => false,
- else => @compileError("IO is not supported for platform"),
-};
diff --git a/src/server.zig b/src/server.zig
index 2ab357c4..64bfcab1 100644
--- a/src/server.zig
+++ b/src/server.zig
@@ -26,14 +26,6 @@ const Allocator = std.mem.Allocator;
const ArenaAllocator = std.heap.ArenaAllocator;
const log = @import("log.zig");
-const IO = @import("runtime/loop.zig").IO;
-const Completion = IO.Completion;
-const AcceptError = IO.AcceptError;
-const RecvError = IO.RecvError;
-const SendError = IO.SendError;
-const TimeoutError = IO.TimeoutError;
-const Loop = @import("runtime/loop.zig").Loop;
-
const App = @import("app.zig").App;
const CDP = @import("cdp/cdp.zig").CDP;
@@ -46,114 +38,143 @@ const MAX_HTTP_REQUEST_SIZE = 4096;
// +140 for the max control packet that might be interleaved in a message
const MAX_MESSAGE_SIZE = 512 * 1024 + 14;
-const Server = struct {
+pub const Server = struct {
app: *App,
- loop: *Loop,
allocator: Allocator,
- client: ?*Client = null,
-
- // internal fields
- listener: posix.socket_t,
- timeout: u64,
-
- // I/O fields
- accept_completion: Completion,
-
- // The response to send on a GET /json/version request
+ client: ?posix.socket_t,
+ listener: ?posix.socket_t,
json_version_response: []const u8,
- fn deinit(self: *Server) void {
- _ = self;
- }
+ pub fn init(app: *App, address: net.Address) !Server {
+ const allocator = app.allocator;
+ const json_version_response = try buildJSONVersionResponse(allocator, address);
+ errdefer allocator.free(json_version_response);
- fn queueAccept(self: *Server) void {
- log.debug(.app, "accepting connection", .{});
- self.loop.io.accept(
- *Server,
- self,
- callbackAccept,
- &self.accept_completion,
- self.listener,
- );
- }
-
- fn callbackAccept(
- self: *Server,
- completion: *Completion,
- result: AcceptError!posix.socket_t,
- ) void {
- std.debug.assert(completion == &self.accept_completion);
- self.doCallbackAccept(result) catch |err| {
- log.err(.app, "server accept error", .{ .err = err });
- self.queueAccept();
+ return .{
+ .app = app,
+ .client = null,
+ .listener = null,
+ .allocator = allocator,
+ .json_version_response = json_version_response,
};
}
- fn doCallbackAccept(
- self: *Server,
- result: AcceptError!posix.socket_t,
- ) !void {
- const socket = try result;
- const client = try self.allocator.create(Client);
- client.* = Client.init(socket, self);
- client.start();
- self.client = client;
-
- if (log.enabled(.app, .info)) {
- var address: std.net.Address = undefined;
- var socklen: posix.socklen_t = @sizeOf(net.Address);
- try std.posix.getsockname(socket, &address.any, &socklen);
- log.info(.app, "client connected", .{ .ip = address });
+ pub fn deinit(self: *Server) void {
+ self.allocator.free(self.json_version_response);
+ if (self.listener) |listener| {
+ posix.close(listener);
}
}
- fn releaseClient(self: *Server, client: *Client) void {
- self.allocator.destroy(client);
- self.client = null;
+ pub fn run(self: *Server, address: net.Address, timeout_ms: i32) !void {
+ const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC;
+ const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP);
+ self.listener = listener;
+
+ try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
+ // TODO: Broken on darwin
+ // https://github.com/ziglang/zig/issues/17260 (fixed in Zig 0.14)
+ // if (@hasDecl(os.TCP, "NODELAY")) {
+ // try os.setsockopt(socket.sockfd.?, os.IPPROTO.TCP, os.TCP.NODELAY, &std.mem.toBytes(@as(c_int, 1)));
+ // }
+ try posix.setsockopt(listener, posix.IPPROTO.TCP, 1, &std.mem.toBytes(@as(c_int, 1)));
+
+ try posix.bind(listener, &address.any, address.getOsSockLen());
+ try posix.listen(listener, 1);
+
+ log.info(.app, "server running", .{ .address = address });
+ while (true) {
+ const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| {
+ log.err(.app, "CDP accept", .{ .err = err });
+ std.time.sleep(std.time.ns_per_s);
+ continue;
+ };
+
+ self.client = socket;
+ defer if (self.client) |s| {
+ posix.close(s);
+ self.client = null;
+ };
+
+ if (log.enabled(.app, .info)) {
+ var client_address: std.net.Address = undefined;
+ var socklen: posix.socklen_t = @sizeOf(net.Address);
+ try std.posix.getsockname(socket, &client_address.any, &socklen);
+ log.info(.app, "client connected", .{ .ip = client_address });
+ }
+
+ self.readLoop(socket, timeout_ms) catch |err| {
+ log.err(.app, "CDP client loop", .{ .err = err });
+ };
+ }
+ }
+
+ fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: i32) !void {
+ // This shouldn't be necessary, but the Client is HUGE (> 512KB) because
+ // it has a large read buffer. I don't know why, but v8 crashes if this
+ // is on the stack (and I assume it's related to its size).
+ const client = try self.allocator.create(Client);
+ defer self.allocator.destroy(client);
+
+ client.* = try Client.init(socket, self);
+ defer client.deinit();
+
+ var last_message = timestamp();
+ var http = &self.app.http;
+ while (true) {
+ if (http.poll(20, socket)) {
+ const n = posix.read(socket, client.readBuf()) catch |err| {
+ log.warn(.app, "CDP read", .{ .err = err });
+ return;
+ };
+ if (n == 0) {
+ log.info(.app, "CDP disconnect", .{});
+ return;
+ }
+ const more = client.processData(n) catch false;
+ if (!more) {
+ return;
+ }
+ last_message = timestamp();
+ } else if (timestamp() - last_message > timeout_ms) {
+ log.info(.app, "CDP timeout", .{});
+ return;
+ }
+ // We have 3 types of "events":
+ // - Incoming CDP messages
+ // - Network events from the browser
+ // - Timeouts from the browser
+
+ // The call to http.poll above handles the first two (which is why
+ // we pass the client socket to it). But browser timeouts aren't
+ // hooked into that. So we need to go to the browser page (if there
+ // is one), and ask it to process any pending events. That action
+ // doesn't starve #2 (Network events from the browser), because
+ // page.wait() handles that too. But it does starve #1 (Incoming CDP
+ // messages). The good news is that, while the Page is mostly
+ // unaware of CDP, it will only block if it actually has something to
+ // do AND it knows if we're waiting on an intercept request, and will
+ // eagerly return control here in those cases.
+ if (client.mode == .cdp) {
+ client.mode.cdp.pageWait();
+ }
+ }
}
};
-// Client
-// --------
-
pub const Client = struct {
// The client is initially serving HTTP requests but, under normal circumstances
// should eventually be upgraded to a websocket connections
- mode: Mode,
+ mode: union(enum) {
+ http: void,
+ cdp: CDP,
+ },
- // The CDP instance that processes messages from this client
- // (a generic so we can test with a mock
- // null until mode == .websocket
- cdp: ?CDP,
-
- // Our Server (a generic so we can test with a mock)
server: *Server,
reader: Reader(true),
socket: posix.socket_t,
- last_active: std.time.Instant,
-
- // queue of messages to send
- send_queue: SendQueue,
- send_queue_node_pool: std.heap.MemoryPool(SendQueue.Node),
-
- read_pending: bool,
- read_completion: Completion,
-
- write_pending: bool,
- write_completion: Completion,
-
- timeout_pending: bool,
- timeout_completion: Completion,
-
- // Used along with xyx_pending to figure out the lifetime of
- // the client. When connected == false and we have no more pending
- // completions, we can kill the client
- connected: bool,
-
- const Mode = enum {
- http,
- websocket,
- };
+ socket_flags: usize,
+ send_arena: ArenaAllocator,
const EMPTY_PONG = [_]u8{ 138, 0 };
@@ -164,118 +185,29 @@ pub const Client = struct {
// "private-use" close codes must be from 4000-49999
const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000
- const SendQueue = std.DoublyLinkedList(Outgoing);
+ fn init(socket: posix.socket_t, server: *Server) !Client {
+ const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0);
+ const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true }));
+ // we expect the socket to come to us as nonblocking
+ std.debug.assert(socket_flags & nonblocking == nonblocking);
- fn init(socket: posix.socket_t, server: *Server) Client {
return .{
- .cdp = null,
- .mode = .http,
.socket = socket,
.server = server,
- .last_active = now(),
- .send_queue = .{},
- .read_pending = false,
- .read_completion = undefined,
- .write_pending = false,
- .write_completion = undefined,
- .timeout_pending = false,
- .timeout_completion = undefined,
- .connected = true,
+ .mode = .{ .http = {} },
+ .socket_flags = socket_flags,
.reader = .{ .allocator = server.allocator },
- .send_queue_node_pool = std.heap.MemoryPool(SendQueue.Node).init(server.allocator),
+ .send_arena = ArenaAllocator.init(server.allocator),
};
}
- fn maybeDeinit(self: *Client) void {
- if (self.read_pending or self.write_pending) {
- // We cannot do anything as long as we still have these pending
- // They should not be pending for long as we're only here after
- // having shutdown the socket
- return;
+ fn deinit(self: *Client) void {
+ switch (self.mode) {
+ .cdp => |*cdp| cdp.deinit(),
+ .http => {},
}
-
- // We don't have a read nor a write completion pending, we can start
- // to shutdown.
-
self.reader.deinit();
- var node = self.send_queue.first;
- while (node) |n| {
- if (n.data.arena) |*arena| {
- arena.deinit();
- }
- node = n.next;
- }
- if (self.cdp) |*cdp| {
- cdp.deinit();
- }
- self.send_queue_node_pool.deinit();
- posix.close(self.socket);
-
- // let the client accept a new connection
- self.server.queueAccept();
-
- if (self.timeout_pending == false) {
- // We also don't have a pending timeout, we can release the client.
- // See callbackTimeout for more explanation about this. But, TL;DR
- // we want to call `queueAccept` as soon as we have no more read/write
- // but we don't want to wait for the timeout callback.
- self.server.releaseClient(self);
- }
- }
-
- fn close(self: *Client) void {
- log.info(.app, "client disconnected", .{});
- self.connected = false;
- // recv only, because we might have pending writes we'd like to get
- // out (like the HTTP error response)
- posix.shutdown(self.socket, .recv) catch {};
- self.maybeDeinit();
- }
-
- fn start(self: *Client) void {
- self.queueRead();
- self.queueTimeout();
- }
-
- fn queueRead(self: *Client) void {
- self.server.loop.io.recv(
- *Client,
- self,
- callbackRead,
- &self.read_completion,
- self.socket,
- self.readBuf(),
- );
- self.read_pending = true;
- }
-
- fn callbackRead(self: *Client, _: *Completion, result: RecvError!usize) void {
- self.read_pending = false;
- if (self.connected == false) {
- self.maybeDeinit();
- return;
- }
-
- const size = result catch |err| {
- log.err(.app, "server read error", .{ .err = err });
- self.close();
- return;
- };
-
- if (size == 0) {
- self.close();
- return;
- }
-
- const more = self.processData(size) catch {
- self.close();
- return;
- };
-
- // if more == false, the client is disconnecting
- if (more) {
- self.queueRead();
- }
+ self.send_arena.deinit();
}
fn readBuf(self: *Client) []u8 {
@@ -283,19 +215,15 @@ pub const Client = struct {
}
fn processData(self: *Client, len: usize) !bool {
- self.last_active = now();
self.reader.len += len;
switch (self.mode) {
- .http => {
- try self.processHTTPRequest();
- return true;
- },
- .websocket => return self.processWebsocketMessage(),
+ .cdp => |*cdp| return self.processWebsocketMessage(cdp),
+ .http => return self.processHTTPRequest(),
}
}
- fn processHTTPRequest(self: *Client) !void {
+ fn processHTTPRequest(self: *Client) !bool {
std.debug.assert(self.reader.pos == 0);
const request = self.reader.buf[0..self.reader.len];
@@ -307,10 +235,12 @@ pub const Client = struct {
// we're only expecting [body-less] GET requests.
if (std.mem.endsWith(u8, request, "\r\n\r\n") == false) {
// we need more data, put any more data here
- return;
+ return true;
}
- self.handleHTTPRequest(request) catch |err| {
+ // the next incoming data can go to the front of our buffer
+ defer self.reader.len = 0;
+ return self.handleHTTPRequest(request) catch |err| {
switch (err) {
error.NotFound => self.writeHTTPErrorResponse(404, "Not found"),
error.InvalidRequest => self.writeHTTPErrorResponse(400, "Invalid request"),
@@ -326,12 +256,9 @@ pub const Client = struct {
}
return err;
};
-
- // the next incoming data can go to the front of our buffer
- self.reader.len = 0;
}
- fn handleHTTPRequest(self: *Client, request: []u8) !void {
+ fn handleHTTPRequest(self: *Client, request: []u8) !bool {
if (request.len < 18) {
// 18 is [generously] the smallest acceptable HTTP request
return error.InvalidRequest;
@@ -348,11 +275,12 @@ pub const Client = struct {
const url = request[4..url_end];
if (std.mem.eql(u8, url, "/")) {
- return self.upgradeConnection(request);
+ try self.upgradeConnection(request);
+ return true;
}
if (std.mem.eql(u8, url, "/json/version")) {
- try self.send(null, self.server.json_version_response);
+ try self.send(self.server.json_version_response);
// Chromedp (a Go driver) does an http request to /json/version
// then to / (websocket upgrade) using a different connection.
// Since we only allow 1 connection at a time, the 2nd one (the
@@ -360,7 +288,7 @@ pub const Client = struct {
// We can avoid that by closing the connection. json_version_response
// has a Connection: Close header too.
try posix.shutdown(self.socket, .recv);
- return;
+ return false;
}
return error.NotFound;
@@ -426,8 +354,7 @@ pub const Client = struct {
// our caller has already made sure this request ended in \r\n\r\n
// so it isn't something we need to check again
- var arena = ArenaAllocator.init(self.server.allocator);
- errdefer arena.deinit();
+ const allocator = self.send_arena.allocator();
const response = blk: {
// Response to an ugprade request is always this, with
@@ -442,7 +369,7 @@ pub const Client = struct {
// The response will be sent via the IO Loop and thus has to have its
// own lifetime.
- const res = try arena.allocator().dupe(u8, template);
+ const res = try allocator.dupe(u8, template);
// magic response
const key_pos = res.len - 32;
@@ -458,9 +385,8 @@ pub const Client = struct {
break :blk res;
};
- self.mode = .websocket;
- self.cdp = try CDP.init(self.server.app, self);
- return self.send(arena, response);
+ self.mode = .{ .cdp = try CDP.init(self.server.app, self) };
+ return self.send(response);
}
fn writeHTTPErrorResponse(self: *Client, comptime status: u16, comptime body: []const u8) void {
@@ -471,24 +397,21 @@ pub const Client = struct {
// we're going to close this connection anyways, swallowing any
// error seems safe
- self.send(null, response) catch {};
+ self.send(response) catch {};
}
- fn processWebsocketMessage(self: *Client) !bool {
- errdefer self.close();
-
+ fn processWebsocketMessage(self: *Client, cdp: *CDP) !bool {
var reader = &self.reader;
-
while (true) {
const msg = reader.next() catch |err| {
switch (err) {
- error.TooLarge => self.send(null, &CLOSE_TOO_BIG) catch {},
- error.NotMasked => self.send(null, &CLOSE_PROTOCOL_ERROR) catch {},
- error.ReservedFlags => self.send(null, &CLOSE_PROTOCOL_ERROR) catch {},
- error.InvalidMessageType => self.send(null, &CLOSE_PROTOCOL_ERROR) catch {},
- error.ControlTooLarge => self.send(null, &CLOSE_PROTOCOL_ERROR) catch {},
- error.InvalidContinuation => self.send(null, &CLOSE_PROTOCOL_ERROR) catch {},
- error.NestedFragementation => self.send(null, &CLOSE_PROTOCOL_ERROR) catch {},
+ error.TooLarge => self.send(&CLOSE_TOO_BIG) catch {},
+ error.NotMasked => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
+ error.ReservedFlags => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
+ error.InvalidMessageType => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
+ error.ControlTooLarge => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
+ error.InvalidContinuation => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
+ error.NestedFragementation => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.OutOfMemory => {}, // don't borther trying to send an error in this case
}
return err;
@@ -498,12 +421,10 @@ pub const Client = struct {
.pong => {},
.ping => try self.sendPong(msg.data),
.close => {
- self.send(null, &CLOSE_NORMAL) catch {};
- self.close();
+ self.send(&CLOSE_NORMAL) catch {};
return false;
},
- .text, .binary => if (self.cdp.?.handleMessage(msg.data) == false) {
- self.close();
+ .text, .binary => if (cdp.handleMessage(msg.data) == false) {
return false;
},
}
@@ -520,18 +441,16 @@ pub const Client = struct {
fn sendPong(self: *Client, data: []const u8) !void {
if (data.len == 0) {
- return self.send(null, &EMPTY_PONG);
+ return self.send(&EMPTY_PONG);
}
var header_buf: [10]u8 = undefined;
const header = websocketHeader(&header_buf, .pong, data.len);
- var arena = ArenaAllocator.init(self.server.allocator);
- errdefer arena.deinit();
-
- var framed = try arena.allocator().alloc(u8, header.len + data.len);
+ const allocator = self.send_arena.allocator();
+ var framed = try allocator.alloc(u8, header.len + data.len);
@memcpy(framed[0..header.len], header);
@memcpy(framed[header.len..], data);
- return self.send(arena, framed);
+ return self.send(framed);
}
// called by CDP
@@ -541,10 +460,7 @@ pub const Client = struct {
// buffer, where the first 10 bytes are reserved. We can then backfill
// the header and send the slice.
pub fn sendJSON(self: *Client, message: anytype, opts: std.json.StringifyOptions) !void {
- var arena = ArenaAllocator.init(self.server.allocator);
- errdefer arena.deinit();
-
- const allocator = arena.allocator();
+ const allocator = self.send_arena.allocator();
var buf: std.ArrayListUnmanaged(u8) = .{};
try buf.ensureTotalCapacity(allocator, 512);
@@ -554,144 +470,60 @@ pub const Client = struct {
try std.json.stringify(message, opts, buf.writer(allocator));
const framed = fillWebsocketHeader(buf);
- return self.send(arena, framed);
+ return self.send(framed);
}
pub fn sendJSONRaw(
self: *Client,
- arena: ArenaAllocator,
buf: std.ArrayListUnmanaged(u8),
) !void {
// Dangerous API!. We assume the caller has reserved the first 10
// bytes in `buf`.
const framed = fillWebsocketHeader(buf);
- return self.send(arena, framed);
+ return self.send(framed);
}
- fn queueTimeout(self: *Client) void {
- self.server.loop.io.timeout(
- *Client,
- self,
- callbackTimeout,
- &self.timeout_completion,
- TimeoutCheck,
- );
- self.timeout_pending = true;
- }
+ fn send(self: *Client, data: []const u8) !void {
+ var pos: usize = 0;
+ var changed_to_blocking: bool = false;
+ defer _ = self.send_arena.reset(.{ .retain_with_limit = 1024 * 32 });
- fn callbackTimeout(self: *Client, _: *Completion, result: TimeoutError!void) void {
- self.timeout_pending = false;
- if (self.connected == false) {
- if (self.read_pending == false and self.write_pending == false) {
- // Timeout is problematic. Ideally, we'd just call maybeDeinit
- // here and check for timeout_pending == true. But that would
- // mean not being able to accept a new connection until this
- // callback fires - introducing a noticeable delay.
- // So, when read_pending and write_pending are both false, we
- // clean up as much as we can, and let the server accept a new
- // connection but we keep the client around to handle this
- // completion (if only we could cancel a completion!).
- // If we're here, with connected == false, read_pending == false
- // and write_pending == false, then everything has already been
- // cleaned up, and we just need to release the client.
- self.server.releaseClient(self);
- }
- return;
- }
-
- if (result) |_| {
- if (now().since(self.last_active) >= self.server.timeout) {
- log.info(.app, "client connection timeout", .{});
- if (self.mode == .websocket) {
- self.send(null, &CLOSE_TIMEOUT) catch {};
- }
- self.close();
- return;
- }
- } else |err| {
- log.err(.app, "server timeout error", .{ .err = err });
- }
-
- self.queueTimeout();
- }
-
- fn send(self: *Client, arena: ?ArenaAllocator, data: []const u8) !void {
- const node = try self.send_queue_node_pool.create();
- errdefer self.send_queue_node_pool.destroy(node);
-
- node.data = Outgoing{
- .arena = arena,
- .to_send = data,
- };
- self.send_queue.append(node);
-
- if (self.send_queue.len > 1) {
- // if we already had a message in the queue, then our send loop
- // is already setup.
- return;
- }
- self.queueSend();
- }
-
- fn queueSend(self: *Client) void {
- if (self.connected == false) {
- return;
- }
-
- const node = self.send_queue.first orelse {
- // no more messages to send;
- return;
+ defer if (changed_to_blocking) {
+ // We had to change our socket to blocking me to get our write out
+ // We need to change it back to non-blocking.
+ _ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags) catch |err| {
+ log.err(.app, "CDP restore nonblocking", .{ .err = err });
+ };
};
- self.server.loop.io.send(
- *Client,
- self,
- sendCallback,
- &self.write_completion,
- self.socket,
- node.data.to_send,
- );
- self.write_pending = true;
- }
+ LOOP: while (pos < data.len) {
+ const written = posix.write(self.socket, data[pos..]) catch |err| switch (err) {
+ error.WouldBlock => {
+ // self.socket is nonblocking, because we don't want to block
+ // reads. But our life is a lot easier if we block writes,
+ // largely, because we don't have to maintain a queue of pending
+ // writes (which would each need their own allocations). So
+ // if we get a WouldBlock error, we'll switch the socket to
+ // blocking and switch it back to non-blocking after the write
+ // is complete. Doesn't seem particularly efficiently, but
+ // this should virtually never happen.
+ std.debug.assert(changed_to_blocking == false);
+ log.debug(.app, "CDP write would block", .{});
+ changed_to_blocking = true;
+ _ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true })));
+ continue :LOOP;
+ },
+ else => return err,
+ };
- fn sendCallback(self: *Client, _: *Completion, result: SendError!usize) void {
- self.write_pending = false;
- if (self.connected == false) {
- self.maybeDeinit();
- return;
- }
-
- const sent = result catch |err| {
- log.warn(.app, "server send error", .{ .err = err });
- self.close();
- return;
- };
-
- const node = self.send_queue.popFirst().?;
- const outgoing = &node.data;
- if (sent == outgoing.to_send.len) {
- if (outgoing.arena) |*arena| {
- arena.deinit();
+ if (written == 0) {
+ return error.Closed;
}
- self.send_queue_node_pool.destroy(node);
- } else {
- // oops, we shouldn't have popped this node off, we need
- // to add it back to the front in order to send the unsent data
- // (this is less likely to happen, which is why we eagerly
- // pop it off)
- std.debug.assert(sent < outgoing.to_send.len);
- node.data.to_send = outgoing.to_send[sent..];
- self.send_queue.prepend(node);
+ pos += written;
}
- self.queueSend();
}
};
-const Outgoing = struct {
- to_send: []const u8,
- arena: ?ArenaAllocator,
-};
-
// WebSocket message reader. Given websocket message, acts as an iterator that
// can return zero or more Messages. When next returns null, any incomplete
// message will remain in reader.data
@@ -1008,72 +840,6 @@ fn websocketHeader(buf: []u8, op_code: OpCode, payload_len: usize) []const u8 {
return buf[0..10];
}
-pub fn run(
- app: *App,
- address: net.Address,
- timeout: u64,
-) !void {
- // create socket
- const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK;
- const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP);
- defer posix.close(listener);
-
- try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
- // TODO: Broken on darwin
- // https://github.com/ziglang/zig/issues/17260 (fixed in Zig 0.14)
- // if (@hasDecl(os.TCP, "NODELAY")) {
- // try os.setsockopt(socket.sockfd.?, os.IPPROTO.TCP, os.TCP.NODELAY, &std.mem.toBytes(@as(c_int, 1)));
- // }
- try posix.setsockopt(listener, posix.IPPROTO.TCP, 1, &std.mem.toBytes(@as(c_int, 1)));
-
- // bind & listen
- try posix.bind(listener, &address.any, address.getOsSockLen());
- try posix.listen(listener, 1);
-
- var loop = app.loop;
- const allocator = app.allocator;
- const json_version_response = try buildJSONVersionResponse(allocator, address);
- defer allocator.free(json_version_response);
-
- var server = Server{
- .app = app,
- .loop = loop,
- .timeout = timeout,
- .listener = listener,
- .allocator = allocator,
- .accept_completion = undefined,
- .json_version_response = json_version_response,
- };
- defer server.deinit();
-
- // accept an connection
- server.queueAccept();
- log.info(.app, "server running", .{ .address = address });
-
- // infinite loop on I/O events, either:
- // - cmd from incoming connection on server socket
- // - JS callbacks events from scripts
- // var http_client = app.http_client;
- while (true) {
- // @newhttp. This is a hack. We used to just have 1 loop, so we could
- // sleep it it "forever" and any activity (message to this server,
- // JS callback, http data) would wake it up.
- // Now we have 2 loops. If we block on one, the other won't get woken
- // up. We don't block "forever" but even 10ms adds a bunch of latency
- // since this is called in a loop.
- // Hopefully this is temporary and we can remove the io loop and then
- // only have 1 loop. But, until then, we need to check both loops and
- // pay some blocking penalty.
- if (server.client) |client| {
- if (client.cdp) |*cdp| {
- cdp.pageWait();
- }
- }
-
- try loop.io.run_for_ns(10 * std.time.ns_per_ms);
- }
-}
-
// Utils
// --------
@@ -1099,9 +865,9 @@ fn buildJSONVersionResponse(
return try std.fmt.allocPrint(allocator, response_format, .{ body_len, address });
}
-fn now() std.time.Instant {
- // can only fail on platforms we don't support
- return std.time.Instant.now() catch unreachable;
+fn timestamp() u32 {
+ const ts = std.posix.clock_gettime(std.posix.CLOCK.MONOTONIC) catch unreachable;
+ return @intCast(ts.sec);
}
// In-place string lowercase
@@ -1465,8 +1231,7 @@ const MockCDP = struct {
allocator: Allocator = testing.allocator,
- fn init(_: Allocator, client: anytype, loop: *Loop) MockCDP {
- _ = loop;
+ fn init(_: Allocator, client: anytype) MockCDP {
_ = client;
return .{};
}