diff --git a/.github/workflows/e2e-test.yml b/.github/workflows/e2e-test.yml index 6f9055cc..74eda203 100644 --- a/.github/workflows/e2e-test.yml +++ b/.github/workflows/e2e-test.yml @@ -122,8 +122,8 @@ jobs: needs: zig-build-release env: - MAX_VmHWM: 26000 # 26MB (KB) - MAX_CG_PEAK: 6000 # 6MB (KB) + MAX_VmHWM: 28000 # 28MB (KB) + MAX_CG_PEAK: 8000 # 8MB (KB) MAX_AVG_DURATION: 17 LIGHTPANDA_DISABLE_TELEMETRY: true diff --git a/src/ArenaPool.zig b/src/ArenaPool.zig index 32257e30..7f1d38dc 100644 --- a/src/ArenaPool.zig +++ b/src/ArenaPool.zig @@ -29,6 +29,7 @@ free_list_len: u16 = 0, free_list: ?*Entry = null, free_list_max: u16, entry_pool: std.heap.MemoryPool(Entry), +mutex: std.Thread.Mutex = .{}, const Entry = struct { next: ?*Entry, @@ -54,6 +55,9 @@ pub fn deinit(self: *ArenaPool) void { } pub fn acquire(self: *ArenaPool) !Allocator { + self.mutex.lock(); + defer self.mutex.unlock(); + if (self.free_list) |entry| { self.free_list = entry.next; self.free_list_len -= 1; @@ -73,6 +77,12 @@ pub fn release(self: *ArenaPool, allocator: Allocator) void { const arena: *std.heap.ArenaAllocator = @ptrCast(@alignCast(allocator.ptr)); const entry: *Entry = @fieldParentPtr("arena", arena); + // Reset the arena before acquiring the lock to minimize lock hold time + _ = arena.reset(.{ .retain_with_limit = self.retain_bytes }); + + self.mutex.lock(); + defer self.mutex.unlock(); + const free_list_len = self.free_list_len; if (free_list_len == self.free_list_max) { arena.deinit(); @@ -80,7 +90,6 @@ pub fn release(self: *ArenaPool, allocator: Allocator) void { return; } - _ = arena.reset(.{ .retain_with_limit = self.retain_bytes }); entry.next = self.free_list; self.free_list_len = free_list_len + 1; self.free_list = entry; diff --git a/src/Config.zig b/src/Config.zig index c9725168..39422791 100644 --- a/src/Config.zig +++ b/src/Config.zig @@ -30,6 +30,13 @@ pub const RunMode = enum { version, }; +pub const CDP_MAX_HTTP_REQUEST_SIZE = 4096; + +// max message size +// +14 for max websocket payload overhead +// +140 for the max control packet that might be interleaved in a message +pub const CDP_MAX_MESSAGE_SIZE = 512 * 1024 + 14 + 140; + mode: Mode, exec_name: []const u8, http_headers: HttpHeaders, @@ -145,6 +152,20 @@ pub fn userAgentSuffix(self: *const Config) ?[]const u8 { }; } +pub fn maxConnections(self: *const Config) u16 { + return switch (self.mode) { + .serve => |opts| opts.cdp_max_connections, + else => unreachable, + }; +} + +pub fn maxPendingConnections(self: *const Config) u31 { + return switch (self.mode) { + .serve => |opts| opts.cdp_max_pending_connections, + else => unreachable, + }; +} + pub const Mode = union(RunMode) { help: bool, // false when being printed because of an error fetch: Fetch, @@ -156,10 +177,8 @@ pub const Serve = struct { host: []const u8 = "127.0.0.1", port: u16 = 9222, timeout: u31 = 10, - max_connections: u16 = 16, - max_tabs_per_connection: u16 = 8, - max_memory_per_tab: u64 = 512 * 1024 * 1024, - max_pending_connections: u16 = 128, + cdp_max_connections: u16 = 16, + cdp_max_pending_connections: u16 = 128, common: Common = .{}, }; @@ -333,18 +352,11 @@ pub fn printUsageAndExit(self: *const Config, success: bool) void { \\--timeout Inactivity timeout in seconds before disconnecting clients \\ Defaults to 10 (seconds). Limited to 604800 (1 week). \\ - \\--max_connections + \\--cdp_max_connections \\ Maximum number of simultaneous CDP connections. \\ Defaults to 16. \\ - \\--max_tabs Maximum number of tabs per CDP connection. - \\ Defaults to 8. - \\ - \\--max_tab_memory - \\ Maximum memory per tab in bytes. - \\ Defaults to 536870912 (512 MB). - \\ - \\--max_pending_connections + \\--cdp_max_pending_connections \\ Maximum pending connections in the accept queue. \\ Defaults to 128. \\ @@ -479,53 +491,27 @@ fn parseServeArgs( continue; } - if (std.mem.eql(u8, "--max_connections", opt)) { + if (std.mem.eql(u8, "--cdp_max_connections", opt)) { const str = args.next() orelse { - log.fatal(.app, "missing argument value", .{ .arg = "--max_connections" }); + log.fatal(.app, "missing argument value", .{ .arg = "--cdp_max_connections" }); return error.InvalidArgument; }; - serve.max_connections = std.fmt.parseInt(u16, str, 10) catch |err| { - log.fatal(.app, "invalid argument value", .{ .arg = "--max_connections", .err = err }); + serve.cdp_max_connections = std.fmt.parseInt(u16, str, 10) catch |err| { + log.fatal(.app, "invalid argument value", .{ .arg = "--cdp_max_connections", .err = err }); return error.InvalidArgument; }; continue; } - if (std.mem.eql(u8, "--max_tabs", opt)) { + if (std.mem.eql(u8, "--cdp_max_pending_connections", opt)) { const str = args.next() orelse { - log.fatal(.app, "missing argument value", .{ .arg = "--max_tabs" }); + log.fatal(.app, "missing argument value", .{ .arg = "--cdp_max_pending_connections" }); return error.InvalidArgument; }; - serve.max_tabs_per_connection = std.fmt.parseInt(u16, str, 10) catch |err| { - log.fatal(.app, "invalid argument value", .{ .arg = "--max_tabs", .err = err }); - return error.InvalidArgument; - }; - continue; - } - - if (std.mem.eql(u8, "--max_tab_memory", opt)) { - const str = args.next() orelse { - log.fatal(.app, "missing argument value", .{ .arg = "--max_tab_memory" }); - return error.InvalidArgument; - }; - - serve.max_memory_per_tab = std.fmt.parseInt(u64, str, 10) catch |err| { - log.fatal(.app, "invalid argument value", .{ .arg = "--max_tab_memory", .err = err }); - return error.InvalidArgument; - }; - continue; - } - - if (std.mem.eql(u8, "--max_pending_connections", opt)) { - const str = args.next() orelse { - log.fatal(.app, "missing argument value", .{ .arg = "--max_pending_connections" }); - return error.InvalidArgument; - }; - - serve.max_pending_connections = std.fmt.parseInt(u16, str, 10) catch |err| { - log.fatal(.app, "invalid argument value", .{ .arg = "--max_pending_connections", .err = err }); + serve.cdp_max_pending_connections = std.fmt.parseInt(u16, str, 10) catch |err| { + log.fatal(.app, "invalid argument value", .{ .arg = "--cdp_max_pending_connections", .err = err }); return error.InvalidArgument; }; continue; diff --git a/src/Server.zig b/src/Server.zig index 1ac3084c..9f060a1c 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -28,23 +28,25 @@ const ArenaAllocator = std.heap.ArenaAllocator; const log = @import("log.zig"); const App = @import("App.zig"); +const Config = @import("Config.zig"); const CDP = @import("cdp/cdp.zig").CDP; - -const MAX_HTTP_REQUEST_SIZE = 4096; - -// max message size -// +14 for max websocket payload overhead -// +140 for the max control packet that might be interleaved in a message -const MAX_MESSAGE_SIZE = 512 * 1024 + 14 + 140; +const Http = @import("http/Http.zig"); +const HttpClient = @import("http/Client.zig"); const Server = @This(); + app: *App, -shutdown: bool = false, +shutdown: std.atomic.Value(bool) = .init(false), allocator: Allocator, -client: ?posix.socket_t, listener: ?posix.socket_t, json_version_response: []const u8, +// Thread management +active_threads: std.atomic.Value(u32) = .init(0), +clients: std.ArrayList(*Client) = .{}, +client_mutex: std.Thread.Mutex = .{}, +clients_pool: std.heap.MemoryPool(Client), + pub fn init(app: *App, address: net.Address) !Server { const allocator = app.allocator; const json_version_response = try buildJSONVersionResponse(allocator, address); @@ -52,19 +54,28 @@ pub fn init(app: *App, address: net.Address) !Server { return .{ .app = app, - .client = null, .listener = null, .allocator = allocator, .json_version_response = json_version_response, + .clients_pool = std.heap.MemoryPool(Client).init(app.allocator), }; } /// Interrupts the server so that main can complete normally and call all defer handlers. pub fn stop(self: *Server) void { - if (@atomicRmw(bool, &self.shutdown, .Xchg, true, .monotonic)) { + if (self.shutdown.swap(true, .release)) { return; } + // Shutdown all active clients + { + self.client_mutex.lock(); + defer self.client_mutex.unlock(); + for (self.clients.items) |client| { + client.stop(); + } + } + // Linux and BSD/macOS handle canceling a socket blocked on accept differently. // For Linux, we use std.shutdown, which will cause accept to return error.SocketNotListening (EINVAL). // For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (BADF). @@ -81,17 +92,22 @@ pub fn stop(self: *Server) void { } pub fn deinit(self: *Server) void { + if (!self.shutdown.load(.acquire)) { + self.stop(); + } + + self.joinThreads(); if (self.listener) |listener| { posix.close(listener); self.listener = null; } - // *if* server.run is running, we should really wait for it to return - // before existing from here. + self.clients.deinit(self.allocator); + self.clients_pool.deinit(); self.allocator.free(self.json_version_response); } pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { - const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC; + const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK; const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP); self.listener = listener; @@ -101,16 +117,20 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { } try posix.bind(listener, &address.any, address.getOsSockLen()); - try posix.listen(listener, 1); + try posix.listen(listener, self.app.config.maxPendingConnections()); log.info(.app, "server running", .{ .address = address }); - while (!@atomicLoad(bool, &self.shutdown, .monotonic)) { + while (!self.shutdown.load(.acquire)) { const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| { switch (err) { error.SocketNotListening, error.ConnectionAborted => { log.info(.app, "server stopped", .{}); break; }, + error.WouldBlock => { + std.Thread.sleep(10 * std.time.ns_per_ms); + continue; + }, else => { log.err(.app, "CDP accept", .{ .err = err }); std.Thread.sleep(std.time.ns_per_s); @@ -119,96 +139,121 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { } }; - self.client = socket; - defer if (self.client) |s| { - posix.close(s); - self.client = null; - }; - - if (log.enabled(.app, .info)) { - var client_address: std.net.Address = undefined; - var socklen: posix.socklen_t = @sizeOf(net.Address); - try std.posix.getsockname(socket, &client_address.any, &socklen); - log.info(.app, "client connected", .{ .ip = client_address }); - } - - self.readLoop(socket, timeout_ms) catch |err| { - log.err(.app, "CDP client loop", .{ .err = err }); + self.spawnWorker(socket, timeout_ms) catch |err| { + log.err(.app, "CDP spawn", .{ .err = err }); + posix.close(socket); }; } } -fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void { - // This shouldn't be necessary, but the Client is HUGE (> 512KB) because - // it has a large read buffer. I don't know why, but v8 crashes if this - // is on the stack (and I assume it's related to its size). - const client = try self.allocator.create(Client); - defer self.allocator.destroy(client); +fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void { + defer posix.close(socket); - client.* = try Client.init(socket, self); + // Client is HUGE (> 512KB) because it has a large read buffer. + // V8 crashes if this is on the stack (likely related to its size). + const client = self.getClient() catch |err| { + log.err(.app, "CDP client create", .{ .err = err }); + return; + }; + defer self.releaseClient(client); + + client.* = Client.init( + socket, + self.allocator, + self.app, + self.json_version_response, + timeout_ms, + ) catch |err| { + log.err(.app, "CDP client init", .{ .err = err }); + return; + }; defer client.deinit(); - var http = &self.app.http; - http.addCDPClient(.{ - .socket = socket, - .ctx = client, - .blocking_read_start = Client.blockingReadStart, - .blocking_read = Client.blockingRead, - .blocking_read_end = Client.blockingReadStop, - }); - defer http.removeCDPClient(); + self.registerClient(client); + defer self.unregisterClient(client); - lp.assert(client.mode == .http, "Server.readLoop invalid mode", .{}); - while (true) { - if (http.poll(timeout_ms) != .cdp_socket) { - log.info(.app, "CDP timeout", .{}); - return; - } - - if (client.readSocket() == false) { - return; - } - - if (client.mode == .cdp) { - break; // switch to our CDP loop - } + // Check shutdown after registering to avoid missing stop() signal. + // If stop() already iterated over clients, this client won't receive stop() + // and would block joinThreads() indefinitely. + if (self.shutdown.load(.acquire)) { + return; } - var cdp = &client.mode.cdp; - var last_message = timestamp(.monotonic); - var ms_remaining = timeout_ms; - while (true) { - switch (cdp.pageWait(ms_remaining)) { - .cdp_socket => { - if (client.readSocket() == false) { - return; - } - last_message = timestamp(.monotonic); - ms_remaining = timeout_ms; - }, - .no_page => { - if (http.poll(ms_remaining) != .cdp_socket) { - log.info(.app, "CDP timeout", .{}); - return; - } - if (client.readSocket() == false) { - return; - } - last_message = timestamp(.monotonic); - ms_remaining = timeout_ms; - }, - .done => { - const elapsed = timestamp(.monotonic) - last_message; - if (elapsed > ms_remaining) { - log.info(.app, "CDP timeout", .{}); - return; - } - ms_remaining -= @intCast(elapsed); - }, + client.start(); +} + +fn getClient(self: *Server) !*Client { + self.client_mutex.lock(); + defer self.client_mutex.unlock(); + return self.clients_pool.create(); +} + +fn releaseClient(self: *Server, client: *Client) void { + self.client_mutex.lock(); + defer self.client_mutex.unlock(); + self.clients_pool.destroy(client); +} + +fn registerClient(self: *Server, client: *Client) void { + self.client_mutex.lock(); + defer self.client_mutex.unlock(); + self.clients.append(self.allocator, client) catch {}; +} + +fn unregisterClient(self: *Server, client: *Client) void { + self.client_mutex.lock(); + defer self.client_mutex.unlock(); + for (self.clients.items, 0..) |c, i| { + if (c == client) { + _ = self.clients.swapRemove(i); + break; } } } +fn spawnWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void { + if (self.shutdown.load(.acquire)) { + return error.ShuttingDown; + } + + // Atomically increment active_threads only if below max_connections. + // Uses CAS loop to avoid race between checking the limit and incrementing. + // + // cmpxchgWeak may fail for two reasons: + // 1. Another thread changed the value (increment or decrement) + // 2. Spurious failure on some architectures (e.g. ARM) + // + // We use Weak instead of Strong because we need a retry loop anyway: + // if CAS fails because a thread finished (counter decreased), we should + // retry rather than return an error - there may now be room for a new connection. + // + // On failure, cmpxchgWeak returns the actual value, which we reuse to avoid + // an extra load on the next iteration. + const max_connections = self.app.config.maxConnections(); + var current = self.active_threads.load(.monotonic); + while (current < max_connections) { + current = self.active_threads.cmpxchgWeak(current, current + 1, .monotonic, .monotonic) orelse break; + } else { + return error.MaxThreadsReached; + } + errdefer _ = self.active_threads.fetchSub(1, .monotonic); + + const thread = try std.Thread.spawn(.{}, runWorker, .{ self, socket, timeout_ms }); + thread.detach(); +} + +fn runWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) void { + defer _ = self.active_threads.fetchSub(1, .monotonic); + handleConnection(self, socket, timeout_ms); +} + +fn joinThreads(self: *Server) void { + while (self.active_threads.load(.monotonic) > 0) { + std.Thread.sleep(10 * std.time.ns_per_ms); + } +} + +// Handle exactly one TCP connection. pub const Client = struct { // The client is initially serving HTTP requests but, under normal circumstances // should eventually be upgraded to a websocket connections @@ -217,11 +262,15 @@ pub const Client = struct { cdp: CDP, }, - server: *Server, + allocator: Allocator, + app: *App, + http: *HttpClient, + json_version_response: []const u8, reader: Reader(true), socket: posix.socket_t, socket_flags: usize, send_arena: ArenaAllocator, + timeout_ms: u32, const EMPTY_PONG = [_]u8{ 138, 0 }; @@ -232,25 +281,49 @@ pub const Client = struct { // "private-use" close codes must be from 4000-49999 const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000 - fn init(socket: posix.socket_t, server: *Server) !Client { + fn init( + socket: posix.socket_t, + allocator: Allocator, + app: *App, + json_version_response: []const u8, + timeout_ms: u32, + ) !Client { + if (log.enabled(.app, .info)) { + var client_address: std.net.Address = undefined; + var socklen: posix.socklen_t = @sizeOf(net.Address); + try std.posix.getsockname(socket, &client_address.any, &socklen); + log.info(.app, "client connected", .{ .ip = client_address }); + } + const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0); const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true })); // we expect the socket to come to us as nonblocking lp.assert(socket_flags & nonblocking == nonblocking, "Client.init blocking", .{}); - var reader = try Reader(true).init(server.allocator); + var reader = try Reader(true).init(allocator); errdefer reader.deinit(); + const http = try app.http.createClient(allocator); + errdefer http.deinit(); + return .{ .socket = socket, - .server = server, + .allocator = allocator, + .app = app, + .http = http, + .json_version_response = json_version_response, .reader = reader, .mode = .{ .http = {} }, .socket_flags = socket_flags, - .send_arena = ArenaAllocator.init(server.allocator), + .send_arena = ArenaAllocator.init(allocator), + .timeout_ms = timeout_ms, }; } + fn stop(self: *Client) void { + posix.shutdown(self.socket, .recv) catch {}; + } + fn deinit(self: *Client) void { switch (self.mode) { .cdp => |*cdp| cdp.deinit(), @@ -258,6 +331,88 @@ pub const Client = struct { } self.reader.deinit(); self.send_arena.deinit(); + self.http.deinit(); + } + + fn start(self: *Client) void { + const http = self.http; + http.cdp_client = .{ + .socket = self.socket, + .ctx = self, + .blocking_read_start = Client.blockingReadStart, + .blocking_read = Client.blockingRead, + .blocking_read_end = Client.blockingReadStop, + }; + defer http.cdp_client = null; + + self.httpLoop(http) catch |err| { + log.err(.app, "CDP client loop", .{ .err = err }); + }; + } + + fn httpLoop(self: *Client, http: *HttpClient) !void { + lp.assert(self.mode == .http, "Client.httpLoop invalid mode", .{}); + while (true) { + const status = http.tick(self.timeout_ms) catch |err| { + log.err(.app, "http tick", .{ .err = err }); + return; + }; + if (status != .cdp_socket) { + log.info(.app, "CDP timeout", .{}); + return; + } + + if (self.readSocket() == false) { + return; + } + + if (self.mode == .cdp) { + break; + } + } + + return self.cdpLoop(http); + } + + fn cdpLoop(self: *Client, http: *HttpClient) !void { + var cdp = &self.mode.cdp; + var last_message = timestamp(.monotonic); + var ms_remaining = self.timeout_ms; + + while (true) { + switch (cdp.pageWait(ms_remaining)) { + .cdp_socket => { + if (self.readSocket() == false) { + return; + } + last_message = timestamp(.monotonic); + ms_remaining = self.timeout_ms; + }, + .no_page => { + const status = http.tick(ms_remaining) catch |err| { + log.err(.app, "http tick", .{ .err = err }); + return; + }; + if (status != .cdp_socket) { + log.info(.app, "CDP timeout", .{}); + return; + } + if (self.readSocket() == false) { + return; + } + last_message = timestamp(.monotonic); + ms_remaining = self.timeout_ms; + }, + .done => { + const elapsed = timestamp(.monotonic) - last_message; + if (elapsed > ms_remaining) { + log.info(.app, "CDP timeout", .{}); + return; + } + ms_remaining -= @intCast(elapsed); + }, + } + } } fn blockingReadStart(ctx: *anyopaque) bool { @@ -314,7 +469,7 @@ pub const Client = struct { lp.assert(self.reader.pos == 0, "Client.HTTP pos", .{ .pos = self.reader.pos }); const request = self.reader.buf[0..self.reader.len]; - if (request.len > MAX_HTTP_REQUEST_SIZE) { + if (request.len > Config.CDP_MAX_HTTP_REQUEST_SIZE) { self.writeHTTPErrorResponse(413, "Request too large"); return error.RequestTooLarge; } @@ -367,7 +522,7 @@ pub const Client = struct { } if (std.mem.eql(u8, url, "/json/version")) { - try self.send(self.server.json_version_response); + try self.send(self.json_version_response); // Chromedp (a Go driver) does an http request to /json/version // then to / (websocket upgrade) using a different connection. // Since we only allow 1 connection at a time, the 2nd one (the @@ -472,7 +627,7 @@ pub const Client = struct { break :blk res; }; - self.mode = .{ .cdp = try CDP.init(self.server.app, self) }; + self.mode = .{ .cdp = try CDP.init(self.app, self.http, self) }; return self.send(response); } @@ -707,7 +862,7 @@ fn Reader(comptime EXPECT_MASK: bool) type { if (message_len > 125) { return error.ControlTooLarge; } - } else if (message_len > MAX_MESSAGE_SIZE) { + } else if (message_len > Config.CDP_MAX_MESSAGE_SIZE) { return error.TooLarge; } else if (message_len > self.buf.len) { const len = self.buf.len; @@ -735,7 +890,7 @@ fn Reader(comptime EXPECT_MASK: bool) type { if (is_continuation) { const fragments = &(self.fragments orelse return error.InvalidContinuation); - if (fragments.message.items.len + message_len > MAX_MESSAGE_SIZE) { + if (fragments.message.items.len + message_len > Config.CDP_MAX_MESSAGE_SIZE) { return error.TooLarge; } diff --git a/src/browser/Browser.zig b/src/browser/Browser.zig index b89e23c1..09a78cab 100644 --- a/src/browser/Browser.zig +++ b/src/browser/Browser.zig @@ -24,9 +24,9 @@ const ArenaAllocator = std.heap.ArenaAllocator; const js = @import("js/js.zig"); const log = @import("../log.zig"); const App = @import("../App.zig"); +const HttpClient = @import("../http/Client.zig"); const ArenaPool = App.ArenaPool; -const HttpClient = App.Http.Client; const IS_DEBUG = @import("builtin").mode == .Debug; @@ -47,6 +47,7 @@ http_client: *HttpClient, const InitOpts = struct { env: js.Env.InitOpts = .{}, + http_client: *HttpClient, }; pub fn init(app: *App, opts: InitOpts) !Browser { @@ -61,7 +62,7 @@ pub fn init(app: *App, opts: InitOpts) !Browser { .session = null, .allocator = allocator, .arena_pool = &app.arena_pool, - .http_client = app.http.client, + .http_client = opts.http_client, }; } diff --git a/src/browser/Robots.zig b/src/browser/Robots.zig index 5c0428fa..0318ef30 100644 --- a/src/browser/Robots.zig +++ b/src/browser/Robots.zig @@ -111,12 +111,16 @@ pub const RobotStore = struct { allocator: std.mem.Allocator, map: RobotsMap, + mutex: std.Thread.Mutex = .{}, pub fn init(allocator: std.mem.Allocator) RobotStore { return .{ .allocator = allocator, .map = .empty }; } pub fn deinit(self: *RobotStore) void { + self.mutex.lock(); + defer self.mutex.unlock(); + var iter = self.map.iterator(); while (iter.next()) |entry| { @@ -132,6 +136,9 @@ pub const RobotStore = struct { } pub fn get(self: *RobotStore, url: []const u8) ?RobotsEntry { + self.mutex.lock(); + defer self.mutex.unlock(); + return self.map.get(url); } @@ -140,11 +147,17 @@ pub const RobotStore = struct { } pub fn put(self: *RobotStore, url: []const u8, robots: Robots) !void { + self.mutex.lock(); + defer self.mutex.unlock(); + const duped = try self.allocator.dupe(u8, url); try self.map.put(self.allocator, duped, .{ .present = robots }); } pub fn putAbsent(self: *RobotStore, url: []const u8) !void { + self.mutex.lock(); + defer self.mutex.unlock(); + const duped = try self.allocator.dupe(u8, url); try self.map.put(self.allocator, duped, .absent); } diff --git a/src/browser/js/Env.zig b/src/browser/js/Env.zig index 8b64ad1f..ef0cc1f6 100644 --- a/src/browser/js/Env.zig +++ b/src/browser/js/Env.zig @@ -39,6 +39,14 @@ const JsApis = bridge.JsApis; const Allocator = std.mem.Allocator; const IS_DEBUG = builtin.mode == .Debug; +fn initClassIds() void { + inline for (JsApis, 0..) |JsApi, i| { + JsApi.Meta.class_id = i; + } +} + +var class_id_once = std.once(initClassIds); + // The Env maps to a V8 isolate, which represents a isolated sandbox for // executing JavaScript. The Env is where we'll define our V8 <-> Zig bindings, // and it's where we'll start ExecutionWorlds, which actually execute JavaScript. @@ -90,6 +98,9 @@ pub fn init(app: *App, opts: InitOpts) !Env { } } + // Initialize class IDs once before any V8 work + class_id_once.call(); + const allocator = app.allocator; const snapshot = &app.snapshot; @@ -132,8 +143,7 @@ pub fn init(app: *App, opts: InitOpts) !Env { temp_scope.init(isolate); defer temp_scope.deinit(); - inline for (JsApis, 0..) |JsApi, i| { - JsApi.Meta.class_id = i; + inline for (JsApis, 0..) |_, i| { const data = v8.v8__Isolate__GetDataFromSnapshotOnce(isolate_handle, snapshot.data_start + i); const function_handle: *const v8.FunctionTemplate = @ptrCast(data); // Make function template eternal diff --git a/src/browser/webapi/net/Fetch.zig b/src/browser/webapi/net/Fetch.zig index da191d79..2eea37e3 100644 --- a/src/browser/webapi/net/Fetch.zig +++ b/src/browser/webapi/net/Fetch.zig @@ -205,7 +205,8 @@ fn httpShutdownCallback(ctx: *anyopaque) void { var response = self._response; response._transfer = null; response.deinit(true); - self._owns_response = false; + // Do not access `self` after this point: the Fetch struct was + // allocated from response._arena which has been released. } } diff --git a/src/cdp/cdp.zig b/src/cdp/cdp.zig index f8ff36d1..cb4cce92 100644 --- a/src/cdp/cdp.zig +++ b/src/cdp/cdp.zig @@ -28,6 +28,7 @@ const js = @import("../browser/js/js.zig"); const App = @import("../App.zig"); const Browser = @import("../browser/Browser.zig"); const Session = @import("../browser/Session.zig"); +const HttpClient = @import("../http/Client.zig"); const Page = @import("../browser/Page.zig"); const Incrementing = @import("../id.zig").Incrementing; const Notification = @import("../Notification.zig"); @@ -84,10 +85,11 @@ pub fn CDPT(comptime TypeProvider: type) type { const Self = @This(); - pub fn init(app: *App, client: TypeProvider.Client) !Self { + pub fn init(app: *App, http_client: *HttpClient, client: TypeProvider.Client) !Self { const allocator = app.allocator; const browser = try Browser.init(app, .{ .env = .{ .with_inspector = true }, + .http_client = http_client, }); errdefer browser.deinit(); diff --git a/src/cdp/testing.zig b/src/cdp/testing.zig index 3c9e0588..6a98a71e 100644 --- a/src/cdp/testing.zig +++ b/src/cdp/testing.zig @@ -85,7 +85,7 @@ const TestContext = struct { self.client = Client.init(self.arena.allocator()); // Don't use the arena here. We want to detect leaks in CDP. // The arena is only for test-specific stuff - self.cdp_ = TestCDP.init(base.test_app, &self.client.?) catch unreachable; + self.cdp_ = TestCDP.init(base.test_app, base.test_http, &self.client.?) catch unreachable; } return &self.cdp_.?; } diff --git a/src/http/Http.zig b/src/http/Http.zig index d9943a74..0c20faa8 100644 --- a/src/http/Http.zig +++ b/src/http/Http.zig @@ -17,8 +17,6 @@ // along with this program. If not, see . const std = @import("std"); -const lp = @import("lightpanda"); -const Config = @import("../Config.zig"); pub const c = @cImport({ @cInclude("curl/curl.h"); @@ -28,6 +26,8 @@ pub const ENABLE_DEBUG = false; pub const Client = @import("Client.zig"); pub const Transfer = Client.Transfer; +const lp = @import("lightpanda"); +const Config = @import("../Config.zig"); const log = @import("../log.zig"); const errors = @import("errors.zig"); const RobotStore = @import("../browser/Robots.zig").RobotStore; @@ -42,10 +42,11 @@ const ArenaAllocator = std.heap.ArenaAllocator; // once for all http connections is a win. const Http = @This(); -config: *const Config, -client: *Client, -ca_blob: ?c.curl_blob, arena: ArenaAllocator, +allocator: Allocator, +config: *const Config, +ca_blob: ?c.curl_blob, +robot_store: *RobotStore, pub fn init(allocator: Allocator, robot_store: *RobotStore, config: *const Config) !Http { try errorCheck(c.curl_global_init(c.CURL_GLOBAL_SSL)); @@ -60,40 +61,29 @@ pub fn init(allocator: Allocator, robot_store: *RobotStore, config: *const Confi var ca_blob: ?c.curl_blob = null; if (config.tlsVerifyHost()) { - ca_blob = try loadCerts(allocator, arena.allocator()); + ca_blob = try loadCerts(allocator); } - var client = try Client.init(allocator, ca_blob, robot_store, config); - errdefer client.deinit(); - return .{ .arena = arena, - .client = client, - .ca_blob = ca_blob, + .allocator = allocator, .config = config, + .ca_blob = ca_blob, + .robot_store = robot_store, }; } pub fn deinit(self: *Http) void { - self.client.deinit(); + if (self.ca_blob) |ca_blob| { + const data: [*]u8 = @ptrCast(ca_blob.data); + self.allocator.free(data[0..ca_blob.len]); + } c.curl_global_cleanup(); self.arena.deinit(); } -pub fn poll(self: *Http, timeout_ms: u32) Client.PerformStatus { - return self.client.tick(timeout_ms) catch |err| { - log.err(.app, "http poll", .{ .err = err }); - return .normal; - }; -} - -pub fn addCDPClient(self: *Http, cdp_client: Client.CDPClient) void { - lp.assert(self.client.cdp_client == null, "Http addCDPClient existing", .{}); - self.client.cdp_client = cdp_client; -} - -pub fn removeCDPClient(self: *Http) void { - self.client.cdp_client = null; +pub fn createClient(self: *Http, allocator: Allocator) !*Client { + return Client.init(allocator, self.ca_blob, self.robot_store, self.config); } pub fn newConnection(self: *Http) !Connection { @@ -351,7 +341,7 @@ pub const Method = enum(u8) { // This whole rescan + decode is really just needed for MacOS. On Linux // bundle.rescan does find the .pem file(s) which could be in a few different // places, so it's still useful, just not efficient. -fn loadCerts(allocator: Allocator, arena: Allocator) !c.curl_blob { +fn loadCerts(allocator: Allocator) !c.curl_blob { var bundle: std.crypto.Certificate.Bundle = .{}; try bundle.rescan(allocator); defer bundle.deinit(allocator); @@ -374,8 +364,9 @@ fn loadCerts(allocator: Allocator, arena: Allocator) !c.curl_blob { (bundle.map.count() * 75) + // start / end per certificate + extra, just in case (encoded_size / 64) // newline per 64 characters ; - try arr.ensureTotalCapacity(arena, buffer_size); - var writer = arr.writer(arena); + try arr.ensureTotalCapacity(allocator, buffer_size); + errdefer arr.deinit(allocator); + var writer = arr.writer(allocator); var it = bundle.map.valueIterator(); while (it.next()) |index| { @@ -388,11 +379,16 @@ fn loadCerts(allocator: Allocator, arena: Allocator) !c.curl_blob { } // Final encoding should not be larger than our initial size estimate - lp.assert(buffer_size > arr.items.len, "Http loadCerts", .{ .estiate = buffer_size, .len = arr.items.len }); + lp.assert(buffer_size > arr.items.len, "Http loadCerts", .{ .estimate = buffer_size, .len = arr.items.len }); + + // Allocate exactly the size needed and copy the data + const result = try allocator.dupe(u8, arr.items); + // Free the original oversized allocation + arr.deinit(allocator); return .{ - .len = arr.items.len, - .data = arr.items.ptr, + .len = result.len, + .data = result.ptr, .flags = 0, }; } diff --git a/src/lightpanda.zig b/src/lightpanda.zig index c40120cc..d2736689 100644 --- a/src/lightpanda.zig +++ b/src/lightpanda.zig @@ -39,10 +39,13 @@ pub const FetchOpts = struct { writer: ?*std.Io.Writer = null, }; pub fn fetch(app: *App, url: [:0]const u8, opts: FetchOpts) !void { + const http_client = try app.http.createClient(app.allocator); + defer http_client.deinit(); + const notification = try Notification.init(app.allocator); defer notification.deinit(); - var browser = try Browser.init(app, .{}); + var browser = try Browser.init(app, .{ .http_client = http_client }); defer browser.deinit(); var session = try browser.newSession(notification); diff --git a/src/main_legacy_test.zig b/src/main_legacy_test.zig index 35332863..11c7588e 100644 --- a/src/main_legacy_test.zig +++ b/src/main_legacy_test.zig @@ -46,17 +46,24 @@ pub fn main() !void { var test_arena = std.heap.ArenaAllocator.init(allocator); defer test_arena.deinit(); - var browser = try lp.Browser.init(app, .{}); - const notification = try lp.Notification.init(app.allocator); - defer notification.deinit(); + const http_client = try app.http.createClient(allocator); + defer http_client.deinit(); + + var browser = try lp.Browser.init(app, .{ .http_client = http_client }); defer browser.deinit(); + const notification = try lp.Notification.init(allocator); + defer notification.deinit(); + const session = try browser.newSession(notification); + defer session.deinit(); var dir = try std.fs.cwd().openDir("src/browser/tests/legacy/", .{ .iterate = true, .no_follow = true }); defer dir.close(); + var walker = try dir.walk(allocator); defer walker.deinit(); + while (try walker.next()) |entry| { _ = test_arena.reset(.retain_capacity); if (entry.kind != .file) { diff --git a/src/main_wpt.zig b/src/main_wpt.zig index 12846fef..bf63c6c2 100644 --- a/src/main_wpt.zig +++ b/src/main_wpt.zig @@ -69,7 +69,10 @@ pub fn main() !void { var app = try lp.App.init(allocator, &config); defer app.deinit(); - var browser = try lp.Browser.init(app, .{}); + const http_client = try app.http.createClient(allocator); + defer http_client.deinit(); + + var browser = try lp.Browser.init(app, .{ .http_client = http_client }); defer browser.deinit(); // An arena for running each tests. Is reset after every test. diff --git a/src/testing.zig b/src/testing.zig index 41f67e96..b79eacd4 100644 --- a/src/testing.zig +++ b/src/testing.zig @@ -39,6 +39,7 @@ pub fn reset() void { const App = @import("App.zig"); const js = @import("browser/js/js.zig"); const Config = @import("Config.zig"); +const Client = @import("http/Client.zig"); const Page = @import("browser/Page.zig"); const Browser = @import("browser/Browser.zig"); const Session = @import("browser/Session.zig"); @@ -334,6 +335,7 @@ fn isJsonValue(a: std.json.Value, b: std.json.Value) bool { } pub var test_app: *App = undefined; +pub var test_http: *Client = undefined; pub var test_browser: Browser = undefined; pub var test_notification: *Notification = undefined; pub var test_session: *Session = undefined; @@ -472,7 +474,10 @@ test "tests:beforeAll" { test_app = try App.init(test_allocator, &test_config); errdefer test_app.deinit(); - test_browser = try Browser.init(test_app, .{}); + test_http = try test_app.http.createClient(test_allocator); + errdefer test_http.deinit(); + + test_browser = try Browser.init(test_app, .{ .http_client = test_http }); errdefer test_browser.deinit(); // Create notification for testing @@ -519,6 +524,7 @@ test "tests:afterAll" { test_notification.deinit(); test_browser.deinit(); + test_http.deinit(); test_app.deinit(); test_config.deinit(@import("root").tracking_allocator); }