diff --git a/build.zig b/build.zig index bcf34bee..e5ab3491 100644 --- a/build.zig +++ b/build.zig @@ -189,7 +189,7 @@ fn addDependencies( .prebuilt_v8_path = prebuilt_v8_path, .is_asan = is_asan, .is_tsan = is_tsan, - .v8_enable_sandbox = is_tsan, + .v8_enable_sandbox = is_tsan, // v8 contains a bug and cannot be compiled with tsan without a sandbox. }; mod.addIncludePath(b.path("vendor/lightpanda")); diff --git a/src/ArenaPool.zig b/src/ArenaPool.zig index f7d7015f..93a3271d 100644 --- a/src/ArenaPool.zig +++ b/src/ArenaPool.zig @@ -29,6 +29,7 @@ free_list_len: u16 = 0, free_list: ?*Entry = null, free_list_max: u16, entry_pool: std.heap.MemoryPool(Entry), +mutex: std.Thread.Mutex, const Entry = struct { next: ?*Entry, @@ -41,6 +42,7 @@ pub fn init(allocator: Allocator) ArenaPool { .free_list_max = 512, // TODO make configurable .retain_bytes = 1024 * 16, // TODO make configurable .entry_pool = std.heap.MemoryPool(Entry).init(allocator), + .mutex = .{}, }; } @@ -54,6 +56,9 @@ pub fn deinit(self: *ArenaPool) void { } pub fn acquire(self: *ArenaPool) !Allocator { + self.mutex.lock(); + defer self.mutex.unlock(); + if (self.free_list) |entry| { self.free_list = entry.next; self.free_list_len -= 1; @@ -73,6 +78,12 @@ pub fn release(self: *ArenaPool, allocator: Allocator) void { const arena: *std.heap.ArenaAllocator = @ptrCast(@alignCast(allocator.ptr)); const entry: *Entry = @fieldParentPtr("arena", arena); + // Reset the arena before acquiring the lock to minimize lock hold time + _ = arena.reset(.{ .retain_with_limit = self.retain_bytes }); + + self.mutex.lock(); + defer self.mutex.unlock(); + const free_list_len = self.free_list_len; if (free_list_len == self.free_list_max) { arena.deinit(); @@ -80,7 +91,6 @@ pub fn release(self: *ArenaPool, allocator: Allocator) void { return; } - _ = arena.reset(.{ .retain_with_limit = self.retain_bytes }); entry.next = self.free_list; self.free_list_len = free_list_len + 1; self.free_list = entry; diff --git a/src/Config.zig b/src/Config.zig index fc4ebcdd..66e26219 100644 --- a/src/Config.zig +++ b/src/Config.zig @@ -30,6 +30,13 @@ pub const RunMode = enum { version, }; +pub const MAX_HTTP_REQUEST_SIZE = 4096; + +// max message size +// +14 for max websocket payload overhead +// +140 for the max control packet that might be interleaved in a message +pub const MAX_MESSAGE_SIZE = 512 * 1024 + 14 + 140; + mode: Mode, exec_name: []const u8, http_headers: HttpHeaders, @@ -131,6 +138,20 @@ pub fn userAgentSuffix(self: *const Config) ?[]const u8 { }; } +pub fn maxConnections(self: *const Config) u16 { + return switch (self.mode) { + .serve => |opts| opts.max_connections, + else => unreachable, + }; +} + +pub fn maxPendingConnections(self: *const Config) u31 { + return switch (self.mode) { + .serve => |opts| opts.max_pending_connections, + else => unreachable, + }; +} + pub const Mode = union(RunMode) { help: bool, // false when being printed because of an error fetch: Fetch, @@ -144,7 +165,6 @@ pub const Serve = struct { timeout: u31 = 10, max_connections: u16 = 16, max_tabs_per_connection: u16 = 8, - max_memory_per_tab: u64 = 512 * 1024 * 1024, max_pending_connections: u16 = 128, common: Common = .{}, }; @@ -479,19 +499,6 @@ fn parseServeArgs( continue; } - if (std.mem.eql(u8, "--max_tab_memory", opt)) { - const str = args.next() orelse { - log.fatal(.app, "missing argument value", .{ .arg = "--max_tab_memory" }); - return error.InvalidArgument; - }; - - serve.max_memory_per_tab = std.fmt.parseInt(u64, str, 10) catch |err| { - log.fatal(.app, "invalid argument value", .{ .arg = "--max_tab_memory", .err = err }); - return error.InvalidArgument; - }; - continue; - } - if (std.mem.eql(u8, "--max_pending_connections", opt)) { const str = args.next() orelse { log.fatal(.app, "missing argument value", .{ .arg = "--max_pending_connections" }); diff --git a/src/Server.zig b/src/Server.zig index a557d8ed..b547e4aa 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -28,43 +28,51 @@ const ArenaAllocator = std.heap.ArenaAllocator; const log = @import("log.zig"); const App = @import("App.zig"); +const Config = @import("Config.zig"); const CDP = @import("cdp/cdp.zig").CDP; - -const MAX_HTTP_REQUEST_SIZE = 4096; - -// max message size -// +14 for max websocket payload overhead -// +140 for the max control packet that might be interleaved in a message -const MAX_MESSAGE_SIZE = 512 * 1024 + 14 + 140; +const Http = @import("http/Http.zig"); +const HttpClient = @import("http/Client.zig"); const Server = @This(); + app: *App, -shutdown: bool = false, +shutdown: std.atomic.Value(bool) = .init(false), allocator: Allocator, -client: ?posix.socket_t, listener: ?posix.socket_t, json_version_response: []const u8, -pub fn init(app: *App, address: net.Address) !Server { - const allocator = app.allocator; +// Thread management +active_threads: std.atomic.Value(u32) = .init(0), +clients: std.ArrayListUnmanaged(*Client) = .{}, +clients_mu: std.Thread.Mutex = .{}, +clients_pool: std.heap.MemoryPool(Client), + +pub fn init(allocator: Allocator, app: *App, address: net.Address) !Server { const json_version_response = try buildJSONVersionResponse(allocator, address); errdefer allocator.free(json_version_response); return .{ .app = app, - .client = null, .listener = null, .allocator = allocator, .json_version_response = json_version_response, + .clients_pool = std.heap.MemoryPool(Client).init(allocator), }; } /// Interrupts the server so that main can complete normally and call all defer handlers. pub fn stop(self: *Server) void { - if (@atomicRmw(bool, &self.shutdown, .Xchg, true, .monotonic)) { + if (self.shutdown.swap(true, .release)) { return; } + // Shutdown all active clients + self.clients_mu.lock(); + for (self.clients.items) |client| { + client.stop(); + } + self.clients_mu.unlock(); + // Linux and BSD/macOS handle canceling a socket blocked on accept differently. // For Linux, we use std.shutdown, which will cause accept to return error.SocketNotListening (EINVAL). // For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (BADF). @@ -81,17 +89,18 @@ pub fn stop(self: *Server) void { } pub fn deinit(self: *Server) void { + self.joinThreads(); if (self.listener) |listener| { posix.close(listener); self.listener = null; } - // *if* server.run is running, we should really wait for it to return - // before existing from here. + self.clients.deinit(self.allocator); + self.clients_pool.deinit(); self.allocator.free(self.json_version_response); } pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { - const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC; + const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK; const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP); self.listener = listener; @@ -101,16 +110,20 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { } try posix.bind(listener, &address.any, address.getOsSockLen()); - try posix.listen(listener, 1); + try posix.listen(listener, self.app.config.maxPendingConnections()); log.info(.app, "server running", .{ .address = address }); - while (!@atomicLoad(bool, &self.shutdown, .monotonic)) { + while (!self.shutdown.load(.acquire)) { const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| { switch (err) { error.SocketNotListening, error.ConnectionAborted => { log.info(.app, "server stopped", .{}); break; }, + error.WouldBlock => { + std.Thread.sleep(10 * std.time.ns_per_ms); + continue; + }, else => { log.err(.app, "CDP accept", .{ .err = err }); std.Thread.sleep(std.time.ns_per_s); @@ -119,97 +132,99 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { } }; - self.client = socket; - defer if (self.client) |s| { - posix.close(s); - self.client = null; - }; - - if (log.enabled(.app, .info)) { - var client_address: std.net.Address = undefined; - var socklen: posix.socklen_t = @sizeOf(net.Address); - try std.posix.getsockname(socket, &client_address.any, &socklen); - log.info(.app, "client connected", .{ .ip = client_address }); - } - - self.readLoop(socket, timeout_ms) catch |err| { - log.err(.app, "CDP client loop", .{ .err = err }); + self.spawnWorker(socket, timeout_ms) catch |err| { + log.err(.app, "CDP spawn", .{ .err = err }); + posix.close(socket); }; } } -fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void { - // This shouldn't be necessary, but the Client is HUGE (> 512KB) because - // it has a large read buffer. I don't know why, but v8 crashes if this - // is on the stack (and I assume it's related to its size). - const client = try self.allocator.create(Client); - defer self.allocator.destroy(client); +fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void { + defer posix.close(socket); - client.* = try Client.init(socket, self); + // Client is HUGE (> 512KB) because it has a large read buffer. + // V8 crashes if this is on the stack (likely related to its size). + const client = self.getClient() catch |err| { + log.err(.app, "CDP client create", .{ .err = err }); + return; + }; + defer self.releaseClient(client); + + client.* = Client.init( + socket, + self.allocator, + self.app, + self.json_version_response, + timeout_ms, + ) catch |err| { + log.err(.app, "CDP client init", .{ .err = err }); + return; + }; defer client.deinit(); - var http = &self.app.http; - http.addCDPClient(.{ - .socket = socket, - .ctx = client, - .blocking_read_start = Client.blockingReadStart, - .blocking_read = Client.blockingRead, - .blocking_read_end = Client.blockingReadStop, - }); - defer http.removeCDPClient(); + self.registerClient(client); + defer self.unregisterClient(client); - lp.assert(client.mode == .http, "Server.readLoop invalid mode", .{}); - while (true) { - if (http.poll(timeout_ms) != .cdp_socket) { - log.info(.app, "CDP timeout", .{}); - return; - } + client.start(); +} - if (client.readSocket() == false) { - return; - } +fn getClient(self: *Server) !*Client { + self.clients_mu.lock(); + defer self.clients_mu.unlock(); + return self.clients_pool.create(); +} - if (client.mode == .cdp) { - break; // switch to our CDP loop - } - } +fn releaseClient(self: *Server, client: *Client) void { + self.clients_mu.lock(); + defer self.clients_mu.unlock(); + self.clients_pool.destroy(client); +} - var cdp = &client.mode.cdp; - var last_message = timestamp(.monotonic); - var ms_remaining = timeout_ms; - while (true) { - switch (cdp.pageWait(ms_remaining)) { - .cdp_socket => { - if (client.readSocket() == false) { - return; - } - last_message = timestamp(.monotonic); - ms_remaining = timeout_ms; - }, - .no_page => { - if (http.poll(ms_remaining) != .cdp_socket) { - log.info(.app, "CDP timeout", .{}); - return; - } - if (client.readSocket() == false) { - return; - } - last_message = timestamp(.monotonic); - ms_remaining = timeout_ms; - }, - .done => { - const elapsed = timestamp(.monotonic) - last_message; - if (elapsed > ms_remaining) { - log.info(.app, "CDP timeout", .{}); - return; - } - ms_remaining -= @intCast(elapsed); - }, - .navigate => unreachable, // must have been handled by the session +fn registerClient(self: *Server, client: *Client) void { + self.clients_mu.lock(); + defer self.clients_mu.unlock(); + self.clients.append(self.allocator, client) catch {}; +} + +fn unregisterClient(self: *Server, client: *Client) void { + self.clients_mu.lock(); + defer self.clients_mu.unlock(); + for (self.clients.items, 0..) |c, i| { + if (c == client) { + _ = self.clients.swapRemove(i); + break; } } } +fn spawnWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void { + if (self.shutdown.load(.acquire)) { + return error.ShuttingDown; + } + + if (self.active_threads.load(.monotonic) >= self.app.config.maxConnections()) { + return error.MaxThreadsReached; + } + + _ = self.active_threads.fetchAdd(1, .monotonic); + errdefer _ = self.active_threads.fetchSub(1, .monotonic); + + const thread = try std.Thread.spawn(.{}, runWorker, .{ self, socket, timeout_ms }); + thread.detach(); +} + +fn runWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) void { + defer _ = self.active_threads.fetchSub(1, .monotonic); + handleConnection(self, socket, timeout_ms); +} + +fn joinThreads(self: *Server) void { + while (self.active_threads.load(.monotonic) > 0) { + std.Thread.sleep(10 * std.time.ns_per_ms); + } +} + +// Handle exactly one TCP connection. pub const Client = struct { // The client is initially serving HTTP requests but, under normal circumstances // should eventually be upgraded to a websocket connections @@ -218,11 +233,15 @@ pub const Client = struct { cdp: CDP, }, - server: *Server, + allocator: Allocator, + app: *App, + http: *HttpClient, + json_version_response: []const u8, reader: Reader(true), socket: posix.socket_t, socket_flags: usize, send_arena: ArenaAllocator, + timeout_ms: u32, const EMPTY_PONG = [_]u8{ 138, 0 }; @@ -233,22 +252,42 @@ pub const Client = struct { // "private-use" close codes must be from 4000-49999 const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000 - fn init(socket: posix.socket_t, server: *Server) !Client { + fn init( + socket: posix.socket_t, + allocator: Allocator, + app: *App, + json_version_response: []const u8, + timeout_ms: u32, + ) !Client { + if (log.enabled(.app, .info)) { + var client_address: std.net.Address = undefined; + var socklen: posix.socklen_t = @sizeOf(net.Address); + try std.posix.getsockname(socket, &client_address.any, &socklen); + log.info(.app, "client connected", .{ .ip = client_address }); + } + const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0); const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true })); // we expect the socket to come to us as nonblocking lp.assert(socket_flags & nonblocking == nonblocking, "Client.init blocking", .{}); - var reader = try Reader(true).init(server.allocator); + var reader = try Reader(true).init(allocator); errdefer reader.deinit(); + const http = try app.http.createClient(allocator); + errdefer http.deinit(); + return .{ .socket = socket, - .server = server, + .allocator = allocator, + .app = app, + .http = http, + .json_version_response = json_version_response, .reader = reader, .mode = .{ .http = {} }, .socket_flags = socket_flags, - .send_arena = ArenaAllocator.init(server.allocator), + .send_arena = ArenaAllocator.init(allocator), + .timeout_ms = timeout_ms, }; } @@ -259,6 +298,93 @@ pub const Client = struct { } self.reader.deinit(); self.send_arena.deinit(); + self.http.deinit(); + } + + fn start(self: *Client) void { + const http = self.http; + http.cdp_client = .{ + .socket = self.socket, + .ctx = self, + .blocking_read_start = Client.blockingReadStart, + .blocking_read = Client.blockingRead, + .blocking_read_end = Client.blockingReadStop, + }; + defer http.cdp_client = null; + + self.httpLoop(http) catch |err| { + log.err(.app, "CDP client loop", .{ .err = err }); + }; + } + + fn stop(self: *Client) void { + posix.shutdown(self.socket, .recv) catch {}; + } + + fn httpLoop(self: *Client, http: *HttpClient) !void { + lp.assert(self.mode == .http, "Client.httpLoop invalid mode", .{}); + while (true) { + const status = http.tick(self.timeout_ms) catch |err| { + log.err(.app, "http tick", .{ .err = err }); + return; + }; + if (status != .cdp_socket) { + log.info(.app, "CDP timeout", .{}); + return; + } + + if (self.readSocket() == false) { + return; + } + + if (self.mode == .cdp) { + break; + } + } + + return self.cdpLoop(http); + } + + fn cdpLoop(self: *Client, http: *HttpClient) !void { + var cdp = &self.mode.cdp; + var last_message = timestamp(.monotonic); + var ms_remaining = self.timeout_ms; + + while (true) { + switch (cdp.pageWait(ms_remaining)) { + .cdp_socket => { + if (self.readSocket() == false) { + return; + } + last_message = timestamp(.monotonic); + ms_remaining = self.timeout_ms; + }, + .no_page => { + const status = http.tick(ms_remaining) catch |err| { + log.err(.app, "http tick", .{ .err = err }); + return; + }; + if (status != .cdp_socket) { + log.info(.app, "CDP timeout", .{}); + return; + } + if (self.readSocket() == false) { + return; + } + last_message = timestamp(.monotonic); + ms_remaining = self.timeout_ms; + }, + .done => { + const elapsed = timestamp(.monotonic) - last_message; + if (elapsed > ms_remaining) { + log.info(.app, "CDP timeout", .{}); + return; + } + ms_remaining -= @intCast(elapsed); + }, + .navigate => unreachable, // must have been handled by the session + } + } } fn blockingReadStart(ctx: *anyopaque) bool { @@ -315,7 +441,7 @@ pub const Client = struct { lp.assert(self.reader.pos == 0, "Client.HTTP pos", .{ .pos = self.reader.pos }); const request = self.reader.buf[0..self.reader.len]; - if (request.len > MAX_HTTP_REQUEST_SIZE) { + if (request.len > Config.MAX_HTTP_REQUEST_SIZE) { self.writeHTTPErrorResponse(413, "Request too large"); return error.RequestTooLarge; } @@ -368,7 +494,7 @@ pub const Client = struct { } if (std.mem.eql(u8, url, "/json/version")) { - try self.send(self.server.json_version_response); + try self.send(self.json_version_response); // Chromedp (a Go driver) does an http request to /json/version // then to / (websocket upgrade) using a different connection. // Since we only allow 1 connection at a time, the 2nd one (the @@ -473,7 +599,7 @@ pub const Client = struct { break :blk res; }; - self.mode = .{ .cdp = try CDP.init(self.server.app, self) }; + self.mode = .{ .cdp = try CDP.init(self.app, self.http, self) }; return self.send(response); } @@ -708,7 +834,7 @@ fn Reader(comptime EXPECT_MASK: bool) type { if (message_len > 125) { return error.ControlTooLarge; } - } else if (message_len > MAX_MESSAGE_SIZE) { + } else if (message_len > Config.MAX_MESSAGE_SIZE) { return error.TooLarge; } else if (message_len > self.buf.len) { const len = self.buf.len; @@ -736,7 +862,7 @@ fn Reader(comptime EXPECT_MASK: bool) type { if (is_continuation) { const fragments = &(self.fragments orelse return error.InvalidContinuation); - if (fragments.message.items.len + message_len > MAX_MESSAGE_SIZE) { + if (fragments.message.items.len + message_len > Config.MAX_MESSAGE_SIZE) { return error.TooLarge; } diff --git a/src/TestHTTPServer.zig b/src/TestHTTPServer.zig index 19c5bbc9..d492f53f 100644 --- a/src/TestHTTPServer.zig +++ b/src/TestHTTPServer.zig @@ -56,9 +56,9 @@ pub fn run(self: *TestHTTPServer, wg: *std.Thread.WaitGroup) !void { wg.finish(); - while (true) { + while (!self.shutdown.load(.acquire)) { const conn = listener.accept() catch |err| { - if (self.shutdown.load(.acquire) or err == error.SocketNotListening) { + if (err == error.SocketNotListening) { return; } return err; diff --git a/src/browser/Browser.zig b/src/browser/Browser.zig index 54668987..5da04f24 100644 --- a/src/browser/Browser.zig +++ b/src/browser/Browser.zig @@ -24,9 +24,9 @@ const ArenaAllocator = std.heap.ArenaAllocator; const js = @import("js/js.zig"); const log = @import("../log.zig"); const App = @import("../App.zig"); +const HttpClient = @import("../http/Client.zig"); const ArenaPool = App.ArenaPool; -const HttpClient = App.Http.Client; const IS_DEBUG = @import("builtin").mode == .Debug; @@ -51,6 +51,7 @@ transfer_arena: ArenaAllocator, const InitOpts = struct { env: js.Env.InitOpts = .{}, + http_client: *HttpClient, }; pub fn init(app: *App, opts: InitOpts) !Browser { @@ -65,7 +66,7 @@ pub fn init(app: *App, opts: InitOpts) !Browser { .session = null, .allocator = allocator, .arena_pool = &app.arena_pool, - .http_client = app.http.client, + .http_client = opts.http_client, .call_arena = ArenaAllocator.init(allocator), .page_arena = ArenaAllocator.init(allocator), .session_arena = ArenaAllocator.init(allocator), diff --git a/src/browser/js/Env.zig b/src/browser/js/Env.zig index 9001d343..110d43c1 100644 --- a/src/browser/js/Env.zig +++ b/src/browser/js/Env.zig @@ -37,6 +37,14 @@ const JsApis = bridge.JsApis; const Allocator = std.mem.Allocator; const IS_DEBUG = @import("builtin").mode == .Debug; +fn initClassIds() void { + inline for (JsApis, 0..) |JsApi, i| { + JsApi.Meta.class_id = i; + } +} + +var class_id_once = std.once(initClassIds); + // The Env maps to a V8 isolate, which represents a isolated sandbox for // executing JavaScript. The Env is where we'll define our V8 <-> Zig bindings, // and it's where we'll start ExecutionWorlds, which actually execute JavaScript. @@ -76,6 +84,9 @@ pub const InitOpts = struct { }; pub fn init(app: *App, opts: InitOpts) !Env { + // Initialize class IDs once before any V8 work + class_id_once.call(); + const allocator = app.allocator; const snapshot = &app.snapshot; @@ -117,8 +128,7 @@ pub fn init(app: *App, opts: InitOpts) !Env { temp_scope.init(isolate); defer temp_scope.deinit(); - inline for (JsApis, 0..) |JsApi, i| { - JsApi.Meta.class_id = i; + inline for (JsApis, 0..) |_, i| { const data = v8.v8__Isolate__GetDataFromSnapshotOnce(isolate_handle, snapshot.data_start + i); const function_handle: *const v8.FunctionTemplate = @ptrCast(data); // Make function template eternal diff --git a/src/browser/js/Inspector.zig b/src/browser/js/Inspector.zig index 7ba97257..6fab9c91 100644 --- a/src/browser/js/Inspector.zig +++ b/src/browser/js/Inspector.zig @@ -38,11 +38,12 @@ const IS_DEBUG = @import("builtin").mode == .Debug; const Inspector = @This(); unique_id: i64, +allocator: Allocator, isolate: *v8.Isolate, handle: *v8.Inspector, client: *v8.InspectorClientImpl, default_context: ?v8.Global, -session: ?Session, +sessions: std.ArrayListUnmanaged(*Session), pub fn init(allocator: Allocator, isolate: *v8.Isolate) !*Inspector { const self = try allocator.create(Inspector); @@ -50,7 +51,8 @@ pub fn init(allocator: Allocator, isolate: *v8.Isolate) !*Inspector { self.* = .{ .unique_id = 1, - .session = null, + .allocator = allocator, + .sessions = .empty, .isolate = isolate, .client = undefined, .handle = undefined, @@ -67,32 +69,43 @@ pub fn init(allocator: Allocator, isolate: *v8.Isolate) !*Inspector { return self; } -pub fn deinit(self: *const Inspector, allocator: Allocator) void { +pub fn deinit(self: *Inspector, allocator: Allocator) void { var hs: v8.HandleScope = undefined; v8.v8__HandleScope__CONSTRUCT(&hs, self.isolate); defer v8.v8__HandleScope__DESTRUCT(&hs); - if (self.session) |*s| { - s.deinit(); + for (self.sessions.items) |session| { + session.deinit(); + self.allocator.destroy(session); } + self.sessions.deinit(self.allocator); + v8.v8_inspector__Client__IMPL__DELETE(self.client); v8.v8_inspector__Inspector__DELETE(self.handle); allocator.destroy(self); } -pub fn startSession(self: *Inspector, ctx: anytype) *Session { - if (comptime IS_DEBUG) { - std.debug.assert(self.session == null); - } +pub fn startSession(self: *Inspector, ctx: anytype) !*Session { + const session = try self.allocator.create(Session); + errdefer self.allocator.destroy(session); - self.session = @as(Session, undefined); - Session.init(&self.session.?, self, ctx); - return &self.session.?; + Session.init(session, self, ctx); + try self.sessions.append(self.allocator, session); + return session; } -pub fn stopSession(self: *Inspector) void { - self.session.?.deinit(); - self.session = null; +pub fn stopSession(self: *Inspector, session: *Session) void { + for (self.sessions.items, 0..) |s, i| { + if (s == session) { + _ = self.sessions.swapRemove(i); + session.deinit(); + self.allocator.destroy(session); + return; + } + } + if (comptime IS_DEBUG) { + @panic("Tried to stop unknown inspector session"); + } } // From CDP docs diff --git a/src/cdp/cdp.zig b/src/cdp/cdp.zig index 99310255..bd0574b7 100644 --- a/src/cdp/cdp.zig +++ b/src/cdp/cdp.zig @@ -28,6 +28,7 @@ const js = @import("../browser/js/js.zig"); const App = @import("../App.zig"); const Browser = @import("../browser/Browser.zig"); const Session = @import("../browser/Session.zig"); +const HttpClient = @import("../http/Client.zig"); const Page = @import("../browser/Page.zig"); const Incrementing = @import("../id.zig").Incrementing; const Notification = @import("../Notification.zig"); @@ -85,10 +86,11 @@ pub fn CDPT(comptime TypeProvider: type) type { const Self = @This(); - pub fn init(app: *App, client: TypeProvider.Client) !Self { + pub fn init(app: *App, http_client: *HttpClient, client: TypeProvider.Client) !Self { const allocator = app.allocator; const browser = try Browser.init(app, .{ .env = .{ .with_inspector = true }, + .http_client = http_client, }); errdefer browser.deinit(); @@ -403,8 +405,9 @@ pub fn BrowserContext(comptime CDP_T: type) type { const session = try cdp.browser.newSession(notification); const browser = &cdp.browser; - const inspector_session = browser.env.inspector.?.startSession(self); - errdefer browser.env.inspector.?.stopSession(); + const inspector = browser.env.inspector.?; + const inspector_session = try inspector.startSession(self); + errdefer inspector.stopSession(inspector_session); var registry = Node.Registry.init(allocator); errdefer registry.deinit(); @@ -455,7 +458,7 @@ pub fn BrowserContext(comptime CDP_T: type) type { // before deinit it. browser.env.inspector.?.resetContextGroup(); browser.runMessageLoop(); - browser.env.inspector.?.stopSession(); + browser.env.inspector.?.stopSession(self.inspector_session); // abort all intercepted requests before closing the sesion/page // since some of these might callback into the page/scriptmanager diff --git a/src/cdp/testing.zig b/src/cdp/testing.zig index 4de1e59f..a0369d0e 100644 --- a/src/cdp/testing.zig +++ b/src/cdp/testing.zig @@ -85,7 +85,7 @@ const TestContext = struct { self.client = Client.init(self.arena.allocator()); // Don't use the arena here. We want to detect leaks in CDP. // The arena is only for test-specific stuff - self.cdp_ = TestCDP.init(base.test_app, &self.client.?) catch unreachable; + self.cdp_ = TestCDP.init(base.test_app, base.test_http, &self.client.?) catch unreachable; } return &self.cdp_.?; } diff --git a/src/http/Http.zig b/src/http/Http.zig index 3d488f95..42255120 100644 --- a/src/http/Http.zig +++ b/src/http/Http.zig @@ -17,8 +17,6 @@ // along with this program. If not, see . const std = @import("std"); -const lp = @import("lightpanda"); -const Config = @import("../Config.zig"); pub const c = @cImport({ @cInclude("curl/curl.h"); @@ -28,6 +26,8 @@ pub const ENABLE_DEBUG = false; pub const Client = @import("Client.zig"); pub const Transfer = Client.Transfer; +const lp = @import("lightpanda"); +const Config = @import("../Config.zig"); const log = @import("../log.zig"); const errors = @import("errors.zig"); @@ -41,8 +41,8 @@ const ArenaAllocator = std.heap.ArenaAllocator; // once for all http connections is a win. const Http = @This(); +allocator: Allocator, config: *const Config, -client: *Client, ca_blob: ?c.curl_blob, arena: ArenaAllocator, @@ -59,40 +59,28 @@ pub fn init(allocator: Allocator, config: *const Config) !Http { var ca_blob: ?c.curl_blob = null; if (config.tlsVerifyHost()) { - ca_blob = try loadCerts(allocator, arena.allocator()); + ca_blob = try loadCerts(allocator); } - var client = try Client.init(allocator, ca_blob, config); - errdefer client.deinit(); - return .{ .arena = arena, - .client = client, .ca_blob = ca_blob, + .allocator = allocator, .config = config, }; } pub fn deinit(self: *Http) void { - self.client.deinit(); + if (self.ca_blob) |ca_blob| { + const data: [*]u8 = @ptrCast(ca_blob.data); + self.allocator.free(data[0..ca_blob.len]); + } c.curl_global_cleanup(); self.arena.deinit(); } -pub fn poll(self: *Http, timeout_ms: u32) Client.PerformStatus { - return self.client.tick(timeout_ms) catch |err| { - log.err(.app, "http poll", .{ .err = err }); - return .normal; - }; -} - -pub fn addCDPClient(self: *Http, cdp_client: Client.CDPClient) void { - lp.assert(self.client.cdp_client == null, "Http addCDPClient existing", .{}); - self.client.cdp_client = cdp_client; -} - -pub fn removeCDPClient(self: *Http) void { - self.client.cdp_client = null; +pub fn createClient(self: *Http, allocator: Allocator) !*Client { + return Client.init(allocator, self.ca_blob, self.config); } pub fn newConnection(self: *Http) !Connection { @@ -348,7 +336,7 @@ pub const Method = enum(u8) { // This whole rescan + decode is really just needed for MacOS. On Linux // bundle.rescan does find the .pem file(s) which could be in a few different // places, so it's still useful, just not efficient. -fn loadCerts(allocator: Allocator, arena: Allocator) !c.curl_blob { +fn loadCerts(allocator: Allocator) !c.curl_blob { var bundle: std.crypto.Certificate.Bundle = .{}; try bundle.rescan(allocator); defer bundle.deinit(allocator); @@ -371,8 +359,9 @@ fn loadCerts(allocator: Allocator, arena: Allocator) !c.curl_blob { (bundle.map.count() * 75) + // start / end per certificate + extra, just in case (encoded_size / 64) // newline per 64 characters ; - try arr.ensureTotalCapacity(arena, buffer_size); - var writer = arr.writer(arena); + try arr.ensureTotalCapacity(allocator, buffer_size); + errdefer arr.deinit(allocator); + var writer = arr.writer(allocator); var it = bundle.map.valueIterator(); while (it.next()) |index| { @@ -385,11 +374,16 @@ fn loadCerts(allocator: Allocator, arena: Allocator) !c.curl_blob { } // Final encoding should not be larger than our initial size estimate - lp.assert(buffer_size > arr.items.len, "Http loadCerts", .{ .estiate = buffer_size, .len = arr.items.len }); + lp.assert(buffer_size > arr.items.len, "Http loadCerts", .{ .estimate = buffer_size, .len = arr.items.len }); + + // Allocate exactly the size needed and copy the data + const result = try allocator.dupe(u8, arr.items); + // Free the original oversized allocation + arr.deinit(allocator); return .{ - .len = arr.items.len, - .data = arr.items.ptr, + .len = result.len, + .data = result.ptr, .flags = 0, }; } diff --git a/src/lightpanda.zig b/src/lightpanda.zig index 59ac4dc9..12481465 100644 --- a/src/lightpanda.zig +++ b/src/lightpanda.zig @@ -38,9 +38,12 @@ pub const FetchOpts = struct { dump: dump.RootOpts, writer: ?*std.Io.Writer = null, }; -pub fn fetch(app: *App, url: [:0]const u8, opts: FetchOpts) !void { - var browser = try Browser.init(app, .{}); - const notification = try Notification.init(app.allocator); +pub fn fetch(allocator: std.mem.Allocator, app: *App, url: [:0]const u8, opts: FetchOpts) !void { + const http_client = try app.http.createClient(allocator); + defer http_client.deinit(); + + var browser = try Browser.init(app, .{ .http_client = http_client }); + const notification = try Notification.init(allocator); defer notification.deinit(); defer browser.deinit(); diff --git a/src/log.zig b/src/log.zig index 9936cbb0..168867ed 100644 --- a/src/log.zig +++ b/src/log.zig @@ -49,7 +49,7 @@ const Opts = struct { pub var opts = Opts{}; -// synchronizes writes to the output +// synchronizes access to _interceptor var out_lock: Thread.Mutex = .{}; // synchronizes access to last_log @@ -146,7 +146,14 @@ fn logTo(comptime scope: Scope, level: Level, comptime msg: []const u8, data: an } out.flush() catch return; - const interceptor = _interceptor orelse return; + // Copy the interceptor under lock, then release before doing I/O + const interceptor = blk: { + out_lock.lock(); + defer out_lock.unlock(); + break :blk _interceptor orelse return; + }; + + // I/O operations happen without holding the lock to minimize contention if (interceptor.writer(interceptor.ctx, scope, level)) |iwriter| { try logLogfmt(scope, level, msg, data, iwriter); try iwriter.flush(); @@ -368,10 +375,14 @@ fn timestamp(comptime mode: datetime.TimestampMode) u64 { var _interceptor: ?Interceptor = null; pub fn registerInterceptor(interceptor: Interceptor) void { + out_lock.lock(); + defer out_lock.unlock(); _interceptor = interceptor; } pub fn unregisterInterceptor() void { + out_lock.lock(); + defer out_lock.unlock(); _interceptor = null; } diff --git a/src/main.zig b/src/main.zig index 76c23d74..e7aa184f 100644 --- a/src/main.zig +++ b/src/main.zig @@ -93,7 +93,7 @@ fn run(allocator: Allocator, main_arena: Allocator, sighandler: *SigHandler) !vo }; // _server is global to handle graceful shutdown. - var server = try lp.Server.init(app, address); + var server = try lp.Server.init(allocator, app, address); defer server.deinit(); try sighandler.on(lp.Server.stop, .{&server}); @@ -123,7 +123,7 @@ fn run(allocator: Allocator, main_arena: Allocator, sighandler: *SigHandler) !vo fetch_opts.writer = &writer.interface; } - lp.fetch(app, url, fetch_opts) catch |err| { + lp.fetch(allocator, app, url, fetch_opts) catch |err| { log.fatal(.app, "fetch error", .{ .err = err, .url = url }); return err; }; diff --git a/src/main_legacy_test.zig b/src/main_legacy_test.zig index bc59492a..d8011120 100644 --- a/src/main_legacy_test.zig +++ b/src/main_legacy_test.zig @@ -44,8 +44,11 @@ pub fn main() !void { var test_arena = std.heap.ArenaAllocator.init(allocator); defer test_arena.deinit(); - var browser = try lp.Browser.init(app, .{}); - const notification = try lp.Notification.init(app.allocator); + const http_client = try app.http.createClient(allocator); + defer http_client.deinit(); + + var browser = try lp.Browser.init(app, .{ .http_client = http_client }); + const notification = try lp.Notification.init(allocator); defer notification.deinit(); defer browser.deinit(); diff --git a/src/main_wpt.zig b/src/main_wpt.zig index 8647df96..562c57ea 100644 --- a/src/main_wpt.zig +++ b/src/main_wpt.zig @@ -67,7 +67,10 @@ pub fn main() !void { var app = try lp.App.init(allocator, &config); defer app.deinit(); - var browser = try lp.Browser.init(app, .{}); + const http_client = try app.http.createClient(allocator); + defer http_client.deinit(); + + var browser = try lp.Browser.init(app, .{ .http_client = http_client }); defer browser.deinit(); // An arena for running each tests. Is reset after every test. @@ -109,7 +112,7 @@ fn run( test_file: []const u8, err_out: *?[]const u8, ) ![]const u8 { - const notification = try lp.Notification.init(browser.allocator); + const notification = try lp.Notification.init(browser.app.allocator); defer notification.deinit(); const session = try browser.newSession(notification); diff --git a/src/testing.zig b/src/testing.zig index 41f67e96..c6af35d6 100644 --- a/src/testing.zig +++ b/src/testing.zig @@ -39,6 +39,7 @@ pub fn reset() void { const App = @import("App.zig"); const js = @import("browser/js/js.zig"); const Config = @import("Config.zig"); +const Client = @import("http/Client.zig"); const Page = @import("browser/Page.zig"); const Browser = @import("browser/Browser.zig"); const Session = @import("browser/Session.zig"); @@ -334,6 +335,7 @@ fn isJsonValue(a: std.json.Value, b: std.json.Value) bool { } pub var test_app: *App = undefined; +pub var test_http: *Client = undefined; pub var test_browser: Browser = undefined; pub var test_notification: *Notification = undefined; pub var test_session: *Session = undefined; @@ -472,7 +474,10 @@ test "tests:beforeAll" { test_app = try App.init(test_allocator, &test_config); errdefer test_app.deinit(); - test_browser = try Browser.init(test_app, .{}); + test_http = try test_app.http.createClient(test_allocator); + errdefer test_http.deinit(); + + test_browser = try Browser.init(test_app, .{ .http_client = test_http }); errdefer test_browser.deinit(); // Create notification for testing @@ -519,13 +524,15 @@ test "tests:afterAll" { test_notification.deinit(); test_browser.deinit(); + test_http.deinit(); test_app.deinit(); test_config.deinit(@import("root").tracking_allocator); } fn serveCDP(wg: *std.Thread.WaitGroup) !void { const address = try std.net.Address.parseIp("127.0.0.1", 9583); - test_cdp_server = try Server.init(test_app, address); + const test_allocator = @import("root").tracking_allocator; + test_cdp_server = try Server.init(test_allocator, test_app, address); wg.finish();