diff --git a/LICENSING.md b/LICENSING.md index e6988ade..fc131484 100644 --- a/LICENSING.md +++ b/LICENSING.md @@ -10,7 +10,6 @@ The default license for this project is [AGPL-3.0-only](LICENSE). The following files are licensed under MIT: ``` -src/http/Client.zig src/polyfill/fetch.js ``` diff --git a/build.zig b/build.zig index 6349e1f2..ca480681 100644 --- a/build.zig +++ b/build.zig @@ -177,6 +177,9 @@ fn common( options: jsruntime.Options, ) !void { const target = step.root_module.resolved_target.?; + const optimize = step.root_module.optimize.?; + const dep_opts = .{ .target = target, .optimize = optimize }; + const jsruntimemod = try jsruntime_pkgs.module( b, options, @@ -189,15 +192,7 @@ fn common( netsurf.addImport("jsruntime", jsruntimemod); step.root_module.addImport("netsurf", netsurf); - const asyncio = b.addModule("asyncio", .{ - .root_source_file = b.path("vendor/zig-async-io/src/lib.zig"), - }); - step.root_module.addImport("asyncio", asyncio); - - const tlsmod = b.addModule("tls", .{ - .root_source_file = b.path("vendor/tls.zig/src/root.zig"), - }); - step.root_module.addImport("tls", tlsmod); + step.root_module.addImport("tls", b.dependency("tls", dep_opts).module("tls")); } fn moduleNetSurf(b: *std.Build, target: std.Build.ResolvedTarget) !*std.Build.Module { diff --git a/build.zig.zon b/build.zig.zon new file mode 100644 index 00000000..5f3ea5cb --- /dev/null +++ b/build.zig.zon @@ -0,0 +1,12 @@ +.{ + .name = .browser, + .paths = .{""}, + .version = "0.0.0", + .fingerprint = 0xda130f3af836cea0, + .dependencies = .{ + .tls = .{ + .url = "https://github.com/karlseguin/tls.zig/archive/e39d40150f10464992da11352fb3955b3345272f.tar.gz", + .hash = "122039cd3abe387b69d23930bf12154c2c84fc894874e10129a1fc5e8ac75ca0ddc0" + }, + }, +} diff --git a/src/browser/browser.zig b/src/browser/browser.zig index bb51d9bb..8b227296 100644 --- a/src/browser/browser.zig +++ b/src/browser/browser.zig @@ -24,7 +24,6 @@ const Allocator = std.mem.Allocator; const Types = @import("root").Types; const parser = @import("netsurf"); -const Loader = @import("loader.zig").Loader; const Dump = @import("dump.zig"); const Mime = @import("mime.zig").Mime; @@ -44,10 +43,8 @@ const Location = @import("../html/location.zig").Location; const storage = @import("../storage/storage.zig"); -const FetchResult = @import("../http/Client.zig").Client.FetchResult; - +const http = @import("../http/client.zig"); const UserContext = @import("../user_context.zig").UserContext; -const HttpClient = @import("asyncio").Client; const polyfill = @import("../polyfill/polyfill.zig"); @@ -63,20 +60,21 @@ pub const Browser = struct { app: *App, session: ?*Session, allocator: Allocator, - http_client: *HttpClient, + http_client: *http.Client, session_pool: SessionPool, page_arena: std.heap.ArenaAllocator, const SessionPool = std.heap.MemoryPool(Session); - pub fn init(app: *App) Browser { + pub fn init(app: *App) !Browser { const allocator = app.allocator; return .{ .app = app, .session = null, .allocator = allocator, - .http_client = @ptrCast(&app.http_client), + .http_client = &app.http_client, .session_pool = SessionPool.init(allocator), + .http_client = try http.Client.init(allocator, 5), .page_arena = std.heap.ArenaAllocator.init(allocator), }; } @@ -121,9 +119,6 @@ pub const Session = struct { // all others Session deps use directly self.alloc and not the arena. arena: std.heap.ArenaAllocator, - // TODO handle proxy - loader: Loader, - env: Env, inspector: jsruntime.Inspector, @@ -132,6 +127,7 @@ pub const Session = struct { // TODO move the shed to the browser? storage_shed: storage.Shed, page: ?Page = null, + http_client: *http.Client, jstypes: [Types.len]usize = undefined, @@ -143,7 +139,7 @@ pub const Session = struct { .env = undefined, .browser = browser, .inspector = undefined, - .loader = Loader.init(allocator), + .http_client = &browser.http_client, .storage_shed = storage.Shed.init(allocator), .arena = std.heap.ArenaAllocator.init(allocator), .window = Window.create(null, .{ .agent = user_agent }), @@ -181,7 +177,6 @@ pub const Session = struct { } self.env.deinit(); self.arena.deinit(); - self.loader.deinit(); self.storage_shed.deinit(); } @@ -370,32 +365,14 @@ pub const Page = struct { } }); // load the data - var resp = try self.session.loader.get(arena, self.uri); - defer resp.deinit(); + var request = try self.session.http_client.request(.GET, self.uri); + defer request.deinit(); + var response = try request.sendSync(.{}); - const req = resp.req; + const header = response.header; + log.info("GET {any} {d}", .{ self.uri, header.status }); - log.info("GET {any} {d}", .{ self.uri, @intFromEnum(req.response.status) }); - - // TODO handle redirection - log.debug("{?} {d} {s}", .{ - req.response.version, - @intFromEnum(req.response.status), - req.response.reason, - // TODO log headers - }); - - // TODO handle charset - // https://html.spec.whatwg.org/#content-type - var it = req.response.iterateHeaders(); - var ct_: ?[]const u8 = null; - while (true) { - const h = it.next() orelse break; - if (std.ascii.eqlIgnoreCase(h.name, "Content-Type")) { - ct_ = try arena.dupe(u8, h.value); - } - } - const ct = ct_ orelse { + const ct = response.header.get("content-type") orelse { // no content type in HTTP headers. // TODO try to sniff mime type from the body. log.info("no content-type HTTP header", .{}); @@ -404,14 +381,18 @@ pub const Page = struct { log.debug("header content-type: {s}", .{ct}); var mime = try Mime.parse(arena, ct); + defer mime.deinit(); if (mime.isHTML()) { - try self.loadHTMLDoc(req.reader(), mime.charset orelse "utf-8", aux_data); + try self.loadHTMLDoc(&response, mime.charset orelse "utf-8", aux_data); } else { log.info("non-HTML document: {s}", .{ct}); - + var arr: std.ArrayListUnmanaged(u8) = .{}; + while (try response.next()) |data| { + try arr.appendSlice(arena, try arena.dupe(u8, data)); + } // save the body into the page. - self.raw_data = try req.reader().readAllAlloc(arena, 16 * 1024 * 1024); + self.raw_data = arr.items; } } @@ -453,7 +434,7 @@ pub const Page = struct { // replace the user context document with the new one. try session.env.setUserContext(.{ .document = html_doc, - .httpClient = self.session.browser.http_client, + .http_client = self.session.browser.http_client, }); // browse the DOM tree to retrieve scripts @@ -628,30 +609,32 @@ pub const Page = struct { const u = try std.Uri.resolve_inplace(self.uri, res_src, &b); - var fetchres = try self.session.loader.get(arena, u); - defer fetchres.deinit(); + var request = try self.session.http_client.request(.GET, u); + defer request.deinit(); + var response = try request.sendSync(.{}); - const resp = fetchres.req.response; + log.info("fetch {any}: {d}", .{ u, response.header.status }); - log.info("fetch {any}: {d}", .{ u, resp.status }); - - if (resp.status != .ok) { + if (response.header.status != 200) { return FetchError.BadStatusCode; } + var arr: std.ArrayListUnmanaged(u8) = .{}; + while (try response.next()) |data| { + try arr.appendSlice(arena, try arena.dupe(u8, data)); + } + // TODO check content-type - const body = try fetchres.req.reader().readAllAlloc(arena, 16 * 1024 * 1024); // check no body - if (body.len == 0) { + if (arr.items.len == 0) { return FetchError.NoBody; } - return body; + return arr.items; } - // fetchScript senf a GET request to the src and execute the script - // received. + fn fetchScript(self: *const Page, s: *const Script) !void { const arena = self.arena; const body = try self.fetchData(arena, s.src, null); diff --git a/src/browser/loader.zig b/src/browser/loader.zig deleted file mode 100644 index 6df6d9ad..00000000 --- a/src/browser/loader.zig +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright (C) 2023-2024 Lightpanda (Selecy SAS) -// -// Francis Bouvier -// Pierre Tachoire -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -const std = @import("std"); -const Client = @import("../http/Client.zig"); - -const user_agent = @import("browser.zig").user_agent; - -pub const Loader = struct { - client: Client, - // use 64KB for headers buffer size. - server_header_buffer: [1024 * 64]u8 = undefined, - - pub const Response = struct { - alloc: std.mem.Allocator, - req: *Client.Request, - - pub fn deinit(self: *Response) void { - self.req.deinit(); - self.alloc.destroy(self.req); - } - }; - - pub fn init(alloc: std.mem.Allocator) Loader { - return Loader{ - .client = Client{ - .allocator = alloc, - }, - }; - } - - pub fn deinit(self: *Loader) void { - self.client.deinit(); - } - - // see - // https://ziglang.org/documentation/master/std/#A;std:http.Client.fetch - // for reference. - // The caller is responsible for calling `deinit()` on the `Response`. - pub fn get(self: *Loader, alloc: std.mem.Allocator, uri: std.Uri) !Response { - var resp = Response{ - .alloc = alloc, - .req = try alloc.create(Client.Request), - }; - errdefer alloc.destroy(resp.req); - - resp.req.* = try self.client.open(.GET, uri, .{ - .headers = .{ - .user_agent = .{ .override = user_agent }, - }, - .extra_headers = &.{ - .{ .name = "Accept", .value = "*/*" }, - .{ .name = "Accept-Language", .value = "en-US,en;q=0.5" }, - }, - .server_header_buffer = &self.server_header_buffer, - }); - errdefer resp.req.deinit(); - - try resp.req.send(); - try resp.req.finish(); - try resp.req.wait(); - - return resp; - } -}; - -test "loader: get" { - const alloc = std.testing.allocator; - var loader = Loader.init(alloc); - defer loader.deinit(); - - const uri = try std.Uri.parse("http://localhost:9582/loader"); - var result = try loader.get(alloc, uri); - defer result.deinit(); - - try std.testing.expectEqual(.ok, result.req.response.status); - - var res: [128]u8 = undefined; - const size = try result.req.readAll(&res); - try std.testing.expectEqual(6, size); - try std.testing.expectEqualStrings("Hello!", res[0..6]); -} diff --git a/src/cdp/cdp.zig b/src/cdp/cdp.zig index d7975664..7d0b6bf6 100644 --- a/src/cdp/cdp.zig +++ b/src/cdp/cdp.zig @@ -73,13 +73,13 @@ pub fn CDPT(comptime TypeProvider: type) type { pub const Browser = TypeProvider.Browser; pub const Session = TypeProvider.Session; - pub fn init(app: *App, client: TypeProvider.Client) Self { + pub fn init(app: *App, client: TypeProvider.Client) !Self { const allocator = app.allocator; return .{ .client = client, .allocator = allocator, .browser_context = null, - .browser = Browser.init(app), + .browser = try Browser.init(app), .message_arena = std.heap.ArenaAllocator.init(allocator), .browser_context_pool = std.heap.MemoryPool(BrowserContext(Self)).init(allocator), }; diff --git a/src/cdp/testing.zig b/src/cdp/testing.zig index 051309de..bd90cd7c 100644 --- a/src/cdp/testing.zig +++ b/src/cdp/testing.zig @@ -17,7 +17,7 @@ const Browser = struct { session: ?*Session = null, arena: std.heap.ArenaAllocator, - pub fn init(app: *App) Browser { + pub fn init(app: *App) !Browser { return .{ .arena = std.heap.ArenaAllocator.init(app.allocator), }; diff --git a/src/http/Client.zig b/src/http/Client.zig deleted file mode 100644 index cc234e4a..00000000 --- a/src/http/Client.zig +++ /dev/null @@ -1,1776 +0,0 @@ -//! HTTP(S) Client implementation. -//! -//! Connections are opened in a thread-safe manner, but individual Requests are not. -//! -//! TLS support may be disabled via `std.options.http_disable_tls`. - -const std = @import("std"); -const builtin = @import("builtin"); -const testing = std.testing; -const http = std.http; -const mem = std.mem; -const net = std.net; -const Uri = std.Uri; -const Allocator = mem.Allocator; -const assert = std.debug.assert; -const use_vectors = builtin.zig_backend != .stage2_x86_64; - -const Client = @This(); -const proto = std.http.protocol; - -const tls23 = @import("tls"); - -pub const disable_tls = std.options.http_disable_tls; - -/// Used for all client allocations. Must be thread-safe. -allocator: Allocator, - -ca_bundle: if (disable_tls) void else std.crypto.Certificate.Bundle = if (disable_tls) {} else .{}, -ca_bundle_mutex: std.Thread.Mutex = .{}, - -/// When this is `true`, the next time this client performs an HTTPS request, -/// it will first rescan the system for root certificates. -next_https_rescan_certs: bool = true, - -/// The pool of connections that can be reused (and currently in use). -connection_pool: ConnectionPool = .{}, - -/// If populated, all http traffic travels through this third party. -/// This field cannot be modified while the client has active connections. -/// Pointer to externally-owned memory. -http_proxy: ?*Proxy = null, -/// If populated, all https traffic travels through this third party. -/// This field cannot be modified while the client has active connections. -/// Pointer to externally-owned memory. -https_proxy: ?*Proxy = null, - -/// A set of linked lists of connections that can be reused. -pub const ConnectionPool = struct { - mutex: std.Thread.Mutex = .{}, - /// Open connections that are currently in use. - used: Queue = .{}, - /// Open connections that are not currently in use. - free: Queue = .{}, - free_len: usize = 0, - free_size: usize = 32, - - /// The criteria for a connection to be considered a match. - pub const Criteria = struct { - host: []const u8, - port: u16, - protocol: Connection.Protocol, - }; - - const Queue = std.DoublyLinkedList(Connection); - pub const Node = Queue.Node; - - /// Finds and acquires a connection from the connection pool matching the criteria. This function is threadsafe. - /// If no connection is found, null is returned. - pub fn findConnection(pool: *ConnectionPool, criteria: Criteria) ?*Connection { - pool.mutex.lock(); - defer pool.mutex.unlock(); - - var next = pool.free.last; - while (next) |node| : (next = node.prev) { - if (node.data.protocol != criteria.protocol) continue; - if (node.data.port != criteria.port) continue; - - // Domain names are case-insensitive (RFC 5890, Section 2.3.2.4) - if (!std.ascii.eqlIgnoreCase(node.data.host, criteria.host)) continue; - - pool.acquireUnsafe(node); - return &node.data; - } - - return null; - } - - /// Acquires an existing connection from the connection pool. This function is not threadsafe. - pub fn acquireUnsafe(pool: *ConnectionPool, node: *Node) void { - pool.free.remove(node); - pool.free_len -= 1; - - pool.used.append(node); - } - - /// Acquires an existing connection from the connection pool. This function is threadsafe. - pub fn acquire(pool: *ConnectionPool, node: *Node) void { - pool.mutex.lock(); - defer pool.mutex.unlock(); - - return pool.acquireUnsafe(node); - } - - /// Tries to release a connection back to the connection pool. This function is threadsafe. - /// If the connection is marked as closing, it will be closed instead. - /// - /// The allocator must be the owner of all nodes in this pool. - /// The allocator must be the owner of all resources associated with the connection. - pub fn release(pool: *ConnectionPool, allocator: Allocator, connection: *Connection) void { - pool.mutex.lock(); - defer pool.mutex.unlock(); - - const node: *Node = @fieldParentPtr("data", connection); - - pool.used.remove(node); - - if (node.data.closing or pool.free_size == 0) { - node.data.close(allocator); - return allocator.destroy(node); - } - - if (pool.free_len >= pool.free_size) { - const popped = pool.free.popFirst() orelse unreachable; - pool.free_len -= 1; - - popped.data.close(allocator); - allocator.destroy(popped); - } - - if (node.data.proxied) { - pool.free.prepend(node); // proxied connections go to the end of the queue, always try direct connections first - } else { - pool.free.append(node); - } - - pool.free_len += 1; - } - - /// Adds a newly created node to the pool of used connections. This function is threadsafe. - pub fn addUsed(pool: *ConnectionPool, node: *Node) void { - pool.mutex.lock(); - defer pool.mutex.unlock(); - - pool.used.append(node); - } - - /// Resizes the connection pool. This function is threadsafe. - /// - /// If the new size is smaller than the current size, then idle connections will be closed until the pool is the new size. - pub fn resize(pool: *ConnectionPool, allocator: Allocator, new_size: usize) void { - pool.mutex.lock(); - defer pool.mutex.unlock(); - - const next = pool.free.first; - _ = next; - while (pool.free_len > new_size) { - const popped = pool.free.popFirst() orelse unreachable; - pool.free_len -= 1; - - popped.data.close(allocator); - allocator.destroy(popped); - } - - pool.free_size = new_size; - } - - /// Frees the connection pool and closes all connections within. This function is threadsafe. - /// - /// All future operations on the connection pool will deadlock. - pub fn deinit(pool: *ConnectionPool, allocator: Allocator) void { - pool.mutex.lock(); - - var next = pool.free.first; - while (next) |node| { - defer allocator.destroy(node); - next = node.next; - - node.data.close(allocator); - } - - next = pool.used.first; - while (next) |node| { - defer allocator.destroy(node); - next = node.next; - - node.data.close(allocator); - } - - pool.* = undefined; - } -}; - -/// An interface to either a plain or TLS connection. -pub const Connection = struct { - stream: net.Stream, - /// undefined unless protocol is tls. - tls_client: if (!disable_tls) *tls23.Connection(net.Stream) else void, - - /// The protocol that this connection is using. - protocol: Protocol, - - /// The host that this connection is connected to. - host: []u8, - - /// The port that this connection is connected to. - port: u16, - - /// Whether this connection is proxied and is not directly connected. - proxied: bool = false, - - /// Whether this connection is closing when we're done with it. - closing: bool = false, - - read_start: BufferSize = 0, - read_end: BufferSize = 0, - write_end: BufferSize = 0, - read_buf: [buffer_size]u8 = undefined, - write_buf: [buffer_size]u8 = undefined, - - pub const buffer_size = std.crypto.tls.max_ciphertext_record_len; - const BufferSize = std.math.IntFittingRange(0, buffer_size); - - pub const Protocol = enum { plain, tls }; - - pub fn readvDirectTls(conn: *Connection, buffers: []std.posix.iovec) ReadError!usize { - return conn.tls_client.readv(buffers) catch |err| { - // https://github.com/ziglang/zig/issues/2473 - if (mem.startsWith(u8, @errorName(err), "TlsAlert")) return error.TlsAlert; - - switch (err) { - error.TlsRecordOverflow, error.TlsBadRecordMac, error.TlsUnexpectedMessage => return error.TlsFailure, - error.ConnectionTimedOut => return error.ConnectionTimedOut, - error.ConnectionResetByPeer, error.BrokenPipe => return error.ConnectionResetByPeer, - else => return error.UnexpectedReadFailure, - } - }; - } - - pub fn readvDirect(conn: *Connection, buffers: []std.posix.iovec) ReadError!usize { - if (conn.protocol == .tls) { - if (disable_tls) unreachable; - - return conn.readvDirectTls(buffers); - } - - return conn.stream.readv(buffers) catch |err| switch (err) { - error.ConnectionTimedOut => return error.ConnectionTimedOut, - error.ConnectionResetByPeer, error.BrokenPipe => return error.ConnectionResetByPeer, - else => return error.UnexpectedReadFailure, - }; - } - - /// Refills the read buffer with data from the connection. - pub fn fill(conn: *Connection) ReadError!void { - if (conn.read_end != conn.read_start) return; - - var iovecs = [1]std.posix.iovec{ - .{ .base = &conn.read_buf, .len = conn.read_buf.len }, - }; - const nread = try conn.readvDirect(&iovecs); - if (nread == 0) return error.EndOfStream; - conn.read_start = 0; - conn.read_end = @intCast(nread); - } - - /// Returns the current slice of buffered data. - pub fn peek(conn: *Connection) []const u8 { - return conn.read_buf[conn.read_start..conn.read_end]; - } - - /// Discards the given number of bytes from the read buffer. - pub fn drop(conn: *Connection, num: BufferSize) void { - conn.read_start += num; - } - - /// Reads data from the connection into the given buffer. - pub fn read(conn: *Connection, buffer: []u8) ReadError!usize { - const available_read = conn.read_end - conn.read_start; - const available_buffer = buffer.len; - - if (available_read > available_buffer) { // partially read buffered data - @memcpy(buffer[0..available_buffer], conn.read_buf[conn.read_start..conn.read_end][0..available_buffer]); - conn.read_start += @intCast(available_buffer); - - return available_buffer; - } else if (available_read > 0) { // fully read buffered data - @memcpy(buffer[0..available_read], conn.read_buf[conn.read_start..conn.read_end]); - conn.read_start += available_read; - - return available_read; - } - - var iovecs = [2]std.posix.iovec{ - .{ .base = buffer.ptr, .len = buffer.len }, - .{ .base = &conn.read_buf, .len = conn.read_buf.len }, - }; - const nread = try conn.readvDirect(&iovecs); - - if (nread > buffer.len) { - conn.read_start = 0; - conn.read_end = @intCast(nread - buffer.len); - return buffer.len; - } - - return nread; - } - - pub const ReadError = error{ - TlsFailure, - TlsAlert, - ConnectionTimedOut, - ConnectionResetByPeer, - UnexpectedReadFailure, - EndOfStream, - }; - - pub const Reader = std.io.Reader(*Connection, ReadError, read); - - pub fn reader(conn: *Connection) Reader { - return Reader{ .context = conn }; - } - - pub fn writeAllDirectTls(conn: *Connection, buffer: []const u8) WriteError!void { - return conn.tls_client.writeAll(buffer) catch |err| switch (err) { - error.BrokenPipe, error.ConnectionResetByPeer => return error.ConnectionResetByPeer, - else => return error.UnexpectedWriteFailure, - }; - } - - pub fn writeAllDirect(conn: *Connection, buffer: []const u8) WriteError!void { - if (conn.protocol == .tls) { - if (disable_tls) unreachable; - - return conn.writeAllDirectTls(buffer); - } - - return conn.stream.writeAll(buffer) catch |err| switch (err) { - error.BrokenPipe, error.ConnectionResetByPeer => return error.ConnectionResetByPeer, - else => return error.UnexpectedWriteFailure, - }; - } - - /// Writes the given buffer to the connection. - pub fn write(conn: *Connection, buffer: []const u8) WriteError!usize { - if (conn.write_buf.len - conn.write_end < buffer.len) { - try conn.flush(); - - if (buffer.len > conn.write_buf.len) { - try conn.writeAllDirect(buffer); - return buffer.len; - } - } - - @memcpy(conn.write_buf[conn.write_end..][0..buffer.len], buffer); - conn.write_end += @intCast(buffer.len); - - return buffer.len; - } - - /// Returns a buffer to be filled with exactly len bytes to write to the connection. - pub fn allocWriteBuffer(conn: *Connection, len: BufferSize) WriteError![]u8 { - if (conn.write_buf.len - conn.write_end < len) try conn.flush(); - defer conn.write_end += len; - return conn.write_buf[conn.write_end..][0..len]; - } - - /// Flushes the write buffer to the connection. - pub fn flush(conn: *Connection) WriteError!void { - if (conn.write_end == 0) return; - - try conn.writeAllDirect(conn.write_buf[0..conn.write_end]); - conn.write_end = 0; - } - - pub const WriteError = error{ - ConnectionResetByPeer, - UnexpectedWriteFailure, - }; - - pub const Writer = std.io.Writer(*Connection, WriteError, write); - - pub fn writer(conn: *Connection) Writer { - return Writer{ .context = conn }; - } - - /// Closes the connection. - pub fn close(conn: *Connection, allocator: Allocator) void { - if (conn.protocol == .tls) { - if (disable_tls) unreachable; - - // try to cleanly close the TLS connection, for any server that cares. - conn.tls_client.close() catch {}; - allocator.destroy(conn.tls_client); - } - - conn.stream.close(); - allocator.free(conn.host); - } -}; - -/// The mode of transport for requests. -pub const RequestTransfer = union(enum) { - content_length: u64, - chunked: void, - none: void, -}; - -/// The decompressor for response messages. -pub const Compression = union(enum) { - pub const DeflateDecompressor = std.compress.zlib.Decompressor(Request.TransferReader); - pub const GzipDecompressor = std.compress.gzip.Decompressor(Request.TransferReader); - // https://github.com/ziglang/zig/issues/18937 - //pub const ZstdDecompressor = std.compress.zstd.DecompressStream(Request.TransferReader, .{}); - - deflate: DeflateDecompressor, - gzip: GzipDecompressor, - // https://github.com/ziglang/zig/issues/18937 - //zstd: ZstdDecompressor, - none: void, -}; - -/// A HTTP response originating from a server. -pub const Response = struct { - version: http.Version, - status: http.Status, - reason: []const u8, - - /// Points into the user-provided `server_header_buffer`. - location: ?[]const u8 = null, - /// Points into the user-provided `server_header_buffer`. - content_type: ?[]const u8 = null, - /// Points into the user-provided `server_header_buffer`. - content_disposition: ?[]const u8 = null, - - keep_alive: bool, - - /// If present, the number of bytes in the response body. - content_length: ?u64 = null, - - /// If present, the transfer encoding of the response body, otherwise none. - transfer_encoding: http.TransferEncoding = .none, - - /// If present, the compression of the response body, otherwise identity (no compression). - transfer_compression: http.ContentEncoding = .identity, - - parser: proto.HeadersParser, - compression: Compression = .none, - - /// Whether the response body should be skipped. Any data read from the - /// response body will be discarded. - skip: bool = false, - - pub const ParseError = error{ - HttpHeadersInvalid, - HttpHeaderContinuationsUnsupported, - HttpTransferEncodingUnsupported, - HttpConnectionHeaderUnsupported, - InvalidContentLength, - CompressionUnsupported, - }; - - pub fn parse(res: *Response, bytes: []const u8) ParseError!void { - var it = mem.splitSequence(u8, bytes, "\r\n"); - - const first_line = it.next().?; - if (first_line.len < 12) { - return error.HttpHeadersInvalid; - } - - const version: http.Version = switch (int64(first_line[0..8])) { - int64("HTTP/1.0") => .@"HTTP/1.0", - int64("HTTP/1.1") => .@"HTTP/1.1", - else => return error.HttpHeadersInvalid, - }; - if (first_line[8] != ' ') return error.HttpHeadersInvalid; - const status: http.Status = @enumFromInt(parseInt3(first_line[9..12])); - const reason = mem.trimLeft(u8, first_line[12..], " "); - - res.version = version; - res.status = status; - res.reason = reason; - res.keep_alive = switch (version) { - .@"HTTP/1.0" => false, - .@"HTTP/1.1" => true, - }; - - while (it.next()) |line| { - if (line.len == 0) return; - switch (line[0]) { - ' ', '\t' => return error.HttpHeaderContinuationsUnsupported, - else => {}, - } - - var line_it = mem.splitScalar(u8, line, ':'); - const header_name = line_it.next().?; - const header_value = mem.trim(u8, line_it.rest(), " \t"); - if (header_name.len == 0) return error.HttpHeadersInvalid; - - if (std.ascii.eqlIgnoreCase(header_name, "connection")) { - res.keep_alive = !std.ascii.eqlIgnoreCase(header_value, "close"); - } else if (std.ascii.eqlIgnoreCase(header_name, "content-type")) { - res.content_type = header_value; - } else if (std.ascii.eqlIgnoreCase(header_name, "location")) { - res.location = header_value; - } else if (std.ascii.eqlIgnoreCase(header_name, "content-disposition")) { - res.content_disposition = header_value; - } else if (std.ascii.eqlIgnoreCase(header_name, "transfer-encoding")) { - // Transfer-Encoding: second, first - // Transfer-Encoding: deflate, chunked - var iter = mem.splitBackwardsScalar(u8, header_value, ','); - - const first = iter.first(); - const trimmed_first = mem.trim(u8, first, " "); - - var next: ?[]const u8 = first; - if (std.meta.stringToEnum(http.TransferEncoding, trimmed_first)) |transfer| { - if (res.transfer_encoding != .none) return error.HttpHeadersInvalid; // we already have a transfer encoding - res.transfer_encoding = transfer; - - next = iter.next(); - } - - if (next) |second| { - const trimmed_second = mem.trim(u8, second, " "); - - if (std.meta.stringToEnum(http.ContentEncoding, trimmed_second)) |transfer| { - if (res.transfer_compression != .identity) return error.HttpHeadersInvalid; // double compression is not supported - res.transfer_compression = transfer; - } else { - return error.HttpTransferEncodingUnsupported; - } - } - - if (iter.next()) |_| return error.HttpTransferEncodingUnsupported; - } else if (std.ascii.eqlIgnoreCase(header_name, "content-length")) { - const content_length = std.fmt.parseInt(u64, header_value, 10) catch return error.InvalidContentLength; - - if (res.content_length != null and res.content_length != content_length) return error.HttpHeadersInvalid; - - res.content_length = content_length; - } else if (std.ascii.eqlIgnoreCase(header_name, "content-encoding")) { - if (res.transfer_compression != .identity) return error.HttpHeadersInvalid; - - const trimmed = mem.trim(u8, header_value, " "); - - if (std.meta.stringToEnum(http.ContentEncoding, trimmed)) |ce| { - res.transfer_compression = ce; - } else { - return error.HttpTransferEncodingUnsupported; - } - } - } - return error.HttpHeadersInvalid; // missing empty line - } - - test parse { - const response_bytes = "HTTP/1.1 200 OK\r\n" ++ - "LOcation:url\r\n" ++ - "content-tYpe: text/plain\r\n" ++ - "content-disposition:attachment; filename=example.txt \r\n" ++ - "content-Length:10\r\n" ++ - "TRansfer-encoding:\tdeflate, chunked \r\n" ++ - "connectioN:\t keep-alive \r\n\r\n"; - - var header_buffer: [1024]u8 = undefined; - var res = Response{ - .status = undefined, - .reason = undefined, - .version = undefined, - .keep_alive = false, - .parser = proto.HeadersParser.init(&header_buffer), - }; - - @memcpy(header_buffer[0..response_bytes.len], response_bytes); - res.parser.header_bytes_len = response_bytes.len; - - try res.parse(response_bytes); - - try testing.expectEqual(.@"HTTP/1.1", res.version); - try testing.expectEqualStrings("OK", res.reason); - try testing.expectEqual(.ok, res.status); - - try testing.expectEqualStrings("url", res.location.?); - try testing.expectEqualStrings("text/plain", res.content_type.?); - try testing.expectEqualStrings("attachment; filename=example.txt", res.content_disposition.?); - - try testing.expectEqual(true, res.keep_alive); - try testing.expectEqual(10, res.content_length.?); - try testing.expectEqual(.chunked, res.transfer_encoding); - try testing.expectEqual(.deflate, res.transfer_compression); - } - - inline fn int64(array: *const [8]u8) u64 { - return @bitCast(array.*); - } - - fn parseInt3(text: *const [3]u8) u10 { - if (use_vectors) { - const nnn: @Vector(3, u8) = text.*; - const zero: @Vector(3, u8) = .{ '0', '0', '0' }; - const mmm: @Vector(3, u10) = .{ 100, 10, 1 }; - return @reduce(.Add, @as(@Vector(3, u10), nnn -% zero) *% mmm); - } - return std.fmt.parseInt(u10, text, 10) catch unreachable; - } - - test parseInt3 { - const expectEqual = testing.expectEqual; - try expectEqual(@as(u10, 0), parseInt3("000")); - try expectEqual(@as(u10, 418), parseInt3("418")); - try expectEqual(@as(u10, 999), parseInt3("999")); - } - - pub fn iterateHeaders(r: Response) http.HeaderIterator { - return http.HeaderIterator.init(r.parser.get()); - } - - test iterateHeaders { - const response_bytes = "HTTP/1.1 200 OK\r\n" ++ - "LOcation:url\r\n" ++ - "content-tYpe: text/plain\r\n" ++ - "content-disposition:attachment; filename=example.txt \r\n" ++ - "content-Length:10\r\n" ++ - "TRansfer-encoding:\tdeflate, chunked \r\n" ++ - "connectioN:\t keep-alive \r\n\r\n"; - - var header_buffer: [1024]u8 = undefined; - var res = Response{ - .status = undefined, - .reason = undefined, - .version = undefined, - .keep_alive = false, - .parser = proto.HeadersParser.init(&header_buffer), - }; - - @memcpy(header_buffer[0..response_bytes.len], response_bytes); - res.parser.header_bytes_len = response_bytes.len; - - var it = res.iterateHeaders(); - { - const header = it.next().?; - try testing.expectEqualStrings("LOcation", header.name); - try testing.expectEqualStrings("url", header.value); - try testing.expect(!it.is_trailer); - } - { - const header = it.next().?; - try testing.expectEqualStrings("content-tYpe", header.name); - try testing.expectEqualStrings("text/plain", header.value); - try testing.expect(!it.is_trailer); - } - { - const header = it.next().?; - try testing.expectEqualStrings("content-disposition", header.name); - try testing.expectEqualStrings("attachment; filename=example.txt", header.value); - try testing.expect(!it.is_trailer); - } - { - const header = it.next().?; - try testing.expectEqualStrings("content-Length", header.name); - try testing.expectEqualStrings("10", header.value); - try testing.expect(!it.is_trailer); - } - { - const header = it.next().?; - try testing.expectEqualStrings("TRansfer-encoding", header.name); - try testing.expectEqualStrings("deflate, chunked", header.value); - try testing.expect(!it.is_trailer); - } - { - const header = it.next().?; - try testing.expectEqualStrings("connectioN", header.name); - try testing.expectEqualStrings("keep-alive", header.value); - try testing.expect(!it.is_trailer); - } - try testing.expectEqual(null, it.next()); - } -}; - -/// A HTTP request that has been sent. -/// -/// Order of operations: open -> send[ -> write -> finish] -> wait -> read -pub const Request = struct { - uri: Uri, - client: *Client, - /// This is null when the connection is released. - connection: ?*Connection, - keep_alive: bool, - - method: http.Method, - version: http.Version = .@"HTTP/1.1", - transfer_encoding: RequestTransfer, - redirect_behavior: RedirectBehavior, - - /// Whether the request should handle a 100-continue response before sending the request body. - handle_continue: bool, - - /// The response associated with this request. - /// - /// This field is undefined until `wait` is called. - response: Response, - - /// Standard headers that have default, but overridable, behavior. - headers: Headers, - - /// These headers are kept including when following a redirect to a - /// different domain. - /// Externally-owned; must outlive the Request. - extra_headers: []const http.Header, - - /// These headers are stripped when following a redirect to a different - /// domain. - /// Externally-owned; must outlive the Request. - privileged_headers: []const http.Header, - - pub const Headers = struct { - host: Value = .default, - authorization: Value = .default, - user_agent: Value = .default, - connection: Value = .default, - accept_encoding: Value = .default, - content_type: Value = .default, - - pub const Value = union(enum) { - default, - omit, - override: []const u8, - }; - }; - - /// Any value other than `not_allowed` or `unhandled` means that integer represents - /// how many remaining redirects are allowed. - pub const RedirectBehavior = enum(u16) { - /// The next redirect will cause an error. - not_allowed = 0, - /// Redirects are passed to the client to analyze the redirect response - /// directly. - unhandled = std.math.maxInt(u16), - _, - - pub fn subtractOne(rb: *RedirectBehavior) void { - switch (rb.*) { - .not_allowed => unreachable, - .unhandled => unreachable, - _ => rb.* = @enumFromInt(@intFromEnum(rb.*) - 1), - } - } - - pub fn remaining(rb: RedirectBehavior) u16 { - assert(rb != .unhandled); - return @intFromEnum(rb); - } - }; - - /// Frees all resources associated with the request. - pub fn deinit(req: *Request) void { - if (req.connection) |connection| { - if (!req.response.parser.done) { - // If the response wasn't fully read, then we need to close the connection. - connection.closing = true; - } - req.client.connection_pool.release(req.client.allocator, connection); - } - req.* = undefined; - } - - // This function must deallocate all resources associated with the request, - // or keep those which will be used. - // This needs to be kept in sync with deinit and request. - fn redirect(req: *Request, uri: Uri) !void { - assert(req.response.parser.done); - - req.client.connection_pool.release(req.client.allocator, req.connection.?); - req.connection = null; - - var server_header = std.heap.FixedBufferAllocator.init(req.response.parser.header_bytes_buffer); - defer req.response.parser.header_bytes_buffer = server_header.buffer[server_header.end_index..]; - const protocol, const valid_uri = try validateUri(uri, server_header.allocator()); - - const new_host = valid_uri.host.?.raw; - const prev_host = req.uri.host.?.raw; - const keep_privileged_headers = - std.ascii.eqlIgnoreCase(valid_uri.scheme, req.uri.scheme) and - std.ascii.endsWithIgnoreCase(new_host, prev_host) and - (new_host.len == prev_host.len or new_host[new_host.len - prev_host.len - 1] == '.'); - if (!keep_privileged_headers) { - // When redirecting to a different domain, strip privileged headers. - req.privileged_headers = &.{}; - } - - if (switch (req.response.status) { - .see_other => true, - .moved_permanently, .found => req.method == .POST, - else => false, - }) { - // A redirect to a GET must change the method and remove the body. - req.method = .GET; - req.transfer_encoding = .none; - req.headers.content_type = .omit; - } - - if (req.transfer_encoding != .none) { - // The request body has already been sent. The request is - // still in a valid state, but the redirect must be handled - // manually. - return error.RedirectRequiresResend; - } - - req.uri = valid_uri; - req.connection = try req.client.connect(new_host, uriPort(valid_uri, protocol), protocol); - req.redirect_behavior.subtractOne(); - req.response.parser.reset(); - - req.response = .{ - .version = undefined, - .status = undefined, - .reason = undefined, - .keep_alive = undefined, - .parser = req.response.parser, - }; - } - - pub const SendError = Connection.WriteError || error{ InvalidContentLength, UnsupportedTransferEncoding }; - - /// Send the HTTP request headers to the server. - pub fn send(req: *Request) SendError!void { - if (!req.method.requestHasBody() and req.transfer_encoding != .none) - return error.UnsupportedTransferEncoding; - - const connection = req.connection.?; - const w = connection.writer(); - - try req.method.write(w); - try w.writeByte(' '); - - if (req.method == .CONNECT) { - try req.uri.writeToStream(.{ .authority = true }, w); - } else { - try req.uri.writeToStream(.{ - .scheme = connection.proxied, - .authentication = connection.proxied, - .authority = connection.proxied, - .path = true, - .query = true, - }, w); - } - try w.writeByte(' '); - try w.writeAll(@tagName(req.version)); - try w.writeAll("\r\n"); - - if (try emitOverridableHeader("host: ", req.headers.host, w)) { - try w.writeAll("host: "); - try req.uri.writeToStream(.{ .authority = true }, w); - try w.writeAll("\r\n"); - } - - if (try emitOverridableHeader("authorization: ", req.headers.authorization, w)) { - if (req.uri.user != null or req.uri.password != null) { - try w.writeAll("authorization: "); - const authorization = try connection.allocWriteBuffer( - @intCast(basic_authorization.valueLengthFromUri(req.uri)), - ); - assert(basic_authorization.value(req.uri, authorization).len == authorization.len); - try w.writeAll("\r\n"); - } - } - - if (try emitOverridableHeader("user-agent: ", req.headers.user_agent, w)) { - try w.writeAll("user-agent: zig/"); - try w.writeAll(builtin.zig_version_string); - try w.writeAll(" (std.http)\r\n"); - } - - if (try emitOverridableHeader("connection: ", req.headers.connection, w)) { - if (req.keep_alive) { - try w.writeAll("connection: keep-alive\r\n"); - } else { - try w.writeAll("connection: close\r\n"); - } - } - - if (try emitOverridableHeader("accept-encoding: ", req.headers.accept_encoding, w)) { - // https://github.com/ziglang/zig/issues/18937 - //try w.writeAll("accept-encoding: gzip, deflate, zstd\r\n"); - try w.writeAll("accept-encoding: gzip, deflate\r\n"); - } - - switch (req.transfer_encoding) { - .chunked => try w.writeAll("transfer-encoding: chunked\r\n"), - .content_length => |len| try w.print("content-length: {d}\r\n", .{len}), - .none => {}, - } - - if (try emitOverridableHeader("content-type: ", req.headers.content_type, w)) { - // The default is to omit content-type if not provided because - // "application/octet-stream" is redundant. - } - - for (req.extra_headers) |header| { - assert(header.name.len != 0); - - try w.writeAll(header.name); - try w.writeAll(": "); - try w.writeAll(header.value); - try w.writeAll("\r\n"); - } - - if (connection.proxied) proxy: { - const proxy = switch (connection.protocol) { - .plain => req.client.http_proxy, - .tls => req.client.https_proxy, - } orelse break :proxy; - - const authorization = proxy.authorization orelse break :proxy; - try w.writeAll("proxy-authorization: "); - try w.writeAll(authorization); - try w.writeAll("\r\n"); - } - - try w.writeAll("\r\n"); - - try connection.flush(); - } - - /// Returns true if the default behavior is required, otherwise handles - /// writing (or not writing) the header. - fn emitOverridableHeader(prefix: []const u8, v: Headers.Value, w: anytype) !bool { - switch (v) { - .default => return true, - .omit => return false, - .override => |x| { - try w.writeAll(prefix); - try w.writeAll(x); - try w.writeAll("\r\n"); - return false; - }, - } - } - - const TransferReadError = Connection.ReadError || proto.HeadersParser.ReadError; - - const TransferReader = std.io.Reader(*Request, TransferReadError, transferRead); - - fn transferReader(req: *Request) TransferReader { - return .{ .context = req }; - } - - fn transferRead(req: *Request, buf: []u8) TransferReadError!usize { - if (req.response.parser.done) return 0; - - var index: usize = 0; - while (index == 0) { - const amt = try req.response.parser.read(req.connection.?, buf[index..], req.response.skip); - if (amt == 0 and req.response.parser.done) break; - index += amt; - } - - return index; - } - - pub const WaitError = RequestError || SendError || TransferReadError || - proto.HeadersParser.CheckCompleteHeadError || Response.ParseError || - error{ // TODO: file zig fmt issue for this bad indentation - TooManyHttpRedirects, - RedirectRequiresResend, - HttpRedirectLocationMissing, - HttpRedirectLocationInvalid, - CompressionInitializationFailed, - CompressionUnsupported, - }; - - /// Waits for a response from the server and parses any headers that are sent. - /// This function will block until the final response is received. - /// - /// If handling redirects and the request has no payload, then this - /// function will automatically follow redirects. If a request payload is - /// present, then this function will error with - /// error.RedirectRequiresResend. - /// - /// Must be called after `send` and, if any data was written to the request - /// body, then also after `finish`. - pub fn wait(req: *Request) WaitError!void { - while (true) { - // This while loop is for handling redirects, which means the request's - // connection may be different than the previous iteration. However, it - // is still guaranteed to be non-null with each iteration of this loop. - const connection = req.connection.?; - - while (true) { // read headers - try connection.fill(); - - const nchecked = try req.response.parser.checkCompleteHead(connection.peek()); - connection.drop(@intCast(nchecked)); - - if (req.response.parser.state.isContent()) break; - } - - try req.response.parse(req.response.parser.get()); - - if (req.response.status == .@"continue") { - // We're done parsing the continue response; reset to prepare - // for the real response. - req.response.parser.done = true; - req.response.parser.reset(); - - if (req.handle_continue) - continue; - - return; // we're not handling the 100-continue - } - - // we're switching protocols, so this connection is no longer doing http - if (req.method == .CONNECT and req.response.status.class() == .success) { - connection.closing = false; - req.response.parser.done = true; - return; // the connection is not HTTP past this point - } - - connection.closing = !req.response.keep_alive or !req.keep_alive; - - // Any response to a HEAD request and any response with a 1xx - // (Informational), 204 (No Content), or 304 (Not Modified) status - // code is always terminated by the first empty line after the - // header fields, regardless of the header fields present in the - // message. - if (req.method == .HEAD or req.response.status.class() == .informational or - req.response.status == .no_content or req.response.status == .not_modified) - { - req.response.parser.done = true; - return; // The response is empty; no further setup or redirection is necessary. - } - - switch (req.response.transfer_encoding) { - .none => { - if (req.response.content_length) |cl| { - req.response.parser.next_chunk_length = cl; - - if (cl == 0) req.response.parser.done = true; - } else { - // read until the connection is closed - req.response.parser.next_chunk_length = std.math.maxInt(u64); - } - }, - .chunked => { - req.response.parser.next_chunk_length = 0; - req.response.parser.state = .chunk_head_size; - }, - } - - if (req.response.status.class() == .redirect and req.redirect_behavior != .unhandled) { - // skip the body of the redirect response, this will at least - // leave the connection in a known good state. - req.response.skip = true; - assert(try req.transferRead(&.{}) == 0); // we're skipping, no buffer is necessary - - if (req.redirect_behavior == .not_allowed) return error.TooManyHttpRedirects; - - const location = req.response.location orelse - return error.HttpRedirectLocationMissing; - - // This mutates the beginning of header_bytes_buffer and uses that - // for the backing memory of the returned Uri. - try req.redirect(req.uri.resolve_inplace( - location, - &req.response.parser.header_bytes_buffer, - ) catch |err| switch (err) { - error.UnexpectedCharacter, - error.InvalidFormat, - error.InvalidPort, - => return error.HttpRedirectLocationInvalid, - error.NoSpaceLeft => return error.HttpHeadersOversize, - }); - try req.send(); - } else { - req.response.skip = false; - if (!req.response.parser.done) { - switch (req.response.transfer_compression) { - .identity => req.response.compression = .none, - .compress, .@"x-compress" => return error.CompressionUnsupported, - .deflate => req.response.compression = .{ - .deflate = std.compress.zlib.decompressor(req.transferReader()), - }, - .gzip, .@"x-gzip" => req.response.compression = .{ - .gzip = std.compress.gzip.decompressor(req.transferReader()), - }, - // https://github.com/ziglang/zig/issues/18937 - //.zstd => req.response.compression = .{ - // .zstd = std.compress.zstd.decompressStream(req.client.allocator, req.transferReader()), - //}, - .zstd => return error.CompressionUnsupported, - } - } - - break; - } - } - } - - pub const ReadError = TransferReadError || proto.HeadersParser.CheckCompleteHeadError || - error{ DecompressionFailure, InvalidTrailers }; - - pub const Reader = std.io.Reader(*Request, ReadError, read); - - pub fn reader(req: *Request) Reader { - return .{ .context = req }; - } - - /// Reads data from the response body. Must be called after `wait`. - pub fn read(req: *Request, buffer: []u8) ReadError!usize { - const out_index = switch (req.response.compression) { - .deflate => |*deflate| deflate.read(buffer) catch return error.DecompressionFailure, - .gzip => |*gzip| gzip.read(buffer) catch return error.DecompressionFailure, - // https://github.com/ziglang/zig/issues/18937 - //.zstd => |*zstd| zstd.read(buffer) catch return error.DecompressionFailure, - else => try req.transferRead(buffer), - }; - if (out_index > 0) return out_index; - - while (!req.response.parser.state.isContent()) { // read trailing headers - try req.connection.?.fill(); - - const nchecked = try req.response.parser.checkCompleteHead(req.connection.?.peek()); - req.connection.?.drop(@intCast(nchecked)); - } - - return 0; - } - - /// Reads data from the response body. Must be called after `wait`. - pub fn readAll(req: *Request, buffer: []u8) !usize { - var index: usize = 0; - while (index < buffer.len) { - const amt = try read(req, buffer[index..]); - if (amt == 0) break; - index += amt; - } - return index; - } - - pub const WriteError = Connection.WriteError || error{ NotWriteable, MessageTooLong }; - - pub const Writer = std.io.Writer(*Request, WriteError, write); - - pub fn writer(req: *Request) Writer { - return .{ .context = req }; - } - - /// Write `bytes` to the server. The `transfer_encoding` field determines how data will be sent. - /// Must be called after `send` and before `finish`. - pub fn write(req: *Request, bytes: []const u8) WriteError!usize { - switch (req.transfer_encoding) { - .chunked => { - if (bytes.len > 0) { - try req.connection.?.writer().print("{x}\r\n", .{bytes.len}); - try req.connection.?.writer().writeAll(bytes); - try req.connection.?.writer().writeAll("\r\n"); - } - - return bytes.len; - }, - .content_length => |*len| { - if (len.* < bytes.len) return error.MessageTooLong; - - const amt = try req.connection.?.write(bytes); - len.* -= amt; - return amt; - }, - .none => return error.NotWriteable, - } - } - - /// Write `bytes` to the server. The `transfer_encoding` field determines how data will be sent. - /// Must be called after `send` and before `finish`. - pub fn writeAll(req: *Request, bytes: []const u8) WriteError!void { - var index: usize = 0; - while (index < bytes.len) { - index += try write(req, bytes[index..]); - } - } - - pub const FinishError = WriteError || error{MessageNotCompleted}; - - /// Finish the body of a request. This notifies the server that you have no more data to send. - /// Must be called after `send`. - pub fn finish(req: *Request) FinishError!void { - switch (req.transfer_encoding) { - .chunked => try req.connection.?.writer().writeAll("0\r\n\r\n"), - .content_length => |len| if (len != 0) return error.MessageNotCompleted, - .none => {}, - } - - try req.connection.?.flush(); - } -}; - -pub const Proxy = struct { - protocol: Connection.Protocol, - host: []const u8, - authorization: ?[]const u8, - port: u16, - supports_connect: bool, -}; - -/// Release all associated resources with the client. -/// -/// All pending requests must be de-initialized and all active connections released -/// before calling this function. -pub fn deinit(client: *Client) void { - assert(client.connection_pool.used.first == null); // There are still active requests. - - client.connection_pool.deinit(client.allocator); - - if (!disable_tls) - client.ca_bundle.deinit(client.allocator); - - client.* = undefined; -} - -/// Populates `http_proxy` and `https_proxy` via standard proxy environment variables. -/// Asserts the client has no active connections. -/// Uses `arena` for a few small allocations that must outlive the client, or -/// at least until those fields are set to different values. -pub fn initDefaultProxies(client: *Client, arena: Allocator) !void { - // Prevent any new connections from being created. - client.connection_pool.mutex.lock(); - defer client.connection_pool.mutex.unlock(); - - assert(client.connection_pool.used.first == null); // There are active requests. - - if (client.http_proxy == null) { - client.http_proxy = try createProxyFromEnvVar(arena, &.{ - "http_proxy", "HTTP_PROXY", "all_proxy", "ALL_PROXY", - }); - } - - if (client.https_proxy == null) { - client.https_proxy = try createProxyFromEnvVar(arena, &.{ - "https_proxy", "HTTPS_PROXY", "all_proxy", "ALL_PROXY", - }); - } -} - -fn createProxyFromEnvVar(arena: Allocator, env_var_names: []const []const u8) !?*Proxy { - const content = for (env_var_names) |name| { - break std.process.getEnvVarOwned(arena, name) catch |err| switch (err) { - error.EnvironmentVariableNotFound => continue, - else => |e| return e, - }; - } else return null; - - const uri = Uri.parse(content) catch try Uri.parseAfterScheme("http", content); - const protocol, const valid_uri = validateUri(uri, arena) catch |err| switch (err) { - error.UnsupportedUriScheme => return null, - error.UriMissingHost => return error.HttpProxyMissingHost, - error.OutOfMemory => |e| return e, - }; - - const authorization: ?[]const u8 = if (valid_uri.user != null or valid_uri.password != null) a: { - const authorization = try arena.alloc(u8, basic_authorization.valueLengthFromUri(valid_uri)); - assert(basic_authorization.value(valid_uri, authorization).len == authorization.len); - break :a authorization; - } else null; - - const proxy = try arena.create(Proxy); - proxy.* = .{ - .protocol = protocol, - .host = valid_uri.host.?.raw, - .authorization = authorization, - .port = uriPort(valid_uri, protocol), - .supports_connect = true, - }; - return proxy; -} - -pub const basic_authorization = struct { - pub const max_user_len = 255; - pub const max_password_len = 255; - pub const max_value_len = valueLength(max_user_len, max_password_len); - - const prefix = "Basic "; - - pub fn valueLength(user_len: usize, password_len: usize) usize { - return prefix.len + std.base64.standard.Encoder.calcSize(user_len + 1 + password_len); - } - - pub fn valueLengthFromUri(uri: Uri) usize { - var stream = std.io.countingWriter(std.io.null_writer); - try stream.writer().print("{user}", .{uri.user orelse Uri.Component.empty}); - const user_len = stream.bytes_written; - stream.bytes_written = 0; - try stream.writer().print("{password}", .{uri.password orelse Uri.Component.empty}); - const password_len = stream.bytes_written; - return valueLength(@intCast(user_len), @intCast(password_len)); - } - - pub fn value(uri: Uri, out: []u8) []u8 { - var buf: [max_user_len + ":".len + max_password_len]u8 = undefined; - var stream = std.io.fixedBufferStream(&buf); - stream.writer().print("{user}", .{uri.user orelse Uri.Component.empty}) catch - unreachable; - assert(stream.pos <= max_user_len); - stream.writer().print(":{password}", .{uri.password orelse Uri.Component.empty}) catch - unreachable; - - @memcpy(out[0..prefix.len], prefix); - const base64 = std.base64.standard.Encoder.encode(out[prefix.len..], stream.getWritten()); - return out[0 .. prefix.len + base64.len]; - } -}; - -pub const ConnectTcpError = Allocator.Error || error{ ConnectionRefused, NetworkUnreachable, ConnectionTimedOut, ConnectionResetByPeer, TemporaryNameServerFailure, NameServerFailure, UnknownHostName, HostLacksNetworkAddresses, UnexpectedConnectFailure, TlsInitializationFailed }; - -/// Connect to `host:port` using the specified protocol. This will reuse a connection if one is already open. -/// -/// This function is threadsafe. -pub fn connectTcp(client: *Client, host: []const u8, port: u16, protocol: Connection.Protocol) ConnectTcpError!*Connection { - if (client.connection_pool.findConnection(.{ - .host = host, - .port = port, - .protocol = protocol, - })) |node| return node; - - if (disable_tls and protocol == .tls) - return error.TlsInitializationFailed; - - const conn = try client.allocator.create(ConnectionPool.Node); - errdefer client.allocator.destroy(conn); - conn.* = .{ .data = undefined }; - - const stream = net.tcpConnectToHost(client.allocator, host, port) catch |err| switch (err) { - error.ConnectionRefused => return error.ConnectionRefused, - error.NetworkUnreachable => return error.NetworkUnreachable, - error.ConnectionTimedOut => return error.ConnectionTimedOut, - error.ConnectionResetByPeer => return error.ConnectionResetByPeer, - error.TemporaryNameServerFailure => return error.TemporaryNameServerFailure, - error.NameServerFailure => return error.NameServerFailure, - error.UnknownHostName => return error.UnknownHostName, - error.HostLacksNetworkAddresses => return error.HostLacksNetworkAddresses, - else => return error.UnexpectedConnectFailure, - }; - errdefer stream.close(); - - conn.data = .{ - .stream = stream, - .tls_client = undefined, - - .protocol = protocol, - .host = try client.allocator.dupe(u8, host), - .port = port, - }; - errdefer client.allocator.free(conn.data.host); - - if (protocol == .tls) { - if (disable_tls) unreachable; - - conn.data.tls_client = try client.allocator.create(tls23.Connection(net.Stream)); - errdefer client.allocator.destroy(conn.data.tls_client); - - conn.data.tls_client.* = tls23.client(stream, .{ - .host = host, - .root_ca = .{ .bundle = client.ca_bundle }, - }) catch return error.TlsInitializationFailed; - } - - client.connection_pool.addUsed(conn); - - return &conn.data; -} - -pub const ConnectUnixError = Allocator.Error || std.posix.SocketError || error{NameTooLong} || std.posix.ConnectError; - -/// Connect to `path` as a unix domain socket. This will reuse a connection if one is already open. -/// -/// This function is threadsafe. -pub fn connectUnix(client: *Client, path: []const u8) ConnectUnixError!*Connection { - if (client.connection_pool.findConnection(.{ - .host = path, - .port = 0, - .protocol = .plain, - })) |node| - return node; - - const conn = try client.allocator.create(ConnectionPool.Node); - errdefer client.allocator.destroy(conn); - conn.* = .{ .data = undefined }; - - const stream = try std.net.connectUnixSocket(path); - errdefer stream.close(); - - conn.data = .{ - .stream = stream, - .tls_client = undefined, - .protocol = .plain, - - .host = try client.allocator.dupe(u8, path), - .port = 0, - }; - errdefer client.allocator.free(conn.data.host); - - client.connection_pool.addUsed(conn); - - return &conn.data; -} - -/// Connect to `tunnel_host:tunnel_port` using the specified proxy with HTTP -/// CONNECT. This will reuse a connection if one is already open. -/// -/// This function is threadsafe. -pub fn connectTunnel( - client: *Client, - proxy: *Proxy, - tunnel_host: []const u8, - tunnel_port: u16, -) !*Connection { - if (!proxy.supports_connect) return error.TunnelNotSupported; - - if (client.connection_pool.findConnection(.{ - .host = tunnel_host, - .port = tunnel_port, - .protocol = proxy.protocol, - })) |node| - return node; - - var maybe_valid = false; - (tunnel: { - const conn = try client.connectTcp(proxy.host, proxy.port, proxy.protocol); - errdefer { - conn.closing = true; - client.connection_pool.release(client.allocator, conn); - } - - var buffer: [8096]u8 = undefined; - var req = client.open(.CONNECT, .{ - .scheme = "http", - .host = .{ .raw = tunnel_host }, - .port = tunnel_port, - }, .{ - .redirect_behavior = .unhandled, - .connection = conn, - .server_header_buffer = &buffer, - }) catch |err| { - std.log.debug("err {}", .{err}); - break :tunnel err; - }; - defer req.deinit(); - - req.send() catch |err| break :tunnel err; - req.wait() catch |err| break :tunnel err; - - if (req.response.status.class() == .server_error) { - maybe_valid = true; - break :tunnel error.ServerError; - } - - if (req.response.status != .ok) break :tunnel error.ConnectionRefused; - - // this connection is now a tunnel, so we can't use it for anything else, it will only be released when the client is de-initialized. - req.connection = null; - - client.allocator.free(conn.host); - conn.host = try client.allocator.dupe(u8, tunnel_host); - errdefer client.allocator.free(conn.host); - - conn.port = tunnel_port; - conn.closing = false; - - return conn; - }) catch { - // something went wrong with the tunnel - proxy.supports_connect = maybe_valid; - return error.TunnelNotSupported; - }; -} - -// Prevents a dependency loop in open() -const ConnectErrorPartial = ConnectTcpError || error{ UnsupportedUriScheme, ConnectionRefused }; -pub const ConnectError = ConnectErrorPartial || RequestError; - -/// Connect to `host:port` using the specified protocol. This will reuse a -/// connection if one is already open. -/// If a proxy is configured for the client, then the proxy will be used to -/// connect to the host. -/// -/// This function is threadsafe. -pub fn connect( - client: *Client, - host: []const u8, - port: u16, - protocol: Connection.Protocol, -) ConnectError!*Connection { - const proxy = switch (protocol) { - .plain => client.http_proxy, - .tls => client.https_proxy, - } orelse return client.connectTcp(host, port, protocol); - - // Prevent proxying through itself. - if (std.ascii.eqlIgnoreCase(proxy.host, host) and - proxy.port == port and proxy.protocol == protocol) - { - return client.connectTcp(host, port, protocol); - } - - if (proxy.supports_connect) tunnel: { - return connectTunnel(client, proxy, host, port) catch |err| switch (err) { - error.TunnelNotSupported => break :tunnel, - else => |e| return e, - }; - } - - // fall back to using the proxy as a normal http proxy - const conn = try client.connectTcp(proxy.host, proxy.port, proxy.protocol); - errdefer { - conn.closing = true; - client.connection_pool.release(conn); - } - - conn.proxied = true; - return conn; -} - -pub const RequestError = ConnectTcpError || ConnectErrorPartial || Request.SendError || - std.fmt.ParseIntError || Connection.WriteError || - error{ // TODO: file a zig fmt issue for this bad indentation - UnsupportedUriScheme, - UriMissingHost, - - CertificateBundleLoadFailure, - UnsupportedTransferEncoding, -}; - -pub const RequestOptions = struct { - version: http.Version = .@"HTTP/1.1", - - /// Automatically ignore 100 Continue responses. This assumes you don't - /// care, and will have sent the body before you wait for the response. - /// - /// If this is not the case AND you know the server will send a 100 - /// Continue, set this to false and wait for a response before sending the - /// body. If you wait AND the server does not send a 100 Continue before - /// you finish the request, then the request *will* deadlock. - handle_continue: bool = true, - - /// If false, close the connection after the one request. If true, - /// participate in the client connection pool. - keep_alive: bool = true, - - /// This field specifies whether to automatically follow redirects, and if - /// so, how many redirects to follow before returning an error. - /// - /// This will only follow redirects for repeatable requests (ie. with no - /// payload or the server has acknowledged the payload). - redirect_behavior: Request.RedirectBehavior = @enumFromInt(3), - - /// Externally-owned memory used to store the server's entire HTTP header. - /// `error.HttpHeadersOversize` is returned from read() when a - /// client sends too many bytes of HTTP headers. - server_header_buffer: []u8, - - /// Must be an already acquired connection. - connection: ?*Connection = null, - - /// Standard headers that have default, but overridable, behavior. - headers: Request.Headers = .{}, - /// These headers are kept including when following a redirect to a - /// different domain. - /// Externally-owned; must outlive the Request. - extra_headers: []const http.Header = &.{}, - /// These headers are stripped when following a redirect to a different - /// domain. - /// Externally-owned; must outlive the Request. - privileged_headers: []const http.Header = &.{}, -}; - -fn validateUri(uri: Uri, arena: Allocator) !struct { Connection.Protocol, Uri } { - const protocol_map = std.StaticStringMap(Connection.Protocol).initComptime(.{ - .{ "http", .plain }, - .{ "ws", .plain }, - .{ "https", .tls }, - .{ "wss", .tls }, - }); - const protocol = protocol_map.get(uri.scheme) orelse return error.UnsupportedUriScheme; - var valid_uri = uri; - // The host is always going to be needed as a raw string for hostname resolution anyway. - valid_uri.host = .{ - .raw = try (uri.host orelse return error.UriMissingHost).toRawMaybeAlloc(arena), - }; - return .{ protocol, valid_uri }; -} - -fn uriPort(uri: Uri, protocol: Connection.Protocol) u16 { - return uri.port orelse switch (protocol) { - .plain => 80, - .tls => 443, - }; -} - -/// Open a connection to the host specified by `uri` and prepare to send a HTTP request. -/// -/// `uri` must remain alive during the entire request. -/// -/// The caller is responsible for calling `deinit()` on the `Request`. -/// This function is threadsafe. -/// -/// Asserts that "\r\n" does not occur in any header name or value. -pub fn open( - client: *Client, - method: http.Method, - uri: Uri, - options: RequestOptions, -) RequestError!Request { - if (std.debug.runtime_safety) { - for (options.extra_headers) |header| { - assert(header.name.len != 0); - assert(std.mem.indexOfScalar(u8, header.name, ':') == null); - assert(std.mem.indexOfPosLinear(u8, header.name, 0, "\r\n") == null); - assert(std.mem.indexOfPosLinear(u8, header.value, 0, "\r\n") == null); - } - for (options.privileged_headers) |header| { - assert(header.name.len != 0); - assert(std.mem.indexOfPosLinear(u8, header.name, 0, "\r\n") == null); - assert(std.mem.indexOfPosLinear(u8, header.value, 0, "\r\n") == null); - } - } - - var server_header = std.heap.FixedBufferAllocator.init(options.server_header_buffer); - const protocol, const valid_uri = try validateUri(uri, server_header.allocator()); - - if (protocol == .tls and @atomicLoad(bool, &client.next_https_rescan_certs, .acquire)) { - if (disable_tls) unreachable; - - client.ca_bundle_mutex.lock(); - defer client.ca_bundle_mutex.unlock(); - - if (client.next_https_rescan_certs) { - client.ca_bundle.rescan(client.allocator) catch - return error.CertificateBundleLoadFailure; - @atomicStore(bool, &client.next_https_rescan_certs, false, .release); - } - } - - const conn = options.connection orelse - try client.connect(valid_uri.host.?.raw, uriPort(valid_uri, protocol), protocol); - - var req: Request = .{ - .uri = valid_uri, - .client = client, - .connection = conn, - .keep_alive = options.keep_alive, - .method = method, - .version = options.version, - .transfer_encoding = .none, - .redirect_behavior = options.redirect_behavior, - .handle_continue = options.handle_continue, - .response = .{ - .version = undefined, - .status = undefined, - .reason = undefined, - .keep_alive = undefined, - .parser = proto.HeadersParser.init(server_header.buffer[server_header.end_index..]), - }, - .headers = options.headers, - .extra_headers = options.extra_headers, - .privileged_headers = options.privileged_headers, - }; - errdefer req.deinit(); - - return req; -} - -pub const FetchOptions = struct { - server_header_buffer: ?[]u8 = null, - redirect_behavior: ?Request.RedirectBehavior = null, - - /// If the server sends a body, it will be appended to this ArrayList. - /// `max_append_size` provides an upper limit for how much they can grow. - response_storage: ResponseStorage = .ignore, - max_append_size: ?usize = null, - - location: Location, - method: ?http.Method = null, - payload: ?[]const u8 = null, - raw_uri: bool = false, - keep_alive: bool = true, - - /// Standard headers that have default, but overridable, behavior. - headers: Request.Headers = .{}, - /// These headers are kept including when following a redirect to a - /// different domain. - /// Externally-owned; must outlive the Request. - extra_headers: []const http.Header = &.{}, - /// These headers are stripped when following a redirect to a different - /// domain. - /// Externally-owned; must outlive the Request. - privileged_headers: []const http.Header = &.{}, - - pub const Location = union(enum) { - url: []const u8, - uri: Uri, - }; - - pub const ResponseStorage = union(enum) { - ignore, - /// Only the existing capacity will be used. - static: *std.ArrayListUnmanaged(u8), - dynamic: *std.ArrayList(u8), - }; -}; - -pub const FetchResult = struct { - status: http.Status, -}; - -/// Perform a one-shot HTTP request with the provided options. -/// -/// This function is threadsafe. -pub fn fetch(client: *Client, options: FetchOptions) !FetchResult { - const uri = switch (options.location) { - .url => |u| try Uri.parse(u), - .uri => |u| u, - }; - var server_header_buffer: [16 * 1024]u8 = undefined; - - const method: http.Method = options.method orelse - if (options.payload != null) .POST else .GET; - - var req = try open(client, method, uri, .{ - .server_header_buffer = options.server_header_buffer orelse &server_header_buffer, - .redirect_behavior = options.redirect_behavior orelse - if (options.payload == null) @enumFromInt(3) else .unhandled, - .headers = options.headers, - .extra_headers = options.extra_headers, - .privileged_headers = options.privileged_headers, - .keep_alive = options.keep_alive, - }); - defer req.deinit(); - - if (options.payload) |payload| req.transfer_encoding = .{ .content_length = payload.len }; - - try req.send(); - - if (options.payload) |payload| try req.writeAll(payload); - - try req.finish(); - try req.wait(); - - switch (options.response_storage) { - .ignore => { - // Take advantage of request internals to discard the response body - // and make the connection available for another request. - req.response.skip = true; - assert(try req.transferRead(&.{}) == 0); // No buffer is necessary when skipping. - }, - .dynamic => |list| { - const max_append_size = options.max_append_size orelse 2 * 1024 * 1024; - try req.reader().readAllArrayList(list, max_append_size); - }, - .static => |list| { - const buf = b: { - const buf = list.unusedCapacitySlice(); - if (options.max_append_size) |len| { - if (len < buf.len) break :b buf[0..len]; - } - break :b buf; - }; - list.items.len += try req.reader().readAll(buf); - }, - } - - return .{ - .status = req.response.status, - }; -} - -test { - _ = &initDefaultProxies; -} diff --git a/src/http/client.zig b/src/http/client.zig new file mode 100644 index 00000000..63e36f76 --- /dev/null +++ b/src/http/client.zig @@ -0,0 +1,1350 @@ +const std = @import("std"); +const builtin = @import("builtin"); + +const os = std.os; +const posix = std.posix; +const Thread = std.Thread; +const Allocator = std.mem.Allocator; +const MemoryPool = std.heap.MemoryPool; +const ArenaAllocator = std.heap.ArenaAllocator; + +const tls = @import("tls"); +const jsruntime = @import("jsruntime"); +const IO = jsruntime.IO; +const Loop = jsruntime.Loop; + +const log = std.log.scoped(.http_client); + +// The longest individual header line that we support +const MAX_HEADER_LINE_LEN = 4096; + +// tls.max_ciphertext_record_len which isn't exposed +const BUFFER_LEN = (1 << 14) + 256 + 5; + +const TLSConnection = tls.Connection(std.net.Stream); +const HeaderList = std.ArrayListUnmanaged(std.http.Header); + +pub const Client = struct { + allocator: Allocator, + state_pool: StatePool, + root_ca: tls.config.CertBundle, + + pub fn init(allocator: Allocator, max_concurrent: usize) !Client { + var root_ca = try tls.config.CertBundle.fromSystem(allocator); + errdefer root_ca.deinit(allocator); + + const state_pool = try StatePool.init(allocator, max_concurrent); + errdefer state_pool.deinit(allocator); + + return .{ + .root_ca = root_ca, + .allocator = allocator, + .state_pool = state_pool, + }; + } + + pub fn deinit(self: *Client) void { + const allocator = self.allocator; + self.root_ca.deinit(allocator); + self.state_pool.deinit(allocator); + } + + pub fn request(self: *Client, method: Request.Method, url: anytype) !Request { + const state = self.state_pool.acquire(); + + errdefer { + state.reset(); + self.state_pool.release(state); + } + + return Request.init(self, state, method, url); + } +}; + +pub const Request = struct { + secure: bool, + method: Method, + uri: std.Uri, + body: ?[]const u8, + arena: Allocator, + headers: HeaderList, + _buf: []u8, + _socket: ?posix.socket_t, + _state: *State, + _client: *Client, + _has_host_header: bool, + + pub const Method = enum { + GET, + PUT, + HEAD, + POST, + DELETE, + OPTIONS, + + pub fn format(self: Method, comptime _: []const u8, _: std.fmt.FormatOptions, writer: anytype) !void { + return writer.writeAll(@tagName(self)); + } + }; + + fn init(client: *Client, state: *State, method: Method, url: anytype) !Request { + var arena = state.arena.allocator(); + + var uri: std.Uri = undefined; + + if (@TypeOf(url) == std.Uri) { + uri = url; + } else { + const owned = try arena.dupe(u8, url); + uri = try std.Uri.parse(owned); + } + + if (uri.host == null) { + return error.UriMissingHost; + } + + var secure: bool = false; + if (std.ascii.eqlIgnoreCase(uri.scheme, "https")) { + secure = true; + } else if (std.ascii.eqlIgnoreCase(uri.scheme, "http") == false) { + return error.UnsupportedUriScheme; + } + + return .{ + .secure = secure, + .uri = uri, + .method = method, + .body = null, + .headers = .{}, + .arena = arena, + ._buf = state.buf, + ._socket = null, + ._state = state, + ._client = client, + ._has_host_header = false, + }; + } + + pub fn deinit(self: *Request) void { + if (self._socket) |socket| { + posix.close(socket); + self._socket = null; + } + _ = self._state.reset(); + self._client.state_pool.release(self._state); + } + + const AddHeaderOpts = struct { + dupe_name: bool = false, + dupe_value: bool = false, + }; + pub fn addHeader(self: *Request, name: []const u8, value: []const u8, opts: AddHeaderOpts) !void { + const arena = self.arena; + + var owned_name = name; + var owned_value = value; + + if (opts.dupe_name) { + owned_name = try arena.dupe(u8, name); + } + if (opts.dupe_value) { + owned_value = try arena.dupe(u8, value); + } + + if (self._has_host_header == false and std.ascii.eqlIgnoreCase(name, "host")) { + self._has_host_header = true; + } + + try self.headers.append(arena, .{ .name = owned_name, .value = owned_value }); + } + + // TODO timeout + const SendSyncOpts = struct {}; + pub fn sendSync(self: *Request, _: SendSyncOpts) !Response { + try self.prepareToSend(); + const socket, const address = try self.createSocket(true); + try posix.connect(socket, &address.any, address.getOsSockLen()); + + const header = try self.buildHeader(); + var stream = std.net.Stream{ .handle = socket }; + + var tls_conn: ?TLSConnection = null; + if (self.secure) { + var conn = try tls.client(stream, .{ + .host = self.host(), + .root_ca = self._client.root_ca, + }); + + try conn.writeAll(header); + if (self.body) |body| { + try conn.writeAll(body); + } + tls_conn = conn; + } else if (self.body) |body| { + var vec = [2]posix.iovec_const{ + .{ .len = header.len, .base = header.ptr }, + .{ .len = body.len, .base = body.ptr }, + }; + try writeAllIOVec(socket, &vec); + } else { + try stream.writeAll(header); + } + + var buf = self._state.buf; + var reader = Reader.init(self._state); + + while (true) { + var n: usize = 0; + if (tls_conn) |*conn| { + n = try conn.read(buf); + } else { + n = try stream.read(buf); + } + + if (n == 0) { + return error.ConnectionResetByPeer; + } + const result = try reader.process(buf[0..n]); + if (result.header) { + std.debug.assert(result.done or reader.body_reader != null); + return .{ + ._buf = buf, + ._request = self, + ._reader = reader, + ._done = result.done, + ._tls_conn = tls_conn, + ._data = result.data, + ._socket = self._socket.?, + .header = reader.response, + }; + } + } + } + + const SendAsyncOpts = struct {}; + pub fn sendAsync(self: *Request, loop: anytype, handler: anytype, _: SendAsyncOpts) !void { + try self.prepareToSend(); + // TODO: change this to nonblocking (false) when we have promise resolution + const socket, const address = try self.createSocket(true); + + const AsyncHandlerT = AsyncHandler(@TypeOf(handler), @TypeOf(loop)); + const async_handler = try self.arena.create(AsyncHandlerT); + async_handler.* = .{ + .loop = loop, + .socket = socket, + .request = self, + .handler = handler, + .tls_conn = null, + .read_buf = self._state.buf, + .reader = Reader.init(self._state), + }; + if (self.secure) { + async_handler.tls_conn = try tls.asyn.Client(AsyncHandlerT.TLSHandler).init(self.arena, .{ .handler = async_handler }, .{ + .host = self.host(), + .root_ca = self._client.root_ca, + }); + } + + loop.connect(AsyncHandlerT, async_handler, &async_handler.read_completion, AsyncHandlerT.connected, socket, address); + } + + fn prepareToSend(self: *Request) !void { + const arena = self.arena; + + if (self.body) |body| { + const cl = try std.fmt.allocPrint(arena, "{d}", .{body.len}); + try self.headers.append(arena, .{ .name = "Content-Length", .value = cl }); + } + + if (!self._has_host_header) { + try self.headers.append(arena, .{ .name = "Host", .value = self.uri.host.?.percent_encoded }); + } + } + + fn createSocket(self: *Request, blocking: bool) !struct { posix.socket_t, std.net.Address } { + const host_ = self.host(); + const port: u16 = self.uri.port orelse if (self.secure) 443 else 80; + + const addresses = try std.net.getAddressList(self.arena, host_, port); + if (addresses.addrs.len == 0) { + return error.UnknownHostName; + } + + // TODO: rotate? + const address = addresses.addrs[0]; + + const sock_flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | if (blocking) @as(u32, 0) else posix.SOCK.NONBLOCK; + const socket = try posix.socket(address.any.family, sock_flags, posix.IPPROTO.TCP); + errdefer posix.close(socket); + + if (@hasDecl(posix.TCP, "NODELAY")) { + try posix.setsockopt(socket, posix.IPPROTO.TCP, posix.TCP.NODELAY, &std.mem.toBytes(@as(c_int, 1))); + } + self._socket = socket; + return .{ socket, address }; + } + + fn buildHeader(self: *Request) ![]const u8 { + const buf = self._state.header_buf; + var fbs = std.io.fixedBufferStream(buf); + var writer = fbs.writer(); + + try writer.writeAll(@tagName(self.method)); + try writer.writeByte(' '); + try self.uri.writeToStream(.{ .path = true, .query = true }, writer); + try writer.writeAll(" HTTP/1.1\r\n"); + for (self.headers.items) |header| { + try writer.writeAll(header.name); + try writer.writeAll(": "); + try writer.writeAll(header.value); + try writer.writeAll("\r\n"); + } + // TODO: remove this once we have a connection pool + try writer.writeAll("Connection: Close\r\n"); + try writer.writeAll("\r\n"); + return buf[0..fbs.pos]; + } + + fn host(self: *const Request) []const u8 { + return self.uri.host.?.percent_encoded; + } +}; + +fn AsyncHandler(comptime H: type, comptime L: type) type { + return struct { + loop: L, + handler: H, + request: *Request, + read_buf: []u8, + socket: posix.socket_t, + read_completion: IO.Completion = undefined, + send_completion: IO.Completion = undefined, + + // used for parsing the response + reader: Reader, + + // Can only ever have 1 inflight write to a socket (multiple could + // get interleaved). You'd think this isn't normally a problem: send + // the header, send the body (or maybe send them together!), but with TLS + // we have no guarantee from the library whether or not it'll want us + // to make multiple writes + send_queue: SendQueue = .{}, + + // Used to help us know if we're writing the header or the body; + state: SendState = .handshake, + + tls_conn: ?tls.asyn.Client(TLSHandler) = null, + + const Self = @This(); + const SendQueue = std.DoublyLinkedList([]const u8); + + const SendState = enum { + handshake, + header, + body, + }; + + fn deinit(self: *Self) void { + if (self.tls_conn) |*tls_conn| { + tls_conn.deinit(); + } + self.request.deinit(); + } + + fn connected(self: *Self, _: *IO.Completion, result: IO.ConnectError!void) void { + result catch |err| return self.handleError("Connection failed", err); + + if (self.tls_conn) |*tls_conn| { + tls_conn.onConnect() catch |err| { + self.handleError("TLS handshake error", err); + }; + self.receive(); + return; + } + + self.state = .header; + const header = self.request.buildHeader() catch |err| { + return self.handleError("out of memory", err); + }; + self.send(header); + } + + fn send(self: *Self, data: []const u8) void { + const node = self.request.arena.create(SendQueue.Node) catch |err| { + self.handleError("out of memory", err); + return; + }; + + node.data = data; + self.send_queue.append(node); + + if (self.send_queue.len > 1) { + // if we already had a message in the queue, then our send loop + // is already setup. + return; + } + + self.loop.send( + Self, + self, + &self.send_completion, + sent, + self.socket, + node.data, + ); + } + + fn sent(self: *Self, _: *IO.Completion, n_: IO.SendError!usize) void { + const n = n_ catch |err| { + return self.handleError("Write error", err); + }; + + const node = self.send_queue.popFirst().?; + const data = node.data; + if (n < data.len) { + // didn't send all the data, we prematurely popped this off + // (because, in most cases, it _will_ send all the data) + node.data = data[n..]; + self.send_queue.prepend(node); + } + + if (self.send_queue.first) |next| { + // we still have data to send + self.loop.send( + Self, + self, + &self.send_completion, + sent, + self.socket, + next.data, + ); + return; + } + + if (self.state == .handshake) {} + + switch (self.state) { + .handshake => { + // We're still doing our handshake. We need to wait until + // that's finished before sending the header. We might have + // more to send until then, but it'll be triggered by the + // TLS layer. + std.debug.assert(self.tls_conn != null); + }, + .body => { + // We've finished sending the body. + if (self.tls_conn == null) { + // if we aren't using TLS, then we need to start the recive loop + self.receive(); + } + }, + .header => { + // We've sent the header, we should send the body. + self.state = .body; + if (self.request.body) |body| { + if (self.tls_conn) |*tls_conn| { + tls_conn.send(body) catch |err| { + self.handleError("TLS send", err); + }; + } else { + self.send(body); + } + } else if (self.tls_conn == null) { + // There is no body, and we aren't using TLS. That means + // our receive loop hasn't been started. Time to start. + self.receive(); + } + }, + } + } + + // Normally, you'd thin of HTTP as being a straight up request-response + // and that we can send, and then receive. But with TLS, we need to receive + // while handshaking and potentially while sending data. So we're always + // receiving. + fn receive(self: *Self) void { + return self.loop.recv( + Self, + self, + &self.read_completion, + Self.received, + self.socket, + self.read_buf, + ); + } + + fn received(self: *Self, _: *IO.Completion, n_: IO.RecvError!usize) void { + const n = n_ catch |err| { + return self.handleError("Read error", err); + }; + + if (n == 0) { + return self.handleError("Connection closed", error.ConnectionResetByPeer); + } + + if (self.tls_conn) |*tls_conn| { + _ = tls_conn.onRecv(self.read_buf[0..n]) catch |err| { + self.handleError("TLS decrypt", err); + return; + }; + return; + } + + self.processData(self.read_buf[0..n]); + } + + fn processData(self: *Self, data: []u8) void { + // If result.header is true, and this is true, then this is the + // first time we're emitting a progress result + const would_be_first = self.reader.response.status == 0; + + const result = self.reader.process(data) catch |err| { + return self.handleError("Invalid server response", err); + }; + + const done = result.done; + if (result.header) { + // if we have a header, then we always emit an event, even if + // there's no data + self.handler.onHttpResponse(.{ + .first = would_be_first, + .done = done, + .data = result.data, + .header = self.reader.response, + }) catch return self.deinit(); + } + + if (done == true) { + return self.deinit(); + } + + self.receive(); + } + + fn handleError(self: *Self, comptime msg: []const u8, err: anyerror) void { + log.warn(msg ++ ": {any} ({any})", .{ err, self.request.uri }); + self.handler.onHttpResponse(error.Failed) catch {}; + self.deinit(); + } + + // Separate struct just to keep it a bit cleaner. tls.zig requires + // callbacks like "onConnect" and "send" which is a bit generic and + // is confusing with the AsyncHandler which has similar concepts. + const TLSHandler = struct { + // reference back to the AsyncHandler + handler: *Self, + + // Callback from tls.zig indicating that the handshake is complete + pub fn onConnect(self: TLSHandler) void { + var handler = self.handler; + const header = handler.request.buildHeader() catch |err| { + return handler.handleError("out of memory", err); + }; + handler.state = .header; + handler.tls_conn.?.send(header) catch |err| { + return handler.handleError("TLS send", err); + }; + } + + // tls.zig wants us to send this data + pub fn send(self: TLSHandler, data: []const u8) !void { + return self.handler.send(data); + } + + // tls.zig received data, it's givingit to us in plaintext + pub fn onRecv(self: TLSHandler, data: []u8) !void { + if (self.handler.state != .body) { + // We should not receive application-level data (which is the + // only data tls.zig will give us), if our handler hasn't sent + // the body. + self.handler.handleError("Premature server response", error.InvalidServerResonse); + return error.InvalidServerResonse; + } + + self.handler.processData(data); + } + }; + }; +} + +// Used for reading the response (both the header and the body) +const Reader = struct { + // always references state.header_buf + header_buf: []u8, + + // position in header_buf that we have valid data up until + pos: usize, + + // for populating the response headers list + arena: Allocator, + + response: ResponseHeader, + + body_reader: ?BodyReader, + + fn init(state: *State) Reader { + return .{ + .pos = 0, + .response = .{}, + .body_reader = null, + .header_buf = state.header_buf, + .arena = state.arena.allocator(), + }; + } + + fn process(self: *Reader, data: []u8) ProcessError!Result { + if (self.body_reader) |*br| { + const ok, const result = try br.process(data); + if (ok == false) { + // There's something that our body reader didn't like. It wants + // us to emit whatever data we have, but it isn't safe to keep + // the connection alive.s + std.debug.assert(result.done == true); + self.response.keepalive = false; + } + return result; + } + + // Still parsing the header + + // what data do we have leftover in `data`. + // When header_done == true, then this is part (or all) of the body + // When header_done == false, then this is a header line that we didn't + // have enough data for. + var unprocessed = data; + + // Data from a previous call to process that we weren't able to parse + const pos = self.pos; + const header_buf = self.header_buf; + + const unparsed = header_buf[0..pos]; + if (unparsed.len > 0) { + // This can get complicated, but we'll try to keep it simple, even + // if that means we'll copy a bit more than we have to. At most, + // unparsed can represent 1 header line. To have 1 complete line, we + // need to find a \n in data. + const line_end = (std.mem.indexOfScalarPos(u8, data, 0, '\n') orelse { + // data doesn't represent a complete header line. We need more data + const end = pos + data.len; + if (end > header_buf.len) { + return error.HeaderTooLarge; + } + self.pos = end; + @memcpy(self.header_buf[pos..end], data); + return .{ .done = false, .data = null, .header = false }; + }) + 1; + + const end = pos + line_end; + if (end > header_buf.len) { + return error.HeaderTooLarge; + } + + @memcpy(header_buf[pos..end], data[0..line_end]); + const done, unprocessed = try self.parseHeader(header_buf[0..end]); + + // we gave parseHeader exactly 1 header line, there should be no leftovers + std.debug.assert(unprocessed.len == 0); + + // we currently have no unprocessed header data + self.pos = 0; + + // We still [probably] have data to process which was not part of + // the previously unparsed header line + unprocessed = data[line_end..]; + + if (done) { + return self.prepareForBody(unprocessed); + } + } + + // If we're here it means that + // 1 - Had no unparsed data, and skipped the entire block above + // 2 - Had unparsed data, but we managed to "complete" it. AND, the + // unparsed data didn't represent the end of the header + // We're now trying to parse the rest of the `data` which was not + // parsed of the unparsed (unprocessed.len could be 0 here). + const done, unprocessed = try self.parseHeader(unprocessed); + if (done == false) { + const p = self.pos; // don't use pos, self.pos might have been altered + const end = p + unprocessed.len; + if (end > header_buf.len) { + return error.HeaderTooLarge; + } + @memcpy(header_buf[p..end], unprocessed); + self.pos = end; + return .{ .done = false, .data = null, .header = false }; + } + + return self.prepareForBody(unprocessed); + } + + // We're done parsing the header, and we need to (maybe) setup the + // BodyReader. `data` represents data that we have leftover after reading + // the header which, presumably, belongs to the body. + fn prepareForBody(self: *Reader, data: []u8) !Result { + const response = &self.response; + const content_length = blk: { + const cl = response.get("content-length") orelse break :blk 0; + break :blk std.fmt.parseInt(u32, cl, 10) catch { + return error.InvalidContentLength; + }; + }; + + if (content_length == 0) { + if (data.len > 0) { + // If the content-length is 0, then we should not extra data + // If we did, this connection is in a weird state + response.keepalive = false; + } + return .{ + .done = true, + .data = null, + .header = true, + }; + } + + self.body_reader = .{ .content_length = .{ .len = content_length, .read = 0 } }; + + // recursive, go we want to process whatever leftover data we have through + // our newly setup body_reader + return self.process(data); + } + + fn parseHeader(self: *Reader, data: []u8) !struct { bool, []u8 } { + var pos: usize = 0; + const arena = self.arena; + if (self.response.status == 0) { + // still don't have a status line + pos = std.mem.indexOfScalarPos(u8, data, 0, '\n') orelse { + return .{ false, data }; + }; + if (pos < 14 or data[pos - 1] != '\r') { + return error.InvalidStatusLine; + } + const protocol = data[0..9]; + if (std.mem.eql(u8, protocol, "HTTP/1.1 ")) { + self.response.keepalive = true; + } else if (std.mem.eql(u8, protocol, "HTTP/1.0 ") == false) { + return error.InvalidStatusLine; + } + self.response.status = std.fmt.parseInt(u16, data[9..12], 10) catch { + return error.InvalidStatusLine; + }; + + // skip over the \n + pos += 1; + } + + while (pos < data.len) { + if (data[pos] == '\r') { + const next = pos + 1; + if (data.len > next and data[next] == '\n') { + return .{ true, data[next + 1 ..] }; + } + } + const value_end = std.mem.indexOfScalarPos(u8, data, pos, '\n') orelse { + return .{ false, data[pos..] }; + }; + + const sep = std.mem.indexOfScalarPos(u8, data[pos..value_end], 0, ':') orelse { + return error.InvalidHeader; + }; + const name_end = pos + sep; + + const value_start = name_end + 1; + + if (value_end == value_start or data[value_end - 1] != '\r') { + return error.InvalidHeader; + } + + const name = data[pos..name_end]; + const value = data[value_start .. value_end - 1]; + + // there's a limit to what whitespace is valid here, but let's be flexible + var normalized_name = std.mem.trim(u8, name, &std.ascii.whitespace); + const normalized_value = std.mem.trim(u8, value, &std.ascii.whitespace); + + // constCast is safe here, and necessary because the std.mem.trim API is bad / broken; + normalized_name = std.ascii.lowerString(@constCast(normalized_name), normalized_name); + try self.response.headers.append(self.arena, .{ + .name = try arena.dupe(u8, normalized_name), + .value = try arena.dupe(u8, normalized_value), + }); + + // +1 to skip over the trailing \n + pos = value_end + 1; + } + return .{ false, "" }; + } + + const BodyReader = union(enum) { + content_length: ContentLength, + + fn process(self: *BodyReader, data: []u8) !struct { bool, Result } { + switch (self.*) { + inline else => |*br| return br.process(data), + } + } + + const ContentLength = struct { + len: usize, + read: usize, + + fn process(self: *ContentLength, d: []u8) !struct { bool, Result } { + const len = self.len; + var read = self.read; + const missing = len - read; + + var data = d; + var valid = true; + + if (d.len > missing) { + valid = false; + data = d[0..missing]; + } + read += data.len; + self.read = read; + + return .{ valid, .{ + .done = read == len, + .data = if (data.len == 0) null else data, + .header = true, + } }; + } + }; + }; + + const Result = struct { + done: bool, + data: ?[]u8, + header: bool, + + const NeedData = Result{ .done = true, .data = null }; + }; + + const ProcessError = error{ + HeaderTooLarge, + OutOfMemory, + InvalidHeader, + InvalidStatusLine, + InvalidContentLength, + }; +}; + +pub const ResponseHeader = struct { + status: u16 = 0, + keepalive: bool = false, + headers: HeaderList = .{}, + + // Stored header has already been lower-cased, we expect name to be lowercased + pub fn get(self: *const ResponseHeader, name: []const u8) ?[]const u8 { + for (self.headers.items) |h| { + if (std.mem.eql(u8, name, h.name)) { + return h.value; + } + } + return null; + } + + pub fn count(self: *const ResponseHeader) usize { + return self.headers.items.len; + } +}; + +// What we emit from the AsyncHandler +pub const Progress = struct { + first: bool, + + // whether or not more data should be expected + done: bool, + // A piece of data from the body + data: ?[]const u8, + + header: ResponseHeader, +}; + +// The value that we return from a synchronous requst. +pub const Response = struct { + _reader: Reader, + _request: *Request, + + _buf: []u8, + _socket: posix.socket_t, + _tls_conn: ?TLSConnection, + + _done: bool, + + // Any data we over-read while parsing the header. This will be returned on + // the first call to next(); + _data: ?[]u8 = null, + header: ResponseHeader, + + pub fn next(self: *Response) !?[]u8 { + if (self._data) |data| { + self._data = null; + return data; + } + + if (self._done) { + return null; + } + + var buf = self._buf; + var reader = &self._reader; + std.debug.assert(reader.body_reader != null); + + while (true) { + // Some content encoding might have data that doesn't result in a + // chunk of information meaningful for the application. + // So we loop + + var n: usize = 0; + if (self._tls_conn) |*tls_conn| { + n = try tls_conn.read(buf); + } else { + n = try posix.read(self._socket, buf); + } + + if (n == 0) { + self._done = true; + return null; + } + const result = try reader.process(buf[0..n]); + self._done = result.done; + if (result.data) |d| { + return d; + } + if (self._done) { + return null; + } + } + } +}; + +// Pooled and re-used when creating a request +const State = struct { + // used for reading chunks of payload data. + buf: []u8, + + // Used for keeping any unparsed header line until more data is received + // At most, this represents 1 line in the header. + header_buf: []u8, + + // Used extensively bu the TLS library. Used to optionally clone request + // headers, and always used to clone response headers. + arena: ArenaAllocator, + + fn init(allocator: Allocator, header_size: usize, buf_size: usize) !State { + const buf = try allocator.alloc(u8, buf_size); + errdefer allocator.free(buf); + + const header_buf = try allocator.alloc(u8, header_size); + errdefer allocator.free(header_buf); + + return .{ + .buf = buf, + .header_buf = header_buf, + .arena = std.heap.ArenaAllocator.init(allocator), + }; + } + + fn reset(self: *State) void { + _ = self.arena.reset(.{ .retain_with_limit = 1024 * 1024 }); + } + + fn deinit(self: *State) void { + const allocator = self.arena.child_allocator; + allocator.free(self.buf); + allocator.free(self.header_buf); + self.arena.deinit(); + } +}; + +pub const Error = error{ + Failed, +}; + +const StatePool = struct { + states: []*State, + available: usize, + mutex: Thread.Mutex, + cond: Thread.Condition, + + pub fn init(allocator: Allocator, count: usize) !StatePool { + const states = try allocator.alloc(*State, count); + errdefer allocator.free(states); + + var started: usize = 0; + errdefer for (0..started) |i| { + states[i].deinit(); + allocator.destroy(states[i]); + }; + + for (0..count) |i| { + const state = try allocator.create(State); + errdefer allocator.destroy(state); + state.* = try State.init(allocator, MAX_HEADER_LINE_LEN, BUFFER_LEN); + states[i] = state; + started += 1; + } + + return .{ + .cond = .{}, + .mutex = .{}, + .states = states, + .available = count, + }; + } + + pub fn deinit(self: *StatePool, allocator: Allocator) void { + for (self.states) |state| { + state.deinit(); + allocator.destroy(state); + } + allocator.free(self.states); + } + + pub fn acquire(self: *StatePool) *State { + self.mutex.lock(); + while (true) { + const states = self.states; + const available = self.available; + if (available == 0) { + self.cond.wait(&self.mutex); + continue; + } + const index = available - 1; + const state = states[index]; + self.available = index; + self.mutex.unlock(); + return state; + } + } + + pub fn release(self: *StatePool, state: *State) void { + self.mutex.lock(); + var states = self.states; + const available = self.available; + states[available] = state; + self.available = available + 1; + self.mutex.unlock(); + self.cond.signal(); + } +}; + +pub fn writeAllIOVec(socket: posix.socket_t, vec: []posix.iovec_const) !void { + var i: usize = 0; + while (true) { + var n = try posix.writev(socket, vec[i..]); + while (n >= vec[i].len) { + n -= vec[i].len; + i += 1; + if (i >= vec.len) { + return; + } + } + vec[i].base += n; + vec[i].len -= n; + } +} + +const testing = @import("../testing.zig"); +test "HttpClient Reader: fuzz" { + var state = try State.init(testing.allocator, 1024, 1024); + defer state.deinit(); + + var res = TestResponse.init(); + defer res.deinit(); + + // testReader randomly fragments the incoming data, hence the loop. + for (0..1000) |_| { + try testing.expectError(error.InvalidStatusLine, testReader(&state, &res, "hello\r\n\r\n")); + try testing.expectError(error.InvalidStatusLine, testReader(&state, &res, "http/1.1 200 \r\n\r\n")); + try testing.expectError(error.InvalidStatusLine, testReader(&state, &res, "HTTP/0.9 200 \r\n\r\n")); + try testing.expectError(error.InvalidStatusLine, testReader(&state, &res, "HTTP/1.1 \r\n\r\n")); + try testing.expectError(error.InvalidStatusLine, testReader(&state, &res, "HTTP/1.1 20a \r\n\r\n")); + try testing.expectError(error.InvalidStatusLine, testReader(&state, &res, "HTTP/1.1 20A \n")); + try testing.expectError(error.InvalidHeader, testReader(&state, &res, "HTTP/1.1 200 \r\nA\r\nB:1\r\n")); + + { + res.reset(); + try testReader(&state, &res, "HTTP/1.1 200 \r\n\r\n"); + try testing.expectEqual(200, res.status); + try testing.expectEqual(true, res.keepalive); + try testing.expectEqual(0, res.body.items.len); + try testing.expectEqual(0, res.headers.items.len); + } + + { + res.reset(); + try testReader(&state, &res, "HTTP/1.0 404 \r\nError: Not-Found\r\n\r\n"); + try testing.expectEqual(404, res.status); + try testing.expectEqual(false, res.keepalive); + try testing.expectEqual(0, res.body.items.len); + try res.assertHeaders(&.{ "error", "Not-Found" }); + } + + { + res.reset(); + try testReader(&state, &res, "HTTP/1.1 200 \r\nSet-Cookie: a32;max-age=60\r\nContent-Length: 12\r\n\r\nOver 9000!!!"); + try testing.expectEqual(200, res.status); + try testing.expectEqual(true, res.keepalive); + try testing.expectEqual("Over 9000!!!", res.body.items); + try res.assertHeaders(&.{ "set-cookie", "a32;max-age=60", "content-length", "12" }); + } + } + + for (0..10) |_| { + { + // large body + const body = "abcdefghijklmnopqrstuvwxyz012345689ABCDEFGHIJKLMNOPQRSTUVWXYZ" ** 10000; + res.reset(); + try testReader(&state, &res, "HTTP/1.1 200 OK\r\n Content-Length : 610000 \r\nOther: 13391AbC93\r\n\r\n" ++ body); + try testing.expectEqual(200, res.status); + try testing.expectEqual(true, res.keepalive); + try testing.expectEqual(body, res.body.items); + try res.assertHeaders(&.{ "content-length", "610000", "other", "13391AbC93" }); + } + + { + // header too big + const data = "HTTP/1.1 200 OK\r\n" ++ ("a" ** 1500); + try testing.expectError(error.HeaderTooLarge, testReader(&state, &res, data)); + } + } +} + +test "HttpClient: invalid url" { + var client = try Client.init(testing.allocator, 1); + defer client.deinit(); + + try testing.expectError(error.UnsupportedUriScheme, client.request(.GET, "://localhost")); + try testing.expectError(error.UnsupportedUriScheme, client.request(.GET, "ftp://localhost")); + try testing.expectError(error.UriMissingHost, client.request(.GET, "http:///")); +} + +test "HttpClient: sync connect error" { + var client = try Client.init(testing.allocator, 2); + defer client.deinit(); + + var req = try client.request(.GET, "HTTP://localhost:9920"); + try testing.expectError(error.ConnectionRefused, req.sendSync(.{})); +} + +test "HttpClient: sync no body" { + var client = try Client.init(testing.allocator, 2); + defer client.deinit(); + + var req = try client.request(.GET, "http://locaLhost:9582/http_client/simple"); + var res = try req.sendSync(.{}); + + try testing.expectEqual(null, try res.next()); + try testing.expectEqual(200, res.header.status); + try testing.expectEqual(2, res.header.count()); + try testing.expectEqual("close", res.header.get("connection")); + try testing.expectEqual("0", res.header.get("content-length")); +} + +test "HttpClient: async connect error" { + var loop = try jsruntime.Loop.init(testing.allocator); + defer loop.deinit(); + + const Handler = struct { + reset: *Thread.ResetEvent, + fn onHttpResponse(self: *@This(), res: Error!Progress) !void { + _ = res catch |err| { + if (err == error.Failed) { + self.reset.set(); + return; + } + std.debug.print("Expected error.ConnectionRefused, got error: {any}", .{err}); + return; + }; + std.debug.print("Expected error.ConnectionRefused, got no error", .{}); + } + }; + + var reset: Thread.ResetEvent = .{}; + var client = try Client.init(testing.allocator, 2); + defer client.deinit(); + + var req = try client.request(.GET, "HTTP://localhost:9920"); + try req.sendAsync(&loop, Handler{ .reset = &reset }, .{}); + try loop.io.run_for_ns(std.time.ns_per_ms); + try reset.timedWait(std.time.ns_per_s); +} + +test "HttpClient: async no body" { + var client = try Client.init(testing.allocator, 2); + defer client.deinit(); + + var handler = try CaptureHandler.init(); + defer handler.deinit(); + + var loop = try jsruntime.Loop.init(testing.allocator); + defer loop.deinit(); + + var req = try client.request(.GET, "HTTP://localhost:9582/http_client/simple"); + try req.sendAsync(&handler.loop, &handler, .{}); + try handler.loop.io.run_for_ns(std.time.ns_per_ms); + try handler.reset.timedWait(std.time.ns_per_s); + + const res = handler.response; + try testing.expectEqual("", res.body.items); + try testing.expectEqual(200, res.status); + try res.assertHeaders(&.{ "connection", "close", "content-length", "0" }); +} + +test "HttpClient: async with body" { + var client = try Client.init(testing.allocator, 2); + defer client.deinit(); + + var handler = try CaptureHandler.init(); + defer handler.deinit(); + + var req = try client.request(.GET, "HTTP://localhost:9582/http_client/body"); + try req.sendAsync(&handler.loop, &handler, .{}); + try handler.loop.io.run_for_ns(std.time.ns_per_ms); + try handler.reset.timedWait(std.time.ns_per_s); + + const res = handler.response; + try testing.expectEqual("over 9000!", res.body.items); + try testing.expectEqual(201, res.status); + try res.assertHeaders(&.{ + "connection", "close", + "content-length", "10", + "_host", "localhost", + "_connection", "Close", + }); +} + +const TestResponse = struct { + status: u16, + keepalive: ?bool, + arena: std.heap.ArenaAllocator, + body: std.ArrayListUnmanaged(u8), + headers: std.ArrayListUnmanaged(std.http.Header), + + fn init() TestResponse { + return .{ + .status = 0, + .keepalive = null, + .body = .{}, + .headers = .{}, + .arena = ArenaAllocator.init(testing.allocator), + }; + } + + fn deinit(self: *TestResponse) void { + self.arena.deinit(); + } + + fn reset(self: *TestResponse) void { + _ = self.arena.reset(.{ .retain_capacity = {} }); + self.status = 0; + self.keepalive = null; + self.body = .{}; + self.headers = .{}; + } + + fn assertHeaders(self: *const TestResponse, expected: []const []const u8) !void { + const actual = self.headers.items; + errdefer { + std.debug.print("Actual headers:\n", .{}); + for (actual) |a| { + std.debug.print("{s}: {s}\n", .{ a.name, a.value }); + } + } + + try testing.expectEqual(expected.len / 2, actual.len); + + var i: usize = 0; + while (i < expected.len) : (i += 2) { + const a = actual[i / 2]; + try testing.expectEqual(expected[i], a.name); + try testing.expectEqual(expected[i + 1], a.value); + } + } +}; + +const CaptureHandler = struct { + loop: jsruntime.Loop, + reset: Thread.ResetEvent, + response: TestResponse, + + fn init() !CaptureHandler { + return .{ + .reset = .{}, + .response = TestResponse.init(), + .loop = try jsruntime.Loop.init(testing.allocator), + }; + } + + fn deinit(self: *CaptureHandler) void { + self.response.deinit(); + self.loop.deinit(); + } + + fn onHttpResponse(self: *CaptureHandler, progress_: Error!Progress) !void { + self.process(progress_) catch |err| { + std.debug.print("error: {}\n", .{err}); + }; + } + + fn process(self: *CaptureHandler, progress_: Error!Progress) !void { + const progress = try progress_; + const allocator = self.response.arena.allocator(); + try self.response.body.appendSlice(allocator, progress.data orelse ""); + if (progress.done) { + self.response.status = progress.header.status; + try self.response.headers.ensureTotalCapacity(allocator, progress.header.headers.items.len); + for (progress.header.headers.items) |header| { + self.response.headers.appendAssumeCapacity(.{ + .name = try allocator.dupe(u8, header.name), + .value = try allocator.dupe(u8, header.value), + }); + } + self.response.keepalive = progress.header.keepalive; + self.reset.set(); + } + } +}; + +fn testReader(state: *State, res: *TestResponse, data: []const u8) !void { + var status: u16 = 0; + var r = Reader.init(state); + + // dupe it so that we have a mutable copy + const owned = try testing.allocator.dupe(u8, data); + defer testing.allocator.free(owned); + + var unsent = owned; + while (unsent.len > 0) { + // send part of the response + const to_send = testing.Random.intRange(usize, 1, unsent.len); + const result = try r.process(unsent[0..to_send]); + + if (status == 0) { + if (result.header) { + status = r.response.status; + } + } else { + // once set, it should not change + try testing.expectEqual(status, r.response.status); + } + + if (result.data) |d| { + try testing.expectEqual(true, result.header); + try res.body.appendSlice(res.arena.allocator(), d); + } + + if (result.done) { + res.status = status; + res.headers = r.response.headers; + res.keepalive = r.response.keepalive; + return; + } + unsent = unsent[to_send..]; + } + return error.NeverDone; +} diff --git a/src/main_tests.zig b/src/main_tests.zig index e82d3f6d..3b82a68c 100644 --- a/src/main_tests.zig +++ b/src/main_tests.zig @@ -31,7 +31,6 @@ const storage = @import("storage/storage.zig"); const url = @import("url/url.zig"); const URL = url.URL; const urlquery = @import("url/query.zig"); -const Client = @import("asyncio").Client; const Location = @import("html/location.zig").Location; const documentTestExecFn = @import("dom/document.zig").testExecFn; @@ -89,12 +88,12 @@ fn testExecFn( std.debug.print("documentHTMLClose error: {s}\n", .{@errorName(err)}); }; - var cli = Client{ .allocator = alloc }; - defer cli.deinit(); + var http_client = try @import("http/client.zig").Client.init(alloc, 5); + defer http_client.deinit(); try js_env.setUserContext(.{ .document = doc, - .httpClient = &cli, + .http_client = &http_client, }); // alias global as self and window @@ -220,6 +219,11 @@ pub fn main() !void { if (run == .all or run == .unit) { std.debug.print("\n", .{}); for (builtin.test_functions) |test_fn| { + if (std.mem.startsWith(u8, test_fn.name, "http.client.test")) { + // covered by unit test, needs a dummy server started, which + // main_test doesn't do. + continue; + } try parser.init(); defer parser.deinit(); diff --git a/src/netsurf/netsurf.zig b/src/netsurf/netsurf.zig index 7a402641..d57b4619 100644 --- a/src/netsurf/netsurf.zig +++ b/src/netsurf/netsurf.zig @@ -2181,21 +2181,28 @@ fn parseParams(enc: ?[:0]const u8) c.dom_hubbub_parser_params { fn parseData(parser: *c.dom_hubbub_parser, reader: anytype) !void { var err: c.hubbub_error = undefined; - var buffer: [1024]u8 = undefined; - var ln = buffer.len; - while (ln > 0) { - ln = try reader.read(&buffer); - err = c.dom_hubbub_parser_parse_chunk(parser, &buffer, ln); - // TODO handle encoding change error return. - // When the HTML contains a META tag with a different encoding than the - // original one, a c.DOM_HUBBUB_HUBBUB_ERR_ENCODINGCHANGE error is - // returned. - // In this case, we must restart the parsing with the new detected - // encoding. The detected encoding is stored in the document and we can - // get it with documentGetInputEncoding(). - try parserErr(err); + const TI = @typeInfo(@TypeOf(reader)); + if (TI == .pointer and @hasDecl(TI.pointer.child, "next")) { + while (try reader.next()) |data| { + err = c.dom_hubbub_parser_parse_chunk(parser, data.ptr, data.len); + try parserErr(err); + } + } else { + var buffer: [1024]u8 = undefined; + var ln = buffer.len; + while (ln > 0) { + ln = try reader.read(&buffer); + err = c.dom_hubbub_parser_parse_chunk(parser, &buffer, ln); + // TODO handle encoding change error return. + // When the HTML contains a META tag with a different encoding than the + // original one, a c.DOM_HUBBUB_HUBBUB_ERR_ENCODINGCHANGE error is + // returned. + // In this case, we must restart the parsing with the new detected + // encoding. The detected encoding is stored in the document and we can + // get it with documentGetInputEncoding(). + try parserErr(err); + } } - err = c.dom_hubbub_parser_completed(parser); try parserErr(err); } diff --git a/src/server.zig b/src/server.zig index e4a1df54..b3088c66 100644 --- a/src/server.zig +++ b/src/server.zig @@ -445,7 +445,7 @@ pub const Client = struct { }; self.mode = .websocket; - self.cdp = CDP.init(self.server.app, self); + self.cdp = try CDP.init(self.server.app, self); return self.send(arena, response); } diff --git a/src/testing.zig b/src/testing.zig index ca232688..52370d69 100644 --- a/src/testing.zig +++ b/src/testing.zig @@ -30,8 +30,8 @@ pub fn expectEqual(expected: anytype, actual: anytype) !void { return; }, .optional => { - if (actual == null) { - return std.testing.expectEqual(null, expected); + if (@typeInfo(@TypeOf(expected)) == .null) { + return std.testing.expectEqual(null, actual); } return expectEqual(expected, actual.?); }, @@ -141,3 +141,36 @@ pub fn print(comptime fmt: []const u8, args: anytype) void { pub fn app(_: anytype) *App { return App.init(allocator, .serve) catch unreachable; } + +pub const Random = struct { + var instance: ?std.Random.DefaultPrng = null; + + pub fn fill(buf: []u8) void { + var r = random(); + r.bytes(buf); + } + + pub fn fillAtLeast(buf: []u8, min: usize) []u8 { + var r = random(); + const l = r.intRangeAtMost(usize, min, buf.len); + r.bytes(buf[0..l]); + return buf; + } + + pub fn intRange(comptime T: type, min: T, max: T) T { + var r = random(); + return r.intRangeAtMost(T, min, max); + } + + pub fn random() std.Random { + if (instance == null) { + var seed: u64 = undefined; + std.posix.getrandom(std.mem.asBytes(&seed)) catch unreachable; + instance = std.Random.DefaultPrng.init(seed); + // instance = std.Random.DefaultPrng.init(0); + + } + return instance.?.random(); + } +}; +>>>>>>> eaccbd0 (replace zig-async-io and std.http.Client with a custom HTTP client) diff --git a/src/unit_tests.zig b/src/unit_tests.zig index 56fdacad..88b3af5a 100644 --- a/src/unit_tests.zig +++ b/src/unit_tests.zig @@ -60,7 +60,7 @@ pub fn main() !void { const http_thread = blk: { const address = try std.net.Address.parseIp("127.0.0.1", 9582); - const thread = try std.Thread.spawn(.{}, serveHTTP, .{address}); + const thread = try std.Thread.spawn(.{}, serveHTTP, .{ allocator, address }); break :blk thread; }; defer http_thread.join(); @@ -323,12 +323,18 @@ fn isUnnamed(t: std.builtin.TestFn) bool { return true; } -fn serveHTTP(address: std.net.Address) !void { +fn serveHTTP(allocator: Allocator, address: std.net.Address) !void { + var arena = std.heap.ArenaAllocator.init(allocator); + defer arena.deinit(); + var listener = try address.listen(.{ .reuse_address = true }); defer listener.deinit(); var read_buffer: [1024]u8 = undefined; ACCEPT: while (true) { + defer _ = arena.reset(.{ .retain_with_limit = 1024 }); + const aa = arena.allocator(); + var conn = try listener.accept(); defer conn.stream.close(); var server = std.http.Server.init(conn, &read_buffer); @@ -344,8 +350,23 @@ fn serveHTTP(address: std.net.Address) !void { const path = request.head.target; if (std.mem.eql(u8, path, "/loader")) { - try writeResponse(&request, .{ - .body = "Hello!", + try request.respond("Hello!", .{}); + } else if (std.mem.eql(u8, path, "/http_client/simple")) { + try request.respond("", .{}); + } else if (std.mem.eql(u8, path, "/http_client/body")) { + var headers: std.ArrayListUnmanaged(std.http.Header) = .{}; + + var it = request.iterateHeaders(); + while (it.next()) |hdr| { + try headers.append(aa, .{ + .name = try std.fmt.allocPrint(aa, "_{s}", .{hdr.name}), + .value = hdr.value, + }); + } + + try request.respond("over 9000!", .{ + .status = .created, + .extra_headers = headers.items, }); } } @@ -360,26 +381,16 @@ fn serveCDP(app: *App, address: std.net.Address) !void { }; } -const Response = struct { - body: []const u8 = "", - status: std.http.Status = .ok, -}; - -fn writeResponse(req: *std.http.Server.Request, res: Response) !void { - try req.respond(res.body, .{ .status = res.status }); -} - test { std.testing.refAllDecls(@import("url/query.zig")); std.testing.refAllDecls(@import("browser/dump.zig")); - std.testing.refAllDecls(@import("browser/loader.zig")); std.testing.refAllDecls(@import("browser/mime.zig")); std.testing.refAllDecls(@import("css/css.zig")); std.testing.refAllDecls(@import("css/libdom_test.zig")); std.testing.refAllDecls(@import("css/match_test.zig")); std.testing.refAllDecls(@import("css/parser.zig")); std.testing.refAllDecls(@import("generate.zig")); - std.testing.refAllDecls(@import("http/Client.zig")); + std.testing.refAllDecls(@import("http/client.zig")); std.testing.refAllDecls(@import("storage/storage.zig")); std.testing.refAllDecls(@import("storage/cookie.zig")); std.testing.refAllDecls(@import("iterator/iterator.zig")); @@ -388,4 +399,5 @@ test { std.testing.refAllDecls(@import("log.zig")); std.testing.refAllDecls(@import("datetime.zig")); std.testing.refAllDecls(@import("telemetry/telemetry.zig")); + std.testing.refAllDecls(@import("http/client.zig")); } diff --git a/src/user_context.zig b/src/user_context.zig index 3bed0108..c2ca5971 100644 --- a/src/user_context.zig +++ b/src/user_context.zig @@ -1,8 +1,8 @@ const std = @import("std"); const parser = @import("netsurf"); -const Client = @import("asyncio").Client; +const Client = @import("http/client.zig").Client; pub const UserContext = struct { document: *parser.DocumentHTML, - httpClient: *Client, + http_client: *Client, }; diff --git a/src/xhr/xhr.zig b/src/xhr/xhr.zig index 84ae6602..2434dd59 100644 --- a/src/xhr/xhr.zig +++ b/src/xhr/xhr.zig @@ -31,7 +31,7 @@ const XMLHttpRequestEventTarget = @import("event_target.zig").XMLHttpRequestEven const Mime = @import("../browser/mime.zig").Mime; const Loop = jsruntime.Loop; -const Client = @import("asyncio").Client; +const http = @import("../http/client.zig"); const parser = @import("netsurf"); @@ -95,14 +95,12 @@ pub const XMLHttpRequestBodyInit = union(XMLHttpRequestBodyInitTag) { pub const XMLHttpRequest = struct { proto: XMLHttpRequestEventTarget = XMLHttpRequestEventTarget{}, alloc: std.mem.Allocator, - cli: *Client, - io: Client.IO, + client: *http.Client, + request: ?http.Request = null, priv_state: PrivState = .new, - req: ?Client.Request = null, - ctx: ?Client.Ctx = null, - method: std.http.Method, + method: http.Request.Method, state: State, url: ?[]const u8, uri: std.Uri, @@ -125,7 +123,7 @@ pub const XMLHttpRequest = struct { withCredentials: bool = false, // TODO: response readonly attribute any response; - response_bytes: ?[]const u8 = null, + response_bytes: std.ArrayListUnmanaged(u8) = .{}, response_type: ResponseType = .Empty, response_headers: Headers, @@ -133,7 +131,7 @@ pub const XMLHttpRequest = struct { // use 16KB for headers buffer size. response_header_buffer: [1024 * 16]u8 = undefined, - response_status: u10 = 0, + response_status: u16 = 0, // TODO uncomment this field causes casting issue with // XMLHttpRequestEventTarget. I think it's dueto an alignement issue, but @@ -246,13 +244,6 @@ pub const XMLHttpRequest = struct { fn all(self: Headers) []std.http.Header { return self.list.items; } - - fn load(self: *Headers, it: *std.http.HeaderIterator) !void { - while (true) { - const h = it.next() orelse break; - _ = try self.append(h.name, h.value); - } - } }; const Response = union(ResponseType) { @@ -290,17 +281,16 @@ pub const XMLHttpRequest = struct { const min_delay: u64 = 50000000; // 50ms - pub fn constructor(alloc: std.mem.Allocator, loop: *Loop, userctx: UserContext) !XMLHttpRequest { + pub fn constructor(alloc: std.mem.Allocator, userctx: UserContext) !XMLHttpRequest { return .{ .alloc = alloc, .headers = Headers.init(alloc), .response_headers = Headers.init(alloc), - .io = Client.IO.init(loop), .method = undefined, .url = null, .uri = undefined, .state = .unsent, - .cli = userctx.httpClient, + .client = userctx.http_client, }; } @@ -311,7 +301,6 @@ pub const XMLHttpRequest = struct { if (self.payload) |v| alloc.free(v); self.payload = null; - if (self.response_bytes) |v| alloc.free(v); if (self.response_obj) |v| v.deinit(); self.response_obj = null; @@ -329,12 +318,6 @@ pub const XMLHttpRequest = struct { self.send_flag = false; self.priv_state = .new; - - if (self.ctx) |*c| c.deinit(); - self.ctx = null; - - if (self.req) |*r| r.deinit(); - self.req = null; } pub fn deinit(self: *XMLHttpRequest, alloc: std.mem.Allocator) void { @@ -449,7 +432,7 @@ pub const XMLHttpRequest = struct { } const methods = [_]struct { - tag: std.http.Method, + tag: http.Request.Method, name: []const u8, }{ .{ .tag = .DELETE, .name = "DELETE" }, @@ -461,7 +444,7 @@ pub const XMLHttpRequest = struct { }; const methods_forbidden = [_][]const u8{ "CONNECT", "TRACE", "TRACK" }; - pub fn validMethod(m: []const u8) DOMError!std.http.Method { + pub fn validMethod(m: []const u8) DOMError!http.Request.Method { for (methods) |method| { if (std.ascii.eqlIgnoreCase(method.name, m)) { return method.tag; @@ -485,7 +468,7 @@ pub const XMLHttpRequest = struct { } // TODO body can be either a XMLHttpRequestBodyInit or a document - pub fn _send(self: *XMLHttpRequest, alloc: std.mem.Allocator, body: ?[]const u8) !void { + pub fn _send(self: *XMLHttpRequest, loop: *Loop, alloc: std.mem.Allocator, body: ?[]const u8) !void { if (self.state != .opened) return DOMError.InvalidState; if (self.send_flag) return DOMError.InvalidState; @@ -515,153 +498,77 @@ pub const XMLHttpRequest = struct { self.priv_state = .open; - self.req = try self.cli.create(self.method, self.uri, .{ - .server_header_buffer = &self.response_header_buffer, - .extra_headers = self.headers.all(), - }); - errdefer { - self.req.?.deinit(); - self.req = null; - } + self.request = try self.client.request(self.method, self.uri); - self.ctx = try Client.Ctx.init(&self.io, &self.req.?); - errdefer { - self.ctx.?.deinit(); - self.ctx = null; - } - self.ctx.?.userData = self; + var request = &self.request.?; + errdefer request.deinit(); - try self.cli.async_open( - self.method, - self.uri, - .{ .server_header_buffer = &self.response_header_buffer }, - &self.ctx.?, - onRequestConnect, - ); + for (self.headers.list.items) |hdr| { + try request.addHeader(hdr.name, hdr.value, .{}); + } + request.body = self.payload; + try request.sendAsync(loop, self, .{}); } - fn onRequestWait(ctx: *Client.Ctx, res: anyerror!void) !void { - var self = selfCtx(ctx); - res catch |err| return self.onErr(err); + pub fn onHttpResponse(self: *XMLHttpRequest, progress_: http.Error!http.Progress) !void { + const progress = progress_ catch |err| { + self.onErr(err); + return err; + }; - log.info("{any} {any} {d}", .{ self.method, self.uri, self.req.?.response.status }); + if (progress.first) { + const header = progress.header; + log.info("{any} {any} {d}", .{ self.method, self.uri, header.status }); - self.priv_state = .done; - var it = self.req.?.response.iterateHeaders(); - self.response_headers.load(&it) catch |e| return self.onErr(e); + self.priv_state = .done; - // extract a mime type from headers. - const ct = self.response_headers.getFirstValue("Content-Type") orelse "text/xml"; - self.response_mime = Mime.parse(self.alloc, ct) catch |e| return self.onErr(e); + for (header.headers.items) |hdr| { + try self.response_headers.append(hdr.name, hdr.value); + } - // TODO handle override mime type + // extract a mime type from headers. + const ct = header.get("Content-Type") orelse "text/xml"; + self.response_mime = Mime.parse(self.alloc, ct) catch |e| return self.onErr(e); - self.state = .headers_received; - self.dispatchEvt("readystatechange"); - - self.response_status = @intFromEnum(self.req.?.response.status); - - var buf: std.ArrayListUnmanaged(u8) = .{}; - - // TODO set correct length - const total = 0; - var loaded: u64 = 0; - - // dispatch a progress event loadstart. - self.dispatchProgressEvent("loadstart", .{ .loaded = loaded, .total = total }); - - // TODO read async - const reader = self.req.?.reader(); - var buffer: [1024]u8 = undefined; - var ln = buffer.len; - var prev_dispatch: ?std.time.Instant = null; - while (ln > 0) { - ln = reader.read(&buffer) catch |e| { - buf.deinit(self.alloc); - return self.onErr(e); - }; - buf.appendSlice(self.alloc, buffer[0..ln]) catch |e| { - buf.deinit(self.alloc); - return self.onErr(e); - }; - loaded = loaded + ln; - - // Dispatch only if 50ms have passed. - const now = std.time.Instant.now() catch |e| { - buf.deinit(self.alloc); - return self.onErr(e); - }; - if (prev_dispatch != null and now.since(prev_dispatch.?) < min_delay) continue; - defer prev_dispatch = now; - - self.state = .loading; + // TODO handle override mime type + self.state = .headers_received; self.dispatchEvt("readystatechange"); - // dispatch a progress event progress. - self.dispatchProgressEvent("progress", .{ - .loaded = loaded, - .total = total, - }); - } - self.response_bytes = buf.items; - self.send_flag = false; + self.response_status = header.status; + // TODO correct total + self.dispatchProgressEvent("loadstart", .{ .loaded = 0, .total = 0 }); + + self.state = .loading; + } + + const data = progress.data orelse return; + const buf = &self.response_bytes; + + try buf.appendSlice(self.alloc, data); + const total_len = buf.items.len; + + // TODO: don't dispatch this more than once every 50ms + // dispatch a progress event progress. + self.dispatchEvt("readystatechange"); + + self.dispatchProgressEvent("progress", .{ + .total = buf.items.len, + .loaded = buf.items.len, + }); + + if (progress.done == false) { + return; + } + + self.send_flag = false; self.state = .done; self.dispatchEvt("readystatechange"); // dispatch a progress event load. - self.dispatchProgressEvent("load", .{ .loaded = loaded, .total = total }); + self.dispatchProgressEvent("load", .{ .loaded = total_len, .total = total_len }); // dispatch a progress event loadend. - self.dispatchProgressEvent("loadend", .{ .loaded = loaded, .total = total }); - - if (self.ctx) |*c| c.deinit(); - self.ctx = null; - - if (self.req) |*r| r.deinit(); - self.req = null; - } - - fn onRequestFinish(ctx: *Client.Ctx, res: anyerror!void) !void { - var self = selfCtx(ctx); - res catch |err| return self.onErr(err); - - self.priv_state = .wait; - return ctx.req.async_wait(ctx, onRequestWait) catch |e| return self.onErr(e); - } - - fn onRequestSend(ctx: *Client.Ctx, res: anyerror!void) !void { - var self = selfCtx(ctx); - res catch |err| return self.onErr(err); - - if (self.payload) |payload| { - self.priv_state = .write; - return ctx.req.async_writeAll(payload, ctx, onRequestWrite) catch |e| return self.onErr(e); - } - - self.priv_state = .finish; - return ctx.req.async_finish(ctx, onRequestFinish) catch |e| return self.onErr(e); - } - - fn onRequestWrite(ctx: *Client.Ctx, res: anyerror!void) !void { - var self = selfCtx(ctx); - res catch |err| return self.onErr(err); - self.priv_state = .finish; - return ctx.req.async_finish(ctx, onRequestFinish) catch |e| return self.onErr(e); - } - - fn onRequestConnect(ctx: *Client.Ctx, res: anyerror!void) anyerror!void { - var self = selfCtx(ctx); - res catch |err| return self.onErr(err); - - // prepare payload transfert. - if (self.payload) |v| self.req.?.transfer_encoding = .{ .content_length = v.len }; - - self.priv_state = .send; - return ctx.req.async_send(ctx, onRequestSend) catch |err| return self.onErr(err); - } - - fn selfCtx(ctx: *Client.Ctx) *XMLHttpRequest { - return @ptrCast(@alignCast(ctx.userData)); + self.dispatchProgressEvent("loadend", .{ .loaded = total_len, .total = total_len }); } fn onErr(self: *XMLHttpRequest, err: anyerror) void { @@ -675,12 +582,6 @@ pub const XMLHttpRequest = struct { self.dispatchProgressEvent("loadend", .{}); log.debug("{any} {any} {any}", .{ self.method, self.uri, self.err }); - - if (self.ctx) |*c| c.deinit(); - self.ctx = null; - - if (self.req) |*r| r.deinit(); - self.req = null; } pub fn _abort(self: *XMLHttpRequest) void { @@ -803,7 +704,7 @@ pub const XMLHttpRequest = struct { } if (self.response_type == .JSON) { - if (self.response_bytes == null) return null; + if (self.response_bytes.items.len == 0) return null; // TODO Let jsonObject be the result of running parse JSON from bytes // on this’s received bytes. If that threw an exception, then return @@ -841,7 +742,7 @@ pub const XMLHttpRequest = struct { }; defer alloc.free(ccharset); - var fbs = std.io.fixedBufferStream(self.response_bytes.?); + var fbs = std.io.fixedBufferStream(self.response_bytes.items); const doc = parser.documentHTMLParse(fbs.reader(), ccharset) catch { self.response_obj = .{ .Failure = true }; return; @@ -862,7 +763,7 @@ pub const XMLHttpRequest = struct { const p = std.json.parseFromSlice( JSONValue, alloc, - self.response_bytes.?, + self.response_bytes.items, .{}, ) catch |e| { log.err("parse JSON: {}", .{e}); @@ -875,8 +776,7 @@ pub const XMLHttpRequest = struct { pub fn get_responseText(self: *XMLHttpRequest) ![]const u8 { if (self.response_type != .Empty and self.response_type != .Text) return DOMError.InvalidState; - - return if (self.response_bytes) |v| v else ""; + return self.response_bytes.items; } pub fn _getResponseHeader(self: *XMLHttpRequest, name: []const u8) ?[]const u8 { diff --git a/vendor/tls.zig b/vendor/tls.zig deleted file mode 160000 index 7eb35dab..00000000 --- a/vendor/tls.zig +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 7eb35dabf8798a88e15ddb7cd409e4f0b15912c4