diff --git a/.gitmodules b/.gitmodules index 2ceea970..229d1a16 100644 --- a/.gitmodules +++ b/.gitmodules @@ -25,3 +25,6 @@ [submodule "vendor/tls.zig"] path = vendor/tls.zig url = git@github.com:ianic/tls.zig.git +[submodule "vendor/zig-async-io"] + path = vendor/zig-async-io + url = git@github.com:lightpanda-io/zig-async-io.git diff --git a/build.zig b/build.zig index 86ad4ef9..8c83d648 100644 --- a/build.zig +++ b/build.zig @@ -159,6 +159,11 @@ fn common( netsurf.addImport("jsruntime", jsruntimemod); step.root_module.addImport("netsurf", netsurf); + const asyncio = b.addModule("asyncio", .{ + .root_source_file = b.path("vendor/zig-async-io/src/lib.zig"), + }); + step.root_module.addImport("asyncio", asyncio); + const tlsmod = b.addModule("tls", .{ .root_source_file = b.path("vendor/tls.zig/src/main.zig"), }); diff --git a/src/async/Client.zig b/src/async/Client.zig deleted file mode 100644 index 91748b9a..00000000 --- a/src/async/Client.zig +++ /dev/null @@ -1,1766 +0,0 @@ -// Copyright (C) 2023-2024 Lightpanda (Selecy SAS) -// -// Francis Bouvier -// Pierre Tachoire -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -//! HTTP(S) Client implementation. -//! -//! Connections are opened in a thread-safe manner, but individual Requests are not. -//! -//! TLS support may be disabled via `std.options.http_disable_tls`. - -const std = @import("std"); -const builtin = @import("builtin"); -const Stream = @import("stream.zig").Stream; -const testing = std.testing; -const http = std.http; -const mem = std.mem; -const net = std.net; -const Uri = std.Uri; -const Allocator = mem.Allocator; -const assert = std.debug.assert; -const use_vectors = builtin.zig_backend != .stage2_x86_64; - -const Client = @This(); -const proto = std.http.protocol; - -const tls23 = @import("tls"); - -const Loop = @import("jsruntime").Loop; -const tcp = @import("tcp.zig"); - -pub const disable_tls = std.options.http_disable_tls; - -/// Used for all client allocations. Must be thread-safe. -allocator: Allocator, - -// std.net.Stream implementation using jsruntime Loop -loop: *Loop, - -ca_bundle: if (disable_tls) void else std.crypto.Certificate.Bundle = if (disable_tls) {} else .{}, -ca_bundle_mutex: std.Thread.Mutex = .{}, - -/// When this is `true`, the next time this client performs an HTTPS request, -/// it will first rescan the system for root certificates. -next_https_rescan_certs: bool = true, - -/// The pool of connections that can be reused (and currently in use). -connection_pool: ConnectionPool = .{}, - -/// If populated, all http traffic travels through this third party. -/// This field cannot be modified while the client has active connections. -/// Pointer to externally-owned memory. -http_proxy: ?*Proxy = null, -/// If populated, all https traffic travels through this third party. -/// This field cannot be modified while the client has active connections. -/// Pointer to externally-owned memory. -https_proxy: ?*Proxy = null, - -/// A set of linked lists of connections that can be reused. -pub const ConnectionPool = struct { - mutex: std.Thread.Mutex = .{}, - /// Open connections that are currently in use. - used: Queue = .{}, - /// Open connections that are not currently in use. - free: Queue = .{}, - free_len: usize = 0, - free_size: usize = 32, - - /// The criteria for a connection to be considered a match. - pub const Criteria = struct { - host: []const u8, - port: u16, - protocol: Connection.Protocol, - }; - - const Queue = std.DoublyLinkedList(Connection); - pub const Node = Queue.Node; - - /// Finds and acquires a connection from the connection pool matching the criteria. This function is threadsafe. - /// If no connection is found, null is returned. - pub fn findConnection(pool: *ConnectionPool, criteria: Criteria) ?*Connection { - pool.mutex.lock(); - defer pool.mutex.unlock(); - - var next = pool.free.last; - while (next) |node| : (next = node.prev) { - if (node.data.protocol != criteria.protocol) continue; - if (node.data.port != criteria.port) continue; - - // Domain names are case-insensitive (RFC 5890, Section 2.3.2.4) - if (!std.ascii.eqlIgnoreCase(node.data.host, criteria.host)) continue; - - pool.acquireUnsafe(node); - return &node.data; - } - - return null; - } - - /// Acquires an existing connection from the connection pool. This function is not threadsafe. - pub fn acquireUnsafe(pool: *ConnectionPool, node: *Node) void { - pool.free.remove(node); - pool.free_len -= 1; - - pool.used.append(node); - } - - /// Acquires an existing connection from the connection pool. This function is threadsafe. - pub fn acquire(pool: *ConnectionPool, node: *Node) void { - pool.mutex.lock(); - defer pool.mutex.unlock(); - - return pool.acquireUnsafe(node); - } - - /// Tries to release a connection back to the connection pool. This function is threadsafe. - /// If the connection is marked as closing, it will be closed instead. - /// - /// The allocator must be the owner of all nodes in this pool. - /// The allocator must be the owner of all resources associated with the connection. - pub fn release(pool: *ConnectionPool, allocator: Allocator, connection: *Connection) void { - pool.mutex.lock(); - defer pool.mutex.unlock(); - - const node: *Node = @fieldParentPtr("data", connection); - - pool.used.remove(node); - - if (node.data.closing or pool.free_size == 0) { - node.data.close(allocator); - return allocator.destroy(node); - } - - if (pool.free_len >= pool.free_size) { - const popped = pool.free.popFirst() orelse unreachable; - pool.free_len -= 1; - - popped.data.close(allocator); - allocator.destroy(popped); - } - - if (node.data.proxied) { - pool.free.prepend(node); // proxied connections go to the end of the queue, always try direct connections first - } else { - pool.free.append(node); - } - - pool.free_len += 1; - } - - /// Adds a newly created node to the pool of used connections. This function is threadsafe. - pub fn addUsed(pool: *ConnectionPool, node: *Node) void { - pool.mutex.lock(); - defer pool.mutex.unlock(); - - pool.used.append(node); - } - - /// Resizes the connection pool. This function is threadsafe. - /// - /// If the new size is smaller than the current size, then idle connections will be closed until the pool is the new size. - pub fn resize(pool: *ConnectionPool, allocator: Allocator, new_size: usize) void { - pool.mutex.lock(); - defer pool.mutex.unlock(); - - const next = pool.free.first; - _ = next; - while (pool.free_len > new_size) { - const popped = pool.free.popFirst() orelse unreachable; - pool.free_len -= 1; - - popped.data.close(allocator); - allocator.destroy(popped); - } - - pool.free_size = new_size; - } - - /// Frees the connection pool and closes all connections within. This function is threadsafe. - /// - /// All future operations on the connection pool will deadlock. - pub fn deinit(pool: *ConnectionPool, allocator: Allocator) void { - pool.mutex.lock(); - - var next = pool.free.first; - while (next) |node| { - defer allocator.destroy(node); - next = node.next; - - node.data.close(allocator); - } - - next = pool.used.first; - while (next) |node| { - defer allocator.destroy(node); - next = node.next; - - node.data.close(allocator); - } - - pool.* = undefined; - } -}; - -/// An interface to either a plain or TLS connection. -pub const Connection = struct { - stream: Stream, - /// undefined unless protocol is tls. - tls_client: if (!disable_tls) *tls23.Connection(Stream) else void, - - /// The protocol that this connection is using. - protocol: Protocol, - - /// The host that this connection is connected to. - host: []u8, - - /// The port that this connection is connected to. - port: u16, - - /// Whether this connection is proxied and is not directly connected. - proxied: bool = false, - - /// Whether this connection is closing when we're done with it. - closing: bool = false, - - read_start: BufferSize = 0, - read_end: BufferSize = 0, - write_end: BufferSize = 0, - read_buf: [buffer_size]u8 = undefined, - write_buf: [buffer_size]u8 = undefined, - - pub const buffer_size = std.crypto.tls.max_ciphertext_record_len; - const BufferSize = std.math.IntFittingRange(0, buffer_size); - - pub const Protocol = enum { plain, tls }; - - pub fn readvDirectTls(conn: *Connection, buffers: []std.posix.iovec) ReadError!usize { - return conn.tls_client.readv(buffers) catch |err| { - // https://github.com/ziglang/zig/issues/2473 - if (mem.startsWith(u8, @errorName(err), "TlsAlert")) return error.TlsAlert; - - switch (err) { - error.TlsRecordOverflow, error.TlsBadRecordMac, error.TlsUnexpectedMessage => return error.TlsFailure, - error.ConnectionTimedOut => return error.ConnectionTimedOut, - error.ConnectionResetByPeer, error.BrokenPipe => return error.ConnectionResetByPeer, - else => return error.UnexpectedReadFailure, - } - }; - } - - pub fn readvDirect(conn: *Connection, buffers: []std.posix.iovec) ReadError!usize { - if (conn.protocol == .tls) { - if (disable_tls) unreachable; - - return conn.readvDirectTls(buffers); - } - - return conn.stream.readv(buffers) catch |err| switch (err) { - error.ConnectionTimedOut => return error.ConnectionTimedOut, - error.ConnectionResetByPeer, error.BrokenPipe => return error.ConnectionResetByPeer, - else => return error.UnexpectedReadFailure, - }; - } - - /// Refills the read buffer with data from the connection. - pub fn fill(conn: *Connection) ReadError!void { - if (conn.read_end != conn.read_start) return; - - var iovecs = [1]std.posix.iovec{ - .{ .base = &conn.read_buf, .len = conn.read_buf.len }, - }; - const nread = try conn.readvDirect(&iovecs); - if (nread == 0) return error.EndOfStream; - conn.read_start = 0; - conn.read_end = @intCast(nread); - } - - /// Returns the current slice of buffered data. - pub fn peek(conn: *Connection) []const u8 { - return conn.read_buf[conn.read_start..conn.read_end]; - } - - /// Discards the given number of bytes from the read buffer. - pub fn drop(conn: *Connection, num: BufferSize) void { - conn.read_start += num; - } - - /// Reads data from the connection into the given buffer. - pub fn read(conn: *Connection, buffer: []u8) ReadError!usize { - const available_read = conn.read_end - conn.read_start; - const available_buffer = buffer.len; - - if (available_read > available_buffer) { // partially read buffered data - @memcpy(buffer[0..available_buffer], conn.read_buf[conn.read_start..conn.read_end][0..available_buffer]); - conn.read_start += @intCast(available_buffer); - - return available_buffer; - } else if (available_read > 0) { // fully read buffered data - @memcpy(buffer[0..available_read], conn.read_buf[conn.read_start..conn.read_end]); - conn.read_start += available_read; - - return available_read; - } - - var iovecs = [2]std.posix.iovec{ - .{ .base = buffer.ptr, .len = buffer.len }, - .{ .base = &conn.read_buf, .len = conn.read_buf.len }, - }; - const nread = try conn.readvDirect(&iovecs); - - if (nread > buffer.len) { - conn.read_start = 0; - conn.read_end = @intCast(nread - buffer.len); - return buffer.len; - } - - return nread; - } - - pub const ReadError = error{ - TlsFailure, - TlsAlert, - ConnectionTimedOut, - ConnectionResetByPeer, - UnexpectedReadFailure, - EndOfStream, - }; - - pub const Reader = std.io.Reader(*Connection, ReadError, read); - - pub fn reader(conn: *Connection) Reader { - return Reader{ .context = conn }; - } - - pub fn writeAllDirectTls(conn: *Connection, buffer: []const u8) WriteError!void { - return conn.tls_client.writeAll(buffer) catch |err| switch (err) { - error.BrokenPipe, error.ConnectionResetByPeer => return error.ConnectionResetByPeer, - else => return error.UnexpectedWriteFailure, - }; - } - - pub fn writeAllDirect(conn: *Connection, buffer: []const u8) WriteError!void { - if (conn.protocol == .tls) { - if (disable_tls) unreachable; - - return conn.writeAllDirectTls(buffer); - } - - return conn.stream.writeAll(buffer) catch |err| switch (err) { - error.BrokenPipe, error.ConnectionResetByPeer => return error.ConnectionResetByPeer, - else => return error.UnexpectedWriteFailure, - }; - } - - /// Writes the given buffer to the connection. - pub fn write(conn: *Connection, buffer: []const u8) WriteError!usize { - if (conn.write_buf.len - conn.write_end < buffer.len) { - try conn.flush(); - - if (buffer.len > conn.write_buf.len) { - try conn.writeAllDirect(buffer); - return buffer.len; - } - } - - @memcpy(conn.write_buf[conn.write_end..][0..buffer.len], buffer); - conn.write_end += @intCast(buffer.len); - - return buffer.len; - } - - /// Returns a buffer to be filled with exactly len bytes to write to the connection. - pub fn allocWriteBuffer(conn: *Connection, len: BufferSize) WriteError![]u8 { - if (conn.write_buf.len - conn.write_end < len) try conn.flush(); - defer conn.write_end += len; - return conn.write_buf[conn.write_end..][0..len]; - } - - /// Flushes the write buffer to the connection. - pub fn flush(conn: *Connection) WriteError!void { - if (conn.write_end == 0) return; - - try conn.writeAllDirect(conn.write_buf[0..conn.write_end]); - conn.write_end = 0; - } - - pub const WriteError = error{ - ConnectionResetByPeer, - UnexpectedWriteFailure, - }; - - pub const Writer = std.io.Writer(*Connection, WriteError, write); - - pub fn writer(conn: *Connection) Writer { - return Writer{ .context = conn }; - } - - /// Closes the connection. - pub fn close(conn: *Connection, allocator: Allocator) void { - if (conn.protocol == .tls) { - if (disable_tls) unreachable; - - // try to cleanly close the TLS connection, for any server that cares. - conn.tls_client.close() catch {}; - allocator.destroy(conn.tls_client); - } - - conn.stream.close(); - allocator.free(conn.host); - } -}; - -/// The mode of transport for requests. -pub const RequestTransfer = union(enum) { - content_length: u64, - chunked: void, - none: void, -}; - -/// The decompressor for response messages. -pub const Compression = union(enum) { - pub const DeflateDecompressor = std.compress.zlib.Decompressor(Request.TransferReader); - pub const GzipDecompressor = std.compress.gzip.Decompressor(Request.TransferReader); - // https://github.com/ziglang/zig/issues/18937 - //pub const ZstdDecompressor = std.compress.zstd.DecompressStream(Request.TransferReader, .{}); - - deflate: DeflateDecompressor, - gzip: GzipDecompressor, - // https://github.com/ziglang/zig/issues/18937 - //zstd: ZstdDecompressor, - none: void, -}; - -/// A HTTP response originating from a server. -pub const Response = struct { - version: http.Version, - status: http.Status, - reason: []const u8, - - /// Points into the user-provided `server_header_buffer`. - location: ?[]const u8 = null, - /// Points into the user-provided `server_header_buffer`. - content_type: ?[]const u8 = null, - /// Points into the user-provided `server_header_buffer`. - content_disposition: ?[]const u8 = null, - - keep_alive: bool, - - /// If present, the number of bytes in the response body. - content_length: ?u64 = null, - - /// If present, the transfer encoding of the response body, otherwise none. - transfer_encoding: http.TransferEncoding = .none, - - /// If present, the compression of the response body, otherwise identity (no compression). - transfer_compression: http.ContentEncoding = .identity, - - parser: proto.HeadersParser, - compression: Compression = .none, - - /// Whether the response body should be skipped. Any data read from the - /// response body will be discarded. - skip: bool = false, - - pub const ParseError = error{ - HttpHeadersInvalid, - HttpHeaderContinuationsUnsupported, - HttpTransferEncodingUnsupported, - HttpConnectionHeaderUnsupported, - InvalidContentLength, - CompressionUnsupported, - }; - - pub fn parse(res: *Response, bytes: []const u8) ParseError!void { - var it = mem.splitSequence(u8, bytes, "\r\n"); - - const first_line = it.next().?; - if (first_line.len < 12) { - return error.HttpHeadersInvalid; - } - - const version: http.Version = switch (int64(first_line[0..8])) { - int64("HTTP/1.0") => .@"HTTP/1.0", - int64("HTTP/1.1") => .@"HTTP/1.1", - else => return error.HttpHeadersInvalid, - }; - if (first_line[8] != ' ') return error.HttpHeadersInvalid; - const status: http.Status = @enumFromInt(parseInt3(first_line[9..12])); - const reason = mem.trimLeft(u8, first_line[12..], " "); - - res.version = version; - res.status = status; - res.reason = reason; - res.keep_alive = switch (version) { - .@"HTTP/1.0" => false, - .@"HTTP/1.1" => true, - }; - - while (it.next()) |line| { - if (line.len == 0) return; - switch (line[0]) { - ' ', '\t' => return error.HttpHeaderContinuationsUnsupported, - else => {}, - } - - var line_it = mem.splitScalar(u8, line, ':'); - const header_name = line_it.next().?; - const header_value = mem.trim(u8, line_it.rest(), " \t"); - if (header_name.len == 0) return error.HttpHeadersInvalid; - - if (std.ascii.eqlIgnoreCase(header_name, "connection")) { - res.keep_alive = !std.ascii.eqlIgnoreCase(header_value, "close"); - } else if (std.ascii.eqlIgnoreCase(header_name, "content-type")) { - res.content_type = header_value; - } else if (std.ascii.eqlIgnoreCase(header_name, "location")) { - res.location = header_value; - } else if (std.ascii.eqlIgnoreCase(header_name, "content-disposition")) { - res.content_disposition = header_value; - } else if (std.ascii.eqlIgnoreCase(header_name, "transfer-encoding")) { - // Transfer-Encoding: second, first - // Transfer-Encoding: deflate, chunked - var iter = mem.splitBackwardsScalar(u8, header_value, ','); - - const first = iter.first(); - const trimmed_first = mem.trim(u8, first, " "); - - var next: ?[]const u8 = first; - if (std.meta.stringToEnum(http.TransferEncoding, trimmed_first)) |transfer| { - if (res.transfer_encoding != .none) return error.HttpHeadersInvalid; // we already have a transfer encoding - res.transfer_encoding = transfer; - - next = iter.next(); - } - - if (next) |second| { - const trimmed_second = mem.trim(u8, second, " "); - - if (std.meta.stringToEnum(http.ContentEncoding, trimmed_second)) |transfer| { - if (res.transfer_compression != .identity) return error.HttpHeadersInvalid; // double compression is not supported - res.transfer_compression = transfer; - } else { - return error.HttpTransferEncodingUnsupported; - } - } - - if (iter.next()) |_| return error.HttpTransferEncodingUnsupported; - } else if (std.ascii.eqlIgnoreCase(header_name, "content-length")) { - const content_length = std.fmt.parseInt(u64, header_value, 10) catch return error.InvalidContentLength; - - if (res.content_length != null and res.content_length != content_length) return error.HttpHeadersInvalid; - - res.content_length = content_length; - } else if (std.ascii.eqlIgnoreCase(header_name, "content-encoding")) { - if (res.transfer_compression != .identity) return error.HttpHeadersInvalid; - - const trimmed = mem.trim(u8, header_value, " "); - - if (std.meta.stringToEnum(http.ContentEncoding, trimmed)) |ce| { - res.transfer_compression = ce; - } else { - return error.HttpTransferEncodingUnsupported; - } - } - } - return error.HttpHeadersInvalid; // missing empty line - } - - test parse { - const response_bytes = "HTTP/1.1 200 OK\r\n" ++ - "LOcation:url\r\n" ++ - "content-tYpe: text/plain\r\n" ++ - "content-disposition:attachment; filename=example.txt \r\n" ++ - "content-Length:10\r\n" ++ - "TRansfer-encoding:\tdeflate, chunked \r\n" ++ - "connectioN:\t keep-alive \r\n\r\n"; - - var header_buffer: [1024]u8 = undefined; - var res = Response{ - .status = undefined, - .reason = undefined, - .version = undefined, - .keep_alive = false, - .parser = proto.HeadersParser.init(&header_buffer), - }; - - @memcpy(header_buffer[0..response_bytes.len], response_bytes); - res.parser.header_bytes_len = response_bytes.len; - - try res.parse(response_bytes); - - try testing.expectEqual(.@"HTTP/1.1", res.version); - try testing.expectEqualStrings("OK", res.reason); - try testing.expectEqual(.ok, res.status); - - try testing.expectEqualStrings("url", res.location.?); - try testing.expectEqualStrings("text/plain", res.content_type.?); - try testing.expectEqualStrings("attachment; filename=example.txt", res.content_disposition.?); - - try testing.expectEqual(true, res.keep_alive); - try testing.expectEqual(10, res.content_length.?); - try testing.expectEqual(.chunked, res.transfer_encoding); - try testing.expectEqual(.deflate, res.transfer_compression); - } - - inline fn int64(array: *const [8]u8) u64 { - return @bitCast(array.*); - } - - fn parseInt3(text: *const [3]u8) u10 { - if (use_vectors) { - const nnn: @Vector(3, u8) = text.*; - const zero: @Vector(3, u8) = .{ '0', '0', '0' }; - const mmm: @Vector(3, u10) = .{ 100, 10, 1 }; - return @reduce(.Add, @as(@Vector(3, u10), nnn -% zero) *% mmm); - } - return std.fmt.parseInt(u10, text, 10) catch unreachable; - } - - test parseInt3 { - const expectEqual = testing.expectEqual; - try expectEqual(@as(u10, 0), parseInt3("000")); - try expectEqual(@as(u10, 418), parseInt3("418")); - try expectEqual(@as(u10, 999), parseInt3("999")); - } - - pub fn iterateHeaders(r: Response) http.HeaderIterator { - return http.HeaderIterator.init(r.parser.get()); - } - - test iterateHeaders { - const response_bytes = "HTTP/1.1 200 OK\r\n" ++ - "LOcation:url\r\n" ++ - "content-tYpe: text/plain\r\n" ++ - "content-disposition:attachment; filename=example.txt \r\n" ++ - "content-Length:10\r\n" ++ - "TRansfer-encoding:\tdeflate, chunked \r\n" ++ - "connectioN:\t keep-alive \r\n\r\n"; - - var header_buffer: [1024]u8 = undefined; - var res = Response{ - .status = undefined, - .reason = undefined, - .version = undefined, - .keep_alive = false, - .parser = proto.HeadersParser.init(&header_buffer), - }; - - @memcpy(header_buffer[0..response_bytes.len], response_bytes); - res.parser.header_bytes_len = response_bytes.len; - - var it = res.iterateHeaders(); - { - const header = it.next().?; - try testing.expectEqualStrings("LOcation", header.name); - try testing.expectEqualStrings("url", header.value); - try testing.expect(!it.is_trailer); - } - { - const header = it.next().?; - try testing.expectEqualStrings("content-tYpe", header.name); - try testing.expectEqualStrings("text/plain", header.value); - try testing.expect(!it.is_trailer); - } - { - const header = it.next().?; - try testing.expectEqualStrings("content-disposition", header.name); - try testing.expectEqualStrings("attachment; filename=example.txt", header.value); - try testing.expect(!it.is_trailer); - } - { - const header = it.next().?; - try testing.expectEqualStrings("content-Length", header.name); - try testing.expectEqualStrings("10", header.value); - try testing.expect(!it.is_trailer); - } - { - const header = it.next().?; - try testing.expectEqualStrings("TRansfer-encoding", header.name); - try testing.expectEqualStrings("deflate, chunked", header.value); - try testing.expect(!it.is_trailer); - } - { - const header = it.next().?; - try testing.expectEqualStrings("connectioN", header.name); - try testing.expectEqualStrings("keep-alive", header.value); - try testing.expect(!it.is_trailer); - } - try testing.expectEqual(null, it.next()); - } -}; - -/// A HTTP request that has been sent. -/// -/// Order of operations: open -> send[ -> write -> finish] -> wait -> read -pub const Request = struct { - uri: Uri, - client: *Client, - /// This is null when the connection is released. - connection: ?*Connection, - keep_alive: bool, - - method: http.Method, - version: http.Version = .@"HTTP/1.1", - transfer_encoding: RequestTransfer, - redirect_behavior: RedirectBehavior, - - /// Whether the request should handle a 100-continue response before sending the request body. - handle_continue: bool, - - /// The response associated with this request. - /// - /// This field is undefined until `wait` is called. - response: Response, - - /// Standard headers that have default, but overridable, behavior. - headers: Headers, - - /// These headers are kept including when following a redirect to a - /// different domain. - /// Externally-owned; must outlive the Request. - extra_headers: []const http.Header, - - /// These headers are stripped when following a redirect to a different - /// domain. - /// Externally-owned; must outlive the Request. - privileged_headers: []const http.Header, - - pub const Headers = struct { - host: Value = .default, - authorization: Value = .default, - user_agent: Value = .default, - connection: Value = .default, - accept_encoding: Value = .default, - content_type: Value = .default, - - pub const Value = union(enum) { - default, - omit, - override: []const u8, - }; - }; - - /// Any value other than `not_allowed` or `unhandled` means that integer represents - /// how many remaining redirects are allowed. - pub const RedirectBehavior = enum(u16) { - /// The next redirect will cause an error. - not_allowed = 0, - /// Redirects are passed to the client to analyze the redirect response - /// directly. - unhandled = std.math.maxInt(u16), - _, - - pub fn subtractOne(rb: *RedirectBehavior) void { - switch (rb.*) { - .not_allowed => unreachable, - .unhandled => unreachable, - _ => rb.* = @enumFromInt(@intFromEnum(rb.*) - 1), - } - } - - pub fn remaining(rb: RedirectBehavior) u16 { - assert(rb != .unhandled); - return @intFromEnum(rb); - } - }; - - /// Frees all resources associated with the request. - pub fn deinit(req: *Request) void { - if (req.connection) |connection| { - if (!req.response.parser.done) { - // If the response wasn't fully read, then we need to close the connection. - connection.closing = true; - } - req.client.connection_pool.release(req.client.allocator, connection); - } - req.* = undefined; - } - - // This function must deallocate all resources associated with the request, - // or keep those which will be used. - // This needs to be kept in sync with deinit and request. - fn redirect(req: *Request, uri: Uri) !void { - assert(req.response.parser.done); - - req.client.connection_pool.release(req.client.allocator, req.connection.?); - req.connection = null; - - var server_header = std.heap.FixedBufferAllocator.init(req.response.parser.header_bytes_buffer); - defer req.response.parser.header_bytes_buffer = server_header.buffer[server_header.end_index..]; - const protocol, const valid_uri = try validateUri(uri, server_header.allocator()); - - const new_host = valid_uri.host.?.raw; - const prev_host = req.uri.host.?.raw; - const keep_privileged_headers = - std.ascii.eqlIgnoreCase(valid_uri.scheme, req.uri.scheme) and - std.ascii.endsWithIgnoreCase(new_host, prev_host) and - (new_host.len == prev_host.len or new_host[new_host.len - prev_host.len - 1] == '.'); - if (!keep_privileged_headers) { - // When redirecting to a different domain, strip privileged headers. - req.privileged_headers = &.{}; - } - - if (switch (req.response.status) { - .see_other => true, - .moved_permanently, .found => req.method == .POST, - else => false, - }) { - // A redirect to a GET must change the method and remove the body. - req.method = .GET; - req.transfer_encoding = .none; - req.headers.content_type = .omit; - } - - if (req.transfer_encoding != .none) { - // The request body has already been sent. The request is - // still in a valid state, but the redirect must be handled - // manually. - return error.RedirectRequiresResend; - } - - req.uri = valid_uri; - req.connection = try req.client.connect(new_host, uriPort(valid_uri, protocol), protocol); - req.redirect_behavior.subtractOne(); - req.response.parser.reset(); - - req.response = .{ - .version = undefined, - .status = undefined, - .reason = undefined, - .keep_alive = undefined, - .parser = req.response.parser, - }; - } - - pub const SendError = Connection.WriteError || error{ InvalidContentLength, UnsupportedTransferEncoding }; - - /// Send the HTTP request headers to the server. - pub fn send(req: *Request) SendError!void { - if (!req.method.requestHasBody() and req.transfer_encoding != .none) - return error.UnsupportedTransferEncoding; - - const connection = req.connection.?; - const w = connection.writer(); - - try req.method.write(w); - try w.writeByte(' '); - - if (req.method == .CONNECT) { - try req.uri.writeToStream(.{ .authority = true }, w); - } else { - try req.uri.writeToStream(.{ - .scheme = connection.proxied, - .authentication = connection.proxied, - .authority = connection.proxied, - .path = true, - .query = true, - }, w); - } - try w.writeByte(' '); - try w.writeAll(@tagName(req.version)); - try w.writeAll("\r\n"); - - if (try emitOverridableHeader("host: ", req.headers.host, w)) { - try w.writeAll("host: "); - try req.uri.writeToStream(.{ .authority = true }, w); - try w.writeAll("\r\n"); - } - - if (try emitOverridableHeader("authorization: ", req.headers.authorization, w)) { - if (req.uri.user != null or req.uri.password != null) { - try w.writeAll("authorization: "); - const authorization = try connection.allocWriteBuffer( - @intCast(basic_authorization.valueLengthFromUri(req.uri)), - ); - assert(basic_authorization.value(req.uri, authorization).len == authorization.len); - try w.writeAll("\r\n"); - } - } - - if (try emitOverridableHeader("user-agent: ", req.headers.user_agent, w)) { - try w.writeAll("user-agent: zig/"); - try w.writeAll(builtin.zig_version_string); - try w.writeAll(" (std.http)\r\n"); - } - - if (try emitOverridableHeader("connection: ", req.headers.connection, w)) { - if (req.keep_alive) { - try w.writeAll("connection: keep-alive\r\n"); - } else { - try w.writeAll("connection: close\r\n"); - } - } - - if (try emitOverridableHeader("accept-encoding: ", req.headers.accept_encoding, w)) { - // https://github.com/ziglang/zig/issues/18937 - //try w.writeAll("accept-encoding: gzip, deflate, zstd\r\n"); - try w.writeAll("accept-encoding: gzip, deflate\r\n"); - } - - switch (req.transfer_encoding) { - .chunked => try w.writeAll("transfer-encoding: chunked\r\n"), - .content_length => |len| try w.print("content-length: {d}\r\n", .{len}), - .none => {}, - } - - if (try emitOverridableHeader("content-type: ", req.headers.content_type, w)) { - // The default is to omit content-type if not provided because - // "application/octet-stream" is redundant. - } - - for (req.extra_headers) |header| { - assert(header.name.len != 0); - - try w.writeAll(header.name); - try w.writeAll(": "); - try w.writeAll(header.value); - try w.writeAll("\r\n"); - } - - if (connection.proxied) proxy: { - const proxy = switch (connection.protocol) { - .plain => req.client.http_proxy, - .tls => req.client.https_proxy, - } orelse break :proxy; - - const authorization = proxy.authorization orelse break :proxy; - try w.writeAll("proxy-authorization: "); - try w.writeAll(authorization); - try w.writeAll("\r\n"); - } - - try w.writeAll("\r\n"); - - try connection.flush(); - } - - /// Returns true if the default behavior is required, otherwise handles - /// writing (or not writing) the header. - fn emitOverridableHeader(prefix: []const u8, v: Headers.Value, w: anytype) !bool { - switch (v) { - .default => return true, - .omit => return false, - .override => |x| { - try w.writeAll(prefix); - try w.writeAll(x); - try w.writeAll("\r\n"); - return false; - }, - } - } - - const TransferReadError = Connection.ReadError || proto.HeadersParser.ReadError; - - const TransferReader = std.io.Reader(*Request, TransferReadError, transferRead); - - fn transferReader(req: *Request) TransferReader { - return .{ .context = req }; - } - - fn transferRead(req: *Request, buf: []u8) TransferReadError!usize { - if (req.response.parser.done) return 0; - - var index: usize = 0; - while (index == 0) { - const amt = try req.response.parser.read(req.connection.?, buf[index..], req.response.skip); - if (amt == 0 and req.response.parser.done) break; - index += amt; - } - - return index; - } - - pub const WaitError = RequestError || SendError || TransferReadError || - proto.HeadersParser.CheckCompleteHeadError || Response.ParseError || - error{ // TODO: file zig fmt issue for this bad indentation - TooManyHttpRedirects, - RedirectRequiresResend, - HttpRedirectLocationMissing, - HttpRedirectLocationInvalid, - CompressionInitializationFailed, - CompressionUnsupported, - }; - - /// Waits for a response from the server and parses any headers that are sent. - /// This function will block until the final response is received. - /// - /// If handling redirects and the request has no payload, then this - /// function will automatically follow redirects. If a request payload is - /// present, then this function will error with - /// error.RedirectRequiresResend. - /// - /// Must be called after `send` and, if any data was written to the request - /// body, then also after `finish`. - pub fn wait(req: *Request) WaitError!void { - while (true) { - // This while loop is for handling redirects, which means the request's - // connection may be different than the previous iteration. However, it - // is still guaranteed to be non-null with each iteration of this loop. - const connection = req.connection.?; - - while (true) { // read headers - try connection.fill(); - - const nchecked = try req.response.parser.checkCompleteHead(connection.peek()); - connection.drop(@intCast(nchecked)); - - if (req.response.parser.state.isContent()) break; - } - - try req.response.parse(req.response.parser.get()); - - if (req.response.status == .@"continue") { - // We're done parsing the continue response; reset to prepare - // for the real response. - req.response.parser.done = true; - req.response.parser.reset(); - - if (req.handle_continue) - continue; - - return; // we're not handling the 100-continue - } - - // we're switching protocols, so this connection is no longer doing http - if (req.method == .CONNECT and req.response.status.class() == .success) { - connection.closing = false; - req.response.parser.done = true; - return; // the connection is not HTTP past this point - } - - connection.closing = !req.response.keep_alive or !req.keep_alive; - - // Any response to a HEAD request and any response with a 1xx - // (Informational), 204 (No Content), or 304 (Not Modified) status - // code is always terminated by the first empty line after the - // header fields, regardless of the header fields present in the - // message. - if (req.method == .HEAD or req.response.status.class() == .informational or - req.response.status == .no_content or req.response.status == .not_modified) - { - req.response.parser.done = true; - return; // The response is empty; no further setup or redirection is necessary. - } - - switch (req.response.transfer_encoding) { - .none => { - if (req.response.content_length) |cl| { - req.response.parser.next_chunk_length = cl; - - if (cl == 0) req.response.parser.done = true; - } else { - // read until the connection is closed - req.response.parser.next_chunk_length = std.math.maxInt(u64); - } - }, - .chunked => { - req.response.parser.next_chunk_length = 0; - req.response.parser.state = .chunk_head_size; - }, - } - - if (req.response.status.class() == .redirect and req.redirect_behavior != .unhandled) { - // skip the body of the redirect response, this will at least - // leave the connection in a known good state. - req.response.skip = true; - assert(try req.transferRead(&.{}) == 0); // we're skipping, no buffer is necessary - - if (req.redirect_behavior == .not_allowed) return error.TooManyHttpRedirects; - - const location = req.response.location orelse - return error.HttpRedirectLocationMissing; - - // This mutates the beginning of header_bytes_buffer and uses that - // for the backing memory of the returned Uri. - try req.redirect(req.uri.resolve_inplace( - location, - &req.response.parser.header_bytes_buffer, - ) catch |err| switch (err) { - error.UnexpectedCharacter, - error.InvalidFormat, - error.InvalidPort, - => return error.HttpRedirectLocationInvalid, - error.NoSpaceLeft => return error.HttpHeadersOversize, - }); - try req.send(); - } else { - req.response.skip = false; - if (!req.response.parser.done) { - switch (req.response.transfer_compression) { - .identity => req.response.compression = .none, - .compress, .@"x-compress" => return error.CompressionUnsupported, - .deflate => req.response.compression = .{ - .deflate = std.compress.zlib.decompressor(req.transferReader()), - }, - .gzip, .@"x-gzip" => req.response.compression = .{ - .gzip = std.compress.gzip.decompressor(req.transferReader()), - }, - // https://github.com/ziglang/zig/issues/18937 - //.zstd => req.response.compression = .{ - // .zstd = std.compress.zstd.decompressStream(req.client.allocator, req.transferReader()), - //}, - .zstd => return error.CompressionUnsupported, - } - } - - break; - } - } - } - - pub const ReadError = TransferReadError || proto.HeadersParser.CheckCompleteHeadError || - error{ DecompressionFailure, InvalidTrailers }; - - pub const Reader = std.io.Reader(*Request, ReadError, read); - - pub fn reader(req: *Request) Reader { - return .{ .context = req }; - } - - /// Reads data from the response body. Must be called after `wait`. - pub fn read(req: *Request, buffer: []u8) ReadError!usize { - const out_index = switch (req.response.compression) { - .deflate => |*deflate| deflate.read(buffer) catch return error.DecompressionFailure, - .gzip => |*gzip| gzip.read(buffer) catch return error.DecompressionFailure, - // https://github.com/ziglang/zig/issues/18937 - //.zstd => |*zstd| zstd.read(buffer) catch return error.DecompressionFailure, - else => try req.transferRead(buffer), - }; - if (out_index > 0) return out_index; - - while (!req.response.parser.state.isContent()) { // read trailing headers - try req.connection.?.fill(); - - const nchecked = try req.response.parser.checkCompleteHead(req.connection.?.peek()); - req.connection.?.drop(@intCast(nchecked)); - } - - return 0; - } - - /// Reads data from the response body. Must be called after `wait`. - pub fn readAll(req: *Request, buffer: []u8) !usize { - var index: usize = 0; - while (index < buffer.len) { - const amt = try read(req, buffer[index..]); - if (amt == 0) break; - index += amt; - } - return index; - } - - pub const WriteError = Connection.WriteError || error{ NotWriteable, MessageTooLong }; - - pub const Writer = std.io.Writer(*Request, WriteError, write); - - pub fn writer(req: *Request) Writer { - return .{ .context = req }; - } - - /// Write `bytes` to the server. The `transfer_encoding` field determines how data will be sent. - /// Must be called after `send` and before `finish`. - pub fn write(req: *Request, bytes: []const u8) WriteError!usize { - switch (req.transfer_encoding) { - .chunked => { - if (bytes.len > 0) { - try req.connection.?.writer().print("{x}\r\n", .{bytes.len}); - try req.connection.?.writer().writeAll(bytes); - try req.connection.?.writer().writeAll("\r\n"); - } - - return bytes.len; - }, - .content_length => |*len| { - if (len.* < bytes.len) return error.MessageTooLong; - - const amt = try req.connection.?.write(bytes); - len.* -= amt; - return amt; - }, - .none => return error.NotWriteable, - } - } - - /// Write `bytes` to the server. The `transfer_encoding` field determines how data will be sent. - /// Must be called after `send` and before `finish`. - pub fn writeAll(req: *Request, bytes: []const u8) WriteError!void { - var index: usize = 0; - while (index < bytes.len) { - index += try write(req, bytes[index..]); - } - } - - pub const FinishError = WriteError || error{MessageNotCompleted}; - - /// Finish the body of a request. This notifies the server that you have no more data to send. - /// Must be called after `send`. - pub fn finish(req: *Request) FinishError!void { - switch (req.transfer_encoding) { - .chunked => try req.connection.?.writer().writeAll("0\r\n\r\n"), - .content_length => |len| if (len != 0) return error.MessageNotCompleted, - .none => {}, - } - - try req.connection.?.flush(); - } -}; - -pub const Proxy = struct { - protocol: Connection.Protocol, - host: []const u8, - authorization: ?[]const u8, - port: u16, - supports_connect: bool, -}; - -/// Release all associated resources with the client. -/// -/// All pending requests must be de-initialized and all active connections released -/// before calling this function. -pub fn deinit(client: *Client) void { - assert(client.connection_pool.used.first == null); // There are still active requests. - - client.connection_pool.deinit(client.allocator); - - if (!disable_tls) - client.ca_bundle.deinit(client.allocator); - - client.* = undefined; -} - -/// Populates `http_proxy` and `https_proxy` via standard proxy environment variables. -/// Asserts the client has no active connections. -/// Uses `arena` for a few small allocations that must outlive the client, or -/// at least until those fields are set to different values. -pub fn initDefaultProxies(client: *Client, arena: Allocator) !void { - // Prevent any new connections from being created. - client.connection_pool.mutex.lock(); - defer client.connection_pool.mutex.unlock(); - - assert(client.connection_pool.used.first == null); // There are active requests. - - if (client.http_proxy == null) { - client.http_proxy = try createProxyFromEnvVar(arena, &.{ - "http_proxy", "HTTP_PROXY", "all_proxy", "ALL_PROXY", - }); - } - - if (client.https_proxy == null) { - client.https_proxy = try createProxyFromEnvVar(arena, &.{ - "https_proxy", "HTTPS_PROXY", "all_proxy", "ALL_PROXY", - }); - } -} - -fn createProxyFromEnvVar(arena: Allocator, env_var_names: []const []const u8) !?*Proxy { - const content = for (env_var_names) |name| { - break std.process.getEnvVarOwned(arena, name) catch |err| switch (err) { - error.EnvironmentVariableNotFound => continue, - else => |e| return e, - }; - } else return null; - - const uri = Uri.parse(content) catch try Uri.parseAfterScheme("http", content); - const protocol, const valid_uri = validateUri(uri, arena) catch |err| switch (err) { - error.UnsupportedUriScheme => return null, - error.UriMissingHost => return error.HttpProxyMissingHost, - error.OutOfMemory => |e| return e, - }; - - const authorization: ?[]const u8 = if (valid_uri.user != null or valid_uri.password != null) a: { - const authorization = try arena.alloc(u8, basic_authorization.valueLengthFromUri(valid_uri)); - assert(basic_authorization.value(valid_uri, authorization).len == authorization.len); - break :a authorization; - } else null; - - const proxy = try arena.create(Proxy); - proxy.* = .{ - .protocol = protocol, - .host = valid_uri.host.?.raw, - .authorization = authorization, - .port = uriPort(valid_uri, protocol), - .supports_connect = true, - }; - return proxy; -} - -pub const basic_authorization = struct { - pub const max_user_len = 255; - pub const max_password_len = 255; - pub const max_value_len = valueLength(max_user_len, max_password_len); - - const prefix = "Basic "; - - pub fn valueLength(user_len: usize, password_len: usize) usize { - return prefix.len + std.base64.standard.Encoder.calcSize(user_len + 1 + password_len); - } - - pub fn valueLengthFromUri(uri: Uri) usize { - var stream = std.io.countingWriter(std.io.null_writer); - try stream.writer().print("{user}", .{uri.user orelse Uri.Component.empty}); - const user_len = stream.bytes_written; - stream.bytes_written = 0; - try stream.writer().print("{password}", .{uri.password orelse Uri.Component.empty}); - const password_len = stream.bytes_written; - return valueLength(@intCast(user_len), @intCast(password_len)); - } - - pub fn value(uri: Uri, out: []u8) []u8 { - var buf: [max_user_len + ":".len + max_password_len]u8 = undefined; - var stream = std.io.fixedBufferStream(&buf); - stream.writer().print("{user}", .{uri.user orelse Uri.Component.empty}) catch - unreachable; - assert(stream.pos <= max_user_len); - stream.writer().print(":{password}", .{uri.password orelse Uri.Component.empty}) catch - unreachable; - - @memcpy(out[0..prefix.len], prefix); - const base64 = std.base64.standard.Encoder.encode(out[prefix.len..], stream.getWritten()); - return out[0 .. prefix.len + base64.len]; - } -}; - -pub const ConnectTcpError = Allocator.Error || error{ ConnectionRefused, NetworkUnreachable, ConnectionTimedOut, ConnectionResetByPeer, TemporaryNameServerFailure, NameServerFailure, UnknownHostName, HostLacksNetworkAddresses, UnexpectedConnectFailure, TlsInitializationFailed }; - -/// Connect to `host:port` using the specified protocol. This will reuse a connection if one is already open. -/// -/// This function is threadsafe. -pub fn connectTcp(client: *Client, host: []const u8, port: u16, protocol: Connection.Protocol) ConnectTcpError!*Connection { - if (client.connection_pool.findConnection(.{ - .host = host, - .port = port, - .protocol = protocol, - })) |node| return node; - - if (disable_tls and protocol == .tls) - return error.TlsInitializationFailed; - - const conn = try client.allocator.create(ConnectionPool.Node); - errdefer client.allocator.destroy(conn); - conn.* = .{ .data = undefined }; - - const stream = tcp.tcpConnectToHost(client.allocator, client.loop, host, port) catch |err| switch (err) { - error.ConnectionRefused => return error.ConnectionRefused, - error.NetworkUnreachable => return error.NetworkUnreachable, - error.ConnectionTimedOut => return error.ConnectionTimedOut, - error.ConnectionResetByPeer => return error.ConnectionResetByPeer, - error.TemporaryNameServerFailure => return error.TemporaryNameServerFailure, - error.NameServerFailure => return error.NameServerFailure, - error.UnknownHostName => return error.UnknownHostName, - error.HostLacksNetworkAddresses => return error.HostLacksNetworkAddresses, - else => return error.UnexpectedConnectFailure, - }; - errdefer stream.close(); - - conn.data = .{ - .stream = stream, - .tls_client = undefined, - - .protocol = protocol, - .host = try client.allocator.dupe(u8, host), - .port = port, - }; - errdefer client.allocator.free(conn.data.host); - - if (protocol == .tls) { - if (disable_tls) unreachable; - - conn.data.tls_client = try client.allocator.create(tls23.Connection(Stream)); - errdefer client.allocator.destroy(conn.data.tls_client); - - conn.data.tls_client.* = tls23.client(stream, .{ - .host = host, - .root_ca = client.ca_bundle, - }) catch return error.TlsInitializationFailed; - } - - client.connection_pool.addUsed(conn); - - return &conn.data; -} - -/// Connect to `tunnel_host:tunnel_port` using the specified proxy with HTTP -/// CONNECT. This will reuse a connection if one is already open. -/// -/// This function is threadsafe. -pub fn connectTunnel( - client: *Client, - proxy: *Proxy, - tunnel_host: []const u8, - tunnel_port: u16, -) !*Connection { - if (!proxy.supports_connect) return error.TunnelNotSupported; - - if (client.connection_pool.findConnection(.{ - .host = tunnel_host, - .port = tunnel_port, - .protocol = proxy.protocol, - })) |node| - return node; - - var maybe_valid = false; - (tunnel: { - const conn = try client.connectTcp(proxy.host, proxy.port, proxy.protocol); - errdefer { - conn.closing = true; - client.connection_pool.release(client.allocator, conn); - } - - var buffer: [8096]u8 = undefined; - var req = client.open(.CONNECT, .{ - .scheme = "http", - .host = .{ .raw = tunnel_host }, - .port = tunnel_port, - }, .{ - .redirect_behavior = .unhandled, - .connection = conn, - .server_header_buffer = &buffer, - }) catch |err| { - std.log.debug("err {}", .{err}); - break :tunnel err; - }; - defer req.deinit(); - - req.send() catch |err| break :tunnel err; - req.wait() catch |err| break :tunnel err; - - if (req.response.status.class() == .server_error) { - maybe_valid = true; - break :tunnel error.ServerError; - } - - if (req.response.status != .ok) break :tunnel error.ConnectionRefused; - - // this connection is now a tunnel, so we can't use it for anything else, it will only be released when the client is de-initialized. - req.connection = null; - - client.allocator.free(conn.host); - conn.host = try client.allocator.dupe(u8, tunnel_host); - errdefer client.allocator.free(conn.host); - - conn.port = tunnel_port; - conn.closing = false; - - return conn; - }) catch { - // something went wrong with the tunnel - proxy.supports_connect = maybe_valid; - return error.TunnelNotSupported; - }; -} - -// Prevents a dependency loop in open() -const ConnectErrorPartial = ConnectTcpError || error{ UnsupportedUriScheme, ConnectionRefused }; -pub const ConnectError = ConnectErrorPartial || RequestError; - -/// Connect to `host:port` using the specified protocol. This will reuse a -/// connection if one is already open. -/// If a proxy is configured for the client, then the proxy will be used to -/// connect to the host. -/// -/// This function is threadsafe. -pub fn connect( - client: *Client, - host: []const u8, - port: u16, - protocol: Connection.Protocol, -) ConnectError!*Connection { - const proxy = switch (protocol) { - .plain => client.http_proxy, - .tls => client.https_proxy, - } orelse return client.connectTcp(host, port, protocol); - - // Prevent proxying through itself. - if (std.ascii.eqlIgnoreCase(proxy.host, host) and - proxy.port == port and proxy.protocol == protocol) - { - return client.connectTcp(host, port, protocol); - } - - if (proxy.supports_connect) tunnel: { - return connectTunnel(client, proxy, host, port) catch |err| switch (err) { - error.TunnelNotSupported => break :tunnel, - else => |e| return e, - }; - } - - // fall back to using the proxy as a normal http proxy - const conn = try client.connectTcp(proxy.host, proxy.port, proxy.protocol); - errdefer { - conn.closing = true; - client.connection_pool.release(conn); - } - - conn.proxied = true; - return conn; -} - -pub const RequestError = ConnectTcpError || ConnectErrorPartial || Request.SendError || - std.fmt.ParseIntError || Connection.WriteError || - error{ // TODO: file a zig fmt issue for this bad indentation - UnsupportedUriScheme, - UriMissingHost, - - CertificateBundleLoadFailure, - UnsupportedTransferEncoding, -}; - -pub const RequestOptions = struct { - version: http.Version = .@"HTTP/1.1", - - /// Automatically ignore 100 Continue responses. This assumes you don't - /// care, and will have sent the body before you wait for the response. - /// - /// If this is not the case AND you know the server will send a 100 - /// Continue, set this to false and wait for a response before sending the - /// body. If you wait AND the server does not send a 100 Continue before - /// you finish the request, then the request *will* deadlock. - handle_continue: bool = true, - - /// If false, close the connection after the one request. If true, - /// participate in the client connection pool. - keep_alive: bool = true, - - /// This field specifies whether to automatically follow redirects, and if - /// so, how many redirects to follow before returning an error. - /// - /// This will only follow redirects for repeatable requests (ie. with no - /// payload or the server has acknowledged the payload). - redirect_behavior: Request.RedirectBehavior = @enumFromInt(3), - - /// Externally-owned memory used to store the server's entire HTTP header. - /// `error.HttpHeadersOversize` is returned from read() when a - /// client sends too many bytes of HTTP headers. - server_header_buffer: []u8, - - /// Must be an already acquired connection. - connection: ?*Connection = null, - - /// Standard headers that have default, but overridable, behavior. - headers: Request.Headers = .{}, - /// These headers are kept including when following a redirect to a - /// different domain. - /// Externally-owned; must outlive the Request. - extra_headers: []const http.Header = &.{}, - /// These headers are stripped when following a redirect to a different - /// domain. - /// Externally-owned; must outlive the Request. - privileged_headers: []const http.Header = &.{}, -}; - -fn validateUri(uri: Uri, arena: Allocator) !struct { Connection.Protocol, Uri } { - const protocol_map = std.StaticStringMap(Connection.Protocol).initComptime(.{ - .{ "http", .plain }, - .{ "ws", .plain }, - .{ "https", .tls }, - .{ "wss", .tls }, - }); - const protocol = protocol_map.get(uri.scheme) orelse return error.UnsupportedUriScheme; - var valid_uri = uri; - // The host is always going to be needed as a raw string for hostname resolution anyway. - valid_uri.host = .{ - .raw = try (uri.host orelse return error.UriMissingHost).toRawMaybeAlloc(arena), - }; - return .{ protocol, valid_uri }; -} - -fn uriPort(uri: Uri, protocol: Connection.Protocol) u16 { - return uri.port orelse switch (protocol) { - .plain => 80, - .tls => 443, - }; -} - -/// Open a connection to the host specified by `uri` and prepare to send a HTTP request. -/// -/// `uri` must remain alive during the entire request. -/// -/// The caller is responsible for calling `deinit()` on the `Request`. -/// This function is threadsafe. -/// -/// Asserts that "\r\n" does not occur in any header name or value. -pub fn open( - client: *Client, - method: http.Method, - uri: Uri, - options: RequestOptions, -) RequestError!Request { - if (std.debug.runtime_safety) { - for (options.extra_headers) |header| { - assert(header.name.len != 0); - assert(std.mem.indexOfScalar(u8, header.name, ':') == null); - assert(std.mem.indexOfPosLinear(u8, header.name, 0, "\r\n") == null); - assert(std.mem.indexOfPosLinear(u8, header.value, 0, "\r\n") == null); - } - for (options.privileged_headers) |header| { - assert(header.name.len != 0); - assert(std.mem.indexOfPosLinear(u8, header.name, 0, "\r\n") == null); - assert(std.mem.indexOfPosLinear(u8, header.value, 0, "\r\n") == null); - } - } - - var server_header = std.heap.FixedBufferAllocator.init(options.server_header_buffer); - const protocol, const valid_uri = try validateUri(uri, server_header.allocator()); - - if (protocol == .tls and @atomicLoad(bool, &client.next_https_rescan_certs, .acquire)) { - if (disable_tls) unreachable; - - client.ca_bundle_mutex.lock(); - defer client.ca_bundle_mutex.unlock(); - - if (client.next_https_rescan_certs) { - client.ca_bundle.rescan(client.allocator) catch - return error.CertificateBundleLoadFailure; - @atomicStore(bool, &client.next_https_rescan_certs, false, .release); - } - } - - const conn = options.connection orelse - try client.connect(valid_uri.host.?.raw, uriPort(valid_uri, protocol), protocol); - - var req: Request = .{ - .uri = valid_uri, - .client = client, - .connection = conn, - .keep_alive = options.keep_alive, - .method = method, - .version = options.version, - .transfer_encoding = .none, - .redirect_behavior = options.redirect_behavior, - .handle_continue = options.handle_continue, - .response = .{ - .version = undefined, - .status = undefined, - .reason = undefined, - .keep_alive = undefined, - .parser = proto.HeadersParser.init(server_header.buffer[server_header.end_index..]), - }, - .headers = options.headers, - .extra_headers = options.extra_headers, - .privileged_headers = options.privileged_headers, - }; - errdefer req.deinit(); - - return req; -} - -pub const FetchOptions = struct { - server_header_buffer: ?[]u8 = null, - redirect_behavior: ?Request.RedirectBehavior = null, - - /// If the server sends a body, it will be appended to this ArrayList. - /// `max_append_size` provides an upper limit for how much they can grow. - response_storage: ResponseStorage = .ignore, - max_append_size: ?usize = null, - - location: Location, - method: ?http.Method = null, - payload: ?[]const u8 = null, - raw_uri: bool = false, - keep_alive: bool = true, - - /// Standard headers that have default, but overridable, behavior. - headers: Request.Headers = .{}, - /// These headers are kept including when following a redirect to a - /// different domain. - /// Externally-owned; must outlive the Request. - extra_headers: []const http.Header = &.{}, - /// These headers are stripped when following a redirect to a different - /// domain. - /// Externally-owned; must outlive the Request. - privileged_headers: []const http.Header = &.{}, - - pub const Location = union(enum) { - url: []const u8, - uri: Uri, - }; - - pub const ResponseStorage = union(enum) { - ignore, - /// Only the existing capacity will be used. - static: *std.ArrayListUnmanaged(u8), - dynamic: *std.ArrayList(u8), - }; -}; - -pub const FetchResult = struct { - status: http.Status, -}; - -/// Perform a one-shot HTTP request with the provided options. -/// -/// This function is threadsafe. -pub fn fetch(client: *Client, options: FetchOptions) !FetchResult { - const uri = switch (options.location) { - .url => |u| try Uri.parse(u), - .uri => |u| u, - }; - var server_header_buffer: [16 * 1024]u8 = undefined; - - const method: http.Method = options.method orelse - if (options.payload != null) .POST else .GET; - - var req = try open(client, method, uri, .{ - .server_header_buffer = options.server_header_buffer orelse &server_header_buffer, - .redirect_behavior = options.redirect_behavior orelse - if (options.payload == null) @enumFromInt(3) else .unhandled, - .headers = options.headers, - .extra_headers = options.extra_headers, - .privileged_headers = options.privileged_headers, - .keep_alive = options.keep_alive, - }); - defer req.deinit(); - - if (options.payload) |payload| req.transfer_encoding = .{ .content_length = payload.len }; - - try req.send(); - - if (options.payload) |payload| try req.writeAll(payload); - - try req.finish(); - try req.wait(); - - switch (options.response_storage) { - .ignore => { - // Take advantage of request internals to discard the response body - // and make the connection available for another request. - req.response.skip = true; - assert(try req.transferRead(&.{}) == 0); // No buffer is necessary when skipping. - }, - .dynamic => |list| { - const max_append_size = options.max_append_size orelse 2 * 1024 * 1024; - try req.reader().readAllArrayList(list, max_append_size); - }, - .static => |list| { - const buf = b: { - const buf = list.unusedCapacitySlice(); - if (options.max_append_size) |len| { - if (len < buf.len) break :b buf[0..len]; - } - break :b buf; - }; - list.items.len += try req.reader().readAll(buf); - }, - } - - return .{ - .status = req.response.status, - }; -} - -test { - _ = &initDefaultProxies; -} diff --git a/src/async/stream.zig b/src/async/stream.zig deleted file mode 100644 index 85b6cbb2..00000000 --- a/src/async/stream.zig +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright (C) 2023-2024 Lightpanda (Selecy SAS) -// -// Francis Bouvier -// Pierre Tachoire -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -const std = @import("std"); -const builtin = @import("builtin"); -const posix = std.posix; -const io = std.io; -const assert = std.debug.assert; - -const tcp = @import("tcp.zig"); - -pub const Stream = struct { - alloc: std.mem.Allocator, - conn: *tcp.Conn, - - handle: posix.socket_t, - - pub fn close(self: Stream) void { - posix.close(self.handle); - self.alloc.destroy(self.conn); - } - - pub const ReadError = posix.ReadError; - pub const WriteError = posix.WriteError; - - pub const Reader = io.Reader(Stream, ReadError, read); - pub const Writer = io.Writer(Stream, WriteError, write); - - pub fn reader(self: Stream) Reader { - return .{ .context = self }; - } - - pub fn writer(self: Stream) Writer { - return .{ .context = self }; - } - - pub fn read(self: Stream, buffer: []u8) ReadError!usize { - return self.conn.receive(self.handle, buffer) catch |err| switch (err) { - else => return error.Unexpected, - }; - } - - pub fn readv(s: Stream, iovecs: []const posix.iovec) ReadError!usize { - return posix.readv(s.handle, iovecs); - } - - /// Returns the number of bytes read. If the number read is smaller than - /// `buffer.len`, it means the stream reached the end. Reaching the end of - /// a stream is not an error condition. - pub fn readAll(s: Stream, buffer: []u8) ReadError!usize { - return readAtLeast(s, buffer, buffer.len); - } - - /// Returns the number of bytes read, calling the underlying read function - /// the minimal number of times until the buffer has at least `len` bytes - /// filled. If the number read is less than `len` it means the stream - /// reached the end. Reaching the end of the stream is not an error - /// condition. - pub fn readAtLeast(s: Stream, buffer: []u8, len: usize) ReadError!usize { - assert(len <= buffer.len); - var index: usize = 0; - while (index < len) { - const amt = try s.read(buffer[index..]); - if (amt == 0) break; - index += amt; - } - return index; - } - - /// TODO in evented I/O mode, this implementation incorrectly uses the event loop's - /// file system thread instead of non-blocking. It needs to be reworked to properly - /// use non-blocking I/O. - pub fn write(self: Stream, buffer: []const u8) WriteError!usize { - return self.conn.send(self.handle, buffer) catch |err| switch (err) { - error.AccessDenied => error.AccessDenied, - error.WouldBlock => error.WouldBlock, - error.ConnectionResetByPeer => error.ConnectionResetByPeer, - error.MessageTooBig => error.FileTooBig, - error.BrokenPipe => error.BrokenPipe, - else => return error.Unexpected, - }; - } - - pub fn writeAll(self: Stream, bytes: []const u8) WriteError!void { - var index: usize = 0; - while (index < bytes.len) { - index += try self.write(bytes[index..]); - } - } - - /// See https://github.com/ziglang/zig/issues/7699 - /// See equivalent function: `std.fs.File.writev`. - pub fn writev(self: Stream, iovecs: []const posix.iovec_const) WriteError!usize { - if (iovecs.len == 0) return 0; - const first_buffer = iovecs[0].base[0..iovecs[0].len]; - return try self.write(first_buffer); - } - - /// The `iovecs` parameter is mutable because this function needs to mutate the fields in - /// order to handle partial writes from the underlying OS layer. - /// See https://github.com/ziglang/zig/issues/7699 - /// See equivalent function: `std.fs.File.writevAll`. - pub fn writevAll(self: Stream, iovecs: []posix.iovec_const) WriteError!void { - if (iovecs.len == 0) return; - - var i: usize = 0; - while (true) { - var amt = try self.writev(iovecs[i..]); - while (amt >= iovecs[i].len) { - amt -= iovecs[i].len; - i += 1; - if (i >= iovecs.len) return; - } - iovecs[i].base += amt; - iovecs[i].len -= amt; - } - } -}; diff --git a/src/async/tcp.zig b/src/async/tcp.zig deleted file mode 100644 index 61a49548..00000000 --- a/src/async/tcp.zig +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright (C) 2023-2024 Lightpanda (Selecy SAS) -// -// Francis Bouvier -// Pierre Tachoire -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -const std = @import("std"); -const net = std.net; -const Stream = @import("stream.zig").Stream; -const Loop = @import("jsruntime").Loop; -const NetworkImpl = Loop.Network(Conn.Command); - -// Conn is a TCP connection using jsruntime Loop async I/O. -// connect, send and receive are blocking, but use async I/O in the background. -// Client doesn't own the socket used for the connection, the caller is -// responsible for closing it. -pub const Conn = struct { - const Command = struct { - impl: NetworkImpl, - - done: bool = false, - err: ?anyerror = null, - ln: usize = 0, - - fn ok(self: *Command, err: ?anyerror, ln: usize) void { - self.err = err; - self.ln = ln; - self.done = true; - } - - fn wait(self: *Command) !usize { - while (!self.done) try self.impl.tick(); - - if (self.err) |err| return err; - return self.ln; - } - pub fn onConnect(self: *Command, err: ?anyerror) void { - self.ok(err, 0); - } - pub fn onSend(self: *Command, ln: usize, err: ?anyerror) void { - self.ok(err, ln); - } - pub fn onReceive(self: *Command, ln: usize, err: ?anyerror) void { - self.ok(err, ln); - } - }; - - loop: *Loop, - - pub fn connect(self: *Conn, socket: std.posix.socket_t, address: std.net.Address) !void { - var cmd = Command{ .impl = NetworkImpl.init(self.loop) }; - cmd.impl.connect(&cmd, socket, address); - _ = try cmd.wait(); - } - - pub fn send(self: *Conn, socket: std.posix.socket_t, buffer: []const u8) !usize { - var cmd = Command{ .impl = NetworkImpl.init(self.loop) }; - cmd.impl.send(&cmd, socket, buffer); - return try cmd.wait(); - } - - pub fn receive(self: *Conn, socket: std.posix.socket_t, buffer: []u8) !usize { - var cmd = Command{ .impl = NetworkImpl.init(self.loop) }; - cmd.impl.receive(&cmd, socket, buffer); - return try cmd.wait(); - } -}; - -pub fn tcpConnectToHost(alloc: std.mem.Allocator, loop: *Loop, name: []const u8, port: u16) !Stream { - // TODO async resolve - const list = try net.getAddressList(alloc, name, port); - defer list.deinit(); - - if (list.addrs.len == 0) return error.UnknownHostName; - - for (list.addrs) |addr| { - return tcpConnectToAddress(alloc, loop, addr) catch |err| switch (err) { - error.ConnectionRefused => { - continue; - }, - else => return err, - }; - } - return std.posix.ConnectError.ConnectionRefused; -} - -pub fn tcpConnectToAddress(alloc: std.mem.Allocator, loop: *Loop, addr: net.Address) !Stream { - const sockfd = try std.posix.socket(addr.any.family, std.posix.SOCK.STREAM, std.posix.IPPROTO.TCP); - errdefer std.posix.close(sockfd); - - var conn = try alloc.create(Conn); - conn.* = Conn{ .loop = loop }; - try conn.connect(sockfd, addr); - - return Stream{ - .alloc = alloc, - .conn = conn, - .handle = sockfd, - }; -} diff --git a/src/async/test.zig b/src/async/test.zig deleted file mode 100644 index 27f86c6a..00000000 --- a/src/async/test.zig +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright (C) 2023-2024 Lightpanda (Selecy SAS) -// -// Francis Bouvier -// Pierre Tachoire -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -const std = @import("std"); -const http = std.http; -const Client = @import("Client.zig"); -const Request = @import("Client.zig").Request; - -pub const Loop = @import("jsruntime").Loop; - -const url = "https://w3.org"; - -test "blocking mode fetch API" { - const alloc = std.testing.allocator; - - var loop = try Loop.init(alloc); - defer loop.deinit(); - - var client: Client = .{ - .allocator = alloc, - .loop = &loop, - }; - defer client.deinit(); - - // force client's CA cert scan from system. - try client.ca_bundle.rescan(client.allocator); - - const res = try client.fetch(.{ - .location = .{ .uri = try std.Uri.parse(url) }, - }); - - try std.testing.expect(res.status == .ok); -} - -test "blocking mode open/send/wait API" { - const alloc = std.testing.allocator; - - var loop = try Loop.init(alloc); - defer loop.deinit(); - - var client: Client = .{ - .allocator = alloc, - .loop = &loop, - }; - defer client.deinit(); - - // force client's CA cert scan from system. - try client.ca_bundle.rescan(client.allocator); - - var buf: [2014]u8 = undefined; - var req = try client.open(.GET, try std.Uri.parse(url), .{ - .server_header_buffer = &buf, - }); - defer req.deinit(); - - try req.send(); - try req.finish(); - try req.wait(); - - try std.testing.expect(req.response.status == .ok); -} - -// Example how to write an async http client using the modified standard client. -const AsyncClient = struct { - cli: Client, - - const YieldImpl = Loop.Yield(AsyncRequest); - const AsyncRequest = struct { - const State = enum { new, open, send, finish, wait, done }; - - cli: *Client, - uri: std.Uri, - - req: ?Request = undefined, - state: State = .new, - - impl: YieldImpl, - err: ?anyerror = null, - - buf: [2014]u8 = undefined, - - pub fn deinit(self: *AsyncRequest) void { - if (self.req) |*r| r.deinit(); - } - - pub fn fetch(self: *AsyncRequest) void { - self.state = .new; - return self.impl.yield(self); - } - - fn onerr(self: *AsyncRequest, err: anyerror) void { - self.state = .done; - self.err = err; - } - - pub fn onYield(self: *AsyncRequest, err: ?anyerror) void { - if (err) |e| return self.onerr(e); - - switch (self.state) { - .new => { - self.state = .open; - self.req = self.cli.open(.GET, self.uri, .{ - .server_header_buffer = &self.buf, - }) catch |e| return self.onerr(e); - }, - .open => { - self.state = .send; - self.req.?.send() catch |e| return self.onerr(e); - }, - .send => { - self.state = .finish; - self.req.?.finish() catch |e| return self.onerr(e); - }, - .finish => { - self.state = .wait; - self.req.?.wait() catch |e| return self.onerr(e); - }, - .wait => { - self.state = .done; - return; - }, - .done => return, - } - - return self.impl.yield(self); - } - - pub fn wait(self: *AsyncRequest) !void { - while (self.state != .done) try self.impl.tick(); - if (self.err) |err| return err; - } - }; - - pub fn init(alloc: std.mem.Allocator, loop: *Loop) AsyncClient { - return .{ - .cli = .{ - .allocator = alloc, - .loop = loop, - }, - }; - } - - pub fn deinit(self: *AsyncClient) void { - self.cli.deinit(); - } - - pub fn createRequest(self: *AsyncClient, uri: std.Uri) !AsyncRequest { - return .{ - .impl = YieldImpl.init(self.cli.loop), - .cli = &self.cli, - .uri = uri, - }; - } -}; - -test "non blocking client" { - const alloc = std.testing.allocator; - - var loop = try Loop.init(alloc); - defer loop.deinit(); - - var client = AsyncClient.init(alloc, &loop); - defer client.deinit(); - - var reqs: [3]AsyncClient.AsyncRequest = undefined; - for (0..reqs.len) |i| { - reqs[i] = try client.createRequest(try std.Uri.parse(url)); - reqs[i].fetch(); - } - for (0..reqs.len) |i| { - try reqs[i].wait(); - reqs[i].deinit(); - } -} diff --git a/src/browser/browser.zig b/src/browser/browser.zig index 30bfdcc4..b399e755 100644 --- a/src/browser/browser.zig +++ b/src/browser/browser.zig @@ -40,7 +40,7 @@ const storage = @import("../storage/storage.zig"); const FetchResult = @import("../http/Client.zig").Client.FetchResult; const UserContext = @import("../user_context.zig").UserContext; -const HttpClient = @import("../async/Client.zig"); +const HttpClient = @import("asyncio").Client; const log = std.log.scoped(.browser); @@ -116,7 +116,7 @@ pub const Session = struct { }; Env.init(&self.env, self.arena.allocator(), loop, null); - self.httpClient = .{ .allocator = alloc, .loop = loop }; + self.httpClient = .{ .allocator = alloc }; try self.env.load(&self.jstypes); } diff --git a/src/main.zig b/src/main.zig index 7e9b8dbb..c2106f73 100644 --- a/src/main.zig +++ b/src/main.zig @@ -30,6 +30,7 @@ const apiweb = @import("apiweb.zig"); pub const Types = jsruntime.reflect(apiweb.Interfaces); pub const UserContext = apiweb.UserContext; +pub const IO = @import("asyncio").Wrapper(jsruntime.Loop); const log = std.log.scoped(.cli); diff --git a/src/main_shell.zig b/src/main_shell.zig index fbb23660..eb88ab50 100644 --- a/src/main_shell.zig +++ b/src/main_shell.zig @@ -24,12 +24,13 @@ const parser = @import("netsurf"); const apiweb = @import("apiweb.zig"); const Window = @import("html/window.zig").Window; const storage = @import("storage/storage.zig"); +const Client = @import("asyncio").Client; const html_test = @import("html_test.zig").html; pub const Types = jsruntime.reflect(apiweb.Interfaces); pub const UserContext = apiweb.UserContext; -const Client = @import("async/Client.zig"); +pub const IO = @import("asyncio").Wrapper(jsruntime.Loop); var doc: *parser.DocumentHTML = undefined; @@ -41,7 +42,7 @@ fn execJS( try js_env.start(); defer js_env.stop(); - var cli = Client{ .allocator = alloc, .loop = js_env.nat_ctx.loop }; + var cli = Client{ .allocator = alloc }; defer cli.deinit(); try js_env.setUserContext(UserContext{ diff --git a/src/main_wpt.zig b/src/main_wpt.zig index 49b7ba23..7cf2f077 100644 --- a/src/main_wpt.zig +++ b/src/main_wpt.zig @@ -50,6 +50,7 @@ const Out = enum { pub const Types = jsruntime.reflect(apiweb.Interfaces); pub const GlobalType = apiweb.GlobalType; pub const UserContext = apiweb.UserContext; +pub const IO = @import("asyncio").Wrapper(jsruntime.Loop); // TODO For now the WPT tests run is specific to WPT. // It manually load js framwork libs, and run the first script w/ js content in diff --git a/src/run_tests.zig b/src/run_tests.zig index 8a5be4a5..a9073d4e 100644 --- a/src/run_tests.zig +++ b/src/run_tests.zig @@ -30,7 +30,7 @@ const xhr = @import("xhr/xhr.zig"); const storage = @import("storage/storage.zig"); const url = @import("url/url.zig"); const urlquery = @import("url/query.zig"); -const Client = @import("async/Client.zig"); +const Client = @import("asyncio").Client; const documentTestExecFn = @import("dom/document.zig").testExecFn; const HTMLDocumentTestExecFn = @import("html/document.zig").testExecFn; @@ -59,6 +59,7 @@ const MutationObserverTestExecFn = @import("dom/mutation_observer.zig").testExec pub const Types = jsruntime.reflect(apiweb.Interfaces); pub const UserContext = @import("user_context.zig").UserContext; +pub const IO = @import("asyncio").Wrapper(jsruntime.Loop); var doc: *parser.DocumentHTML = undefined; @@ -86,7 +87,7 @@ fn testExecFn( std.debug.print("documentHTMLClose error: {s}\n", .{@errorName(err)}); }; - var cli = Client{ .allocator = alloc, .loop = js_env.nat_ctx.loop }; + var cli = Client{ .allocator = alloc }; defer cli.deinit(); try js_env.setUserContext(.{ @@ -298,9 +299,6 @@ test { const msgTest = @import("msg.zig"); std.testing.refAllDecls(msgTest); - const asyncTest = @import("async/test.zig"); - std.testing.refAllDecls(asyncTest); - const dumpTest = @import("browser/dump.zig"); std.testing.refAllDecls(dumpTest); diff --git a/src/server.zig b/src/server.zig index 505de7b3..9b34c602 100644 --- a/src/server.zig +++ b/src/server.zig @@ -482,7 +482,7 @@ pub fn listen( // - cmd from incoming connection on server socket // - JS callbacks events from scripts while (true) { - try loop.io.tick(); + try loop.io.run_for_ns(10 * std.time.ns_per_ms); if (loop.cbk_error) { log.err("JS error", .{}); // if (try try_catch.exception(alloc, js_env.*)) |msg| { diff --git a/src/test_runner.zig b/src/test_runner.zig index 8b138d0b..8358b66c 100644 --- a/src/test_runner.zig +++ b/src/test_runner.zig @@ -22,6 +22,7 @@ const tests = @import("run_tests.zig"); pub const Types = tests.Types; pub const UserContext = tests.UserContext; +pub const IO = tests.IO; pub fn main() !void { try tests.main(); diff --git a/src/user_context.zig b/src/user_context.zig index 23d85955..3bed0108 100644 --- a/src/user_context.zig +++ b/src/user_context.zig @@ -1,6 +1,6 @@ const std = @import("std"); const parser = @import("netsurf"); -const Client = @import("async/Client.zig"); +const Client = @import("asyncio").Client; pub const UserContext = struct { document: *parser.DocumentHTML, diff --git a/src/wpt/run.zig b/src/wpt/run.zig index e2b58470..a44b12d7 100644 --- a/src/wpt/run.zig +++ b/src/wpt/run.zig @@ -28,10 +28,10 @@ const Loop = jsruntime.Loop; const Env = jsruntime.Env; const Window = @import("../html/window.zig").Window; const storage = @import("../storage/storage.zig"); +const Client = @import("asyncio").Client; const Types = @import("../main_wpt.zig").Types; const UserContext = @import("../main_wpt.zig").UserContext; -const Client = @import("../async/Client.zig"); // runWPT parses the given HTML file, starts a js env and run the first script // tags containing javascript sources. @@ -53,7 +53,7 @@ pub fn run(arena: *std.heap.ArenaAllocator, comptime dir: []const u8, f: []const var loop = try Loop.init(alloc); defer loop.deinit(); - var cli = Client{ .allocator = alloc, .loop = &loop }; + var cli = Client{ .allocator = alloc }; defer cli.deinit(); var js_env: Env = undefined; diff --git a/src/xhr/xhr.zig b/src/xhr/xhr.zig index 0549a3e2..2e3e4a16 100644 --- a/src/xhr/xhr.zig +++ b/src/xhr/xhr.zig @@ -32,8 +32,7 @@ const XMLHttpRequestEventTarget = @import("event_target.zig").XMLHttpRequestEven const Mime = @import("../browser/mime.zig"); const Loop = jsruntime.Loop; -const YieldImpl = Loop.Yield(XMLHttpRequest); -const Client = @import("../async/Client.zig"); +const Client = @import("asyncio").Client; const parser = @import("netsurf"); @@ -98,10 +97,11 @@ pub const XMLHttpRequest = struct { proto: XMLHttpRequestEventTarget = XMLHttpRequestEventTarget{}, alloc: std.mem.Allocator, cli: *Client, - impl: YieldImpl, + io: Client.IO, priv_state: PrivState = .new, req: ?Client.Request = null, + ctx: ?Client.Ctx = null, method: std.http.Method, state: u16, @@ -135,7 +135,13 @@ pub const XMLHttpRequest = struct { response_header_buffer: [1024 * 16]u8 = undefined, response_status: u10 = 0, - response_override_mime_type: ?[]const u8 = null, + + // TODO uncomment this field causes casting issue with + // XMLHttpRequestEventTarget. I think it's dueto an alignement issue, but + // not sure. see + // https://lightpanda.slack.com/archives/C05TRU6RBM1/p1707819010681019 + // response_override_mime_type: ?[]const u8 = null, + response_mime: Mime = undefined, response_obj: ?ResponseObj = null, send_flag: bool = false, @@ -288,7 +294,7 @@ pub const XMLHttpRequest = struct { .alloc = alloc, .headers = Headers.init(alloc), .response_headers = Headers.init(alloc), - .impl = YieldImpl.init(loop), + .io = Client.IO.init(loop), .method = undefined, .url = null, .uri = undefined, @@ -320,10 +326,11 @@ pub const XMLHttpRequest = struct { self.priv_state = .new; - if (self.req) |*r| { - r.deinit(); - self.req = null; - } + if (self.ctx) |*c| c.deinit(); + self.ctx = null; + + if (self.req) |*r| r.deinit(); + self.req = null; } pub fn deinit(self: *XMLHttpRequest, alloc: std.mem.Allocator) void { @@ -498,138 +505,160 @@ pub const XMLHttpRequest = struct { log.debug("{any} {any}", .{ self.method, self.uri }); self.send_flag = true; - self.impl.yield(self); - } - // onYield is a callback called between each request's steps. - // Between each step, the code is blocking. - // Yielding allows pseudo-async and gives a chance to other async process - // to be called. - pub fn onYield(self: *XMLHttpRequest, err: ?anyerror) void { - if (err) |e| return self.onErr(e); + self.priv_state = .open; - switch (self.priv_state) { - .new => { - self.priv_state = .open; - self.req = self.cli.open(self.method, self.uri, .{ - .server_header_buffer = &self.response_header_buffer, - .extra_headers = self.headers.all(), - }) catch |e| return self.onErr(e); - }, - .open => { - // prepare payload transfert. - if (self.payload) |v| self.req.?.transfer_encoding = .{ .content_length = v.len }; - - self.priv_state = .send; - self.req.?.send() catch |e| return self.onErr(e); - }, - .send => { - if (self.payload) |payload| { - self.priv_state = .write; - self.req.?.writeAll(payload) catch |e| return self.onErr(e); - } else { - self.priv_state = .finish; - self.req.?.finish() catch |e| return self.onErr(e); - } - }, - .write => { - self.priv_state = .finish; - self.req.?.finish() catch |e| return self.onErr(e); - }, - .finish => { - self.priv_state = .wait; - self.req.?.wait() catch |e| return self.onErr(e); - }, - .wait => { - log.info("{any} {any} {d}", .{ self.method, self.uri, self.req.?.response.status }); - - self.priv_state = .done; - var it = self.req.?.response.iterateHeaders(); - self.response_headers.load(&it) catch |e| return self.onErr(e); - - // extract a mime type from headers. - const ct = self.response_headers.getFirstValue("Content-Type") orelse "text/xml"; - self.response_mime = Mime.parse(ct) catch |e| return self.onErr(e); - - // TODO handle override mime type - - self.state = HEADERS_RECEIVED; - self.dispatchEvt("readystatechange"); - - self.response_status = @intFromEnum(self.req.?.response.status); - - var buf: std.ArrayListUnmanaged(u8) = .{}; - - // TODO set correct length - const total = 0; - var loaded: u64 = 0; - - // dispatch a progress event loadstart. - self.dispatchProgressEvent("loadstart", .{ .loaded = loaded, .total = total }); - - const reader = self.req.?.reader(); - var buffer: [1024]u8 = undefined; - var ln = buffer.len; - var prev_dispatch: ?std.time.Instant = null; - while (ln > 0) { - ln = reader.read(&buffer) catch |e| { - buf.deinit(self.alloc); - return self.onErr(e); - }; - buf.appendSlice(self.alloc, buffer[0..ln]) catch |e| { - buf.deinit(self.alloc); - return self.onErr(e); - }; - loaded = loaded + ln; - - // Dispatch only if 50ms have passed. - const now = std.time.Instant.now() catch |e| { - buf.deinit(self.alloc); - return self.onErr(e); - }; - if (prev_dispatch != null and now.since(prev_dispatch.?) < min_delay) continue; - defer prev_dispatch = now; - - self.state = LOADING; - self.dispatchEvt("readystatechange"); - - // dispatch a progress event progress. - self.dispatchProgressEvent("progress", .{ - .loaded = loaded, - .total = total, - }); - } - self.response_bytes = buf.items; - self.send_flag = false; - - self.state = DONE; - self.dispatchEvt("readystatechange"); - - // dispatch a progress event load. - self.dispatchProgressEvent("load", .{ .loaded = loaded, .total = total }); - // dispatch a progress event loadend. - self.dispatchProgressEvent("loadend", .{ .loaded = loaded, .total = total }); - }, - .done => { - if (self.req) |*r| { - r.deinit(); - self.req = null; - } - - // finalize fetch process. - return; - }, + self.req = try self.cli.create(self.method, self.uri, .{ + .server_header_buffer = &self.response_header_buffer, + .extra_headers = self.headers.all(), + }); + errdefer { + self.req.?.deinit(); + self.req = null; } - self.impl.yield(self); + self.ctx = try Client.Ctx.init(&self.io, &self.req.?); + errdefer { + self.ctx.?.deinit(); + self.ctx = null; + } + self.ctx.?.userData = self; + + try self.cli.async_open( + self.method, + self.uri, + .{ .server_header_buffer = &self.response_header_buffer }, + &self.ctx.?, + onRequestConnect, + ); + } + + fn onRequestWait(ctx: *Client.Ctx, res: anyerror!void) !void { + var self = selfCtx(ctx); + res catch |err| return self.onErr(err); + + log.info("{any} {any} {d}", .{ self.method, self.uri, self.req.?.response.status }); + + self.priv_state = .done; + var it = self.req.?.response.iterateHeaders(); + self.response_headers.load(&it) catch |e| return self.onErr(e); + + // extract a mime type from headers. + const ct = self.response_headers.getFirstValue("Content-Type") orelse "text/xml"; + self.response_mime = Mime.parse(ct) catch |e| return self.onErr(e); + + // TODO handle override mime type + + self.state = HEADERS_RECEIVED; + self.dispatchEvt("readystatechange"); + + self.response_status = @intFromEnum(self.req.?.response.status); + + var buf: std.ArrayListUnmanaged(u8) = .{}; + + // TODO set correct length + const total = 0; + var loaded: u64 = 0; + + // dispatch a progress event loadstart. + self.dispatchProgressEvent("loadstart", .{ .loaded = loaded, .total = total }); + + // TODO read async + const reader = self.req.?.reader(); + var buffer: [1024]u8 = undefined; + var ln = buffer.len; + var prev_dispatch: ?std.time.Instant = null; + while (ln > 0) { + ln = reader.read(&buffer) catch |e| { + buf.deinit(self.alloc); + return self.onErr(e); + }; + buf.appendSlice(self.alloc, buffer[0..ln]) catch |e| { + buf.deinit(self.alloc); + return self.onErr(e); + }; + loaded = loaded + ln; + + // Dispatch only if 50ms have passed. + const now = std.time.Instant.now() catch |e| { + buf.deinit(self.alloc); + return self.onErr(e); + }; + if (prev_dispatch != null and now.since(prev_dispatch.?) < min_delay) continue; + defer prev_dispatch = now; + + self.state = LOADING; + self.dispatchEvt("readystatechange"); + + // dispatch a progress event progress. + self.dispatchProgressEvent("progress", .{ + .loaded = loaded, + .total = total, + }); + } + self.response_bytes = buf.items; + self.send_flag = false; + + self.state = DONE; + self.dispatchEvt("readystatechange"); + + // dispatch a progress event load. + self.dispatchProgressEvent("load", .{ .loaded = loaded, .total = total }); + // dispatch a progress event loadend. + self.dispatchProgressEvent("loadend", .{ .loaded = loaded, .total = total }); + + if (self.ctx) |*c| c.deinit(); + self.ctx = null; + + if (self.req) |*r| r.deinit(); + self.req = null; + } + + fn onRequestFinish(ctx: *Client.Ctx, res: anyerror!void) !void { + var self = selfCtx(ctx); + res catch |err| return self.onErr(err); + + self.priv_state = .wait; + return ctx.req.async_wait(ctx, onRequestWait) catch |e| return self.onErr(e); + } + + fn onRequestSend(ctx: *Client.Ctx, res: anyerror!void) !void { + var self = selfCtx(ctx); + res catch |err| return self.onErr(err); + + if (self.payload) |payload| { + self.priv_state = .write; + return ctx.req.async_writeAll(payload, ctx, onRequestWrite) catch |e| return self.onErr(e); + } + + self.priv_state = .finish; + return ctx.req.async_finish(ctx, onRequestFinish) catch |e| return self.onErr(e); + } + + fn onRequestWrite(ctx: *Client.Ctx, res: anyerror!void) !void { + var self = selfCtx(ctx); + res catch |err| return self.onErr(err); + self.priv_state = .finish; + return ctx.req.async_finish(ctx, onRequestFinish) catch |e| return self.onErr(e); + } + + fn onRequestConnect(ctx: *Client.Ctx, res: anyerror!void) anyerror!void { + var self = selfCtx(ctx); + res catch |err| return self.onErr(err); + + // prepare payload transfert. + if (self.payload) |v| self.req.?.transfer_encoding = .{ .content_length = v.len }; + + self.priv_state = .send; + return ctx.req.async_send(ctx, onRequestSend) catch |err| return self.onErr(err); + } + + fn selfCtx(ctx: *Client.Ctx) *XMLHttpRequest { + return @ptrCast(@alignCast(ctx.userData)); } fn onErr(self: *XMLHttpRequest, err: anyerror) void { self.priv_state = .done; - if (self.req) |*r| { - r.deinit(); - self.req = null; - } self.err = err; self.state = DONE; @@ -639,6 +668,12 @@ pub const XMLHttpRequest = struct { self.dispatchProgressEvent("loadend", .{}); log.debug("{any} {any} {any}", .{ self.method, self.uri, self.err }); + + if (self.ctx) |*c| c.deinit(); + self.ctx = null; + + if (self.req) |*r| r.deinit(); + self.req = null; } pub fn _abort(self: *XMLHttpRequest) void { @@ -886,7 +921,7 @@ pub fn testExecFn( // .{ .src = "req.onload", .ex = "function cbk(event) { nb ++; evt = event; }" }, //.{ .src = "req.onload = cbk", .ex = "function cbk(event) { nb ++; evt = event; }" }, - .{ .src = "req.open('GET', 'http://httpbin.io/html')", .ex = "undefined" }, + .{ .src = "req.open('GET', 'https://httpbin.io/html')", .ex = "undefined" }, .{ .src = "req.setRequestHeader('User-Agent', 'lightpanda/1.0')", .ex = "undefined" }, // ensure open resets values @@ -916,7 +951,7 @@ pub fn testExecFn( var document = [_]Case{ .{ .src = "const req2 = new XMLHttpRequest()", .ex = "undefined" }, - .{ .src = "req2.open('GET', 'http://httpbin.io/html')", .ex = "undefined" }, + .{ .src = "req2.open('GET', 'https://httpbin.io/html')", .ex = "undefined" }, .{ .src = "req2.responseType = 'document'", .ex = "document" }, .{ .src = "req2.send()", .ex = "undefined" }, @@ -932,7 +967,7 @@ pub fn testExecFn( var json = [_]Case{ .{ .src = "const req3 = new XMLHttpRequest()", .ex = "undefined" }, - .{ .src = "req3.open('GET', 'http://httpbin.io/json')", .ex = "undefined" }, + .{ .src = "req3.open('GET', 'https://httpbin.io/json')", .ex = "undefined" }, .{ .src = "req3.responseType = 'json'", .ex = "json" }, .{ .src = "req3.send()", .ex = "undefined" }, @@ -947,7 +982,7 @@ pub fn testExecFn( var post = [_]Case{ .{ .src = "const req4 = new XMLHttpRequest()", .ex = "undefined" }, - .{ .src = "req4.open('POST', 'http://httpbin.io/post')", .ex = "undefined" }, + .{ .src = "req4.open('POST', 'https://httpbin.io/post')", .ex = "undefined" }, .{ .src = "req4.send('foo')", .ex = "undefined" }, // Each case executed waits for all loop callaback calls. @@ -960,7 +995,7 @@ pub fn testExecFn( var cbk = [_]Case{ .{ .src = "const req5 = new XMLHttpRequest()", .ex = "undefined" }, - .{ .src = "req5.open('GET', 'http://httpbin.io/json')", .ex = "undefined" }, + .{ .src = "req5.open('GET', 'https://httpbin.io/json')", .ex = "undefined" }, .{ .src = "var status = 0; req5.onload = function () { status = this.status };", .ex = "function () { status = this.status }" }, .{ .src = "req5.send()", .ex = "undefined" }, diff --git a/vendor/zig-async-io b/vendor/zig-async-io new file mode 160000 index 00000000..ed7ae07d --- /dev/null +++ b/vendor/zig-async-io @@ -0,0 +1 @@ +Subproject commit ed7ae07d1c39ca073a6eacb741c8b56bc3e57f9f diff --git a/vendor/zig-js-runtime b/vendor/zig-js-runtime index f434b3cf..d1152619 160000 --- a/vendor/zig-js-runtime +++ b/vendor/zig-js-runtime @@ -1 +1 @@ -Subproject commit f434b3cfa1938277a6cd2e225974bb8d33d578c2 +Subproject commit d11526195cd8f417901533145c42355fe39ff24e