diff --git a/src/Server.zig b/src/Server.zig index 7393447b..14a16726 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -38,7 +38,6 @@ const Server = @This(); app: *App, shutdown: std.atomic.Value(bool) = .init(false), allocator: Allocator, -client: ?posix.socket_t, listener: ?posix.socket_t, json_version_response: []const u8, @@ -49,7 +48,6 @@ pub fn init(app: *App, address: net.Address) !Server { return .{ .app = app, - .client = null, .listener = null, .allocator = allocator, .json_version_response = json_version_response, @@ -86,8 +84,6 @@ pub fn deinit(self: *Server) void { posix.close(listener); self.listener = null; } - // *if* server.run is running, we should really wait for it to return - // before existing from here. self.allocator.free(self.json_version_response); } @@ -119,104 +115,40 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { }, } }; - - self.client = socket; - defer if (self.client) |s| { - posix.close(s); - self.client = null; - }; - - if (log.enabled(.app, .info)) { - var client_address: std.net.Address = undefined; - var socklen: posix.socklen_t = @sizeOf(net.Address); - try std.posix.getsockname(socket, &client_address.any, &socklen); - log.info(.app, "client connected", .{ .ip = client_address }); + if (self.shutdown.load(.acquire)) { + return error.ShuttingDown; } - self.readLoop(socket, timeout_ms) catch |err| { - log.err(.app, "CDP client loop", .{ .err = err }); - }; + self.handleConnection(socket, timeout_ms); } } -fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void { - // This shouldn't be necessary, but the Client is HUGE (> 512KB) because - // it has a large read buffer. I don't know why, but v8 crashes if this - // is on the stack (and I assume it's related to its size). - const client = try self.allocator.create(Client); - defer self.allocator.destroy(client); +fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void { + defer posix.close(socket); - client.* = try Client.init(socket, self); + var client = Client.init( + socket, + self.allocator, + self.app, + self.json_version_response, + timeout_ms, + ) catch |err| { + log.err(.app, "CDP client init", .{ .err = err }); + return; + }; defer client.deinit(); - client.http.cdp_client = .{ - .socket = client.socket, - .ctx = client, - .blocking_read_start = Client.blockingReadStart, - .blocking_read = Client.blockingRead, - .blocking_read_end = Client.blockingReadStop, - }; - defer client.http.cdp_client = null; - - lp.assert(client.mode == .http, "Server.readLoop invalid mode", .{}); - while (true) { - const status = client.http.tick(timeout_ms) catch |err| { - log.err(.app, "http tick", .{ .err = err }); - return; - }; - if (status != .cdp_socket) { - log.info(.app, "CDP timeout", .{}); - return; - } - - if (client.readSocket() == false) { - return; - } - - if (client.mode == .cdp) { - break; // switch to our CDP loop - } + // Check shutdown after registering to avoid missing stop() signal. + // If stop() already iterated over clients, this client won't receive stop() + // and would block joinThreads() indefinitely. + if (self.shutdown.load(.acquire)) { + return; } - var cdp = &client.mode.cdp; - var last_message = timestamp(.monotonic); - var ms_remaining = timeout_ms; - while (true) { - switch (cdp.pageWait(ms_remaining)) { - .cdp_socket => { - if (client.readSocket() == false) { - return; - } - last_message = timestamp(.monotonic); - ms_remaining = timeout_ms; - }, - .no_page => { - const status = client.http.tick(ms_remaining) catch |err| { - log.err(.app, "http tick", .{ .err = err }); - return; - }; - if (status != .cdp_socket) { - log.info(.app, "CDP timeout", .{}); - return; - } - if (client.readSocket() == false) { - return; - } - last_message = timestamp(.monotonic); - ms_remaining = timeout_ms; - }, - .done => { - const elapsed = timestamp(.monotonic) - last_message; - if (elapsed > ms_remaining) { - log.info(.app, "CDP timeout", .{}); - return; - } - ms_remaining -= @intCast(elapsed); - }, - } - } + client.start(); } +// Handle exactly one TCP connection. pub const Client = struct { // The client is initially serving HTTP requests but, under normal circumstances // should eventually be upgraded to a websocket connections @@ -225,12 +157,15 @@ pub const Client = struct { cdp: CDP, }, - server: *Server, + allocator: Allocator, + app: *App, http: *HttpClient, + json_version_response: []const u8, reader: Reader(true), socket: posix.socket_t, socket_flags: usize, send_arena: ArenaAllocator, + timeout_ms: u32, const EMPTY_PONG = [_]u8{ 138, 0 }; @@ -241,29 +176,49 @@ pub const Client = struct { // "private-use" close codes must be from 4000-49999 const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000 - fn init(socket: posix.socket_t, server: *Server) !Client { + fn init( + socket: posix.socket_t, + allocator: Allocator, + app: *App, + json_version_response: []const u8, + timeout_ms: u32, + ) !Client { + if (log.enabled(.app, .info)) { + var client_address: std.net.Address = undefined; + var socklen: posix.socklen_t = @sizeOf(net.Address); + try std.posix.getsockname(socket, &client_address.any, &socklen); + log.info(.app, "client connected", .{ .ip = client_address }); + } + const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0); const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true })); // we expect the socket to come to us as nonblocking lp.assert(socket_flags & nonblocking == nonblocking, "Client.init blocking", .{}); - var reader = try Reader(true).init(server.allocator); + var reader = try Reader(true).init(allocator); errdefer reader.deinit(); - const http = try server.app.http.createClient(server.allocator); + const http = try app.http.createClient(allocator); errdefer http.deinit(); return .{ .socket = socket, - .server = server, + .allocator = allocator, + .app = app, .http = http, + .json_version_response = json_version_response, .reader = reader, .mode = .{ .http = {} }, .socket_flags = socket_flags, - .send_arena = ArenaAllocator.init(server.allocator), + .send_arena = ArenaAllocator.init(allocator), + .timeout_ms = timeout_ms, }; } + fn stop(self: *Client) void { + posix.shutdown(self.socket, .recv) catch {}; + } + fn deinit(self: *Client) void { switch (self.mode) { .cdp => |*cdp| cdp.deinit(), @@ -271,6 +226,88 @@ pub const Client = struct { } self.reader.deinit(); self.send_arena.deinit(); + self.http.deinit(); + } + + fn start(self: *Client) void { + const http = self.http; + http.cdp_client = .{ + .socket = self.socket, + .ctx = self, + .blocking_read_start = Client.blockingReadStart, + .blocking_read = Client.blockingRead, + .blocking_read_end = Client.blockingReadStop, + }; + defer http.cdp_client = null; + + self.httpLoop(http) catch |err| { + log.err(.app, "CDP client loop", .{ .err = err }); + }; + } + + fn httpLoop(self: *Client, http: *HttpClient) !void { + lp.assert(self.mode == .http, "Client.httpLoop invalid mode", .{}); + while (true) { + const status = http.tick(self.timeout_ms) catch |err| { + log.err(.app, "http tick", .{ .err = err }); + return; + }; + if (status != .cdp_socket) { + log.info(.app, "CDP timeout", .{}); + return; + } + + if (self.readSocket() == false) { + return; + } + + if (self.mode == .cdp) { + break; + } + } + + return self.cdpLoop(http); + } + + fn cdpLoop(self: *Client, http: *HttpClient) !void { + var cdp = &self.mode.cdp; + var last_message = timestamp(.monotonic); + var ms_remaining = self.timeout_ms; + + while (true) { + switch (cdp.pageWait(ms_remaining)) { + .cdp_socket => { + if (self.readSocket() == false) { + return; + } + last_message = timestamp(.monotonic); + ms_remaining = self.timeout_ms; + }, + .no_page => { + const status = http.tick(ms_remaining) catch |err| { + log.err(.app, "http tick", .{ .err = err }); + return; + }; + if (status != .cdp_socket) { + log.info(.app, "CDP timeout", .{}); + return; + } + if (self.readSocket() == false) { + return; + } + last_message = timestamp(.monotonic); + ms_remaining = self.timeout_ms; + }, + .done => { + const elapsed = timestamp(.monotonic) - last_message; + if (elapsed > ms_remaining) { + log.info(.app, "CDP timeout", .{}); + return; + } + ms_remaining -= @intCast(elapsed); + }, + } + } } fn blockingReadStart(ctx: *anyopaque) bool { @@ -380,7 +417,7 @@ pub const Client = struct { } if (std.mem.eql(u8, url, "/json/version")) { - try self.send(self.server.json_version_response); + try self.send(self.json_version_response); // Chromedp (a Go driver) does an http request to /json/version // then to / (websocket upgrade) using a different connection. // Since we only allow 1 connection at a time, the 2nd one (the @@ -485,7 +522,7 @@ pub const Client = struct { break :blk res; }; - self.mode = .{ .cdp = try CDP.init(self.server.app, self.http, self) }; + self.mode = .{ .cdp = try CDP.init(self.app, self.http, self) }; return self.send(response); }