Move IO loops from Server to cdp Client

This commit is contained in:
Nikolay Govorov
2026-02-17 14:09:09 +00:00
parent 9296c10ca4
commit b4a40f1257

View File

@@ -38,7 +38,6 @@ const Server = @This();
app: *App, app: *App,
shutdown: std.atomic.Value(bool) = .init(false), shutdown: std.atomic.Value(bool) = .init(false),
allocator: Allocator, allocator: Allocator,
client: ?posix.socket_t,
listener: ?posix.socket_t, listener: ?posix.socket_t,
json_version_response: []const u8, json_version_response: []const u8,
@@ -49,7 +48,6 @@ pub fn init(app: *App, address: net.Address) !Server {
return .{ return .{
.app = app, .app = app,
.client = null,
.listener = null, .listener = null,
.allocator = allocator, .allocator = allocator,
.json_version_response = json_version_response, .json_version_response = json_version_response,
@@ -86,8 +84,6 @@ pub fn deinit(self: *Server) void {
posix.close(listener); posix.close(listener);
self.listener = null; self.listener = null;
} }
// *if* server.run is running, we should really wait for it to return
// before existing from here.
self.allocator.free(self.json_version_response); self.allocator.free(self.json_version_response);
} }
@@ -119,13 +115,74 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void {
}, },
} }
}; };
if (self.shutdown.load(.acquire)) {
return error.ShuttingDown;
}
self.client = socket; self.handleConnection(socket, timeout_ms);
defer if (self.client) |s| { }
posix.close(s); }
self.client = null;
fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void {
defer posix.close(socket);
var client = Client.init(
socket,
self.allocator,
self.app,
self.json_version_response,
timeout_ms,
) catch |err| {
log.err(.app, "CDP client init", .{ .err = err });
return;
}; };
defer client.deinit();
// Check shutdown after registering to avoid missing stop() signal.
// If stop() already iterated over clients, this client won't receive stop()
// and would block joinThreads() indefinitely.
if (self.shutdown.load(.acquire)) {
return;
}
client.start();
}
// Handle exactly one TCP connection.
pub const Client = struct {
// The client is initially serving HTTP requests but, under normal circumstances
// should eventually be upgraded to a websocket connections
mode: union(enum) {
http: void,
cdp: CDP,
},
allocator: Allocator,
app: *App,
http: *HttpClient,
json_version_response: []const u8,
reader: Reader(true),
socket: posix.socket_t,
socket_flags: usize,
send_arena: ArenaAllocator,
timeout_ms: u32,
const EMPTY_PONG = [_]u8{ 138, 0 };
// CLOSE, 2 length, code
const CLOSE_NORMAL = [_]u8{ 136, 2, 3, 232 }; // code: 1000
const CLOSE_TOO_BIG = [_]u8{ 136, 2, 3, 241 }; // 1009
const CLOSE_PROTOCOL_ERROR = [_]u8{ 136, 2, 3, 234 }; //code: 1002
// "private-use" close codes must be from 4000-49999
const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000
fn init(
socket: posix.socket_t,
allocator: Allocator,
app: *App,
json_version_response: []const u8,
timeout_ms: u32,
) !Client {
if (log.enabled(.app, .info)) { if (log.enabled(.app, .info)) {
var client_address: std.net.Address = undefined; var client_address: std.net.Address = undefined;
var socklen: posix.socklen_t = @sizeOf(net.Address); var socklen: posix.socklen_t = @sizeOf(net.Address);
@@ -133,34 +190,65 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void {
log.info(.app, "client connected", .{ .ip = client_address }); log.info(.app, "client connected", .{ .ip = client_address });
} }
self.readLoop(socket, timeout_ms) catch |err| { const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0);
log.err(.app, "CDP client loop", .{ .err = err }); const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true }));
// we expect the socket to come to us as nonblocking
lp.assert(socket_flags & nonblocking == nonblocking, "Client.init blocking", .{});
var reader = try Reader(true).init(allocator);
errdefer reader.deinit();
const http = try app.http.createClient(allocator);
errdefer http.deinit();
return .{
.socket = socket,
.allocator = allocator,
.app = app,
.http = http,
.json_version_response = json_version_response,
.reader = reader,
.mode = .{ .http = {} },
.socket_flags = socket_flags,
.send_arena = ArenaAllocator.init(allocator),
.timeout_ms = timeout_ms,
}; };
} }
fn stop(self: *Client) void {
posix.shutdown(self.socket, .recv) catch {};
} }
fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void { fn deinit(self: *Client) void {
// This shouldn't be necessary, but the Client is HUGE (> 512KB) because switch (self.mode) {
// it has a large read buffer. I don't know why, but v8 crashes if this .cdp => |*cdp| cdp.deinit(),
// is on the stack (and I assume it's related to its size). .http => {},
const client = try self.allocator.create(Client); }
defer self.allocator.destroy(client); self.reader.deinit();
self.send_arena.deinit();
self.http.deinit();
}
client.* = try Client.init(socket, self); fn start(self: *Client) void {
defer client.deinit(); const http = self.http;
http.cdp_client = .{
client.http.cdp_client = .{ .socket = self.socket,
.socket = client.socket, .ctx = self,
.ctx = client,
.blocking_read_start = Client.blockingReadStart, .blocking_read_start = Client.blockingReadStart,
.blocking_read = Client.blockingRead, .blocking_read = Client.blockingRead,
.blocking_read_end = Client.blockingReadStop, .blocking_read_end = Client.blockingReadStop,
}; };
defer client.http.cdp_client = null; defer http.cdp_client = null;
lp.assert(client.mode == .http, "Server.readLoop invalid mode", .{}); self.httpLoop(http) catch |err| {
log.err(.app, "CDP client loop", .{ .err = err });
};
}
fn httpLoop(self: *Client, http: *HttpClient) !void {
lp.assert(self.mode == .http, "Client.httpLoop invalid mode", .{});
while (true) { while (true) {
const status = client.http.tick(timeout_ms) catch |err| { const status = http.tick(self.timeout_ms) catch |err| {
log.err(.app, "http tick", .{ .err = err }); log.err(.app, "http tick", .{ .err = err });
return; return;
}; };
@@ -169,29 +257,34 @@ fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void {
return; return;
} }
if (client.readSocket() == false) { if (self.readSocket() == false) {
return; return;
} }
if (client.mode == .cdp) { if (self.mode == .cdp) {
break; // switch to our CDP loop break;
} }
} }
var cdp = &client.mode.cdp; return self.cdpLoop(http);
}
fn cdpLoop(self: *Client, http: *HttpClient) !void {
var cdp = &self.mode.cdp;
var last_message = timestamp(.monotonic); var last_message = timestamp(.monotonic);
var ms_remaining = timeout_ms; var ms_remaining = self.timeout_ms;
while (true) { while (true) {
switch (cdp.pageWait(ms_remaining)) { switch (cdp.pageWait(ms_remaining)) {
.cdp_socket => { .cdp_socket => {
if (client.readSocket() == false) { if (self.readSocket() == false) {
return; return;
} }
last_message = timestamp(.monotonic); last_message = timestamp(.monotonic);
ms_remaining = timeout_ms; ms_remaining = self.timeout_ms;
}, },
.no_page => { .no_page => {
const status = client.http.tick(ms_remaining) catch |err| { const status = http.tick(ms_remaining) catch |err| {
log.err(.app, "http tick", .{ .err = err }); log.err(.app, "http tick", .{ .err = err });
return; return;
}; };
@@ -199,11 +292,11 @@ fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void {
log.info(.app, "CDP timeout", .{}); log.info(.app, "CDP timeout", .{});
return; return;
} }
if (client.readSocket() == false) { if (self.readSocket() == false) {
return; return;
} }
last_message = timestamp(.monotonic); last_message = timestamp(.monotonic);
ms_remaining = timeout_ms; ms_remaining = self.timeout_ms;
}, },
.done => { .done => {
const elapsed = timestamp(.monotonic) - last_message; const elapsed = timestamp(.monotonic) - last_message;
@@ -217,62 +310,6 @@ fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void {
} }
} }
pub const Client = struct {
// The client is initially serving HTTP requests but, under normal circumstances
// should eventually be upgraded to a websocket connections
mode: union(enum) {
http: void,
cdp: CDP,
},
server: *Server,
http: *HttpClient,
reader: Reader(true),
socket: posix.socket_t,
socket_flags: usize,
send_arena: ArenaAllocator,
const EMPTY_PONG = [_]u8{ 138, 0 };
// CLOSE, 2 length, code
const CLOSE_NORMAL = [_]u8{ 136, 2, 3, 232 }; // code: 1000
const CLOSE_TOO_BIG = [_]u8{ 136, 2, 3, 241 }; // 1009
const CLOSE_PROTOCOL_ERROR = [_]u8{ 136, 2, 3, 234 }; //code: 1002
// "private-use" close codes must be from 4000-49999
const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000
fn init(socket: posix.socket_t, server: *Server) !Client {
const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0);
const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true }));
// we expect the socket to come to us as nonblocking
lp.assert(socket_flags & nonblocking == nonblocking, "Client.init blocking", .{});
var reader = try Reader(true).init(server.allocator);
errdefer reader.deinit();
const http = try server.app.http.createClient(server.allocator);
errdefer http.deinit();
return .{
.socket = socket,
.server = server,
.http = http,
.reader = reader,
.mode = .{ .http = {} },
.socket_flags = socket_flags,
.send_arena = ArenaAllocator.init(server.allocator),
};
}
fn deinit(self: *Client) void {
switch (self.mode) {
.cdp => |*cdp| cdp.deinit(),
.http => {},
}
self.reader.deinit();
self.send_arena.deinit();
}
fn blockingReadStart(ctx: *anyopaque) bool { fn blockingReadStart(ctx: *anyopaque) bool {
const self: *Client = @ptrCast(@alignCast(ctx)); const self: *Client = @ptrCast(@alignCast(ctx));
_ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true }))) catch |err| { _ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true }))) catch |err| {
@@ -380,7 +417,7 @@ pub const Client = struct {
} }
if (std.mem.eql(u8, url, "/json/version")) { if (std.mem.eql(u8, url, "/json/version")) {
try self.send(self.server.json_version_response); try self.send(self.json_version_response);
// Chromedp (a Go driver) does an http request to /json/version // Chromedp (a Go driver) does an http request to /json/version
// then to / (websocket upgrade) using a different connection. // then to / (websocket upgrade) using a different connection.
// Since we only allow 1 connection at a time, the 2nd one (the // Since we only allow 1 connection at a time, the 2nd one (the
@@ -485,7 +522,7 @@ pub const Client = struct {
break :blk res; break :blk res;
}; };
self.mode = .{ .cdp = try CDP.init(self.server.app, self.http, self) }; self.mode = .{ .cdp = try CDP.init(self.app, self.http, self) };
return self.send(response); return self.send(response);
} }