diff --git a/src/App.zig b/src/App.zig index 2d930fd6..9039cec5 100644 --- a/src/App.zig +++ b/src/App.zig @@ -25,23 +25,20 @@ const Config = @import("Config.zig"); const Snapshot = @import("browser/js/Snapshot.zig"); const Platform = @import("browser/js/Platform.zig"); const Telemetry = @import("telemetry/telemetry.zig").Telemetry; -const RobotStore = @import("browser/Robots.zig").RobotStore; -pub const Http = @import("http/Http.zig"); +const Network = @import("network/Runtime.zig"); pub const ArenaPool = @import("ArenaPool.zig"); const App = @This(); -http: Http, +network: Network, config: *const Config, platform: Platform, snapshot: Snapshot, telemetry: Telemetry, allocator: Allocator, arena_pool: ArenaPool, -robots: RobotStore, app_dir_path: ?[]const u8, -shutdown: bool = false, pub fn init(allocator: Allocator, config: *const Config) !*App { const app = try allocator.create(App); @@ -50,8 +47,7 @@ pub fn init(allocator: Allocator, config: *const Config) !*App { app.* = .{ .config = config, .allocator = allocator, - .robots = RobotStore.init(allocator), - .http = undefined, + .network = undefined, .platform = undefined, .snapshot = undefined, .app_dir_path = undefined, @@ -59,8 +55,8 @@ pub fn init(allocator: Allocator, config: *const Config) !*App { .arena_pool = undefined, }; - app.http = try Http.init(allocator, &app.robots, config); - errdefer app.http.deinit(); + app.network = try Network.init(allocator, config); + errdefer app.network.deinit(); app.platform = try Platform.init(); errdefer app.platform.deinit(); @@ -79,19 +75,18 @@ pub fn init(allocator: Allocator, config: *const Config) !*App { return app; } -pub fn deinit(self: *App) void { - if (@atomicRmw(bool, &self.shutdown, .Xchg, true, .monotonic)) { - return; - } +pub fn shutdown(self: *const App) bool { + return self.network.shutdown.load(.acquire); +} +pub fn deinit(self: *App) void { const allocator = self.allocator; if (self.app_dir_path) |app_dir_path| { allocator.free(app_dir_path); self.app_dir_path = null; } self.telemetry.deinit(); - self.robots.deinit(); - self.http.deinit(); + self.network.deinit(); self.snapshot.deinit(); self.platform.deinit(); self.arena_pool.deinit(); diff --git a/src/Config.zig b/src/Config.zig index 5a4cc58e..a06fcc51 100644 --- a/src/Config.zig +++ b/src/Config.zig @@ -31,6 +31,7 @@ pub const RunMode = enum { mcp, }; +pub const MAX_LISTENERS = 16; pub const CDP_MAX_HTTP_REQUEST_SIZE = 4096; // max message size @@ -153,6 +154,13 @@ pub fn userAgentSuffix(self: *const Config) ?[]const u8 { }; } +pub fn cdpTimeout(self: *const Config) usize { + return switch (self.mode) { + .serve => |opts| if (opts.timeout > 604_800) 604_800_000 else @as(usize, opts.timeout) * 1000, + else => unreachable, + }; +} + pub fn maxConnections(self: *const Config) u16 { return switch (self.mode) { .serve => |opts| opts.cdp_max_connections, diff --git a/src/Notification.zig b/src/Notification.zig index 186cc04e..e025820a 100644 --- a/src/Notification.zig +++ b/src/Notification.zig @@ -21,7 +21,7 @@ const lp = @import("lightpanda"); const log = @import("log.zig"); const Page = @import("browser/Page.zig"); -const Transfer = @import("http/Client.zig").Transfer; +const Transfer = @import("browser/HttpClient.zig").Transfer; const Allocator = std.mem.Allocator; diff --git a/src/Server.zig b/src/Server.zig index bd990560..23ddefb5 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -18,8 +18,6 @@ const std = @import("std"); const lp = @import("lightpanda"); -const builtin = @import("builtin"); - const net = std.net; const posix = std.posix; @@ -30,16 +28,13 @@ const log = @import("log.zig"); const App = @import("App.zig"); const Config = @import("Config.zig"); const CDP = @import("cdp/cdp.zig").CDP; -const Net = @import("Net.zig"); -const Http = @import("http/Http.zig"); -const HttpClient = @import("http/Client.zig"); +const Net = @import("network/websocket.zig"); +const HttpClient = @import("browser/HttpClient.zig"); const Server = @This(); app: *App, -shutdown: std.atomic.Value(bool) = .init(false), allocator: Allocator, -listener: ?posix.socket_t, json_version_response: []const u8, // Thread management @@ -48,103 +43,52 @@ clients: std.ArrayList(*Client) = .{}, client_mutex: std.Thread.Mutex = .{}, clients_pool: std.heap.MemoryPool(Client), -pub fn init(app: *App, address: net.Address) !Server { +pub fn init(app: *App, address: net.Address) !*Server { const allocator = app.allocator; const json_version_response = try buildJSONVersionResponse(allocator, address); errdefer allocator.free(json_version_response); - return .{ + const self = try allocator.create(Server); + errdefer allocator.destroy(self); + + self.* = .{ .app = app, - .listener = null, .allocator = allocator, .json_version_response = json_version_response, - .clients_pool = std.heap.MemoryPool(Client).init(app.allocator), + .clients_pool = std.heap.MemoryPool(Client).init(allocator), }; + + try self.app.network.bind(address, self, onAccept); + log.info(.app, "server running", .{ .address = address }); + + return self; } -/// Interrupts the server so that main can complete normally and call all defer handlers. -pub fn stop(self: *Server) void { - if (self.shutdown.swap(true, .release)) { - return; - } - - // Shutdown all active clients +pub fn deinit(self: *Server) void { + // Stop all active clients { self.client_mutex.lock(); defer self.client_mutex.unlock(); + for (self.clients.items) |client| { client.stop(); } } - // Linux and BSD/macOS handle canceling a socket blocked on accept differently. - // For Linux, we use std.shutdown, which will cause accept to return error.SocketNotListening (EINVAL). - // For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (BADF). - if (self.listener) |listener| switch (builtin.target.os.tag) { - .linux => posix.shutdown(listener, .recv) catch |err| { - log.warn(.app, "listener shutdown", .{ .err = err }); - }, - .macos, .freebsd, .netbsd, .openbsd => { - self.listener = null; - posix.close(listener); - }, - else => unreachable, - }; -} - -pub fn deinit(self: *Server) void { - if (!self.shutdown.load(.acquire)) { - self.stop(); - } - self.joinThreads(); - if (self.listener) |listener| { - posix.close(listener); - self.listener = null; - } self.clients.deinit(self.allocator); self.clients_pool.deinit(); self.allocator.free(self.json_version_response); + self.allocator.destroy(self); } -pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { - const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK; - const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP); - self.listener = listener; - - try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1))); - if (@hasDecl(posix.TCP, "NODELAY")) { - try posix.setsockopt(listener, posix.IPPROTO.TCP, posix.TCP.NODELAY, &std.mem.toBytes(@as(c_int, 1))); - } - - try posix.bind(listener, &address.any, address.getOsSockLen()); - try posix.listen(listener, self.app.config.maxPendingConnections()); - - log.info(.app, "server running", .{ .address = address }); - while (!self.shutdown.load(.acquire)) { - const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| { - switch (err) { - error.SocketNotListening, error.ConnectionAborted => { - log.info(.app, "server stopped", .{}); - break; - }, - error.WouldBlock => { - std.Thread.sleep(10 * std.time.ns_per_ms); - continue; - }, - else => { - log.err(.app, "CDP accept", .{ .err = err }); - std.Thread.sleep(std.time.ns_per_s); - continue; - }, - } - }; - - self.spawnWorker(socket, timeout_ms) catch |err| { - log.err(.app, "CDP spawn", .{ .err = err }); - posix.close(socket); - }; - } +fn onAccept(ctx: *anyopaque, socket: posix.socket_t) void { + const self: *Server = @ptrCast(@alignCast(ctx)); + const timeout_ms: u32 = @intCast(self.app.config.cdpTimeout()); + self.spawnWorker(socket, timeout_ms) catch |err| { + log.err(.app, "CDP spawn", .{ .err = err }); + posix.close(socket); + }; } fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void { @@ -173,10 +117,10 @@ fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void self.registerClient(client); defer self.unregisterClient(client); - // Check shutdown after registering to avoid missing stop() signal. - // If stop() already iterated over clients, this client won't receive stop() + // Check shutdown after registering to avoid missing the stop signal. + // If deinit() already iterated over clients, this client won't receive stop() // and would block joinThreads() indefinitely. - if (self.shutdown.load(.acquire)) { + if (self.app.shutdown()) { return; } @@ -213,7 +157,7 @@ fn unregisterClient(self: *Server, client: *Client) void { } fn spawnWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void { - if (self.shutdown.load(.acquire)) { + if (self.app.shutdown()) { return error.ShuttingDown; } @@ -283,7 +227,7 @@ pub const Client = struct { log.info(.app, "client connected", .{ .ip = client_address }); } - const http = try app.http.createClient(allocator); + const http = try HttpClient.init(allocator, &app.network); errdefer http.deinit(); return .{ diff --git a/src/browser/Browser.zig b/src/browser/Browser.zig index 503306d3..8f8c4aa2 100644 --- a/src/browser/Browser.zig +++ b/src/browser/Browser.zig @@ -24,7 +24,7 @@ const ArenaAllocator = std.heap.ArenaAllocator; const js = @import("js/js.zig"); const log = @import("../log.zig"); const App = @import("../App.zig"); -const HttpClient = @import("../http/Client.zig"); +const HttpClient = @import("HttpClient.zig"); const ArenaPool = App.ArenaPool; diff --git a/src/http/Client.zig b/src/browser/HttpClient.zig similarity index 97% rename from src/http/Client.zig rename to src/browser/HttpClient.zig index 5701de27..98292efc 100644 --- a/src/http/Client.zig +++ b/src/browser/HttpClient.zig @@ -17,28 +17,29 @@ // along with this program. If not, see . const std = @import("std"); -const lp = @import("lightpanda"); - -const log = @import("../log.zig"); const builtin = @import("builtin"); +const posix = std.posix; -const Net = @import("../Net.zig"); +const lp = @import("lightpanda"); +const log = @import("../log.zig"); +const Net = @import("../network/http.zig"); +const Network = @import("../network/Runtime.zig"); const Config = @import("../Config.zig"); const URL = @import("../browser/URL.zig"); const Notification = @import("../Notification.zig"); const CookieJar = @import("../browser/webapi/storage/Cookie.zig").Jar; -const Robots = @import("../browser/Robots.zig"); +const Robots = @import("../network/Robots.zig"); const RobotStore = Robots.RobotStore; -const posix = std.posix; const Allocator = std.mem.Allocator; const ArenaAllocator = std.heap.ArenaAllocator; const IS_DEBUG = builtin.mode == .Debug; -const Method = Net.Method; -const ResponseHead = Net.ResponseHead; -const HeaderIterator = Net.HeaderIterator; +pub const Method = Net.Method; +pub const Headers = Net.Headers; +pub const ResponseHead = Net.ResponseHead; +pub const HeaderIterator = Net.HeaderIterator; // This is loosely tied to a browser Page. Loading all the , doing // XHR requests, and loading imports all happens through here. Sine the app @@ -77,8 +78,7 @@ queue: TransferQueue, // The main app allocator allocator: Allocator, -// Reference to the App-owned Robot Store. -robot_store: *RobotStore, +network: *Network, // Queue of requests that depend on a robots.txt. // Allows us to fetch the robots.txt just once. pending_robots_queue: std.StringHashMapUnmanaged(std.ArrayList(Request)) = .empty, @@ -97,8 +97,6 @@ http_proxy: ?[:0]const u8 = null, // CDP. use_proxy: bool, -config: *const Config, - cdp_client: ?CDPClient = null, // libcurl can monitor arbitrary sockets, this lets us use libcurl to poll @@ -121,14 +119,14 @@ pub const CDPClient = struct { const TransferQueue = std.DoublyLinkedList; -pub fn init(allocator: Allocator, ca_blob: ?Net.Blob, robot_store: *RobotStore, config: *const Config) !*Client { +pub fn init(allocator: Allocator, network: *Network) !*Client { var transfer_pool = std.heap.MemoryPool(Transfer).init(allocator); errdefer transfer_pool.deinit(); const client = try allocator.create(Client); errdefer allocator.destroy(client); - var handles = try Net.Handles.init(allocator, ca_blob, config); + var handles = try Net.Handles.init(allocator, network.ca_blob, network.config); errdefer handles.deinit(allocator); // Set transfer callbacks on each connection. @@ -136,7 +134,7 @@ pub fn init(allocator: Allocator, ca_blob: ?Net.Blob, robot_store: *RobotStore, try conn.setCallbacks(Transfer.headerCallback, Transfer.dataCallback); } - const http_proxy = config.httpProxy(); + const http_proxy = network.config.httpProxy(); client.* = .{ .queue = .{}, @@ -144,10 +142,9 @@ pub fn init(allocator: Allocator, ca_blob: ?Net.Blob, robot_store: *RobotStore, .intercepted = 0, .handles = handles, .allocator = allocator, - .robot_store = robot_store, + .network = network, .http_proxy = http_proxy, .use_proxy = http_proxy != null, - .config = config, .transfer_pool = transfer_pool, }; @@ -170,7 +167,7 @@ pub fn deinit(self: *Client) void { } pub fn newHeaders(self: *const Client) !Net.Headers { - return Net.Headers.init(self.config.http_headers.user_agent_header); + return Net.Headers.init(self.network.config.http_headers.user_agent_header); } pub fn abort(self: *Client) void { @@ -255,12 +252,12 @@ pub fn tick(self: *Client, timeout_ms: u32) !PerformStatus { } pub fn request(self: *Client, req: Request) !void { - if (self.config.obeyRobots()) { + if (self.network.config.obeyRobots()) { const robots_url = try URL.getRobotsUrl(self.allocator, req.url); errdefer self.allocator.free(robots_url); // If we have this robots cached, we can take a fast path. - if (self.robot_store.get(robots_url)) |robot_entry| { + if (self.network.robot_store.get(robots_url)) |robot_entry| { defer self.allocator.free(robots_url); switch (robot_entry) { @@ -401,18 +398,18 @@ fn robotsDoneCallback(ctx_ptr: *anyopaque) !void { switch (ctx.status) { 200 => { if (ctx.buffer.items.len > 0) { - const robots: ?Robots = ctx.client.robot_store.robotsFromBytes( - ctx.client.config.http_headers.user_agent, + const robots: ?Robots = ctx.client.network.robot_store.robotsFromBytes( + ctx.client.network.config.http_headers.user_agent, ctx.buffer.items, ) catch blk: { log.warn(.browser, "failed to parse robots", .{ .robots_url = ctx.robots_url }); // If we fail to parse, we just insert it as absent and ignore. - try ctx.client.robot_store.putAbsent(ctx.robots_url); + try ctx.client.network.robot_store.putAbsent(ctx.robots_url); break :blk null; }; if (robots) |r| { - try ctx.client.robot_store.put(ctx.robots_url, r); + try ctx.client.network.robot_store.put(ctx.robots_url, r); const path = URL.getPathname(ctx.req.url); allowed = r.isAllowed(path); } @@ -421,12 +418,12 @@ fn robotsDoneCallback(ctx_ptr: *anyopaque) !void { 404 => { log.debug(.http, "robots not found", .{ .url = ctx.robots_url }); // If we get a 404, we just insert it as absent. - try ctx.client.robot_store.putAbsent(ctx.robots_url); + try ctx.client.network.robot_store.putAbsent(ctx.robots_url); }, else => { log.debug(.http, "unexpected status on robots", .{ .url = ctx.robots_url, .status = ctx.status }); // If we get an unexpected status, we just insert as absent. - try ctx.client.robot_store.putAbsent(ctx.robots_url); + try ctx.client.network.robot_store.putAbsent(ctx.robots_url); }, } @@ -609,7 +606,7 @@ fn makeTransfer(self: *Client, req: Request) !*Transfer { .req = req, .ctx = req.ctx, .client = self, - .max_response_size = self.config.httpMaxResponseSize(), + .max_response_size = self.network.config.httpMaxResponseSize(), }; return transfer; } @@ -706,7 +703,7 @@ fn makeRequest(self: *Client, conn: *Net.Connection, transfer: *Transfer) anyerr } var header_list = req.headers; - try conn.secretHeaders(&header_list, &self.config.http_headers); // Add headers that must be hidden from intercepts + try conn.secretHeaders(&header_list, &self.network.config.http_headers); // Add headers that must be hidden from intercepts try conn.setHeaders(&header_list); // Add cookies. diff --git a/src/browser/Page.zig b/src/browser/Page.zig index 014ebb62..47c63904 100644 --- a/src/browser/Page.zig +++ b/src/browser/Page.zig @@ -62,8 +62,7 @@ const PageTransitionEvent = @import("webapi/event/PageTransitionEvent.zig"); const NavigationKind = @import("webapi/navigation/root.zig").NavigationKind; const KeyboardEvent = @import("webapi/event/KeyboardEvent.zig"); -const Http = App.Http; -const Net = @import("../Net.zig"); +const HttpClient = @import("HttpClient.zig"); const ArenaPool = App.ArenaPool; const timestamp = @import("../datetime.zig").timestamp; @@ -396,7 +395,7 @@ pub fn getOrigin(self: *Page, allocator: Allocator) !?[]const u8 { // Add comon headers for a request: // * cookies // * referer -pub fn headersForRequest(self: *Page, temp: Allocator, url: [:0]const u8, headers: *Http.Headers) !void { +pub fn headersForRequest(self: *Page, temp: Allocator, url: [:0]const u8, headers: *HttpClient.Headers) !void { try self.requestCookie(.{}).headersForRequest(temp, url, headers); // Build the referer @@ -823,7 +822,7 @@ fn notifyParentLoadComplete(self: *Page) void { parent.iframeCompletedLoading(self.iframe.?); } -fn pageHeaderDoneCallback(transfer: *Http.Transfer) !bool { +fn pageHeaderDoneCallback(transfer: *HttpClient.Transfer) !bool { var self: *Page = @ptrCast(@alignCast(transfer.ctx)); // would be different than self.url in the case of a redirect @@ -845,7 +844,7 @@ fn pageHeaderDoneCallback(transfer: *Http.Transfer) !bool { return true; } -fn pageDataCallback(transfer: *Http.Transfer, data: []const u8) !void { +fn pageDataCallback(transfer: *HttpClient.Transfer, data: []const u8) !void { var self: *Page = @ptrCast(@alignCast(transfer.ctx)); if (self._parse_state == .pre) { @@ -3047,7 +3046,7 @@ pub const NavigateReason = enum { pub const NavigateOpts = struct { cdp_id: ?i64 = null, reason: NavigateReason = .address_bar, - method: Http.Method = .GET, + method: HttpClient.Method = .GET, body: ?[]const u8 = null, header: ?[:0]const u8 = null, force: bool = false, @@ -3057,7 +3056,7 @@ pub const NavigateOpts = struct { pub const NavigatedOpts = struct { cdp_id: ?i64 = null, reason: NavigateReason = .address_bar, - method: Http.Method = .GET, + method: HttpClient.Method = .GET, }; const NavigationType = enum { @@ -3302,7 +3301,7 @@ const RequestCookieOpts = struct { is_http: bool = true, is_navigation: bool = false, }; -pub fn requestCookie(self: *const Page, opts: RequestCookieOpts) Http.Client.RequestCookie { +pub fn requestCookie(self: *const Page, opts: RequestCookieOpts) HttpClient.RequestCookie { return .{ .jar = &self._session.cookie_jar, .origin = self.url, diff --git a/src/browser/ScriptManager.zig b/src/browser/ScriptManager.zig index 0466f125..6f55f43b 100644 --- a/src/browser/ScriptManager.zig +++ b/src/browser/ScriptManager.zig @@ -21,7 +21,8 @@ const lp = @import("lightpanda"); const builtin = @import("builtin"); const log = @import("../log.zig"); -const Http = @import("../http/Http.zig"); +const HttpClient = @import("HttpClient.zig"); +const net_http = @import("../network/http.zig"); const String = @import("../string.zig").String; const js = @import("js/js.zig"); @@ -60,7 +61,7 @@ ready_scripts: std.DoublyLinkedList, shutdown: bool = false, -client: *Http.Client, +client: *HttpClient, allocator: Allocator, buffer_pool: BufferPool, @@ -88,7 +89,7 @@ importmap: std.StringHashMapUnmanaged([:0]const u8), // event). page_notified_of_completion: bool, -pub fn init(allocator: Allocator, http_client: *Http.Client, page: *Page) ScriptManager { +pub fn init(allocator: Allocator, http_client: *HttpClient, page: *Page) ScriptManager { return .{ .page = page, .async_scripts = .{}, @@ -141,7 +142,7 @@ fn clearList(list: *std.DoublyLinkedList) void { } } -pub fn getHeaders(self: *ScriptManager, url: [:0]const u8) !Http.Headers { +pub fn getHeaders(self: *ScriptManager, url: [:0]const u8) !net_http.Headers { var headers = try self.client.newHeaders(); try self.page.headersForRequest(self.page.arena, url, &headers); return headers; @@ -675,11 +676,11 @@ pub const Script = struct { self.manager.script_pool.destroy(self); } - fn startCallback(transfer: *Http.Transfer) !void { + fn startCallback(transfer: *HttpClient.Transfer) !void { log.debug(.http, "script fetch start", .{ .req = transfer }); } - fn headerCallback(transfer: *Http.Transfer) !bool { + fn headerCallback(transfer: *HttpClient.Transfer) !bool { const self: *Script = @ptrCast(@alignCast(transfer.ctx)); const header = &transfer.response_header.?; self.status = header.status; @@ -746,14 +747,14 @@ pub const Script = struct { return true; } - fn dataCallback(transfer: *Http.Transfer, data: []const u8) !void { + fn dataCallback(transfer: *HttpClient.Transfer, data: []const u8) !void { const self: *Script = @ptrCast(@alignCast(transfer.ctx)); self._dataCallback(transfer, data) catch |err| { log.err(.http, "SM.dataCallback", .{ .err = err, .transfer = transfer, .len = data.len }); return err; }; } - fn _dataCallback(self: *Script, _: *Http.Transfer, data: []const u8) !void { + fn _dataCallback(self: *Script, _: *HttpClient.Transfer, data: []const u8) !void { try self.source.remote.appendSlice(self.manager.allocator, data); } diff --git a/src/browser/webapi/net/Fetch.zig b/src/browser/webapi/net/Fetch.zig index 35fce366..699cc9c4 100644 --- a/src/browser/webapi/net/Fetch.zig +++ b/src/browser/webapi/net/Fetch.zig @@ -19,7 +19,7 @@ const std = @import("std"); const log = @import("../../../log.zig"); -const Http = @import("../../../http/Http.zig"); +const HttpClient = @import("../../HttpClient.zig"); const js = @import("../../js/js.zig"); const Page = @import("../../Page.zig"); @@ -90,7 +90,7 @@ pub fn init(input: Input, options: ?InitOpts, page: *Page) !js.Promise { return resolver.promise(); } -fn httpStartCallback(transfer: *Http.Transfer) !void { +fn httpStartCallback(transfer: *HttpClient.Transfer) !void { const self: *Fetch = @ptrCast(@alignCast(transfer.ctx)); if (comptime IS_DEBUG) { log.debug(.http, "request start", .{ .url = self._url, .source = "fetch" }); @@ -98,7 +98,7 @@ fn httpStartCallback(transfer: *Http.Transfer) !void { self._response._transfer = transfer; } -fn httpHeaderDoneCallback(transfer: *Http.Transfer) !bool { +fn httpHeaderDoneCallback(transfer: *HttpClient.Transfer) !bool { const self: *Fetch = @ptrCast(@alignCast(transfer.ctx)); const arena = self._response._arena; @@ -148,7 +148,7 @@ fn httpHeaderDoneCallback(transfer: *Http.Transfer) !bool { return true; } -fn httpDataCallback(transfer: *Http.Transfer, data: []const u8) !void { +fn httpDataCallback(transfer: *HttpClient.Transfer, data: []const u8) !void { const self: *Fetch = @ptrCast(@alignCast(transfer.ctx)); try self._buf.appendSlice(self._response._arena, data); } diff --git a/src/browser/webapi/net/Headers.zig b/src/browser/webapi/net/Headers.zig index e5462d82..2c9879ab 100644 --- a/src/browser/webapi/net/Headers.zig +++ b/src/browser/webapi/net/Headers.zig @@ -86,8 +86,8 @@ pub fn forEach(self: *Headers, cb_: js.Function, js_this_: ?js.Object) !void { } // TODO: do we really need 2 different header structs?? -const Http = @import("../../../http/Http.zig"); -pub fn populateHttpHeader(self: *Headers, allocator: Allocator, http_headers: *Http.Headers) !void { +const net_http = @import("../../../network/http.zig"); +pub fn populateHttpHeader(self: *Headers, allocator: Allocator, http_headers: *net_http.Headers) !void { for (self._list._entries.items) |entry| { const merged = try std.mem.concatWithSentinel(allocator, u8, &.{ entry.name.str(), ": ", entry.value.str() }, 0); try http_headers.add(merged); diff --git a/src/browser/webapi/net/Request.zig b/src/browser/webapi/net/Request.zig index aa7e0dd7..3d3ca825 100644 --- a/src/browser/webapi/net/Request.zig +++ b/src/browser/webapi/net/Request.zig @@ -19,7 +19,7 @@ const std = @import("std"); const js = @import("../../js/js.zig"); -const Http = @import("../../../http/Http.zig"); +const net_http = @import("../../../network/http.zig"); const URL = @import("../URL.zig"); const Page = @import("../../Page.zig"); @@ -30,7 +30,7 @@ const Allocator = std.mem.Allocator; const Request = @This(); _url: [:0]const u8, -_method: Http.Method, +_method: net_http.Method, _headers: ?*Headers, _body: ?[]const u8, _arena: Allocator, @@ -108,14 +108,14 @@ pub fn init(input: Input, opts_: ?InitOpts, page: *Page) !*Request { }); } -fn parseMethod(method: []const u8, page: *Page) !Http.Method { +fn parseMethod(method: []const u8, page: *Page) !net_http.Method { if (method.len > "propfind".len) { return error.InvalidMethod; } const lower = std.ascii.lowerString(&page.buf, method); - const method_lookup = std.StaticStringMap(Http.Method).initComptime(.{ + const method_lookup = std.StaticStringMap(net_http.Method).initComptime(.{ .{ "get", .GET }, .{ "post", .POST }, .{ "delete", .DELETE }, diff --git a/src/browser/webapi/net/Response.zig b/src/browser/webapi/net/Response.zig index d2c270ce..13048f33 100644 --- a/src/browser/webapi/net/Response.zig +++ b/src/browser/webapi/net/Response.zig @@ -18,7 +18,7 @@ const std = @import("std"); const js = @import("../../js/js.zig"); -const Http = @import("../../../http/Http.zig"); +const HttpClient = @import("../../HttpClient.zig"); const Page = @import("../../Page.zig"); const Headers = @import("Headers.zig"); @@ -45,7 +45,7 @@ _type: Type, _status_text: []const u8, _url: [:0]const u8, _is_redirected: bool, -_transfer: ?*Http.Transfer = null, +_transfer: ?*HttpClient.Transfer = null, const InitOpts = struct { status: u16 = 200, diff --git a/src/browser/webapi/net/XMLHttpRequest.zig b/src/browser/webapi/net/XMLHttpRequest.zig index bf442c13..b9f053a8 100644 --- a/src/browser/webapi/net/XMLHttpRequest.zig +++ b/src/browser/webapi/net/XMLHttpRequest.zig @@ -20,7 +20,8 @@ const std = @import("std"); const js = @import("../../js/js.zig"); const log = @import("../../../log.zig"); -const Http = @import("../../../http/Http.zig"); +const HttpClient = @import("../../HttpClient.zig"); +const net_http = @import("../../../network/http.zig"); const URL = @import("../../URL.zig"); const Mime = @import("../../Mime.zig"); @@ -38,10 +39,10 @@ const XMLHttpRequest = @This(); _page: *Page, _proto: *XMLHttpRequestEventTarget, _arena: Allocator, -_transfer: ?*Http.Transfer = null, +_transfer: ?*HttpClient.Transfer = null, _url: [:0]const u8 = "", -_method: Http.Method = .GET, +_method: net_http.Method = .GET, _request_headers: *Headers, _request_body: ?[]const u8 = null, @@ -341,7 +342,7 @@ pub fn getResponseXML(self: *XMLHttpRequest, page: *Page) !?*Node.Document { }; } -fn httpStartCallback(transfer: *Http.Transfer) !void { +fn httpStartCallback(transfer: *HttpClient.Transfer) !void { const self: *XMLHttpRequest = @ptrCast(@alignCast(transfer.ctx)); if (comptime IS_DEBUG) { log.debug(.http, "request start", .{ .method = self._method, .url = self._url, .source = "xhr" }); @@ -349,13 +350,13 @@ fn httpStartCallback(transfer: *Http.Transfer) !void { self._transfer = transfer; } -fn httpHeaderCallback(transfer: *Http.Transfer, header: Http.Header) !void { +fn httpHeaderCallback(transfer: *HttpClient.Transfer, header: net_http.Header) !void { const self: *XMLHttpRequest = @ptrCast(@alignCast(transfer.ctx)); const joined = try std.fmt.allocPrint(self._arena, "{s}: {s}", .{ header.name, header.value }); try self._response_headers.append(self._arena, joined); } -fn httpHeaderDoneCallback(transfer: *Http.Transfer) !bool { +fn httpHeaderDoneCallback(transfer: *HttpClient.Transfer) !bool { const self: *XMLHttpRequest = @ptrCast(@alignCast(transfer.ctx)); const header = &transfer.response_header.?; @@ -405,7 +406,7 @@ fn httpHeaderDoneCallback(transfer: *Http.Transfer) !bool { return true; } -fn httpDataCallback(transfer: *Http.Transfer, data: []const u8) !void { +fn httpDataCallback(transfer: *HttpClient.Transfer, data: []const u8) !void { const self: *XMLHttpRequest = @ptrCast(@alignCast(transfer.ctx)); try self._response_data.appendSlice(self._arena, data); @@ -515,7 +516,7 @@ fn stateChanged(self: *XMLHttpRequest, state: ReadyState, page: *Page) !void { } } -fn parseMethod(method: []const u8) !Http.Method { +fn parseMethod(method: []const u8) !net_http.Method { if (std.ascii.eqlIgnoreCase(method, "get")) { return .GET; } diff --git a/src/cdp/cdp.zig b/src/cdp/cdp.zig index 6d14c627..78e5ab50 100644 --- a/src/cdp/cdp.zig +++ b/src/cdp/cdp.zig @@ -28,7 +28,7 @@ const js = @import("../browser/js/js.zig"); const App = @import("../App.zig"); const Browser = @import("../browser/Browser.zig"); const Session = @import("../browser/Session.zig"); -const HttpClient = @import("../http/Client.zig"); +const HttpClient = @import("../browser/HttpClient.zig"); const Page = @import("../browser/Page.zig"); const Incrementing = @import("id.zig").Incrementing; const Notification = @import("../Notification.zig"); diff --git a/src/cdp/domains/fetch.zig b/src/cdp/domains/fetch.zig index beed6d76..310479b2 100644 --- a/src/cdp/domains/fetch.zig +++ b/src/cdp/domains/fetch.zig @@ -23,7 +23,8 @@ const id = @import("../id.zig"); const log = @import("../../log.zig"); const network = @import("network.zig"); -const Http = @import("../../http/Http.zig"); +const HttpClient = @import("../../browser/HttpClient.zig"); +const net_http = @import("../../network/http.zig"); const Notification = @import("../../Notification.zig"); pub fn processMessage(cmd: anytype) !void { @@ -49,7 +50,7 @@ pub fn processMessage(cmd: anytype) !void { // Stored in CDP pub const InterceptState = struct { allocator: Allocator, - waiting: std.AutoArrayHashMapUnmanaged(u32, *Http.Transfer), + waiting: std.AutoArrayHashMapUnmanaged(u32, *HttpClient.Transfer), pub fn init(allocator: Allocator) !InterceptState { return .{ @@ -62,11 +63,11 @@ pub const InterceptState = struct { return self.waiting.count() == 0; } - pub fn put(self: *InterceptState, transfer: *Http.Transfer) !void { + pub fn put(self: *InterceptState, transfer: *HttpClient.Transfer) !void { return self.waiting.put(self.allocator, transfer.id, transfer); } - pub fn remove(self: *InterceptState, request_id: u32) ?*Http.Transfer { + pub fn remove(self: *InterceptState, request_id: u32) ?*HttpClient.Transfer { const entry = self.waiting.fetchSwapRemove(request_id) orelse return null; return entry.value; } @@ -75,7 +76,7 @@ pub const InterceptState = struct { self.waiting.deinit(self.allocator); } - pub fn pendingTransfers(self: *const InterceptState) []*Http.Transfer { + pub fn pendingTransfers(self: *const InterceptState) []*HttpClient.Transfer { return self.waiting.values(); } }; @@ -221,7 +222,7 @@ fn continueRequest(cmd: anytype) !void { url: ?[]const u8 = null, method: ?[]const u8 = null, postData: ?[]const u8 = null, - headers: ?[]const Http.Header = null, + headers: ?[]const net_http.Header = null, interceptResponse: bool = false, })) orelse return error.InvalidParams; @@ -246,7 +247,7 @@ fn continueRequest(cmd: anytype) !void { try transfer.updateURL(try arena.dupeZ(u8, url)); } if (params.method) |method| { - transfer.req.method = std.meta.stringToEnum(Http.Method, method) orelse return error.InvalidParams; + transfer.req.method = std.meta.stringToEnum(net_http.Method, method) orelse return error.InvalidParams; } if (params.headers) |headers| { @@ -323,7 +324,7 @@ fn fulfillRequest(cmd: anytype) !void { const params = (try cmd.params(struct { requestId: []const u8, // "INT-{d}" responseCode: u16, - responseHeaders: ?[]const Http.Header = null, + responseHeaders: ?[]const net_http.Header = null, binaryResponseHeaders: ?[]const u8 = null, body: ?[]const u8 = null, responsePhrase: ?[]const u8 = null, diff --git a/src/cdp/domains/network.zig b/src/cdp/domains/network.zig index b353dc76..a2a36bbe 100644 --- a/src/cdp/domains/network.zig +++ b/src/cdp/domains/network.zig @@ -24,7 +24,7 @@ const CdpStorage = @import("storage.zig"); const id = @import("../id.zig"); const URL = @import("../../browser/URL.zig"); -const Transfer = @import("../../http/Client.zig").Transfer; +const Transfer = @import("../../browser/HttpClient.zig").Transfer; const Notification = @import("../../Notification.zig"); const Mime = @import("../../browser/Mime.zig"); diff --git a/src/http/Http.zig b/src/http/Http.zig deleted file mode 100644 index 778a1be4..00000000 --- a/src/http/Http.zig +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) -// -// Francis Bouvier -// Pierre Tachoire -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -const std = @import("std"); -const Net = @import("../Net.zig"); - -const ENABLE_DEBUG = Net.ENABLE_DEBUG; -pub const Client = @import("Client.zig"); -pub const Transfer = Client.Transfer; - -pub const Method = Net.Method; -pub const Header = Net.Header; -pub const Headers = Net.Headers; - -const Config = @import("../Config.zig"); -const RobotStore = @import("../browser/Robots.zig").RobotStore; - -const Allocator = std.mem.Allocator; -const ArenaAllocator = std.heap.ArenaAllocator; - -// Client.zig does the bulk of the work and is loosely tied to a browser Page. -// But we still need something above Client.zig for the "utility" http stuff -// we need to do, like telemetry. The most important thing we want from this -// is to be able to share the ca_blob, which can be quite large - loading it -// once for all http connections is a win. -const Http = @This(); - -arena: ArenaAllocator, -allocator: Allocator, -config: *const Config, -ca_blob: ?Net.Blob, -robot_store: *RobotStore, - -pub fn init(allocator: Allocator, robot_store: *RobotStore, config: *const Config) !Http { - try Net.globalInit(); - errdefer Net.globalDeinit(); - - if (comptime ENABLE_DEBUG) { - std.debug.print("curl version: {s}\n\n", .{Net.curl_version()}); - } - - var arena = ArenaAllocator.init(allocator); - errdefer arena.deinit(); - - var ca_blob: ?Net.Blob = null; - if (config.tlsVerifyHost()) { - ca_blob = try Net.loadCerts(allocator); - } - - return .{ - .arena = arena, - .allocator = allocator, - .config = config, - .ca_blob = ca_blob, - .robot_store = robot_store, - }; -} - -pub fn deinit(self: *Http) void { - if (self.ca_blob) |ca_blob| { - const data: [*]u8 = @ptrCast(ca_blob.data); - self.allocator.free(data[0..ca_blob.len]); - } - Net.globalDeinit(); - self.arena.deinit(); -} - -pub fn createClient(self: *Http, allocator: Allocator) !*Client { - return Client.init(allocator, self.ca_blob, self.robot_store, self.config); -} - -pub fn newConnection(self: *Http) !Net.Connection { - return Net.Connection.init(self.ca_blob, self.config); -} diff --git a/src/lightpanda.zig b/src/lightpanda.zig index 29b44a4a..0b72e5ed 100644 --- a/src/lightpanda.zig +++ b/src/lightpanda.zig @@ -18,6 +18,7 @@ const std = @import("std"); pub const App = @import("App.zig"); +pub const Network = @import("network/Runtime.zig"); pub const Server = @import("Server.zig"); pub const Config = @import("Config.zig"); pub const URL = @import("browser/URL.zig"); @@ -36,6 +37,7 @@ pub const mcp = @import("mcp.zig"); pub const build_config = @import("build_config"); pub const crash_handler = @import("crash_handler.zig"); +pub const HttpClient = @import("browser/HttpClient.zig"); const IS_DEBUG = @import("builtin").mode == .Debug; pub const FetchOpts = struct { @@ -45,7 +47,7 @@ pub const FetchOpts = struct { writer: ?*std.Io.Writer = null, }; pub fn fetch(app: *App, url: [:0]const u8, opts: FetchOpts) !void { - const http_client = try app.http.createClient(app.allocator); + const http_client = try HttpClient.init(app.allocator, &app.network); defer http_client.deinit(); const notification = try Notification.init(app.allocator); diff --git a/src/main.zig b/src/main.zig index dd6a759a..26e29b22 100644 --- a/src/main.zig +++ b/src/main.zig @@ -93,18 +93,14 @@ fn run(allocator: Allocator, main_arena: Allocator) !void { return args.printUsageAndExit(false); }; - // _server is global to handle graceful shutdown. - var server = try lp.Server.init(app, address); - defer server.deinit(); - - try sighandler.on(lp.Server.stop, .{&server}); - - // max timeout of 1 week. - const timeout = if (opts.timeout > 604_800) 604_800_000 else @as(u32, opts.timeout) * 1000; - server.run(address, timeout) catch |err| { + var server = lp.Server.init(app, address) catch |err| { log.fatal(.app, "server run error", .{ .err = err }); return err; }; + defer server.deinit(); + + try sighandler.on(lp.Network.stop, .{&app.network}); + app.network.run(); }, .fetch => |opts| { const url = opts.url; diff --git a/src/main_legacy_test.zig b/src/main_legacy_test.zig index 11c7588e..a6d1593f 100644 --- a/src/main_legacy_test.zig +++ b/src/main_legacy_test.zig @@ -46,7 +46,7 @@ pub fn main() !void { var test_arena = std.heap.ArenaAllocator.init(allocator); defer test_arena.deinit(); - const http_client = try app.http.createClient(allocator); + const http_client = try lp.HttpClient.init(allocator, &app.network); defer http_client.deinit(); var browser = try lp.Browser.init(app, .{ .http_client = http_client }); diff --git a/src/mcp/Server.zig b/src/mcp/Server.zig index 80da4b56..caed9eef 100644 --- a/src/mcp/Server.zig +++ b/src/mcp/Server.zig @@ -3,7 +3,7 @@ const std = @import("std"); const lp = @import("lightpanda"); const App = @import("../App.zig"); -const HttpClient = @import("../http/Client.zig"); +const HttpClient = @import("../browser/HttpClient.zig"); const testing = @import("../testing.zig"); const protocol = @import("protocol.zig"); const router = @import("router.zig"); @@ -23,7 +23,7 @@ mutex: std.Thread.Mutex = .{}, aw: std.io.Writer.Allocating, pub fn init(allocator: std.mem.Allocator, app: *App, writer: *std.io.Writer) !*Self { - const http_client = try app.http.createClient(allocator); + const http_client = try HttpClient.init(allocator, &app.network); errdefer http_client.deinit(); const notification = try lp.Notification.init(allocator); diff --git a/src/browser/Robots.zig b/src/network/Robots.zig similarity index 100% rename from src/browser/Robots.zig rename to src/network/Robots.zig diff --git a/src/network/Runtime.zig b/src/network/Runtime.zig new file mode 100644 index 00000000..0112dc18 --- /dev/null +++ b/src/network/Runtime.zig @@ -0,0 +1,284 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const std = @import("std"); +const builtin = @import("builtin"); +const net = std.net; +const posix = std.posix; +const Allocator = std.mem.Allocator; + +const lp = @import("lightpanda"); +const Config = @import("../Config.zig"); +const libcurl = @import("../sys/libcurl.zig"); + +const net_http = @import("http.zig"); +const RobotStore = @import("Robots.zig").RobotStore; + +const Runtime = @This(); + +const Listener = struct { + socket: posix.socket_t, + ctx: *anyopaque, + onAccept: *const fn (ctx: *anyopaque, socket: posix.socket_t) void, +}; + +allocator: Allocator, + +config: *const Config, +ca_blob: ?net_http.Blob, +robot_store: RobotStore, + +pollfds: []posix.pollfd, +listener: ?Listener = null, + +// Wakeup pipe: workers write to [1], main thread polls [0] +wakeup_pipe: [2]posix.fd_t = .{ -1, -1 }, + +shutdown: std.atomic.Value(bool) = .init(false), + +fn globalInit() void { + libcurl.curl_global_init(.{ .ssl = true }) catch |err| { + lp.assert(false, "curl global init", .{ .err = err }); + }; +} + +fn globalDeinit() void { + libcurl.curl_global_cleanup(); +} + +pub fn init(allocator: Allocator, config: *const Config) !Runtime { + globalInit(); + errdefer globalDeinit(); + + const pipe = try posix.pipe2(.{ .NONBLOCK = true, .CLOEXEC = true }); + + // 0 is wakeup, 1 is listener + const pollfds = try allocator.alloc(posix.pollfd, 2); + errdefer allocator.free(pollfds); + + @memset(pollfds, .{ .fd = -1, .events = 0, .revents = 0 }); + pollfds[0] = .{ .fd = pipe[0], .events = posix.POLL.IN, .revents = 0 }; + + var ca_blob: ?net_http.Blob = null; + if (config.tlsVerifyHost()) { + ca_blob = try loadCerts(allocator); + } + + return .{ + .allocator = allocator, + .config = config, + .ca_blob = ca_blob, + .robot_store = RobotStore.init(allocator), + .pollfds = pollfds, + .wakeup_pipe = pipe, + }; +} + +pub fn deinit(self: *Runtime) void { + for (&self.wakeup_pipe) |*fd| { + if (fd.* >= 0) { + posix.close(fd.*); + fd.* = -1; + } + } + + self.allocator.free(self.pollfds); + + if (self.ca_blob) |ca_blob| { + const data: [*]u8 = @ptrCast(ca_blob.data); + self.allocator.free(data[0..ca_blob.len]); + } + + self.robot_store.deinit(); + + globalDeinit(); +} + +pub fn bind( + self: *Runtime, + address: net.Address, + ctx: *anyopaque, + on_accept: *const fn (ctx: *anyopaque, socket: posix.socket_t) void, +) !void { + const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK; + const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP); + errdefer posix.close(listener); + + try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1))); + if (@hasDecl(posix.TCP, "NODELAY")) { + try posix.setsockopt(listener, posix.IPPROTO.TCP, posix.TCP.NODELAY, &std.mem.toBytes(@as(c_int, 1))); + } + + try posix.bind(listener, &address.any, address.getOsSockLen()); + try posix.listen(listener, self.config.maxPendingConnections()); + + if (self.listener != null) return error.TooManyListeners; + + self.listener = .{ + .socket = listener, + .ctx = ctx, + .onAccept = on_accept, + }; + self.pollfds[1] = .{ + .fd = listener, + .events = posix.POLL.IN, + .revents = 0, + }; +} + +pub fn run(self: *Runtime) void { + while (!self.shutdown.load(.acquire)) { + const listener = self.listener orelse return; + + _ = posix.poll(self.pollfds, -1) catch |err| { + lp.log.err(.app, "poll", .{ .err = err }); + continue; + }; + + // check wakeup socket + if (self.pollfds[0].revents != 0) { + self.pollfds[0].revents = 0; + + // If we were woken up, perhaps everything was cancelled and the iteration can be completed. + if (self.shutdown.load(.acquire)) break; + } + + // check new connections; + if (self.pollfds[1].revents == 0) continue; + self.pollfds[1].revents = 0; + + const socket = posix.accept(listener.socket, null, null, posix.SOCK.NONBLOCK) catch |err| { + switch (err) { + error.SocketNotListening, error.ConnectionAborted => { + self.pollfds[1] = .{ .fd = -1, .events = 0, .revents = 0 }; + self.listener = null; + }, + error.WouldBlock => {}, + else => { + lp.log.err(.app, "accept", .{ .err = err }); + }, + } + continue; + }; + + listener.onAccept(listener.ctx, socket); + } + + if (self.listener) |listener| { + posix.shutdown(listener.socket, .both) catch |err| { + lp.log.warn(.app, "listener shutdown", .{ .err = err }); + }; + posix.close(listener.socket); + } +} + +pub fn stop(self: *Runtime) void { + self.shutdown.store(true, .release); + _ = posix.write(self.wakeup_pipe[1], &.{1}) catch {}; +} + +pub fn newConnection(self: *Runtime) !net_http.Connection { + return net_http.Connection.init(self.ca_blob, self.config); +} + +// Wraps lines @ 64 columns. A PEM is basically a base64 encoded DER (which is +// what Zig has), with lines wrapped at 64 characters and with a basic header +// and footer +const LineWriter = struct { + col: usize = 0, + inner: std.ArrayList(u8).Writer, + + pub fn writeAll(self: *LineWriter, data: []const u8) !void { + var writer = self.inner; + + var col = self.col; + const len = 64 - col; + + var remain = data; + if (remain.len > len) { + col = 0; + try writer.writeAll(data[0..len]); + try writer.writeByte('\n'); + remain = data[len..]; + } + + while (remain.len > 64) { + try writer.writeAll(remain[0..64]); + try writer.writeByte('\n'); + remain = data[len..]; + } + try writer.writeAll(remain); + self.col = col + remain.len; + } +}; + +// TODO: on BSD / Linux, we could just read the PEM file directly. +// This whole rescan + decode is really just needed for MacOS. On Linux +// bundle.rescan does find the .pem file(s) which could be in a few different +// places, so it's still useful, just not efficient. +fn loadCerts(allocator: Allocator) !libcurl.CurlBlob { + var bundle: std.crypto.Certificate.Bundle = .{}; + try bundle.rescan(allocator); + defer bundle.deinit(allocator); + + const bytes = bundle.bytes.items; + if (bytes.len == 0) { + lp.log.warn(.app, "No system certificates", .{}); + return .{ + .len = 0, + .flags = 0, + .data = bytes.ptr, + }; + } + + const encoder = std.base64.standard.Encoder; + var arr: std.ArrayList(u8) = .empty; + + const encoded_size = encoder.calcSize(bytes.len); + const buffer_size = encoded_size + + (bundle.map.count() * 75) + // start / end per certificate + extra, just in case + (encoded_size / 64) // newline per 64 characters + ; + try arr.ensureTotalCapacity(allocator, buffer_size); + errdefer arr.deinit(allocator); + var writer = arr.writer(allocator); + + var it = bundle.map.valueIterator(); + while (it.next()) |index| { + const cert = try std.crypto.Certificate.der.Element.parse(bytes, index.*); + + try writer.writeAll("-----BEGIN CERTIFICATE-----\n"); + var line_writer = LineWriter{ .inner = writer }; + try encoder.encodeWriter(&line_writer, bytes[index.*..cert.slice.end]); + try writer.writeAll("\n-----END CERTIFICATE-----\n"); + } + + // Final encoding should not be larger than our initial size estimate + lp.assert(buffer_size > arr.items.len, "Http loadCerts", .{ .estimate = buffer_size, .len = arr.items.len }); + + // Allocate exactly the size needed and copy the data + const result = try allocator.dupe(u8, arr.items); + // Free the original oversized allocation + arr.deinit(allocator); + + return .{ + .len = result.len, + .data = result.ptr, + .flags = 0, + }; +} diff --git a/src/network/http.zig b/src/network/http.zig new file mode 100644 index 00000000..28fd7736 --- /dev/null +++ b/src/network/http.zig @@ -0,0 +1,610 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const std = @import("std"); +const builtin = @import("builtin"); +const posix = std.posix; +const Allocator = std.mem.Allocator; +const ArenaAllocator = std.heap.ArenaAllocator; + +const Config = @import("../Config.zig"); +const libcurl = @import("../sys/libcurl.zig"); + +const log = @import("lightpanda").log; +const assert = @import("lightpanda").assert; + +pub const ENABLE_DEBUG = false; +const IS_DEBUG = builtin.mode == .Debug; + +pub const Blob = libcurl.CurlBlob; +pub const WaitFd = libcurl.CurlWaitFd; +pub const writefunc_error = libcurl.curl_writefunc_error; + +const Error = libcurl.Error; +const ErrorMulti = libcurl.ErrorMulti; +const errorFromCode = libcurl.errorFromCode; +const errorMFromCode = libcurl.errorMFromCode; +const errorCheck = libcurl.errorCheck; +const errorMCheck = libcurl.errorMCheck; + +pub fn curl_version() [*c]const u8 { + return libcurl.curl_version(); +} + +pub const Method = enum(u8) { + GET = 0, + PUT = 1, + POST = 2, + DELETE = 3, + HEAD = 4, + OPTIONS = 5, + PATCH = 6, + PROPFIND = 7, +}; + +pub const Header = struct { + name: []const u8, + value: []const u8, +}; + +pub const Headers = struct { + headers: ?*libcurl.CurlSList, + cookies: ?[*c]const u8, + + pub fn init(user_agent: [:0]const u8) !Headers { + const header_list = libcurl.curl_slist_append(null, user_agent); + if (header_list == null) { + return error.OutOfMemory; + } + return .{ .headers = header_list, .cookies = null }; + } + + pub fn deinit(self: *const Headers) void { + if (self.headers) |hdr| { + libcurl.curl_slist_free_all(hdr); + } + } + + pub fn add(self: *Headers, header: [*c]const u8) !void { + // Copies the value + const updated_headers = libcurl.curl_slist_append(self.headers, header); + if (updated_headers == null) { + return error.OutOfMemory; + } + + self.headers = updated_headers; + } + + fn parseHeader(header_str: []const u8) ?Header { + const colon_pos = std.mem.indexOfScalar(u8, header_str, ':') orelse return null; + + const name = std.mem.trim(u8, header_str[0..colon_pos], " \t"); + const value = std.mem.trim(u8, header_str[colon_pos + 1 ..], " \t"); + + return .{ .name = name, .value = value }; + } + + pub fn iterator(self: *Headers) Iterator { + return .{ + .header = self.headers, + .cookies = self.cookies, + }; + } + + const Iterator = struct { + header: [*c]libcurl.CurlSList, + cookies: ?[*c]const u8, + + pub fn next(self: *Iterator) ?Header { + const h = self.header orelse { + const cookies = self.cookies orelse return null; + self.cookies = null; + return .{ .name = "Cookie", .value = std.mem.span(@as([*:0]const u8, cookies)) }; + }; + + self.header = h.*.next; + return parseHeader(std.mem.span(@as([*:0]const u8, @ptrCast(h.*.data)))); + } + }; +}; + +// In normal cases, the header iterator comes from the curl linked list. +// But it's also possible to inject a response, via `transfer.fulfill`. In that +// case, the resposne headers are a list, []const Http.Header. +// This union, is an iterator that exposes the same API for either case. +pub const HeaderIterator = union(enum) { + curl: CurlHeaderIterator, + list: ListHeaderIterator, + + pub fn next(self: *HeaderIterator) ?Header { + switch (self.*) { + inline else => |*it| return it.next(), + } + } + + const CurlHeaderIterator = struct { + conn: *const Connection, + prev: ?*libcurl.CurlHeader = null, + + pub fn next(self: *CurlHeaderIterator) ?Header { + const h = libcurl.curl_easy_nextheader(self.conn.easy, .header, -1, self.prev) orelse return null; + self.prev = h; + + const header = h.*; + return .{ + .name = std.mem.span(header.name), + .value = std.mem.span(header.value), + }; + } + }; + + const ListHeaderIterator = struct { + index: usize = 0, + list: []const Header, + + pub fn next(self: *ListHeaderIterator) ?Header { + const idx = self.index; + if (idx == self.list.len) { + return null; + } + self.index = idx + 1; + return self.list[idx]; + } + }; +}; + +const HeaderValue = struct { + value: []const u8, + amount: usize, +}; + +pub const AuthChallenge = struct { + status: u16, + source: ?enum { server, proxy }, + scheme: ?enum { basic, digest }, + realm: ?[]const u8, + + pub fn parse(status: u16, header: []const u8) !AuthChallenge { + var ac: AuthChallenge = .{ + .status = status, + .source = null, + .realm = null, + .scheme = null, + }; + + const sep = std.mem.indexOfPos(u8, header, 0, ": ") orelse return error.InvalidHeader; + const hname = header[0..sep]; + const hvalue = header[sep + 2 ..]; + + if (std.ascii.eqlIgnoreCase("WWW-Authenticate", hname)) { + ac.source = .server; + } else if (std.ascii.eqlIgnoreCase("Proxy-Authenticate", hname)) { + ac.source = .proxy; + } else { + return error.InvalidAuthChallenge; + } + + const pos = std.mem.indexOfPos(u8, std.mem.trim(u8, hvalue, std.ascii.whitespace[0..]), 0, " ") orelse hvalue.len; + const _scheme = hvalue[0..pos]; + if (std.ascii.eqlIgnoreCase(_scheme, "basic")) { + ac.scheme = .basic; + } else if (std.ascii.eqlIgnoreCase(_scheme, "digest")) { + ac.scheme = .digest; + } else { + return error.UnknownAuthChallengeScheme; + } + + return ac; + } +}; + +pub const ResponseHead = struct { + pub const MAX_CONTENT_TYPE_LEN = 64; + + status: u16, + url: [*c]const u8, + redirect_count: u32, + _content_type_len: usize = 0, + _content_type: [MAX_CONTENT_TYPE_LEN]u8 = undefined, + // this is normally an empty list, but if the response is being injected + // than it'll be populated. It isn't meant to be used directly, but should + // be used through the transfer.responseHeaderIterator() which abstracts + // whether the headers are from a live curl easy handle, or injected. + _injected_headers: []const Header = &.{}, + + pub fn contentType(self: *ResponseHead) ?[]u8 { + if (self._content_type_len == 0) { + return null; + } + return self._content_type[0..self._content_type_len]; + } +}; + +pub const Connection = struct { + easy: *libcurl.Curl, + node: Handles.HandleList.Node = .{}, + + pub fn init( + ca_blob_: ?libcurl.CurlBlob, + config: *const Config, + ) !Connection { + const easy = libcurl.curl_easy_init() orelse return error.FailedToInitializeEasy; + errdefer libcurl.curl_easy_cleanup(easy); + + // timeouts + try libcurl.curl_easy_setopt(easy, .timeout_ms, config.httpTimeout()); + try libcurl.curl_easy_setopt(easy, .connect_timeout_ms, config.httpConnectTimeout()); + + // redirect behavior + try libcurl.curl_easy_setopt(easy, .max_redirs, config.httpMaxRedirects()); + try libcurl.curl_easy_setopt(easy, .follow_location, 2); + try libcurl.curl_easy_setopt(easy, .redir_protocols_str, "HTTP,HTTPS"); // remove FTP and FTPS from the default + + // proxy + const http_proxy = config.httpProxy(); + if (http_proxy) |proxy| { + try libcurl.curl_easy_setopt(easy, .proxy, proxy.ptr); + } + + // tls + if (ca_blob_) |ca_blob| { + try libcurl.curl_easy_setopt(easy, .ca_info_blob, ca_blob); + if (http_proxy != null) { + try libcurl.curl_easy_setopt(easy, .proxy_ca_info_blob, ca_blob); + } + } else { + assert(config.tlsVerifyHost() == false, "Http.init tls_verify_host", .{}); + + try libcurl.curl_easy_setopt(easy, .ssl_verify_host, false); + try libcurl.curl_easy_setopt(easy, .ssl_verify_peer, false); + + if (http_proxy != null) { + try libcurl.curl_easy_setopt(easy, .proxy_ssl_verify_host, false); + try libcurl.curl_easy_setopt(easy, .proxy_ssl_verify_peer, false); + } + } + + // compression, don't remove this. CloudFront will send gzip content + // even if we don't support it, and then it won't be decompressed. + // empty string means: use whatever's available + try libcurl.curl_easy_setopt(easy, .accept_encoding, ""); + + // debug + if (comptime ENABLE_DEBUG) { + try libcurl.curl_easy_setopt(easy, .verbose, true); + + // Sometimes the default debug output hides some useful data. You can + // uncomment the following line (BUT KEEP THE LIVE ABOVE AS-IS), to + // get more control over the data (specifically, the `CURLINFO_TEXT` + // can include useful data). + + // try libcurl.curl_easy_setopt(easy, .debug_function, debugCallback); + } + + return .{ + .easy = easy, + }; + } + + pub fn deinit(self: *const Connection) void { + libcurl.curl_easy_cleanup(self.easy); + } + + pub fn setURL(self: *const Connection, url: [:0]const u8) !void { + try libcurl.curl_easy_setopt(self.easy, .url, url.ptr); + } + + // a libcurl request has 2 methods. The first is the method that + // controls how libcurl behaves. This specifically influences how redirects + // are handled. For example, if you do a POST and get a 301, libcurl will + // change that to a GET. But if you do a POST and get a 308, libcurl will + // keep the POST (and re-send the body). + // The second method is the actual string that's included in the request + // headers. + // These two methods can be different - you can tell curl to behave as though + // you made a GET, but include "POST" in the request header. + // + // Here, we're only concerned about the 2nd method. If we want, we'll set + // the first one based on whether or not we have a body. + // + // It's important that, for each use of this connection, we set the 2nd + // method. Else, if we make a HEAD request and re-use the connection, but + // DON'T reset this, it'll keep making HEAD requests. + // (I don't know if it's as important to reset the 1st method, or if libcurl + // can infer that based on the presence of the body, but we also reset it + // to be safe); + pub fn setMethod(self: *const Connection, method: Method) !void { + const easy = self.easy; + const m: [:0]const u8 = switch (method) { + .GET => "GET", + .POST => "POST", + .PUT => "PUT", + .DELETE => "DELETE", + .HEAD => "HEAD", + .OPTIONS => "OPTIONS", + .PATCH => "PATCH", + .PROPFIND => "PROPFIND", + }; + try libcurl.curl_easy_setopt(easy, .custom_request, m.ptr); + } + + pub fn setBody(self: *const Connection, body: []const u8) !void { + const easy = self.easy; + try libcurl.curl_easy_setopt(easy, .post, true); + try libcurl.curl_easy_setopt(easy, .post_field_size, body.len); + try libcurl.curl_easy_setopt(easy, .copy_post_fields, body.ptr); + } + + pub fn setGetMode(self: *const Connection) !void { + try libcurl.curl_easy_setopt(self.easy, .http_get, true); + } + + pub fn setHeaders(self: *const Connection, headers: *Headers) !void { + try libcurl.curl_easy_setopt(self.easy, .http_header, headers.headers); + } + + pub fn setCookies(self: *const Connection, cookies: [*c]const u8) !void { + try libcurl.curl_easy_setopt(self.easy, .cookie, cookies); + } + + pub fn setPrivate(self: *const Connection, ptr: *anyopaque) !void { + try libcurl.curl_easy_setopt(self.easy, .private, ptr); + } + + pub fn setProxyCredentials(self: *const Connection, creds: [:0]const u8) !void { + try libcurl.curl_easy_setopt(self.easy, .proxy_user_pwd, creds.ptr); + } + + pub fn setCredentials(self: *const Connection, creds: [:0]const u8) !void { + try libcurl.curl_easy_setopt(self.easy, .user_pwd, creds.ptr); + } + + pub fn setCallbacks( + self: *const Connection, + comptime header_cb: libcurl.CurlHeaderFunction, + comptime data_cb: libcurl.CurlWriteFunction, + ) !void { + try libcurl.curl_easy_setopt(self.easy, .header_data, self.easy); + try libcurl.curl_easy_setopt(self.easy, .header_function, header_cb); + try libcurl.curl_easy_setopt(self.easy, .write_data, self.easy); + try libcurl.curl_easy_setopt(self.easy, .write_function, data_cb); + } + + pub fn setProxy(self: *const Connection, proxy: ?[*:0]const u8) !void { + try libcurl.curl_easy_setopt(self.easy, .proxy, proxy); + } + + pub fn setTlsVerify(self: *const Connection, verify: bool, use_proxy: bool) !void { + try libcurl.curl_easy_setopt(self.easy, .ssl_verify_host, verify); + try libcurl.curl_easy_setopt(self.easy, .ssl_verify_peer, verify); + if (use_proxy) { + try libcurl.curl_easy_setopt(self.easy, .proxy_ssl_verify_host, verify); + try libcurl.curl_easy_setopt(self.easy, .proxy_ssl_verify_peer, verify); + } + } + + pub fn getEffectiveUrl(self: *const Connection) ![*c]const u8 { + var url: [*c]u8 = undefined; + try libcurl.curl_easy_getinfo(self.easy, .effective_url, &url); + return url; + } + + pub fn getResponseCode(self: *const Connection) !u16 { + var status: c_long = undefined; + try libcurl.curl_easy_getinfo(self.easy, .response_code, &status); + if (status < 0 or status > std.math.maxInt(u16)) { + return 0; + } + return @intCast(status); + } + + pub fn getRedirectCount(self: *const Connection) !u32 { + var count: c_long = undefined; + try libcurl.curl_easy_getinfo(self.easy, .redirect_count, &count); + return @intCast(count); + } + + pub fn getResponseHeader(self: *const Connection, name: [:0]const u8, index: usize) ?HeaderValue { + var hdr: ?*libcurl.CurlHeader = null; + libcurl.curl_easy_header(self.easy, name, index, .header, -1, &hdr) catch |err| { + // ErrorHeader includes OutOfMemory — rare but real errors from curl internals. + // Logged and returned as null since callers don't expect errors. + log.err(.http, "get response header", .{ + .name = name, + .err = err, + }); + return null; + }; + const h = hdr orelse return null; + return .{ + .amount = h.amount, + .value = std.mem.span(h.value), + }; + } + + pub fn getPrivate(self: *const Connection) !*anyopaque { + var private: *anyopaque = undefined; + try libcurl.curl_easy_getinfo(self.easy, .private, &private); + return private; + } + + // These are headers that may not be send to the users for inteception. + pub fn secretHeaders(_: *const Connection, headers: *Headers, http_headers: *const Config.HttpHeaders) !void { + if (http_headers.proxy_bearer_header) |hdr| { + try headers.add(hdr); + } + } + + pub fn request(self: *const Connection, http_headers: *const Config.HttpHeaders) !u16 { + var header_list = try Headers.init(http_headers.user_agent_header); + defer header_list.deinit(); + try self.secretHeaders(&header_list, http_headers); + try self.setHeaders(&header_list); + + // Add cookies. + if (header_list.cookies) |cookies| { + try self.setCookies(cookies); + } + + try libcurl.curl_easy_perform(self.easy); + return self.getResponseCode(); + } +}; + +pub const Handles = struct { + connections: []Connection, + dirty: HandleList, + in_use: HandleList, + available: HandleList, + multi: *libcurl.CurlM, + performing: bool = false, + + pub const HandleList = std.DoublyLinkedList; + + pub fn init( + allocator: Allocator, + ca_blob: ?libcurl.CurlBlob, + config: *const Config, + ) !Handles { + const count: usize = config.httpMaxConcurrent(); + if (count == 0) return error.InvalidMaxConcurrent; + + const multi = libcurl.curl_multi_init() orelse return error.FailedToInitializeMulti; + errdefer libcurl.curl_multi_cleanup(multi) catch {}; + + try libcurl.curl_multi_setopt(multi, .max_host_connections, config.httpMaxHostOpen()); + + const connections = try allocator.alloc(Connection, count); + errdefer allocator.free(connections); + + var available: HandleList = .{}; + for (0..count) |i| { + connections[i] = try Connection.init(ca_blob, config); + available.append(&connections[i].node); + } + + return .{ + .dirty = .{}, + .in_use = .{}, + .connections = connections, + .available = available, + .multi = multi, + }; + } + + pub fn deinit(self: *Handles, allocator: Allocator) void { + for (self.connections) |*conn| { + conn.deinit(); + } + allocator.free(self.connections); + libcurl.curl_multi_cleanup(self.multi) catch {}; + } + + pub fn hasAvailable(self: *const Handles) bool { + return self.available.first != null; + } + + pub fn get(self: *Handles) ?*Connection { + if (self.available.popFirst()) |node| { + self.in_use.append(node); + return @as(*Connection, @fieldParentPtr("node", node)); + } + return null; + } + + pub fn add(self: *Handles, conn: *const Connection) !void { + try libcurl.curl_multi_add_handle(self.multi, conn.easy); + } + + pub fn remove(self: *Handles, conn: *Connection) void { + if (libcurl.curl_multi_remove_handle(self.multi, conn.easy)) { + self.isAvailable(conn); + } else |err| { + // can happen if we're in a perform() call, so we'll queue this + // for cleanup later. + const node = &conn.node; + self.in_use.remove(node); + self.dirty.append(node); + log.warn(.http, "multi remove handle", .{ .err = err }); + } + } + + pub fn isAvailable(self: *Handles, conn: *Connection) void { + const node = &conn.node; + self.in_use.remove(node); + self.available.append(node); + } + + pub fn perform(self: *Handles) !c_int { + self.performing = true; + defer self.performing = false; + + const multi = self.multi; + var running: c_int = undefined; + try libcurl.curl_multi_perform(self.multi, &running); + + { + const list = &self.dirty; + while (list.first) |node| { + list.remove(node); + const conn: *Connection = @fieldParentPtr("node", node); + if (libcurl.curl_multi_remove_handle(multi, conn.easy)) { + self.available.append(node); + } else |err| { + log.fatal(.http, "multi remove handle", .{ .err = err, .src = "perform" }); + @panic("multi_remove_handle"); + } + } + } + + return running; + } + + pub fn poll(self: *Handles, extra_fds: []libcurl.CurlWaitFd, timeout_ms: c_int) !void { + try libcurl.curl_multi_poll(self.multi, extra_fds, timeout_ms, null); + } + + pub const MultiMessage = struct { + conn: Connection, + err: ?Error, + }; + + pub fn readMessage(self: *Handles) ?MultiMessage { + var messages_count: c_int = 0; + const msg = libcurl.curl_multi_info_read(self.multi, &messages_count) orelse return null; + return switch (msg.data) { + .done => |err| .{ + .conn = .{ .easy = msg.easy_handle }, + .err = err, + }, + else => unreachable, + }; + } +}; + +fn debugCallback(_: *libcurl.Curl, msg_type: libcurl.CurlInfoType, raw: [*c]u8, len: usize, _: *anyopaque) c_int { + const data = raw[0..len]; + switch (msg_type) { + .text => std.debug.print("libcurl [text]: {s}\n", .{data}), + .header_out => std.debug.print("libcurl [req-h]: {s}\n", .{data}), + .header_in => std.debug.print("libcurl [res-h]: {s}\n", .{data}), + // .data_in => std.debug.print("libcurl [res-b]: {s}\n", .{data}), + else => std.debug.print("libcurl ?? {d}\n", .{msg_type}), + } + return 0; +} diff --git a/src/Net.zig b/src/network/websocket.zig similarity index 53% rename from src/Net.zig rename to src/network/websocket.zig index c45707e4..5a5b4747 100644 --- a/src/Net.zig +++ b/src/network/websocket.zig @@ -21,721 +21,10 @@ const builtin = @import("builtin"); const posix = std.posix; const Allocator = std.mem.Allocator; const ArenaAllocator = std.heap.ArenaAllocator; -const libcurl = @import("sys/libcurl.zig"); -const log = @import("log.zig"); -const Config = @import("Config.zig"); +const log = @import("lightpanda").log; const assert = @import("lightpanda").assert; - -pub const ENABLE_DEBUG = false; -const IS_DEBUG = builtin.mode == .Debug; - -pub const Blob = libcurl.CurlBlob; -pub const WaitFd = libcurl.CurlWaitFd; -pub const writefunc_error = libcurl.curl_writefunc_error; - -const Error = libcurl.Error; -const ErrorMulti = libcurl.ErrorMulti; -const errorFromCode = libcurl.errorFromCode; -const errorMFromCode = libcurl.errorMFromCode; -const errorCheck = libcurl.errorCheck; -const errorMCheck = libcurl.errorMCheck; - -pub fn curl_version() [*c]const u8 { - return libcurl.curl_version(); -} - -pub const Method = enum(u8) { - GET = 0, - PUT = 1, - POST = 2, - DELETE = 3, - HEAD = 4, - OPTIONS = 5, - PATCH = 6, - PROPFIND = 7, -}; - -pub const Header = struct { - name: []const u8, - value: []const u8, -}; - -pub const Headers = struct { - headers: ?*libcurl.CurlSList, - cookies: ?[*c]const u8, - - pub fn init(user_agent: [:0]const u8) !Headers { - const header_list = libcurl.curl_slist_append(null, user_agent); - if (header_list == null) { - return error.OutOfMemory; - } - return .{ .headers = header_list, .cookies = null }; - } - - pub fn deinit(self: *const Headers) void { - if (self.headers) |hdr| { - libcurl.curl_slist_free_all(hdr); - } - } - - pub fn add(self: *Headers, header: [*c]const u8) !void { - // Copies the value - const updated_headers = libcurl.curl_slist_append(self.headers, header); - if (updated_headers == null) { - return error.OutOfMemory; - } - - self.headers = updated_headers; - } - - fn parseHeader(header_str: []const u8) ?Header { - const colon_pos = std.mem.indexOfScalar(u8, header_str, ':') orelse return null; - - const name = std.mem.trim(u8, header_str[0..colon_pos], " \t"); - const value = std.mem.trim(u8, header_str[colon_pos + 1 ..], " \t"); - - return .{ .name = name, .value = value }; - } - - pub fn iterator(self: *Headers) Iterator { - return .{ - .header = self.headers, - .cookies = self.cookies, - }; - } - - const Iterator = struct { - header: [*c]libcurl.CurlSList, - cookies: ?[*c]const u8, - - pub fn next(self: *Iterator) ?Header { - const h = self.header orelse { - const cookies = self.cookies orelse return null; - self.cookies = null; - return .{ .name = "Cookie", .value = std.mem.span(@as([*:0]const u8, cookies)) }; - }; - - self.header = h.*.next; - return parseHeader(std.mem.span(@as([*:0]const u8, @ptrCast(h.*.data)))); - } - }; -}; - -// In normal cases, the header iterator comes from the curl linked list. -// But it's also possible to inject a response, via `transfer.fulfill`. In that -// case, the resposne headers are a list, []const Http.Header. -// This union, is an iterator that exposes the same API for either case. -pub const HeaderIterator = union(enum) { - curl: CurlHeaderIterator, - list: ListHeaderIterator, - - pub fn next(self: *HeaderIterator) ?Header { - switch (self.*) { - inline else => |*it| return it.next(), - } - } - - const CurlHeaderIterator = struct { - conn: *const Connection, - prev: ?*libcurl.CurlHeader = null, - - pub fn next(self: *CurlHeaderIterator) ?Header { - const h = libcurl.curl_easy_nextheader(self.conn.easy, .header, -1, self.prev) orelse return null; - self.prev = h; - - const header = h.*; - return .{ - .name = std.mem.span(header.name), - .value = std.mem.span(header.value), - }; - } - }; - - const ListHeaderIterator = struct { - index: usize = 0, - list: []const Header, - - pub fn next(self: *ListHeaderIterator) ?Header { - const idx = self.index; - if (idx == self.list.len) { - return null; - } - self.index = idx + 1; - return self.list[idx]; - } - }; -}; - -const HeaderValue = struct { - value: []const u8, - amount: usize, -}; - -pub const AuthChallenge = struct { - status: u16, - source: ?enum { server, proxy }, - scheme: ?enum { basic, digest }, - realm: ?[]const u8, - - pub fn parse(status: u16, header: []const u8) !AuthChallenge { - var ac: AuthChallenge = .{ - .status = status, - .source = null, - .realm = null, - .scheme = null, - }; - - const sep = std.mem.indexOfPos(u8, header, 0, ": ") orelse return error.InvalidHeader; - const hname = header[0..sep]; - const hvalue = header[sep + 2 ..]; - - if (std.ascii.eqlIgnoreCase("WWW-Authenticate", hname)) { - ac.source = .server; - } else if (std.ascii.eqlIgnoreCase("Proxy-Authenticate", hname)) { - ac.source = .proxy; - } else { - return error.InvalidAuthChallenge; - } - - const pos = std.mem.indexOfPos(u8, std.mem.trim(u8, hvalue, std.ascii.whitespace[0..]), 0, " ") orelse hvalue.len; - const _scheme = hvalue[0..pos]; - if (std.ascii.eqlIgnoreCase(_scheme, "basic")) { - ac.scheme = .basic; - } else if (std.ascii.eqlIgnoreCase(_scheme, "digest")) { - ac.scheme = .digest; - } else { - return error.UnknownAuthChallengeScheme; - } - - return ac; - } -}; - -pub const ResponseHead = struct { - pub const MAX_CONTENT_TYPE_LEN = 64; - - status: u16, - url: [*c]const u8, - redirect_count: u32, - _content_type_len: usize = 0, - _content_type: [MAX_CONTENT_TYPE_LEN]u8 = undefined, - // this is normally an empty list, but if the response is being injected - // than it'll be populated. It isn't meant to be used directly, but should - // be used through the transfer.responseHeaderIterator() which abstracts - // whether the headers are from a live curl easy handle, or injected. - _injected_headers: []const Header = &.{}, - - pub fn contentType(self: *ResponseHead) ?[]u8 { - if (self._content_type_len == 0) { - return null; - } - return self._content_type[0..self._content_type_len]; - } -}; - -pub fn globalInit() Error!void { - try libcurl.curl_global_init(.{ .ssl = true }); -} - -pub fn globalDeinit() void { - libcurl.curl_global_cleanup(); -} - -pub const Connection = struct { - easy: *libcurl.Curl, - node: Handles.HandleList.Node = .{}, - - pub fn init( - ca_blob_: ?libcurl.CurlBlob, - config: *const Config, - ) !Connection { - const easy = libcurl.curl_easy_init() orelse return error.FailedToInitializeEasy; - errdefer libcurl.curl_easy_cleanup(easy); - - // timeouts - try libcurl.curl_easy_setopt(easy, .timeout_ms, config.httpTimeout()); - try libcurl.curl_easy_setopt(easy, .connect_timeout_ms, config.httpConnectTimeout()); - - // redirect behavior - try libcurl.curl_easy_setopt(easy, .max_redirs, config.httpMaxRedirects()); - try libcurl.curl_easy_setopt(easy, .follow_location, 2); - try libcurl.curl_easy_setopt(easy, .redir_protocols_str, "HTTP,HTTPS"); // remove FTP and FTPS from the default - - // proxy - const http_proxy = config.httpProxy(); - if (http_proxy) |proxy| { - try libcurl.curl_easy_setopt(easy, .proxy, proxy.ptr); - } - - // tls - if (ca_blob_) |ca_blob| { - try libcurl.curl_easy_setopt(easy, .ca_info_blob, ca_blob); - if (http_proxy != null) { - try libcurl.curl_easy_setopt(easy, .proxy_ca_info_blob, ca_blob); - } - } else { - assert(config.tlsVerifyHost() == false, "Http.init tls_verify_host", .{}); - - try libcurl.curl_easy_setopt(easy, .ssl_verify_host, false); - try libcurl.curl_easy_setopt(easy, .ssl_verify_peer, false); - - if (http_proxy != null) { - try libcurl.curl_easy_setopt(easy, .proxy_ssl_verify_host, false); - try libcurl.curl_easy_setopt(easy, .proxy_ssl_verify_peer, false); - } - } - - // compression, don't remove this. CloudFront will send gzip content - // even if we don't support it, and then it won't be decompressed. - // empty string means: use whatever's available - try libcurl.curl_easy_setopt(easy, .accept_encoding, ""); - - // debug - if (comptime ENABLE_DEBUG) { - try libcurl.curl_easy_setopt(easy, .verbose, true); - - // Sometimes the default debug output hides some useful data. You can - // uncomment the following line (BUT KEEP THE LIVE ABOVE AS-IS), to - // get more control over the data (specifically, the `CURLINFO_TEXT` - // can include useful data). - - // try libcurl.curl_easy_setopt(easy, .debug_function, debugCallback); - } - - return .{ - .easy = easy, - }; - } - - pub fn deinit(self: *const Connection) void { - libcurl.curl_easy_cleanup(self.easy); - } - - pub fn setURL(self: *const Connection, url: [:0]const u8) !void { - try libcurl.curl_easy_setopt(self.easy, .url, url.ptr); - } - - // a libcurl request has 2 methods. The first is the method that - // controls how libcurl behaves. This specifically influences how redirects - // are handled. For example, if you do a POST and get a 301, libcurl will - // change that to a GET. But if you do a POST and get a 308, libcurl will - // keep the POST (and re-send the body). - // The second method is the actual string that's included in the request - // headers. - // These two methods can be different - you can tell curl to behave as though - // you made a GET, but include "POST" in the request header. - // - // Here, we're only concerned about the 2nd method. If we want, we'll set - // the first one based on whether or not we have a body. - // - // It's important that, for each use of this connection, we set the 2nd - // method. Else, if we make a HEAD request and re-use the connection, but - // DON'T reset this, it'll keep making HEAD requests. - // (I don't know if it's as important to reset the 1st method, or if libcurl - // can infer that based on the presence of the body, but we also reset it - // to be safe); - pub fn setMethod(self: *const Connection, method: Method) !void { - const easy = self.easy; - const m: [:0]const u8 = switch (method) { - .GET => "GET", - .POST => "POST", - .PUT => "PUT", - .DELETE => "DELETE", - .HEAD => "HEAD", - .OPTIONS => "OPTIONS", - .PATCH => "PATCH", - .PROPFIND => "PROPFIND", - }; - try libcurl.curl_easy_setopt(easy, .custom_request, m.ptr); - } - - pub fn setBody(self: *const Connection, body: []const u8) !void { - const easy = self.easy; - try libcurl.curl_easy_setopt(easy, .post, true); - try libcurl.curl_easy_setopt(easy, .post_field_size, body.len); - try libcurl.curl_easy_setopt(easy, .copy_post_fields, body.ptr); - } - - pub fn setGetMode(self: *const Connection) !void { - try libcurl.curl_easy_setopt(self.easy, .http_get, true); - } - - pub fn setHeaders(self: *const Connection, headers: *Headers) !void { - try libcurl.curl_easy_setopt(self.easy, .http_header, headers.headers); - } - - pub fn setCookies(self: *const Connection, cookies: [*c]const u8) !void { - try libcurl.curl_easy_setopt(self.easy, .cookie, cookies); - } - - pub fn setPrivate(self: *const Connection, ptr: *anyopaque) !void { - try libcurl.curl_easy_setopt(self.easy, .private, ptr); - } - - pub fn setProxyCredentials(self: *const Connection, creds: [:0]const u8) !void { - try libcurl.curl_easy_setopt(self.easy, .proxy_user_pwd, creds.ptr); - } - - pub fn setCredentials(self: *const Connection, creds: [:0]const u8) !void { - try libcurl.curl_easy_setopt(self.easy, .user_pwd, creds.ptr); - } - - pub fn setCallbacks( - self: *const Connection, - comptime header_cb: libcurl.CurlHeaderFunction, - comptime data_cb: libcurl.CurlWriteFunction, - ) !void { - try libcurl.curl_easy_setopt(self.easy, .header_data, self.easy); - try libcurl.curl_easy_setopt(self.easy, .header_function, header_cb); - try libcurl.curl_easy_setopt(self.easy, .write_data, self.easy); - try libcurl.curl_easy_setopt(self.easy, .write_function, data_cb); - } - - pub fn setProxy(self: *const Connection, proxy: ?[*:0]const u8) !void { - try libcurl.curl_easy_setopt(self.easy, .proxy, proxy); - } - - pub fn setTlsVerify(self: *const Connection, verify: bool, use_proxy: bool) !void { - try libcurl.curl_easy_setopt(self.easy, .ssl_verify_host, verify); - try libcurl.curl_easy_setopt(self.easy, .ssl_verify_peer, verify); - if (use_proxy) { - try libcurl.curl_easy_setopt(self.easy, .proxy_ssl_verify_host, verify); - try libcurl.curl_easy_setopt(self.easy, .proxy_ssl_verify_peer, verify); - } - } - - pub fn getEffectiveUrl(self: *const Connection) ![*c]const u8 { - var url: [*c]u8 = undefined; - try libcurl.curl_easy_getinfo(self.easy, .effective_url, &url); - return url; - } - - pub fn getResponseCode(self: *const Connection) !u16 { - var status: c_long = undefined; - try libcurl.curl_easy_getinfo(self.easy, .response_code, &status); - if (status < 0 or status > std.math.maxInt(u16)) { - return 0; - } - return @intCast(status); - } - - pub fn getRedirectCount(self: *const Connection) !u32 { - var count: c_long = undefined; - try libcurl.curl_easy_getinfo(self.easy, .redirect_count, &count); - return @intCast(count); - } - - pub fn getResponseHeader(self: *const Connection, name: [:0]const u8, index: usize) ?HeaderValue { - var hdr: ?*libcurl.CurlHeader = null; - libcurl.curl_easy_header(self.easy, name, index, .header, -1, &hdr) catch |err| { - // ErrorHeader includes OutOfMemory — rare but real errors from curl internals. - // Logged and returned as null since callers don't expect errors. - log.err(.http, "get response header", .{ - .name = name, - .err = err, - }); - return null; - }; - const h = hdr orelse return null; - return .{ - .amount = h.amount, - .value = std.mem.span(h.value), - }; - } - - pub fn getPrivate(self: *const Connection) !*anyopaque { - var private: *anyopaque = undefined; - try libcurl.curl_easy_getinfo(self.easy, .private, &private); - return private; - } - - // These are headers that may not be send to the users for inteception. - pub fn secretHeaders(_: *const Connection, headers: *Headers, http_headers: *const Config.HttpHeaders) !void { - if (http_headers.proxy_bearer_header) |hdr| { - try headers.add(hdr); - } - } - - pub fn request(self: *const Connection, http_headers: *const Config.HttpHeaders) !u16 { - var header_list = try Headers.init(http_headers.user_agent_header); - defer header_list.deinit(); - try self.secretHeaders(&header_list, http_headers); - try self.setHeaders(&header_list); - - // Add cookies. - if (header_list.cookies) |cookies| { - try self.setCookies(cookies); - } - - try libcurl.curl_easy_perform(self.easy); - return self.getResponseCode(); - } -}; - -pub const Handles = struct { - connections: []Connection, - dirty: HandleList, - in_use: HandleList, - available: HandleList, - multi: *libcurl.CurlM, - performing: bool = false, - - pub const HandleList = std.DoublyLinkedList; - - pub fn init( - allocator: Allocator, - ca_blob: ?libcurl.CurlBlob, - config: *const Config, - ) !Handles { - const count: usize = config.httpMaxConcurrent(); - if (count == 0) return error.InvalidMaxConcurrent; - - const multi = libcurl.curl_multi_init() orelse return error.FailedToInitializeMulti; - errdefer libcurl.curl_multi_cleanup(multi) catch {}; - - try libcurl.curl_multi_setopt(multi, .max_host_connections, config.httpMaxHostOpen()); - - const connections = try allocator.alloc(Connection, count); - errdefer allocator.free(connections); - - var available: HandleList = .{}; - for (0..count) |i| { - connections[i] = try Connection.init(ca_blob, config); - available.append(&connections[i].node); - } - - return .{ - .dirty = .{}, - .in_use = .{}, - .connections = connections, - .available = available, - .multi = multi, - }; - } - - pub fn deinit(self: *Handles, allocator: Allocator) void { - for (self.connections) |*conn| { - conn.deinit(); - } - allocator.free(self.connections); - libcurl.curl_multi_cleanup(self.multi) catch {}; - } - - pub fn hasAvailable(self: *const Handles) bool { - return self.available.first != null; - } - - pub fn get(self: *Handles) ?*Connection { - if (self.available.popFirst()) |node| { - self.in_use.append(node); - return @as(*Connection, @fieldParentPtr("node", node)); - } - return null; - } - - pub fn add(self: *Handles, conn: *const Connection) !void { - try libcurl.curl_multi_add_handle(self.multi, conn.easy); - } - - pub fn remove(self: *Handles, conn: *Connection) void { - if (libcurl.curl_multi_remove_handle(self.multi, conn.easy)) { - self.isAvailable(conn); - } else |err| { - // can happen if we're in a perform() call, so we'll queue this - // for cleanup later. - const node = &conn.node; - self.in_use.remove(node); - self.dirty.append(node); - log.warn(.http, "multi remove handle", .{ .err = err }); - } - } - - pub fn isAvailable(self: *Handles, conn: *Connection) void { - const node = &conn.node; - self.in_use.remove(node); - self.available.append(node); - } - - pub fn perform(self: *Handles) !c_int { - self.performing = true; - defer self.performing = false; - - const multi = self.multi; - var running: c_int = undefined; - try libcurl.curl_multi_perform(self.multi, &running); - - { - const list = &self.dirty; - while (list.first) |node| { - list.remove(node); - const conn: *Connection = @fieldParentPtr("node", node); - if (libcurl.curl_multi_remove_handle(multi, conn.easy)) { - self.available.append(node); - } else |err| { - log.fatal(.http, "multi remove handle", .{ .err = err, .src = "perform" }); - @panic("multi_remove_handle"); - } - } - } - - return running; - } - - pub fn poll(self: *Handles, extra_fds: []libcurl.CurlWaitFd, timeout_ms: c_int) !void { - try libcurl.curl_multi_poll(self.multi, extra_fds, timeout_ms, null); - } - - pub const MultiMessage = struct { - conn: Connection, - err: ?Error, - }; - - pub fn readMessage(self: *Handles) ?MultiMessage { - var messages_count: c_int = 0; - const msg = libcurl.curl_multi_info_read(self.multi, &messages_count) orelse return null; - return switch (msg.data) { - .done => |err| .{ - .conn = .{ .easy = msg.easy_handle }, - .err = err, - }, - else => unreachable, - }; - } -}; - -// TODO: on BSD / Linux, we could just read the PEM file directly. -// This whole rescan + decode is really just needed for MacOS. On Linux -// bundle.rescan does find the .pem file(s) which could be in a few different -// places, so it's still useful, just not efficient. -pub fn loadCerts(allocator: Allocator) !libcurl.CurlBlob { - var bundle: std.crypto.Certificate.Bundle = .{}; - try bundle.rescan(allocator); - defer bundle.deinit(allocator); - - const bytes = bundle.bytes.items; - if (bytes.len == 0) { - log.warn(.app, "No system certificates", .{}); - return .{ - .len = 0, - .flags = 0, - .data = bytes.ptr, - }; - } - - const encoder = std.base64.standard.Encoder; - var arr: std.ArrayList(u8) = .empty; - - const encoded_size = encoder.calcSize(bytes.len); - const buffer_size = encoded_size + - (bundle.map.count() * 75) + // start / end per certificate + extra, just in case - (encoded_size / 64) // newline per 64 characters - ; - try arr.ensureTotalCapacity(allocator, buffer_size); - errdefer arr.deinit(allocator); - var writer = arr.writer(allocator); - - var it = bundle.map.valueIterator(); - while (it.next()) |index| { - const cert = try std.crypto.Certificate.der.Element.parse(bytes, index.*); - - try writer.writeAll("-----BEGIN CERTIFICATE-----\n"); - var line_writer = LineWriter{ .inner = writer }; - try encoder.encodeWriter(&line_writer, bytes[index.*..cert.slice.end]); - try writer.writeAll("\n-----END CERTIFICATE-----\n"); - } - - // Final encoding should not be larger than our initial size estimate - assert(buffer_size > arr.items.len, "Http loadCerts", .{ .estimate = buffer_size, .len = arr.items.len }); - - // Allocate exactly the size needed and copy the data - const result = try allocator.dupe(u8, arr.items); - // Free the original oversized allocation - arr.deinit(allocator); - - return .{ - .len = result.len, - .data = result.ptr, - .flags = 0, - }; -} - -// Wraps lines @ 64 columns. A PEM is basically a base64 encoded DER (which is -// what Zig has), with lines wrapped at 64 characters and with a basic header -// and footer -const LineWriter = struct { - col: usize = 0, - inner: std.ArrayList(u8).Writer, - - pub fn writeAll(self: *LineWriter, data: []const u8) !void { - var writer = self.inner; - - var col = self.col; - const len = 64 - col; - - var remain = data; - if (remain.len > len) { - col = 0; - try writer.writeAll(data[0..len]); - try writer.writeByte('\n'); - remain = data[len..]; - } - - while (remain.len > 64) { - try writer.writeAll(remain[0..64]); - try writer.writeByte('\n'); - remain = data[len..]; - } - try writer.writeAll(remain); - self.col = col + remain.len; - } -}; - -fn debugCallback(_: *libcurl.Curl, msg_type: libcurl.CurlInfoType, raw: [*c]u8, len: usize, _: *anyopaque) c_int { - const data = raw[0..len]; - switch (msg_type) { - .text => std.debug.print("libcurl [text]: {s}\n", .{data}), - .header_out => std.debug.print("libcurl [req-h]: {s}\n", .{data}), - .header_in => std.debug.print("libcurl [res-h]: {s}\n", .{data}), - // .data_in => std.debug.print("libcurl [res-b]: {s}\n", .{data}), - else => std.debug.print("libcurl ?? {d}\n", .{msg_type}), - } - return 0; -} - -// Zig is in a weird backend transition right now. Need to determine if -// SIMD is even available. -const backend_supports_vectors = switch (builtin.zig_backend) { - .stage2_llvm, .stage2_c => true, - else => false, -}; - -// Websocket messages from client->server are masked using a 4 byte XOR mask -fn mask(m: []const u8, payload: []u8) void { - var data = payload; - - if (!comptime backend_supports_vectors) return simpleMask(m, data); - - const vector_size = std.simd.suggestVectorLength(u8) orelse @sizeOf(usize); - if (data.len >= vector_size) { - const mask_vector = std.simd.repeat(vector_size, @as(@Vector(4, u8), m[0..4].*)); - while (data.len >= vector_size) { - const slice = data[0..vector_size]; - const masked_data_slice: @Vector(vector_size, u8) = slice.*; - slice.* = masked_data_slice ^ mask_vector; - data = data[vector_size..]; - } - } - simpleMask(m, data); -} - -// Used when SIMD isn't available, or for any remaining part of the message -// which is too small to effectively use SIMD. -fn simpleMask(m: []const u8, payload: []u8) void { - for (payload, 0..) |b, i| { - payload[i] = b ^ m[i & 3]; - } -} +const CDP_MAX_MESSAGE_SIZE = @import("../Config.zig").CDP_MAX_MESSAGE_SIZE; const Fragments = struct { type: Message.Type, @@ -763,76 +52,6 @@ const OpCode = enum(u8) { pong = 128 | 10, }; -fn fillWebsocketHeader(buf: std.ArrayList(u8)) []const u8 { - // can't use buf[0..10] here, because the header length - // is variable. If it's just 2 bytes, for example, we need the - // framed message to be: - // h1, h2, data - // If we use buf[0..10], we'd get: - // h1, h2, 0, 0, 0, 0, 0, 0, 0, 0, data - - var header_buf: [10]u8 = undefined; - - // -10 because we reserved 10 bytes for the header above - const header = websocketHeader(&header_buf, .text, buf.items.len - 10); - const start = 10 - header.len; - - const message = buf.items; - @memcpy(message[start..10], header); - return message[start..]; -} - -// makes the assumption that our caller reserved the first -// 10 bytes for the header -fn websocketHeader(buf: []u8, op_code: OpCode, payload_len: usize) []const u8 { - assert(buf.len == 10, "Websocket.Header", .{ .len = buf.len }); - - const len = payload_len; - buf[0] = 128 | @intFromEnum(op_code); // fin | opcode - - if (len <= 125) { - buf[1] = @intCast(len); - return buf[0..2]; - } - - if (len < 65536) { - buf[1] = 126; - buf[2] = @intCast((len >> 8) & 0xFF); - buf[3] = @intCast(len & 0xFF); - return buf[0..4]; - } - - buf[1] = 127; - buf[2] = 0; - buf[3] = 0; - buf[4] = 0; - buf[5] = 0; - buf[6] = @intCast((len >> 24) & 0xFF); - buf[7] = @intCast((len >> 16) & 0xFF); - buf[8] = @intCast((len >> 8) & 0xFF); - buf[9] = @intCast(len & 0xFF); - return buf[0..10]; -} - -fn growBuffer(allocator: Allocator, buf: []u8, required_capacity: usize) ![]u8 { - // from std.ArrayList - var new_capacity = buf.len; - while (true) { - new_capacity +|= new_capacity / 2 + 8; - if (new_capacity >= required_capacity) break; - } - - log.debug(.app, "CDP buffer growth", .{ .from = buf.len, .to = new_capacity }); - - if (allocator.resize(buf, new_capacity)) { - return buf.ptr[0..new_capacity]; - } - const new_buffer = try allocator.alloc(u8, new_capacity); - @memcpy(new_buffer[0..buf.len], buf); - allocator.free(buf); - return new_buffer; -} - // WebSocket message reader. Given websocket message, acts as an iterator that // can return zero or more Messages. When next returns null, any incomplete // message will remain in reader.data @@ -932,7 +151,7 @@ pub fn Reader(comptime EXPECT_MASK: bool) type { if (message_len > 125) { return error.ControlTooLarge; } - } else if (message_len > Config.CDP_MAX_MESSAGE_SIZE) { + } else if (message_len > CDP_MAX_MESSAGE_SIZE) { return error.TooLarge; } else if (message_len > self.buf.len) { const len = self.buf.len; @@ -960,7 +179,7 @@ pub fn Reader(comptime EXPECT_MASK: bool) type { if (is_continuation) { const fragments = &(self.fragments orelse return error.InvalidContinuation); - if (fragments.message.items.len + message_len > Config.CDP_MAX_MESSAGE_SIZE) { + if (fragments.message.items.len + message_len > CDP_MAX_MESSAGE_SIZE) { return error.TooLarge; } @@ -1086,14 +305,6 @@ pub fn Reader(comptime EXPECT_MASK: bool) type { }; } -// In-place string lowercase -fn toLower(str: []u8) []u8 { - for (str, 0..) |ch, i| { - str[i] = std.ascii.toLower(ch); - } - return str; -} - pub const WsConnection = struct { // CLOSE, 2 length, code const CLOSE_NORMAL = [_]u8{ 136, 2, 3, 232 }; // code: 1000 @@ -1385,6 +596,118 @@ pub const WsConnection = struct { } }; +fn fillWebsocketHeader(buf: std.ArrayList(u8)) []const u8 { + // can't use buf[0..10] here, because the header length + // is variable. If it's just 2 bytes, for example, we need the + // framed message to be: + // h1, h2, data + // If we use buf[0..10], we'd get: + // h1, h2, 0, 0, 0, 0, 0, 0, 0, 0, data + + var header_buf: [10]u8 = undefined; + + // -10 because we reserved 10 bytes for the header above + const header = websocketHeader(&header_buf, .text, buf.items.len - 10); + const start = 10 - header.len; + + const message = buf.items; + @memcpy(message[start..10], header); + return message[start..]; +} + +// makes the assumption that our caller reserved the first +// 10 bytes for the header +fn websocketHeader(buf: []u8, op_code: OpCode, payload_len: usize) []const u8 { + assert(buf.len == 10, "Websocket.Header", .{ .len = buf.len }); + + const len = payload_len; + buf[0] = 128 | @intFromEnum(op_code); // fin | opcode + + if (len <= 125) { + buf[1] = @intCast(len); + return buf[0..2]; + } + + if (len < 65536) { + buf[1] = 126; + buf[2] = @intCast((len >> 8) & 0xFF); + buf[3] = @intCast(len & 0xFF); + return buf[0..4]; + } + + buf[1] = 127; + buf[2] = 0; + buf[3] = 0; + buf[4] = 0; + buf[5] = 0; + buf[6] = @intCast((len >> 24) & 0xFF); + buf[7] = @intCast((len >> 16) & 0xFF); + buf[8] = @intCast((len >> 8) & 0xFF); + buf[9] = @intCast(len & 0xFF); + return buf[0..10]; +} + +fn growBuffer(allocator: Allocator, buf: []u8, required_capacity: usize) ![]u8 { + // from std.ArrayList + var new_capacity = buf.len; + while (true) { + new_capacity +|= new_capacity / 2 + 8; + if (new_capacity >= required_capacity) break; + } + + log.debug(.app, "CDP buffer growth", .{ .from = buf.len, .to = new_capacity }); + + if (allocator.resize(buf, new_capacity)) { + return buf.ptr[0..new_capacity]; + } + const new_buffer = try allocator.alloc(u8, new_capacity); + @memcpy(new_buffer[0..buf.len], buf); + allocator.free(buf); + return new_buffer; +} + +// In-place string lowercase +fn toLower(str: []u8) []u8 { + for (str, 0..) |ch, i| { + str[i] = std.ascii.toLower(ch); + } + return str; +} + +// Used when SIMD isn't available, or for any remaining part of the message +// which is too small to effectively use SIMD. +fn simpleMask(m: []const u8, payload: []u8) void { + for (payload, 0..) |b, i| { + payload[i] = b ^ m[i & 3]; + } +} + +// Zig is in a weird backend transition right now. Need to determine if +// SIMD is even available. +const backend_supports_vectors = switch (builtin.zig_backend) { + .stage2_llvm, .stage2_c => true, + else => false, +}; + +// Websocket messages from client->server are masked using a 4 byte XOR mask +fn mask(m: []const u8, payload: []u8) void { + var data = payload; + + if (!comptime backend_supports_vectors) return simpleMask(m, data); + + const vector_size = std.simd.suggestVectorLength(u8) orelse @sizeOf(usize); + if (data.len >= vector_size) { + const mask_vector = std.simd.repeat(vector_size, @as(@Vector(4, u8), m[0..4].*)); + while (data.len >= vector_size) { + const slice = data[0..vector_size]; + const masked_data_slice: @Vector(vector_size, u8) = slice.*; + slice.* = masked_data_slice ^ mask_vector; + data = data[vector_size..]; + } + } + simpleMask(m, data); +} + const testing = std.testing; test "mask" { diff --git a/src/telemetry/lightpanda.zig b/src/telemetry/lightpanda.zig index d141e060..75552eeb 100644 --- a/src/telemetry/lightpanda.zig +++ b/src/telemetry/lightpanda.zig @@ -7,9 +7,9 @@ const Allocator = std.mem.Allocator; const log = @import("../log.zig"); const App = @import("../App.zig"); -const Net = @import("../Net.zig"); const Config = @import("../Config.zig"); const telemetry = @import("telemetry.zig"); +const Connection = @import("../network/http.zig").Connection; const URL = "https://telemetry.lightpanda.io"; const MAX_BATCH_SIZE = 20; @@ -20,13 +20,13 @@ pub const LightPanda = struct { allocator: Allocator, mutex: std.Thread.Mutex, cond: Thread.Condition, - connection: Net.Connection, + connection: Connection, config: *const Config, pending: std.DoublyLinkedList, mem_pool: std.heap.MemoryPool(LightPandaEvent), pub fn init(app: *App) !LightPanda { - const connection = try app.http.newConnection(); + const connection = try app.network.newConnection(); errdefer connection.deinit(); try connection.setURL(URL); diff --git a/src/testing.zig b/src/testing.zig index 16b06a35..a398f824 100644 --- a/src/testing.zig +++ b/src/testing.zig @@ -39,7 +39,7 @@ pub fn reset() void { const App = @import("App.zig"); const js = @import("browser/js/js.zig"); const Config = @import("Config.zig"); -const Client = @import("http/Client.zig"); +const HttpClient = @import("browser/HttpClient.zig"); const Page = @import("browser/Page.zig"); const Browser = @import("browser/Browser.zig"); const Session = @import("browser/Session.zig"); @@ -335,7 +335,7 @@ fn isJsonValue(a: std.json.Value, b: std.json.Value) bool { } pub var test_app: *App = undefined; -pub var test_http: *Client = undefined; +pub var test_http: *HttpClient = undefined; pub var test_browser: Browser = undefined; pub var test_notification: *Notification = undefined; pub var test_session: *Session = undefined; @@ -460,7 +460,7 @@ const log = @import("log.zig"); const TestHTTPServer = @import("TestHTTPServer.zig"); const Server = @import("Server.zig"); -var test_cdp_server: ?Server = null; +var test_cdp_server: ?*Server = null; var test_cdp_server_thread: ?std.Thread = null; var test_http_server: ?TestHTTPServer = null; var test_http_server_thread: ?std.Thread = null; @@ -483,7 +483,7 @@ test "tests:beforeAll" { test_app = try App.init(test_allocator, &test_config); errdefer test_app.deinit(); - test_http = try test_app.http.createClient(test_allocator); + test_http = try HttpClient.init(test_allocator, &test_app.network); errdefer test_http.deinit(); test_browser = try Browser.init(test_app, .{ .http_client = test_http }); @@ -509,13 +509,11 @@ test "tests:beforeAll" { } test "tests:afterAll" { - if (test_cdp_server) |*server| { - server.stop(); - } + test_app.network.stop(); if (test_cdp_server_thread) |thread| { thread.join(); } - if (test_cdp_server) |*server| { + if (test_cdp_server) |server| { server.deinit(); } @@ -540,14 +538,14 @@ test "tests:afterAll" { fn serveCDP(wg: *std.Thread.WaitGroup) !void { const address = try std.net.Address.parseIp("127.0.0.1", 9583); - test_cdp_server = try Server.init(test_app, address); - wg.finish(); - - test_cdp_server.?.run(address, 5) catch |err| { + test_cdp_server = Server.init(test_app, address) catch |err| { std.debug.print("CDP server error: {}", .{err}); return err; }; + wg.finish(); + + test_app.network.run(); } fn testHTTPHandler(req: *std.http.Server.Request) !void {