Files
browser/src/http/client.zig
Karl Seguin f59e3cd4da Maybe retry on TlsAlertCloseNotify error
This might not be specific to network notification, but the issue happens all
the time testing scenarios that rely on network notification, so it's hard
to ignore.
2025-05-24 09:01:13 +08:00

3089 lines
112 KiB
Zig

// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const builtin = @import("builtin");
const os = std.os;
const posix = std.posix;
const Uri = std.Uri;
const Thread = std.Thread;
const Allocator = std.mem.Allocator;
const MemoryPool = std.heap.MemoryPool;
const ArenaAllocator = std.heap.ArenaAllocator;
const tls = @import("tls");
const IO = @import("../runtime/loop.zig").IO;
const Loop = @import("../runtime/loop.zig").Loop;
const Notification = @import("../notification.zig").Notification;
const log = std.log.scoped(.http_client);
// We might need to peek at the body to try and sniff the content-type.
// While we only need a few bytes, in most cases we need to ignore leading
// whitespace, so we want to get a reasonable-sized chunk.
const PEEK_BUF_LEN = 1024;
const BUFFER_LEN = 32 * 1024;
const MAX_HEADER_LINE_LEN = 4096;
// Thread-safe. Holds our root certificate, connection pool and state pool
// Used to create Requests.
pub const Client = struct {
req_id: usize,
allocator: Allocator,
state_pool: StatePool,
http_proxy: ?Uri,
root_ca: tls.config.CertBundle,
tls_verify_host: bool = true,
connection_manager: ConnectionManager,
const Opts = struct {
tls_verify_host: bool = true,
http_proxy: ?std.Uri = null,
max_idle_connection: usize = 10,
};
pub fn init(allocator: Allocator, max_concurrent: usize, opts: Opts) !Client {
var root_ca: tls.config.CertBundle = if (builtin.is_test) .{} else try tls.config.CertBundle.fromSystem(allocator);
errdefer root_ca.deinit(allocator);
const state_pool = try StatePool.init(allocator, max_concurrent);
errdefer state_pool.deinit(allocator);
const connection_manager = ConnectionManager.init(allocator, opts.max_idle_connection);
errdefer connection_manager.deinit();
return .{
.req_id = 0,
.root_ca = root_ca,
.allocator = allocator,
.state_pool = state_pool,
.http_proxy = opts.http_proxy,
.tls_verify_host = opts.tls_verify_host,
.connection_manager = connection_manager,
};
}
pub fn deinit(self: *Client) void {
const allocator = self.allocator;
if (builtin.is_test == false) {
self.root_ca.deinit(allocator);
}
self.state_pool.deinit(allocator);
self.connection_manager.deinit();
}
pub fn request(self: *Client, method: Request.Method, uri: *const Uri) !Request {
const state = self.state_pool.acquire();
errdefer {
state.reset();
self.state_pool.release(state);
}
return Request.init(self, state, method, uri);
}
pub fn requestFactory(self: *Client, notification: ?*Notification) RequestFactory {
return .{
.client = self,
.notification = notification,
};
}
};
// A factory for creating requests with a given set of options.
pub const RequestFactory = struct {
client: *Client,
notification: ?*Notification,
pub fn create(self: RequestFactory, method: Request.Method, uri: *const Uri) !Request {
var req = try self.client.request(method, uri);
req.notification = self.notification;
return req;
}
};
// We assume most connections are going to end up in the IdleConnnection pool,
// so this always end up in on the heap (as a *Connection) using the client's
// connection_pool MemoryPool.
// You'll notice that we have both this "Connection", and that both the SyncHandler
// and the AsyncHandler have a "Conn". The "Conn" are a specialized version
// of this "Connection". The SyncHandler.Conn provides a synchronous API over
// the socket/tls. The AsyncHandler.Conn provides an asynchronous API over these.
//
// The Request and IdleConnections are the only ones that deal directly with this
// "Connection" - and the variable name is "connection".
//
// The Sync/Async handlers deal only with their respective "Conn" - and the
// variable name is "conn".
const Connection = struct {
port: u16,
blocking: bool,
tls: ?TLSClient,
host: []const u8,
socket: posix.socket_t,
const TLSClient = union(enum) {
blocking: tls.Connection(std.net.Stream),
nonblocking: tls.nb.Client(),
fn close(self: *TLSClient) void {
switch (self.*) {
.blocking => |*tls_client| tls_client.close() catch {},
.nonblocking => |*tls_client| tls_client.deinit(),
}
}
};
fn deinit(self: *Connection, allocator: Allocator) void {
allocator.free(self.host);
if (self.tls) |*tls_client| {
tls_client.close();
}
posix.close(self.socket);
}
};
// Represents a request. Can be used to make either a synchronous or an
// asynchronous request. When a synchronous request is made, `request.deinit()`
// should be called once the response is no longer needed.
// When an asychronous request is made, the request is automatically cleaned up
// (but request.deinit() should still be called to discard the request
// before the `sendAsync` is called).
pub const Request = struct {
id: usize,
// The HTTP Method to use
method: Method,
// The URI we requested
request_uri: *const Uri,
// The URI that we're connecting to. Can be different than request_uri when
// proxying is enabled
connect_uri: *const Uri,
// If we're redirecting, this is where we're redirecting to. The only reason
// we really have this is so that we can set self.request_uri = &self.redirect_url.?
redirect_uri: ?Uri = null,
// Optional body
body: ?[]const u8,
// Arena used for the lifetime of the request. Most large allocations are
// either done through the state (pre-allocated on startup + pooled) or
// by the TLS library.
arena: Allocator,
// List of request headers
headers: std.ArrayListUnmanaged(std.http.Header),
// whether or not we expect this connection to be secure
_secure: bool,
// whether or not we should keep the underlying socket open and and usable
// for other requests
_keepalive: bool,
// extracted from request_uri
_request_host: []const u8,
// extracted from connect_uri
_connect_port: u16,
_connect_host: []const u8,
// whether or not the socket comes from the connection pool. If it does,
// and we get an error sending the header, we might retry on a new connection
// because it's possible the other closed the connection, and that's no
// reason to fail the request.
_connection_from_keepalive: bool,
// Used to limit the # of redirects we'll follow
_redirect_count: u16,
// The actual connection, including the socket and, optionally, a TLS client
_connection: ?*Connection,
// Pooled buffers and arena
_state: *State,
// The parent client. Used to get the root certificates, to interact
// with the connection pool, and to return _state to the state pool when done
_client: *Client,
// Whether the Host header has been set via `request.addHeader()`. If not
// we'll set it based on `uri` before issuing the request.
_has_host_header: bool,
// Whether or not we should verify that the host matches the certificate CN
_tls_verify_host: bool,
// We only want to emit a start / complete notifications once per request.
// Because of things like redirects and error handling, it is possible for
// the notification functions to be called multiple times, so we guard them
// with these booleans
_notified_start: bool,
_notified_complete: bool,
// The notifier that we emit request notifications to, if any.
notification: ?*Notification,
pub const Method = enum {
GET,
PUT,
HEAD,
POST,
DELETE,
OPTIONS,
pub fn format(self: Method, comptime _: []const u8, _: std.fmt.FormatOptions, writer: anytype) !void {
return writer.writeAll(@tagName(self));
}
fn safeToRetry(self: Method) bool {
return self == .GET or self == .HEAD or self == .OPTIONS;
}
};
fn init(client: *Client, state: *State, method: Method, uri: *const Uri) !Request {
const decomposed = try decomposeURL(client, uri);
const id = client.req_id + 1;
client.req_id = id;
return .{
.id = id,
.request_uri = uri,
.connect_uri = decomposed.connect_uri,
.body = null,
.headers = .{},
.method = method,
.notification = null,
.arena = state.arena.allocator(),
._secure = decomposed.secure,
._connect_host = decomposed.connect_host,
._connect_port = decomposed.connect_port,
._request_host = decomposed.request_host,
._state = state,
._client = client,
._connection = null,
._keepalive = false,
._redirect_count = 0,
._has_host_header = false,
._notified_start = false,
._notified_complete = false,
._connection_from_keepalive = false,
._tls_verify_host = client.tls_verify_host,
};
}
pub fn deinit(self: *Request) void {
self.releaseConnection();
_ = self._state.reset();
self._client.state_pool.release(self._state);
}
const DecomposedURL = struct {
secure: bool,
connect_port: u16,
connect_host: []const u8,
connect_uri: *const std.Uri,
request_host: []const u8,
};
fn decomposeURL(client: *const Client, uri: *const Uri) !DecomposedURL {
if (uri.host == null) {
return error.UriMissingHost;
}
const request_host = uri.host.?.percent_encoded;
var connect_uri = uri;
var connect_host = request_host;
if (client.http_proxy) |*proxy| {
connect_uri = proxy;
connect_host = proxy.host.?.percent_encoded;
}
var secure: bool = undefined;
const scheme = connect_uri.scheme;
if (std.ascii.eqlIgnoreCase(scheme, "https")) {
secure = true;
} else if (std.ascii.eqlIgnoreCase(scheme, "http")) {
secure = false;
} else {
return error.UnsupportedUriScheme;
}
const connect_port: u16 = connect_uri.port orelse if (secure) 443 else 80;
return .{
.secure = secure,
.connect_port = connect_port,
.connect_host = connect_host,
.connect_uri = connect_uri,
.request_host = request_host,
};
}
// Called in deinit, but also called when we're redirecting to another page
fn releaseConnection(self: *Request) void {
const connection = self._connection orelse return;
self._connection = null;
if (self._keepalive == false) {
self._client.connection_manager.destroy(connection);
return;
}
self._client.connection_manager.keepIdle(connection) catch |err| {
self.destroyConnection(connection);
log.err("failed to release connection to pool: {}", .{err});
};
}
fn createConnection(self: *Request, socket: posix.socket_t, blocking: bool) !*Connection {
const client = self._client;
const connection, const owned_host = try client.connection_manager.create(self._connect_host);
connection.* = .{
.tls = null,
.socket = socket,
.blocking = blocking,
.host = owned_host,
.port = self._connect_port,
};
return connection;
}
fn destroyConnection(self: *Request, connection: *Connection) void {
self._client.connection_manager.destroy(connection);
}
const AddHeaderOpts = struct {
dupe_name: bool = false,
dupe_value: bool = false,
};
pub fn addHeader(self: *Request, name: []const u8, value: []const u8, opts: AddHeaderOpts) !void {
const arena = self.arena;
var owned_name = name;
var owned_value = value;
if (opts.dupe_name) {
owned_name = try arena.dupe(u8, name);
}
if (opts.dupe_value) {
owned_value = try arena.dupe(u8, value);
}
if (self._has_host_header == false and std.ascii.eqlIgnoreCase(name, "host")) {
self._has_host_header = true;
}
try self.headers.append(arena, .{ .name = owned_name, .value = owned_value });
}
// TODO timeout
const SendSyncOpts = struct {
tls_verify_host: ?bool = null,
};
// Makes an synchronous request
pub fn sendSync(self: *Request, opts: SendSyncOpts) anyerror!Response {
if (opts.tls_verify_host) |override| {
self._tls_verify_host = override;
}
try self.prepareInitialSend();
return self.doSendSync(true);
}
// Called internally, follows a redirect.
fn redirectSync(self: *Request, redirect: Reader.Redirect) anyerror!Response {
try self.prepareToRedirect(redirect);
return self.doSendSync(true);
}
fn doSendSync(self: *Request, use_pool: bool) anyerror!Response {
if (use_pool) {
if (self.findExistingConnection(true)) |connection| {
self._connection = connection;
self._connection_from_keepalive = true;
}
}
if (self._connection == null) {
const socket, const address = try self.createSocket(true);
posix.connect(socket, &address.any, address.getOsSockLen()) catch |err| {
posix.close(socket);
return err;
};
const connection = self.createConnection(socket, true) catch |err| {
posix.close(socket);
return err;
};
self._connection = connection;
if (self._secure) {
self._connection.?.tls = .{
.blocking = try tls.client(std.net.Stream{ .handle = socket }, .{
.host = self._connect_host,
.root_ca = self._client.root_ca,
.insecure_skip_verify = self._tls_verify_host == false,
// .key_log_callback = tls.config.key_log.callback,
}),
};
}
self._connection_from_keepalive = false;
}
var handler = SyncHandler{ .request = self };
return handler.send() catch |err| {
log.warn("HTTP error: {any} ({any} {any} {d})", .{ err, self.method, self.request_uri, self._redirect_count });
return err;
};
}
const SendAsyncOpts = struct {
tls_verify_host: ?bool = null,
};
// Makes an asynchronous request
pub fn sendAsync(self: *Request, loop: anytype, handler: anytype, opts: SendAsyncOpts) !void {
if (opts.tls_verify_host) |override| {
self._tls_verify_host = override;
}
try self.prepareInitialSend();
return self.doSendAsync(loop, handler, true);
}
pub fn redirectAsync(self: *Request, redirect: Reader.Redirect, loop: anytype, handler: anytype) !void {
try self.prepareToRedirect(redirect);
return self.doSendAsync(loop, handler, true);
}
fn doSendAsync(self: *Request, loop: anytype, handler: anytype, use_pool: bool) !void {
if (use_pool) {
if (self.findExistingConnection(false)) |connection| {
self._connection = connection;
self._connection_from_keepalive = true;
}
}
var address: std.net.Address = undefined;
if (self._connection == null) {
const socket, address = try self.createSocket(false);
errdefer posix.close(socket);
// It seems wrong to set self._connection here. While we have a
// connection, it isn't yet connected. PLUS, if this is a secure
// connection, we also don't have a handshake.
// But, request._connection only ever gets released to the idle pool
// when request._keepalive == true. And this can only be true _after_
// we've processed the request - at which point, we'd obviously be
// connected + handshake.
self._connection = try self.createConnection(socket, false);
self._connection_from_keepalive = false;
}
const connection = self._connection.?;
errdefer self.destroyConnection(connection);
const AsyncHandlerT = AsyncHandler(@TypeOf(handler), @TypeOf(loop));
const async_handler = try self.arena.create(AsyncHandlerT);
const state = self._state;
async_handler.* = .{
.loop = loop,
.request = self,
.handler = handler,
.read_buf = state.read_buf,
.write_buf = state.write_buf,
.reader = self.newReader(),
.socket = connection.socket,
.conn = .{ .handler = async_handler, .protocol = .{ .plain = {} } },
};
if (self._secure) {
connection.tls = .{
.nonblocking = try tls.nb.Client().init(self._client.allocator, .{
.host = self._connect_host,
.root_ca = self._client.root_ca,
.insecure_skip_verify = self._tls_verify_host == false,
.key_log_callback = tls.config.key_log.callback,
}),
};
async_handler.conn.protocol = .{
.secure = &connection.tls.?.nonblocking,
};
}
if (self._connection_from_keepalive) {
// we're already connected
return async_handler.conn.connected();
}
return loop.connect(
AsyncHandlerT,
async_handler,
&async_handler.read_completion,
AsyncHandlerT.connected,
connection.socket,
address,
);
}
fn newReader(self: *Request) Reader {
return Reader.init(self._state, &self._keepalive);
}
// Does additional setup of the request for the firsts (i.e. non-redirect) call.
fn prepareInitialSend(self: *Request) !void {
const arena = self.arena;
if (self.body) |body| {
const cl = try std.fmt.allocPrint(arena, "{d}", .{body.len});
try self.headers.append(arena, .{ .name = "Content-Length", .value = cl });
}
if (!self._has_host_header) {
try self.headers.append(arena, .{ .name = "Host", .value = self._request_host });
}
try self.headers.append(arena, .{ .name = "User-Agent", .value = "Lightpanda/1.0" });
self.requestStarting();
}
// Sets up the request for redirecting.
fn prepareToRedirect(self: *Request, redirect: Reader.Redirect) !void {
self.releaseConnection();
// CANNOT reset the arena (╥﹏╥)
// We need it for self.request_uri (which we're about to use to resolve
// redirect.location, and it might own some/all headers)
const redirect_count = self._redirect_count;
if (redirect_count == 10) {
return error.TooManyRedirects;
}
var buf = try self.arena.alloc(u8, 2048);
const previous_request_host = self._request_host;
self.redirect_uri = try self.request_uri.resolve_inplace(redirect.location, &buf);
self.request_uri = &self.redirect_uri.?;
const decomposed = try decomposeURL(self._client, self.request_uri);
self.connect_uri = decomposed.connect_uri;
self._request_host = decomposed.request_host;
self._connect_host = decomposed.connect_host;
self._connect_port = decomposed.connect_port;
self._secure = decomposed.secure;
self._keepalive = false;
self._redirect_count = redirect_count + 1;
if (redirect.use_get) {
// Some redirect status codes _require_ that we switch the method
// to a GET.
self.method = .GET;
}
log.info("redirecting to: {any} {any}", .{ self.method, self.request_uri });
if (self.body != null and self.method == .GET) {
// If we have a body and the method is a GET, then we must be following
// a redirect which switched the method. Remove the body.
// Reset the Content-Length
self.body = null;
for (self.headers.items) |*hdr| {
if (std.mem.eql(u8, hdr.name, "Content-Length")) {
hdr.value = "0";
break;
}
}
}
if (std.mem.eql(u8, previous_request_host, self._request_host) == false) {
for (self.headers.items) |*hdr| {
if (std.mem.eql(u8, hdr.name, "Host")) {
hdr.value = self._request_host;
break;
}
}
}
}
fn findExistingConnection(self: *Request, blocking: bool) ?*Connection {
// This is being overly cautious, but it's a bit risky to re-use
// connections for other methods. It isn't so much re-using the
// connection that's the issue, it's dealing with a write error
// when trying to send the request and deciding whether or not we
// should retry the request.
if (self.method.safeToRetry() == false) {
return null;
}
if (self.body != null) {
return null;
}
return self._client.connection_manager.get(self._secure, self._connect_host, self._connect_port, blocking);
}
fn createSocket(self: *Request, blocking: bool) !struct { posix.socket_t, std.net.Address } {
const addresses = try std.net.getAddressList(self.arena, self._connect_host, self._connect_port);
if (addresses.addrs.len == 0) {
return error.UnknownHostName;
}
// TODO: rotate?
const address = addresses.addrs[0];
const sock_flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | if (blocking) @as(u32, 0) else posix.SOCK.NONBLOCK;
const socket = try posix.socket(address.any.family, sock_flags, posix.IPPROTO.TCP);
errdefer posix.close(socket);
if (@hasDecl(posix.TCP, "NODELAY")) {
try posix.setsockopt(socket, posix.IPPROTO.TCP, posix.TCP.NODELAY, &std.mem.toBytes(@as(c_int, 1)));
}
return .{ socket, address };
}
fn buildHeader(self: *Request) ![]const u8 {
const proxied = self.connect_uri != self.request_uri;
const buf = self._state.header_buf;
var fbs = std.io.fixedBufferStream(buf);
var writer = fbs.writer();
try writer.writeAll(@tagName(self.method));
try writer.writeByte(' ');
try self.request_uri.writeToStream(.{ .scheme = proxied, .authority = proxied, .path = true, .query = true }, writer);
try writer.writeAll(" HTTP/1.1\r\n");
for (self.headers.items) |header| {
try writer.writeAll(header.name);
try writer.writeAll(": ");
try writer.writeAll(header.value);
try writer.writeAll("\r\n");
}
try writer.writeAll("\r\n");
return buf[0..fbs.pos];
}
fn requestStarting(self: *Request) void {
const notification = self.notification orelse return;
if (self._notified_start) {
return;
}
self._notified_start = true;
notification.dispatch(.http_request_start, &.{
.id = self.id,
.url = self.request_uri,
.method = self.method,
.headers = self.headers.items,
.has_body = self.body != null,
});
}
fn requestCompleted(self: *Request, response: ResponseHeader) void {
const notification = self.notification orelse return;
if (self._notified_complete) {
return;
}
self._notified_complete = true;
notification.dispatch(.http_request_complete, &.{
.id = self.id,
.url = self.request_uri,
.status = response.status,
.headers = response.headers.items,
});
}
};
// Handles asynchronous requests
fn AsyncHandler(comptime H: type, comptime L: type) type {
return struct {
loop: L,
handler: H,
request: *Request,
read_buf: []u8,
// When we're using TLS, we'll probably need to keep read_buf intact
// until we get a ful TLS record. `read_pos` is the position into `read_buf`
// that we have valid, but unprocessed, data up to.
read_pos: usize = 0,
// Depending on which version of TLS, there are different places during
// the handshake that we want to start receiving from. We can't have
// overlapping receives (works fine on MacOS (kqueue) but not Linux (
// io_uring)). Using this boolean as a guard, to make sure we only have
// 1 in-flight receive is easier than trying to understand TLS.
is_receiving: bool = false,
// need a separate read and write buf because, with TLS, messages are
// not strictly req->resp.
write_buf: []u8,
socket: posix.socket_t,
read_completion: IO.Completion = undefined,
send_completion: IO.Completion = undefined,
// used for parsing the response
reader: Reader,
// Can only ever have 1 inflight write to a socket (multiple could
// get interleaved). You'd think this isn't normally a problem: send
// the header, send the body (or maybe send them together!), but with TLS
// we have no guarantee from the library whether or not it'll want us
// to make multiple writes
send_queue: SendQueue = .{},
// Used to help us know if we're writing the header or the body;
state: SendState = .handshake,
// Abstraction over TLS and plain text socket, this is a version of
// the request._connection (which is a *Connection) that is async-specific.
conn: Conn,
// This will be != null when we're supposed to redirect AND we've
// drained the response body. We need this as a field, because we'll
// detect this inside our TLS onRecv callback (which is executed
// inside the TLS client, and so we can't deinitialize the tls_client)
redirect: ?Reader.Redirect = null,
// There can be cases where we're forced to read the whole body into
// memory in order to process it (*cough* CloudFront incorrectly sending
// gzipped responses *cough*)
full_body: ?std.ArrayListUnmanaged(u8) = null,
const Self = @This();
const SendQueue = std.DoublyLinkedList([]const u8);
const SendState = enum {
handshake,
header,
body,
};
const ProcessStatus = enum {
wait,
done,
need_more,
};
fn deinit(self: *Self) void {
self.request.deinit();
}
fn connected(self: *Self, _: *IO.Completion, result: IO.ConnectError!void) void {
result catch |err| return self.handleError("Connection failed", err);
self.conn.connected() catch |err| {
self.handleError("connected handler error", err);
};
}
fn send(self: *Self, data: []const u8) void {
const node = self.request.arena.create(SendQueue.Node) catch |err| {
self.handleError("out of memory", err);
return;
};
node.data = data;
self.send_queue.append(node);
if (self.send_queue.len > 1) {
// if we already had a message in the queue, then our send loop
// is already setup.
return;
}
self.loop.send(
Self,
self,
&self.send_completion,
sent,
self.socket,
node.data,
) catch |err| {
self.handleError("loop send error", err);
};
}
fn sent(self: *Self, _: *IO.Completion, n_: IO.SendError!usize) void {
const n = n_ catch |err| {
return self.handleError("Write error", err);
};
const node = self.send_queue.first.?;
const data = node.data;
var next: ?*SendQueue.Node = node;
if (n == data.len) {
_ = self.send_queue.popFirst();
next = node.next;
} else {
// didn't send all the data, we prematurely popped this off
// (because, in most cases, it _will_ send all the data)
node.data = data[n..];
}
if (next) |next_| {
// we still have data to send
self.loop.send(
Self,
self,
&self.send_completion,
sent,
self.socket,
next_.data,
) catch |err| {
self.handleError("loop send error", err);
};
return;
}
self.conn.sent() catch |err| {
self.handleError("send handling", err);
};
}
// Normally, you'd think of HTTP as being a straight up request-response
// and that we can send, and then receive. But with TLS, we need to receive
// while handshaking and potentially while sending data. So we're always
// receiving.
fn receive(self: *Self) void {
if (self.is_receiving) {
return;
}
self.is_receiving = true;
self.loop.recv(
Self,
self,
&self.read_completion,
Self.received,
self.socket,
self.read_buf[self.read_pos..],
) catch |err| {
self.handleError("loop recv error", err);
};
}
fn received(self: *Self, _: *IO.Completion, n_: IO.RecvError!usize) void {
self.is_receiving = false;
const n = n_ catch |err| {
return self.handleError("Read error", err);
};
if (n == 0) {
if (self.maybeRetryRequest()) {
return;
}
return self.handleError("Connection closed", error.ConnectionResetByPeer);
}
const status = self.conn.received(self.read_buf[0 .. self.read_pos + n]) catch |err| {
if (err == error.TlsAlertCloseNotify and self.state == .handshake and self.maybeRetryRequest()) {
return;
}
self.handleError("data processing", err);
return;
};
switch (status) {
.wait => {},
.need_more => self.receive(),
.done => {
const redirect = self.redirect orelse {
self.request.requestCompleted(self.reader.response);
self.deinit();
return;
};
self.request.redirectAsync(redirect, self.loop, self.handler) catch |err| {
self.handleError("Setup async redirect", err);
return;
};
// redirectAsync has given up any claim to the request,
// including the socket.
},
}
}
// If our socket came from the connection pool, it's possible that we're
// failing because it's since timed out. If
fn maybeRetryRequest(self: *Self) bool {
const request = self.request;
// We only retry if the connection came from the keepalive pool
// We only use a keepalive connection for specific methods and if
// there's no body.
if (request._connection_from_keepalive == false) {
return false;
}
// Because of the `self.state == .body` check above, it should be
// impossible to be here and have this be true. This is an important
// check, because we're about to release a connection that we know
// is bad, and we don't want it to go back into the pool.
std.debug.assert(request._keepalive == false);
request.releaseConnection();
request.doSendAsync(self.loop, self.handler, false) catch |conn_err| {
// You probably think it's weird that we fallthrough to the:
// return true;
// The caller will take the `true` and just exit. This is what
// we want in this error case, because the next line handles
// the error. We rather emit an "connection error" at this point
// than whatever error we had using the pooled connection.
self.handleError("connection error", conn_err);
};
return true;
}
fn processData(self: *Self, d: []u8) ProcessStatus {
const reader = &self.reader;
var data = d;
while (true) {
const would_be_first = reader.header_done == false;
const result = reader.process(data) catch |err| {
self.handleError("Invalid server response", err);
return .done;
};
if (reader.header_done == false) {
// need more data
return .need_more;
}
// at this point, If `would_be_first == true`, then
// `would_be_first` should be thought of as `is_first` because
// we now have a complete header for the first time.
if (reader.redirect()) |redirect| {
// We don't redirect until we've drained the body (to be
// able to re-use the connection for keepalive).
// Calling `reader.redirect()` over and over again might not
// be the most efficient (it's a very simple function though),
// but for a redirect response, chances are we slurped up
// the header and body in a single go.
if (result.done == false) {
return .need_more;
}
self.redirect = redirect;
return .done;
}
if (would_be_first) {
if (reader.response.get("content-encoding")) |ce| {
if (std.ascii.eqlIgnoreCase(ce, "gzip") == false) {
self.handleError("unsupported content encoding", error.UnsupportedContentEncoding);
return .done;
}
// Our requests _do not_ include an Accept-Encoding header
// but some servers (e.g. CloudFront) can send gzipped
// responses nonetheless. Zig's compression libraries
// do not work well with our async flow - they expect
// to be able to read(buf) more data as needed, instead
// of having us yield new data as it becomes available.
// If async ever becomes a first class citizen, we could
// expect this problem to go away. But, for now, we're
// going to read the _whole_ body into memory. It makes
// our life a lot easier, but it's still a mess.
self.full_body = .empty;
}
}
const done = result.done;
// see a few lines up, if this isn't null, something decided
// we should buffer the entire body into memory.
if (self.full_body) |*full_body| {
if (result.data) |chunk| {
full_body.appendSlice(self.request.arena, chunk) catch |err| {
self.handleError("response buffering error", err);
return .done;
};
}
// when buffering the body into memory, we only emit it once
// everything is done (because we need to process the body
// as a whole)
if (done) {
// We should probably keep track of _why_ we're buffering
// the body into memory. But, for now, the only possible
// reason is that the response was gzipped. That means
// we need to decompress it.
var fbs = std.io.fixedBufferStream(full_body.items);
var decompressor = std.compress.gzip.decompressor(fbs.reader());
var next = decompressor.next() catch |err| {
self.handleError("decompression error", err);
return .done;
};
var first = true;
while (next) |chunk| {
// we need to know if there's another chunk so that
// we know if done should be true or false
next = decompressor.next() catch |err| {
self.handleError("decompression error", err);
return .done;
};
self.handler.onHttpResponse(.{
.data = chunk,
.first = first,
.done = next == null,
.header = reader.response,
}) catch return .done;
first = false;
}
}
} else if (result.data != null or done or would_be_first) {
// If we have data. Or if the request is done. Or if this is the
// first time we have a complete header. Emit the chunk.
self.handler.onHttpResponse(.{
.done = done,
.data = result.data,
.first = would_be_first,
.header = reader.response,
}) catch return .done;
}
if (done == true) {
return .done;
}
// With chunked-encoding, it's possible that we we've only
// partially processed the data. So we need to keep processing
// any unprocessed data. It would be nice if we could just glue
// this all together, but that would require copying bytes around
data = result.unprocessed orelse return .need_more;
}
}
fn handleError(self: *Self, comptime msg: []const u8, err: anyerror) void {
log.err(msg ++ ": {any} ({any} {any})", .{ err, self.request.method, self.request.request_uri });
self.handler.onHttpResponse(err) catch {};
// just to be safe
self.request._keepalive = false;
self.request.deinit();
}
const Conn = struct {
handler: *Self,
protocol: Protocol,
const Protocol = union(enum) {
plain: void,
secure: *tls.nb.Client(),
};
fn connected(self: *Conn) !void {
const handler = self.handler;
switch (self.protocol) {
.plain => {
handler.state = .header;
const header = try handler.request.buildHeader();
handler.send(header);
},
.secure => |tls_client| {
std.debug.assert(handler.state == .handshake);
// initiate the handshake
_, const i = try tls_client.handshake(handler.read_buf[0..0], handler.write_buf);
handler.send(handler.write_buf[0..i]);
handler.receive();
},
}
}
fn received(self: *Conn, data: []u8) !ProcessStatus {
const handler = self.handler;
switch (self.protocol) {
.plain => return handler.processData(data),
.secure => |tls_client| {
var used: usize = 0;
var closed = false;
var cleartext_pos: usize = 0;
var status = ProcessStatus.need_more;
if (tls_client.isConnected()) {
used, cleartext_pos, closed = try tls_client.decrypt(data);
} else {
std.debug.assert(handler.state == .handshake);
// process handshake data
used, const i = try tls_client.handshake(data, handler.write_buf);
if (i > 0) {
handler.send(handler.write_buf[0..i]);
} else if (tls_client.isConnected()) {
// if we're done our handshake, there should be
// no unused data
handler.read_pos = 0;
std.debug.assert(used == data.len);
try self.sendSecureHeader(tls_client);
return .wait;
}
}
if (used == 0) {
// if nothing was used, there should have been
// no cleartext data to process;
std.debug.assert(cleartext_pos == 0);
// if we need more data, then it needs to be
// appended to the end of our existing data to
// build up a complete record
handler.read_pos = data.len;
return if (closed) .done else .need_more;
}
if (cleartext_pos > 0) {
status = handler.processData(data[0..cleartext_pos]);
}
if (closed) {
return .done;
}
if (used == data.len) {
// We used up all the data that we were given. We must
// reset read_pos to 0 because (a) that's more
// efficient and (b) we need all the available space
// to make sure we get a full TLS record next time
handler.read_pos = 0;
return status;
}
// We used some of the data, but have some leftover
// (i.e. there was 1+ full records AND an incomplete
// record). We need to maintain the "leftover" data
// for subsequent reads.
// Remember that our read_buf is the MAX possible TLS
// record size. So as long as we make sure that the start
// of a record is at read_buf[0], we know that we'll
// always have enough space for 1 record.
const unused = data.len - used;
std.mem.copyForwards(u8, handler.read_buf, data[unused..]);
handler.read_pos = unused;
// an incomplete record means there must be more data
return .need_more;
},
}
}
fn sent(self: *Conn) !void {
const handler = self.handler;
switch (self.protocol) {
.plain => switch (handler.state) {
.handshake => unreachable,
.header => {
handler.state = .body;
if (handler.request.body) |body| {
handler.send(body);
}
handler.receive();
},
.body => {},
},
.secure => |tls_client| {
if (tls_client.isConnected() == false) {
std.debug.assert(handler.state == .handshake);
// still handshaking, nothing to do
return;
}
switch (handler.state) {
.handshake => return self.sendSecureHeader(tls_client),
.header => {
handler.state = .body;
const body = handler.request.body orelse {
// We've sent the header, and there's no body
// start receiving the response
handler.receive();
return;
};
const used, const i = try tls_client.encrypt(body, handler.write_buf);
std.debug.assert(body.len == used);
handler.send(handler.write_buf[0..i]);
},
.body => {
// We've sent the body, start receiving the
// response
handler.receive();
},
}
},
}
}
// This can be called from two places because, I think, of differences
// between TLS 1.2 and 1.3. TLS 1.3 requires 1 fewer round trip, and
// as soon as we've written our handshake, we consider the connection
// "connected". TLS 1.2 requires a extra round trip, and thus is
// only connected after we receive response from the server.
fn sendSecureHeader(self: *Conn, tls_client: *tls.nb.Client()) !void {
const handler = self.handler;
handler.state = .header;
const header = try handler.request.buildHeader();
const used, const i = try tls_client.encrypt(header, handler.write_buf);
std.debug.assert(header.len == used);
handler.send(handler.write_buf[0..i]);
}
};
};
}
// Handles synchronous requests
const SyncHandler = struct {
request: *Request,
fn send(self: *SyncHandler) !Response {
var request = self.request;
// Take the request._connection (a *Connection), and turn it into
// something specific to our SyncHandler, a Conn.
var conn: Conn = blk: {
const c = request._connection.?;
if (c.tls) |*tls_client| {
break :blk .{ .tls = &tls_client.blocking };
}
break :blk .{ .plain = c.socket };
};
const header = try request.buildHeader();
try conn.sendRequest(header, request.body);
var reader = request.newReader();
var read_buf = request._state.read_buf;
while (true) {
const n = conn.read(read_buf) catch |err| {
return self.maybeRetryOrErr(err);
};
const result = try reader.process(read_buf[0..n]);
if (reader.header_done == false) {
continue;
}
if (reader.redirect()) |redirect| {
if (result.done == false) {
try self.drain(&reader, &conn, result.unprocessed);
}
return request.redirectSync(redirect);
}
// we have a header, and it isn't a redirect, we return our Response
// object which can be iterated to get the body.
std.debug.assert(result.done or reader.body_reader != null);
std.debug.assert(result.data == null);
// See CompressedReader for an explanation. This isn't great code. Sorry.
if (reader.response.get("content-encoding")) |ce| {
if (std.ascii.eqlIgnoreCase(ce, "gzip") == false) {
log.warn("unsupported content encoding '{s}' for: {}", .{ ce, request.request_uri });
return error.UnsupportedContentEncoding;
}
var compress_reader = CompressedReader{
.over = "",
.inner = &reader,
.done = result.done,
.buffer = read_buf,
.data = result.unprocessed,
.conn = conn,
};
var body: std.ArrayListUnmanaged(u8) = .{};
var decompressor = std.compress.gzip.decompressor(compress_reader.reader());
try decompressor.decompress(body.writer(request.arena));
self.request.requestCompleted(reader.response);
return .{
.header = reader.response,
._done = true,
._request = request,
._peek_buf = body.items,
._peek_len = body.items.len,
._buf = undefined,
._conn = undefined,
._reader = undefined,
};
}
return .{
._conn = conn,
._buf = read_buf,
._request = request,
._reader = reader,
._done = result.done,
._data = result.unprocessed,
._peek_len = 0,
._peek_buf = request._state.peek_buf,
.header = reader.response,
};
}
}
fn maybeRetryOrErr(self: *SyncHandler, err: anyerror) !Response {
var request = self.request;
// we'll only retry if the connection came from the idle pool, because
// these connections might have been closed while idling, so an error
// isn't exactly surprising.
if (request._connection_from_keepalive == false) {
return err;
}
if (err != error.ConnectionResetByPeer) {
return err;
}
// this should be our default, and this function should never have been
// called at a point where this could have been set to true. This is
// important because we're about to release a bad connection, and
// we don't want it to go back into the idle pool.
std.debug.assert(request._keepalive == false);
request.releaseConnection();
// Don't change this false to true. It ensures that we get a new
// connection. This prevents an endless loop because, if this new
// connection also fails, connection_from_keepalive will be false, and our
// above guard clause will abort the retry.
return request.doSendSync(false);
}
fn drain(self: SyncHandler, reader: *Reader, conn: *Conn, unprocessed: ?[]u8) !void {
if (unprocessed) |data| {
const result = try reader.process(data);
if (result.done) {
return;
}
}
var buf = self.request._state.read_buf;
while (true) {
const n = try conn.read(buf);
const result = try reader.process(buf[0..n]);
if (result.done) {
return;
}
}
}
const Conn = union(enum) {
tls: *tls.Connection(std.net.Stream),
plain: posix.socket_t,
fn sendRequest(self: *Conn, header: []const u8, body: ?[]const u8) !void {
switch (self.*) {
.tls => |tls_client| {
try tls_client.writeAll(header);
if (body) |b| {
try tls_client.writeAll(b);
}
},
.plain => |socket| {
if (body) |b| {
var vec = [2]posix.iovec_const{
.{ .len = header.len, .base = header.ptr },
.{ .len = b.len, .base = b.ptr },
};
return writeAllIOVec(socket, &vec);
}
return writeAll(socket, header);
},
}
}
fn read(self: *Conn, buf: []u8) !usize {
const n = switch (self.*) {
.tls => |tls_client| try tls_client.read(buf),
.plain => |socket| try posix.read(socket, buf),
};
if (n == 0) {
return error.ConnectionResetByPeer;
}
return n;
}
fn writeAllIOVec(socket: posix.socket_t, vec: []posix.iovec_const) !void {
var i: usize = 0;
while (true) {
var n = try posix.writev(socket, vec[i..]);
while (n >= vec[i].len) {
n -= vec[i].len;
i += 1;
if (i >= vec.len) {
return;
}
}
vec[i].base += n;
vec[i].len -= n;
}
}
fn writeAll(socket: posix.socket_t, data: []const u8) !void {
var i: usize = 0;
while (i < data.len) {
i += try posix.write(socket, data[i..]);
}
}
};
// We don't ask for encoding, but some providers (CloudFront!!)
// encode anyways. This is an issue for our async-path because Zig's
// decompressors aren't async-friendly - they want to pull data in
// rather than being given data when it's available. Unfortunately
// this is a problem for our own Reader, which is shared by both our
// sync and async handlers, but has an async-ish API. It's hard to
// use our Reader with Zig's decompressors. Given the way our Reader
// is write, this is a problem even for our sync requests. For now, we
// just read the entire body into memory, which makes things manageable.
// Finally, we leverage the existing `peek` logic in the Response to make
// this fully-read content available.
// If you think about it, this CompressedReader is just a fancy "peek" over
// the entire body.
const CompressedReader = struct {
done: bool,
conn: Conn,
buffer: []u8,
inner: *Reader,
// Represents data directly from the socket. It hasn't been processed
// by the body reader. It could, for example, have chunk information in it.
// Needed to be processed by `inner` before it can be returned
data: ?[]u8,
// Represents data that _was_ processed by the body reader, but coudln't
// fit in the destination buffer given to read.
// This adds complexity, but the reality is that we can read more data
// from the socket than space we have in the given `dest`. Think of
// this as doing something like a BufferedReader. We _could_ limit
// our reads to dest.len, but we can overread when initially reading
// the header/response, and at that point, we don't know anything about
// this Compression stuff.
over: []const u8,
const IOReader = std.io.Reader(*CompressedReader, anyerror, read);
pub fn reader(self: *CompressedReader) IOReader {
return .{ .context = self };
}
fn read(self: *CompressedReader, dest: []u8) anyerror!usize {
if (self.over.len > 0) {
// data from a previous `read` which is ready to go as-is. i.e.
// it's already been processed by inner (the body reader).
const l = @min(self.over.len, dest.len);
@memcpy(dest[0..l], self.over[0..l]);
self.over = self.over[l..];
return l;
}
var buffer = self.buffer;
buffer = buffer[0..@min(dest.len, buffer.len)];
while (true) {
if (try self.processData()) |data| {
const l = @min(data.len, dest.len);
@memcpy(dest[0..l], data[0..l]);
// if we processed more data than fits into dest, we store
// it in `over` for the next call to `read`
self.over = data[l..];
return l;
}
if (self.done) {
return 0;
}
const n = try self.conn.read(self.buffer);
self.data = self.buffer[0..n];
}
}
fn processData(self: *CompressedReader) !?[]u8 {
const data = self.data orelse return null;
const result = try self.inner.process(data);
self.done = result.done;
self.data = result.unprocessed; // for the next call
return result.data;
}
};
};
// Used for reading the response (both the header and the body)
const Reader = struct {
// ref request.keepalive
keepalive: *bool,
// always references state.header_buf
header_buf: []u8,
// position in header_buf that we have valid data up until
pos: usize,
// for populating the response headers list
arena: Allocator,
response: ResponseHeader,
body_reader: ?BodyReader,
header_done: bool,
fn init(state: *State, keepalive: *bool) Reader {
return .{
.pos = 0,
.response = .{},
.body_reader = null,
.header_done = false,
.keepalive = keepalive,
.header_buf = state.header_buf,
.arena = state.arena.allocator(),
};
}
// Determines if we need to redirect
fn redirect(self: *const Reader) ?Redirect {
const use_get = switch (self.response.status) {
201, 301, 302, 303 => true,
307, 308 => false,
else => return null,
};
const location = self.response.get("location") orelse return null;
return .{ .use_get = use_get, .location = location };
}
fn process(self: *Reader, data: []u8) ProcessError!Result {
if (self.body_reader) |*br| {
const ok, const result = try br.process(data);
if (ok == false) {
// There's something that our body reader didn't like. It wants
// us to emit whatever data we have, but it isn't safe to keep
// the connection alive.
std.debug.assert(result.done == true);
self.keepalive.* = false;
}
return result;
}
// Still parsing the header
// What data do we have leftover in `data`?
// When header_done == true, then this is part (or all) of the body
// When header_done == false, then this is a header line that we didn't
// have enough data for.
var done = false;
var unprocessed = data;
// Data from a previous call to process that we weren't able to parse
const pos = self.pos;
const header_buf = self.header_buf;
const unparsed = header_buf[0..pos];
if (unparsed.len > 0) {
// This can get complicated, but we'll try to keep it simple, even
// if that means we'll copy a bit more than we have to. At most,
// unparsed can represent 1 header line. To have 1 complete line, we
// need to find a \n in data.
const line_end = (std.mem.indexOfScalarPos(u8, data, 0, '\n') orelse {
// data doesn't represent a complete header line. We need more data
const end = pos + data.len;
if (end > header_buf.len) {
return error.HeaderTooLarge;
}
self.pos = end;
@memcpy(self.header_buf[pos..end], data);
return .{ .done = false, .data = null, .unprocessed = null };
}) + 1;
const end = pos + line_end;
if (end > header_buf.len) {
return error.HeaderTooLarge;
}
@memcpy(header_buf[pos..end], data[0..line_end]);
done, unprocessed = try self.parseHeader(header_buf[0..end]);
// we gave parseHeader exactly 1 header line, there should be no leftovers
std.debug.assert(unprocessed.len == 0);
// we currently have no unprocessed header data
self.pos = 0;
// We still [probably] have data to process which was not part of
// the previously unparsed header line
unprocessed = data[line_end..];
}
if (done == false) {
// If we're here it means that
// 1 - Had no unparsed data, and skipped the entire block above
// 2 - Had unparsed data, but we managed to "complete" it. AND, the
// unparsed data didn't represent the end of the header
// We're now trying to parse the rest of the `data` which was not
// parsed of the unparsed (unprocessed.len could be 0 here).
done, unprocessed = try self.parseHeader(unprocessed);
if (done == false) {
const p = self.pos; // don't use pos, self.pos might have been altered
const end = p + unprocessed.len;
if (end > header_buf.len) {
return error.HeaderTooLarge;
}
@memcpy(header_buf[p..end], unprocessed);
self.pos = end;
return .{ .done = false, .data = null, .unprocessed = null };
}
}
var result = try self.prepareForBody();
if (unprocessed.len > 0) {
if (result.done == true) {
// We think we're done reading the body, but we still have data
// We'll return what we have as-is, but close the connection
// because we don't know what state it's in.
self.keepalive.* = false;
} else {
result.unprocessed = unprocessed;
}
}
return result;
}
// We're done parsing the header, and we need to (maybe) setup the BodyReader
fn prepareForBody(self: *Reader) !Result {
self.header_done = true;
const response = &self.response;
if (response.get("connection")) |connection| {
if (std.ascii.eqlIgnoreCase(connection, "close")) {
self.keepalive.* = false;
}
}
if (response.get("transfer-encoding")) |te| {
if (std.ascii.indexOfIgnoreCase(te, "chunked") != null) {
self.body_reader = .{ .chunked = .{
.size = null,
.missing = 0,
.scrap_len = 0,
.scrap = undefined,
} };
return .{ .done = false, .data = null, .unprocessed = null };
}
}
const content_length = blk: {
const cl = response.get("content-length") orelse break :blk 0;
break :blk std.fmt.parseInt(u32, cl, 10) catch {
return error.InvalidContentLength;
};
};
if (content_length == 0) {
return .{
.done = true,
.data = null,
.unprocessed = null,
};
}
self.body_reader = .{ .content_length = .{ .len = content_length, .read = 0 } };
return .{ .done = false, .data = null, .unprocessed = null };
}
// returns true when done
// returns any remaining unprocessed data
// When done == true, the remaining data must belong to the body
// When done == false, at least part of the remaining data must belong to
// the header.
fn parseHeader(self: *Reader, data: []u8) !struct { bool, []u8 } {
var pos: usize = 0;
const arena = self.arena;
if (self.response.status == 0) {
// still don't have a status line
pos = std.mem.indexOfScalarPos(u8, data, 0, '\n') orelse {
return .{ false, data };
};
if (pos < 14 or data[pos - 1] != '\r') {
return error.InvalidStatusLine;
}
const protocol = data[0..9];
if (std.mem.eql(u8, protocol, "HTTP/1.1 ")) {
self.keepalive.* = true;
} else if (std.mem.eql(u8, protocol, "HTTP/1.0 ") == false) {
return error.InvalidStatusLine;
}
self.response.status = std.fmt.parseInt(u16, data[9..12], 10) catch {
return error.InvalidStatusLine;
};
// skip over the \n
pos += 1;
}
while (pos < data.len) {
if (data[pos] == '\r') {
const next = pos + 1;
if (data.len > next and data[next] == '\n') {
return .{ true, data[next + 1 ..] };
}
}
const value_end = std.mem.indexOfScalarPos(u8, data, pos, '\n') orelse {
return .{ false, data[pos..] };
};
const sep = std.mem.indexOfScalarPos(u8, data[pos..value_end], 0, ':') orelse {
return error.InvalidHeader;
};
const name_end = pos + sep;
const value_start = name_end + 1;
if (value_end == value_start or data[value_end - 1] != '\r') {
return error.InvalidHeader;
}
const name = data[pos..name_end];
const value = data[value_start .. value_end - 1];
// there's a limit to what whitespace is valid here, but let's be flexible
var normalized_name = std.mem.trim(u8, name, &std.ascii.whitespace);
const normalized_value = std.mem.trim(u8, value, &std.ascii.whitespace);
// constCast is safe here, and necessary because the std.mem.trim API is bad / broken;
normalized_name = std.ascii.lowerString(@constCast(normalized_name), normalized_name);
try self.response.headers.append(self.arena, .{
.name = try arena.dupe(u8, normalized_name),
.value = try arena.dupe(u8, normalized_value),
});
// +1 to skip over the trailing \n
pos = value_end + 1;
}
return .{ false, "" };
}
const BodyReader = union(enum) {
chunked: Chunked,
content_length: ContentLength,
fn process(self: *BodyReader, data: []u8) !struct { bool, Result } {
std.debug.assert(data.len > 0);
switch (self.*) {
inline else => |*br| return br.process(data),
}
}
const ContentLength = struct {
len: usize,
read: usize,
fn process(self: *ContentLength, d: []u8) !struct { bool, Result } {
const len = self.len;
var read = self.read;
const missing = len - read;
var data = d;
var valid = true;
if (d.len > missing) {
valid = false;
data = d[0..missing];
}
read += data.len;
self.read = read;
return .{ valid, .{
.done = read == len,
.data = if (data.len == 0) null else data,
.unprocessed = null,
} };
}
};
const Chunked = struct {
// size of the current chunk
size: ?u32,
// the amount of data we're missing in the current chunk, not
// including the tailing end-chunk marker (\r\n)
missing: usize,
// Our chunk reader will emit data as it becomes available, even
// if it isn't a complete chunk. So, ideally, we don't need much state
// But we might also get partial meta-data, like part of the chunk
// length. For example, imagine we get data that looks like:
// over 9000!\r\n32
//
// Now, if we assume that "over 9000!" completes the current chunk
// (which is to say that missing == 12), then the "32" would
// indicate _part_ of the length of the next chunk. But, is the next
// chunk 32, or is it 3293 or ??? So we need to keep the "32" around
// to figure it out.
scrap: [64]u8,
scrap_len: usize,
fn process(self: *Chunked, d: []u8) !struct { bool, Result } {
var data = d;
const scrap = &self.scrap;
const scrap_len = self.scrap_len;
const free_scrap = scrap.len - scrap_len;
if (self.size == null) {
// we don't know the size of the next chunk
const data_header_end = std.mem.indexOfScalarPos(u8, data, 0, '\n') orelse {
// the data that we were given doesn't have a complete header
if (data.len > free_scrap) {
// How big can a chunk reasonably be?
return error.InvalidChunk;
}
const end = scrap_len + data.len;
// we still don't have the end of the chunk header
@memcpy(scrap[scrap_len..end], data);
self.scrap_len = end;
return .{ true, .{ .done = false, .data = null, .unprocessed = null } };
};
var header = data[0..data_header_end];
if (scrap_len > 0) {
const end = scrap_len + data_header_end;
@memcpy(scrap[scrap_len..end], data[0..data_header_end]);
self.scrap_len = 0;
header = scrap[0..end];
}
const next_size = try readChunkSize(header);
self.scrap_len = 0;
self.size = next_size;
self.missing = next_size + 2; // include the footer
data = data[data_header_end + 1 ..];
}
if (data.len == 0) {
return .{ true, .{ .data = null, .done = false, .unprocessed = null } };
}
const size = self.size.?;
const missing = self.missing;
if (data.len >= missing) {
self.size = null;
self.missing = 0;
if (missing == 1) {
if (data[0] != '\n') {
return error.InvalidChunk;
}
if (data.len == 1) {
return .{ true, .{ .data = null, .done = size == 0, .unprocessed = null } };
}
return self.process(data[1..]);
}
if (missing == 2) {
if (data[0] != '\r' or data[1] != '\n') {
return error.InvalidChunk;
}
if (data.len == 2) {
return .{ true, .{ .data = null, .done = size == 0, .unprocessed = null } };
}
return self.process(data[2..]);
}
// we have a complete chunk;
var chunk: ?[]u8 = data;
const last = missing - 2;
if (data[last] != '\r' or data[missing - 1] != '\n') {
return error.InvalidChunk;
}
chunk = if (last == 0) null else data[0..last];
const unprocessed = data[missing..];
return .{ true, .{
.data = chunk,
.done = size == 0,
.unprocessed = if (unprocessed.len == 0) null else unprocessed,
} };
}
const still_missing = missing - data.len;
if (still_missing == 1) {
const last = data.len - 1;
if (data[last] != '\r') {
return error.InvalidChunk;
}
data = data[0..last];
}
self.missing = still_missing;
return .{ true, .{
.data = data,
.done = false,
.unprocessed = null,
} };
}
fn readChunkSize(data: []const u8) !u32 {
std.debug.assert(data.len > 1);
if (data[data.len - 1] != '\r') {
return error.InvalidChunk;
}
// ignore chunk extensions for now
const str_len = std.mem.indexOfScalarPos(u8, data, 0, ';') orelse data.len - 1;
return std.fmt.parseInt(u32, data[0..str_len], 16) catch return error.InvalidChunk;
}
};
};
const Redirect = struct {
use_get: bool,
location: []const u8,
};
const Result = struct {
done: bool,
data: ?[]u8,
// Any unprocessed data we have from the last call to "process".
// We can have unprocessed data when transitioning from parsing the
// header to parsing the body. When using Chunked encoding, we'll also
// have unprocessed data between chunks.
unprocessed: ?[]u8 = null,
};
const ProcessError = error{
HeaderTooLarge,
OutOfMemory,
InvalidHeader,
InvalidStatusLine,
InvalidContentLength,
InvalidChunk,
};
};
pub const ResponseHeader = struct {
status: u16 = 0,
headers: std.ArrayListUnmanaged(Header) = .{},
// Stored header has already been lower-cased
// `name` parameter should be passed in lower-cased
pub fn get(self: *const ResponseHeader, name: []const u8) ?[]u8 {
for (self.headers.items) |h| {
if (std.mem.eql(u8, name, h.name)) {
return h.value;
}
}
return null;
}
pub fn count(self: *const ResponseHeader) usize {
return self.headers.items.len;
}
pub fn iterate(self: *const ResponseHeader, name: []const u8) HeaderIterator {
return .{
.index = 0,
.name = name,
.headers = self.headers,
};
}
};
// We don't want to use std.http.Header, because the value is `[]const u8`.
// We _could_ use it and @constCast, but this gives us more safety.
// The main reason we want to do this is that a caller could lower-case the
// value in-place.
// The value (and key) are both safe to mutate because they're cloned from
// the byte stream by our arena.
pub const Header = struct {
name: []const u8,
value: []u8,
};
const HeaderIterator = struct {
index: usize,
name: []const u8,
headers: std.ArrayListUnmanaged(Header),
pub fn next(self: *HeaderIterator) ?[]u8 {
const name = self.name;
const index = self.index;
for (self.headers.items[index..], index..) |h, i| {
if (std.mem.eql(u8, name, h.name)) {
self.index = i + 1;
return h.value;
}
}
self.index = self.headers.items.len;
return null;
}
};
// What we emit from the AsyncHandler
pub const Progress = struct {
first: bool,
// whether or not more data is expected
done: bool,
// part of the body
data: ?[]const u8,
header: ResponseHeader,
};
// The value that we return from a synchronous request.
pub const Response = struct {
_reader: Reader,
_request: *Request,
_conn: SyncHandler.Conn,
// the buffer to read the peeked data into
_peek_buf: []u8,
// the length of data we've peeked. The peeked_data is _peek_buf[0.._peek_len].
// It's possible for peek_len > 0 and _done == true, in which case, the
// _peeked data should be emitted once and subsequent calls to `next` should
// return null.
_peek_len: usize,
// What we'll read from the socket into. This is the State's read_buf
_buf: []u8,
// Whether or not we're done reading the response. When true, next will
// return null.
_done: bool,
// Data that we've read. This can be set when the Response is first created
// from extra data received while parsing the body. Or, it can be set
// when `next` is called and we read more data from the socket.
_data: ?[]u8 = null,
header: ResponseHeader,
pub fn next(self: *Response) !?[]u8 {
// it's possible for peek_len > - and done == true. This would happen
// when, while peeking, we reached the end of the data. In that case,
// we return the peeked data once, and on subsequent call, we'll return
// null normally, because done == true;
const pl = self._peek_len;
if (pl > 0) {
self._peek_len = 0;
return self._peek_buf[0..pl];
}
return self._nextIgnorePeek(self._buf);
}
fn _nextIgnorePeek(self: *Response, buf: []u8) !?[]u8 {
while (true) {
if (try self.processData()) |data| {
return data;
}
if (self._done) {
self._request.requestCompleted(self.header);
return null;
}
const n = try self._conn.read(buf);
self._data = buf[0..n];
}
}
fn processData(self: *Response) !?[]u8 {
const data = self._data orelse return null;
const result = try self._reader.process(data);
self._done = result.done;
self._data = result.unprocessed; // for the next call
return result.data;
}
pub fn peek(self: *Response) ![]u8 {
if (self._peek_len > 0) {
// Under normal usage, this is only possible when we're dealing
// with a compressed response (despite not asking for it). We handle
// these responses by essentially peeking the entire body.
return self._peek_buf[0..self._peek_len];
}
if (try self.processData()) |data| {
// We already have some or all of the body. This happens because
// we always read as much as we can, so getting the header and
// part/all of the body is normal.
if (data.len > 100) {
self._peek_buf = data;
self._peek_len = data.len;
return data;
}
@memcpy(self._peek_buf[0..data.len], data);
self._peek_len = data.len;
}
while (true) {
var peek_buf = self._peek_buf;
const peek_len = self._peek_len;
const data = (try self._nextIgnorePeek(peek_buf[peek_len..])) orelse {
return peek_buf[0..peek_len];
};
const peek_end = peek_len + data.len;
@memcpy(peek_buf[peek_len..peek_end], data);
self._peek_len = peek_end;
if (peek_end > 100) {
return peek_buf[peek_len..peek_end];
}
}
}
};
// Pooled and re-used when creating a request
const State = struct {
// We might be asked to peek at the response, i.e. to sniff the mime type.
// This will require storing any peeked data so that, later, if we stream
// the body, we can present a cohesive body.
peek_buf: []u8,
// Used for reading chunks of payload data.
read_buf: []u8,
// Used for writing data. If you're wondering why BOTH a read_buf and a
// write_buf, even though HTTP is req -> resp, it's for TLS, which has
// bidirectional data.
write_buf: []u8,
// Used for keeping any unparsed header line until more data is received
// At most, this represents 1 line in the header.
header_buf: []u8,
// Used to optionally clone request headers, and always used to clone
// response headers.
arena: ArenaAllocator,
fn init(allocator: Allocator, header_size: usize, peek_size: usize, buf_size: usize) !State {
const peek_buf = try allocator.alloc(u8, peek_size);
errdefer allocator.free(peek_buf);
const read_buf = try allocator.alloc(u8, buf_size);
errdefer allocator.free(read_buf);
const write_buf = try allocator.alloc(u8, buf_size);
errdefer allocator.free(write_buf);
const header_buf = try allocator.alloc(u8, header_size);
errdefer allocator.free(header_buf);
return .{
.peek_buf = peek_buf,
.read_buf = read_buf,
.write_buf = write_buf,
.header_buf = header_buf,
.arena = std.heap.ArenaAllocator.init(allocator),
};
}
fn reset(self: *State) void {
_ = self.arena.reset(.{ .retain_with_limit = 1024 * 1024 });
}
fn deinit(self: *State) void {
const allocator = self.arena.child_allocator;
allocator.free(self.peek_buf);
allocator.free(self.read_buf);
allocator.free(self.write_buf);
allocator.free(self.header_buf);
self.arena.deinit();
}
};
const StatePool = struct {
states: []*State,
available: usize,
mutex: Thread.Mutex,
cond: Thread.Condition,
pub fn init(allocator: Allocator, count: usize) !StatePool {
const states = try allocator.alloc(*State, count);
errdefer allocator.free(states);
var started: usize = 0;
errdefer for (0..started) |i| {
states[i].deinit();
allocator.destroy(states[i]);
};
for (0..count) |i| {
const state = try allocator.create(State);
errdefer allocator.destroy(state);
state.* = try State.init(allocator, MAX_HEADER_LINE_LEN, PEEK_BUF_LEN, BUFFER_LEN);
states[i] = state;
started += 1;
}
return .{
.cond = .{},
.mutex = .{},
.states = states,
.available = count,
};
}
pub fn deinit(self: *StatePool, allocator: Allocator) void {
for (self.states) |state| {
state.deinit();
allocator.destroy(state);
}
allocator.free(self.states);
}
pub fn acquire(self: *StatePool) *State {
self.mutex.lock();
while (true) {
const states = self.states;
const available = self.available;
if (available == 0) {
self.cond.wait(&self.mutex);
continue;
}
const index = available - 1;
const state = states[index];
self.available = index;
self.mutex.unlock();
return state;
}
}
pub fn release(self: *StatePool, state: *State) void {
self.mutex.lock();
var states = self.states;
const available = self.available;
states[available] = state;
self.available = available + 1;
self.mutex.unlock();
self.cond.signal();
}
};
// Ideally, a connection could be reused as long as the host:port matches.
// But we're also having to match based on blocking and nonblocking and TLS
// and not TLS. It isn't the most efficient. For non-TLS, we could definitely
// always re-use the connection (just toggle the socket's blocking status), but
// for TLS, we'd need to see if the two different TLS objects (blocking and non
// blocking) can be converted from each other.
const ConnectionManager = struct {
max: usize,
idle: List,
count: usize,
mutex: Thread.Mutex,
allocator: Allocator,
node_pool: std.heap.MemoryPool(Node),
connection_pool: std.heap.MemoryPool(Connection),
const List = std.DoublyLinkedList(*Connection);
const Node = List.Node;
fn init(allocator: Allocator, max: usize) ConnectionManager {
return .{
.max = max,
.count = 0,
.idle = .{},
.mutex = .{},
.allocator = allocator,
.node_pool = std.heap.MemoryPool(Node).init(allocator),
.connection_pool = std.heap.MemoryPool(Connection).init(allocator),
};
}
fn deinit(self: *ConnectionManager) void {
const allocator = self.allocator;
self.mutex.lock();
defer self.mutex.unlock();
var node = self.idle.first;
while (node) |n| {
const next = n.next;
n.data.deinit(allocator);
node = next;
}
self.node_pool.deinit();
self.connection_pool.deinit();
}
fn get(self: *ConnectionManager, secure: bool, host: []const u8, port: u16, blocking: bool) ?*Connection {
self.mutex.lock();
defer self.mutex.unlock();
var node = self.idle.first;
while (node) |n| {
const connection = n.data;
if (std.ascii.eqlIgnoreCase(connection.host, host) and connection.port == port and connection.blocking == blocking and ((connection.tls == null) == !secure)) {
self.count -= 1;
self.idle.remove(n);
self.node_pool.destroy(n);
return connection;
}
node = n.next;
}
return null;
}
fn keepIdle(self: *ConnectionManager, connection: *Connection) !void {
self.mutex.lock();
defer self.mutex.unlock();
var node: *Node = undefined;
if (self.count == self.max) {
const oldest = self.idle.popFirst() orelse {
std.debug.assert(self.max == 0);
self.destroy(connection);
return;
};
self.destroy(oldest.data);
// re-use the node
node = oldest;
} else {
node = try self.node_pool.create();
self.count += 1;
}
node.data = connection;
self.idle.append(node);
}
fn create(self: *ConnectionManager, host: []const u8) !struct { *Connection, []const u8 } {
const connection = try self.connection_pool.create();
errdefer self.connection_pool.destroy(connection);
const owned_host = try self.allocator.dupe(u8, host);
return .{ connection, owned_host };
}
fn destroy(self: *ConnectionManager, connection: *Connection) void {
connection.deinit(self.allocator);
self.connection_pool.destroy(connection);
}
};
const testing = @import("../testing.zig");
test "HttpClient Reader: fuzz" {
var state = try State.init(testing.allocator, 1024, 1024, 100);
defer state.deinit();
var res = TestResponse.init();
defer res.deinit();
// testReader randomly fragments the incoming data, hence the loop.
for (0..1000) |_| {
try testing.expectError(error.InvalidStatusLine, testReader(&state, &res, "hello\r\n\r\n"));
try testing.expectError(error.InvalidStatusLine, testReader(&state, &res, "http/1.1 200 \r\n\r\n"));
try testing.expectError(error.InvalidStatusLine, testReader(&state, &res, "HTTP/0.9 200 \r\n\r\n"));
try testing.expectError(error.InvalidStatusLine, testReader(&state, &res, "HTTP/1.1 \r\n\r\n"));
try testing.expectError(error.InvalidStatusLine, testReader(&state, &res, "HTTP/1.1 20a \r\n\r\n"));
try testing.expectError(error.InvalidStatusLine, testReader(&state, &res, "HTTP/1.1 20A \n"));
try testing.expectError(error.InvalidHeader, testReader(&state, &res, "HTTP/1.1 200 \r\nA\r\nB:1\r\n"));
try testing.expectError(error.InvalidChunk, testReader(&state, &res, "HTTP/1.1 200 \r\nTransfer-Encoding: chunked\r\n\r\n abc\r\n"));
try testing.expectError(error.InvalidChunk, testReader(&state, &res, "HTTP/1.1 200 \r\nTransfer-Encoding: chunked\r\n\r\n 123\n"));
{
res.reset();
try testReader(&state, &res, "HTTP/1.1 200 \r\n\r\n");
try testing.expectEqual(200, res.status);
try testing.expectEqual(0, res.body.items.len);
try testing.expectEqual(0, res.headers.items.len);
}
{
res.reset();
try testReader(&state, &res, "HTTP/1.0 404 \r\nError: Not-Found\r\n\r\n");
try testing.expectEqual(404, res.status);
try testing.expectEqual(0, res.body.items.len);
try res.assertHeaders(&.{ "error", "Not-Found" });
}
{
res.reset();
try testReader(&state, &res, "HTTP/1.1 200 \r\nSet-Cookie: a32;max-age=60\r\nContent-Length: 12\r\n\r\nOver 9000!!!");
try testing.expectEqual(200, res.status);
try testing.expectEqual("Over 9000!!!", res.body.items);
try res.assertHeaders(&.{ "set-cookie", "a32;max-age=60", "content-length", "12" });
}
{
res.reset();
try testReader(&state, &res, "HTTP/1.1 200 \r\nTransFEr-ENcoding: chunked \r\n\r\n0\r\n\r\n");
try testing.expectEqual(200, res.status);
try testing.expectEqual("", res.body.items);
try res.assertHeaders(&.{ "transfer-encoding", "chunked" });
}
{
res.reset();
try testReader(&state, &res, "HTTP/1.1 200 \r\nTransFEr-ENcoding: chunked \r\n\r\n0\r\n\r\n");
try testing.expectEqual(200, res.status);
try testing.expectEqual("", res.body.items);
try res.assertHeaders(&.{ "transfer-encoding", "chunked" });
}
{
res.reset();
try testReader(&state, &res, "HTTP/1.1 200 \r\nTransFEr-ENcoding: chunked \r\n\r\nE\r\nHello World!!!\r\n2eE;opts\r\n" ++ ("abc" ** 250) ++ "\r\n0\r\n\r\n");
try testing.expectEqual(200, res.status);
try testing.expectEqual("Hello World!!!" ++ ("abc" ** 250), res.body.items);
try res.assertHeaders(&.{ "transfer-encoding", "chunked" });
}
}
for (0..10) |_| {
{
// large body
const body = "abcdefghijklmnopqrstuvwxyz012345689ABCDEFGHIJKLMNOPQRSTUVWXYZ" ** 10000;
res.reset();
try testReader(&state, &res, "HTTP/1.1 200 OK\r\n Content-Length : 610000 \r\nOther: 13391AbC93\r\n\r\n" ++ body);
try testing.expectEqual(200, res.status);
try testing.expectEqual(body, res.body.items);
try res.assertHeaders(&.{ "content-length", "610000", "other", "13391AbC93" });
}
{
// header too big
const data = "HTTP/1.1 200 OK\r\n" ++ ("a" ** 1500);
try testing.expectError(error.HeaderTooLarge, testReader(&state, &res, data));
}
}
}
test "HttpClient: invalid url" {
var client = try testClient();
defer client.deinit();
const uri = try Uri.parse("http:///");
try testing.expectError(error.UriMissingHost, client.request(.GET, &uri));
}
test "HttpClient: sync connect error" {
var client = try testClient();
defer client.deinit();
const uri = try Uri.parse("HTTP://127.0.0.1:9920");
var req = try client.request(.GET, &uri);
defer req.deinit();
try testing.expectError(error.ConnectionRefused, req.sendSync(.{}));
}
test "HttpClient: sync no body" {
for (0..2) |i| {
var client = try testClient();
defer client.deinit();
const uri = try Uri.parse("http://127.0.0.1:9582/http_client/simple");
var req = try client.request(.GET, &uri);
defer req.deinit();
var res = try req.sendSync(.{});
if (i == 0) {
try testing.expectEqual("", try res.peek());
}
try testing.expectEqual(null, try res.next());
try testing.expectEqual(200, res.header.status);
try testing.expectEqual(2, res.header.count());
try testing.expectEqual("close", res.header.get("connection"));
try testing.expectEqual("0", res.header.get("content-length"));
}
}
test "HttpClient: sync tls no body" {
for (0..1) |_| {
var client = try testClient();
defer client.deinit();
const uri = try Uri.parse("https://127.0.0.1:9581/http_client/simple");
var req = try client.request(.GET, &uri);
defer req.deinit();
var res = try req.sendSync(.{ .tls_verify_host = false });
try testing.expectEqual(null, try res.next());
try testing.expectEqual(200, res.header.status);
try testing.expectEqual(2, res.header.count());
try testing.expectEqual("0", res.header.get("content-length"));
try testing.expectEqual("Close", res.header.get("connection"));
}
}
test "HttpClient: sync with body" {
for (0..2) |i| {
var client = try testClient();
defer client.deinit();
const uri = try Uri.parse("http://127.0.0.1:9582/http_client/echo");
var req = try client.request(.GET, &uri);
defer req.deinit();
var res = try req.sendSync(.{});
if (i == 0) {
try testing.expectEqual("over 9000!", try res.peek());
}
try testing.expectEqual("over 9000!", try res.next());
try testing.expectEqual(201, res.header.status);
try testing.expectEqual(4, res.header.count());
try testing.expectEqual("Close", res.header.get("connection"));
try testing.expectEqual("10", res.header.get("content-length"));
try testing.expectEqual("127.0.0.1", res.header.get("_host"));
try testing.expectEqual("Lightpanda/1.0", res.header.get("_user-agent"));
}
}
test "HttpClient: sync with gzip body" {
for (0..2) |i| {
var client = try testClient();
defer client.deinit();
const uri = try Uri.parse("http://127.0.0.1:9582/http_client/gzip");
var req = try client.request(.GET, &uri);
defer req.deinit();
var res = try req.sendSync(.{});
if (i == 0) {
try testing.expectEqual("A new browser built for machines\n", try res.peek());
}
try testing.expectEqual("A new browser built for machines\n", try res.next());
try testing.expectEqual("gzip", res.header.get("content-encoding"));
}
}
test "HttpClient: sync tls with body" {
var arr: std.ArrayListUnmanaged(u8) = .{};
defer arr.deinit(testing.allocator);
try arr.ensureTotalCapacity(testing.allocator, 20);
var client = try testClient();
defer client.deinit();
for (0..5) |_| {
defer arr.clearRetainingCapacity();
const uri = try Uri.parse("https://127.0.0.1:9581/http_client/body");
var req = try client.request(.GET, &uri);
defer req.deinit();
var res = try req.sendSync(.{ .tls_verify_host = false });
while (try res.next()) |data| {
arr.appendSliceAssumeCapacity(data);
}
try testing.expectEqual("1234567890abcdefhijk", arr.items);
try testing.expectEqual(201, res.header.status);
try testing.expectEqual(3, res.header.count());
try testing.expectEqual("20", res.header.get("content-length"));
try testing.expectEqual("HEaDer", res.header.get("another"));
try testing.expectEqual("Close", res.header.get("connection"));
}
}
test "HttpClient: sync redirect from TLS to Plaintext" {
var arr: std.ArrayListUnmanaged(u8) = .{};
defer arr.deinit(testing.allocator);
try arr.ensureTotalCapacity(testing.allocator, 20);
for (0..5) |_| {
defer arr.clearRetainingCapacity();
var client = try testClient();
defer client.deinit();
const uri = try Uri.parse("https://127.0.0.1:9581/http_client/redirect/insecure");
var req = try client.request(.GET, &uri);
defer req.deinit();
var res = try req.sendSync(.{ .tls_verify_host = false });
while (try res.next()) |data| {
arr.appendSliceAssumeCapacity(data);
}
try testing.expectEqual(201, res.header.status);
try testing.expectEqual("over 9000!", arr.items);
try testing.expectEqual(4, res.header.count());
try testing.expectEqual("Close", res.header.get("connection"));
try testing.expectEqual("10", res.header.get("content-length"));
try testing.expectEqual("127.0.0.1", res.header.get("_host"));
try testing.expectEqual("Lightpanda/1.0", res.header.get("_user-agent"));
}
}
test "HttpClient: sync redirect plaintext to TLS" {
var arr: std.ArrayListUnmanaged(u8) = .{};
defer arr.deinit(testing.allocator);
try arr.ensureTotalCapacity(testing.allocator, 20);
for (0..5) |_| {
defer arr.clearRetainingCapacity();
var client = try testClient();
defer client.deinit();
const uri = try Uri.parse("http://127.0.0.1:9582/http_client/redirect/secure");
var req = try client.request(.GET, &uri);
defer req.deinit();
var res = try req.sendSync(.{ .tls_verify_host = false });
while (try res.next()) |data| {
arr.appendSliceAssumeCapacity(data);
}
try testing.expectEqual(201, res.header.status);
try testing.expectEqual("1234567890abcdefhijk", arr.items);
try testing.expectEqual(3, res.header.count());
try testing.expectEqual("20", res.header.get("content-length"));
try testing.expectEqual("HEaDer", res.header.get("another"));
try testing.expectEqual("Close", res.header.get("connection"));
}
}
test "HttpClient: sync GET redirect" {
var client = try testClient();
defer client.deinit();
const uri = try Uri.parse("http://127.0.0.1:9582/http_client/redirect");
var req = try client.request(.GET, &uri);
defer req.deinit();
var res = try req.sendSync(.{ .tls_verify_host = false });
try testing.expectEqual("over 9000!", try res.next());
try testing.expectEqual(201, res.header.status);
try testing.expectEqual(4, res.header.count());
try testing.expectEqual("Close", res.header.get("connection"));
try testing.expectEqual("10", res.header.get("content-length"));
try testing.expectEqual("127.0.0.1", res.header.get("_host"));
try testing.expectEqual("Lightpanda/1.0", res.header.get("_user-agent"));
}
test "HttpClient: async connect error" {
var loop = try Loop.init(testing.allocator);
defer loop.deinit();
const Handler = struct {
reset: *Thread.ResetEvent,
fn onHttpResponse(self: *@This(), res: anyerror!Progress) !void {
_ = res catch |err| {
if (err == error.ConnectionRefused) {
self.reset.set();
return;
}
std.debug.print("Expected error.ConnectionRefused, got error: {any}", .{err});
return;
};
std.debug.print("Expected error.ConnectionRefused, got no error", .{});
}
};
var reset: Thread.ResetEvent = .{};
var client = try testClient();
defer client.deinit();
const uri = try Uri.parse("HTTP://127.0.0.1:9920");
var req = try client.request(.GET, &uri);
try req.sendAsync(&loop, Handler{ .reset = &reset }, .{});
try loop.io.run_for_ns(std.time.ns_per_ms);
try reset.timedWait(std.time.ns_per_s);
}
test "HttpClient: async no body" {
var client = try testClient();
defer client.deinit();
var handler = try CaptureHandler.init();
defer handler.deinit();
const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/simple");
var req = try client.request(.GET, &uri);
try req.sendAsync(&handler.loop, &handler, .{});
try handler.waitUntilDone();
const res = handler.response;
try testing.expectEqual("", res.body.items);
try testing.expectEqual(200, res.status);
try res.assertHeaders(&.{ "content-length", "0", "connection", "close" });
}
test "HttpClient: async with body" {
var client = try testClient();
defer client.deinit();
var handler = try CaptureHandler.init();
defer handler.deinit();
const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/echo");
var req = try client.request(.GET, &uri);
try req.sendAsync(&handler.loop, &handler, .{});
try handler.waitUntilDone();
const res = handler.response;
try testing.expectEqual("over 9000!", res.body.items);
try testing.expectEqual(201, res.status);
try res.assertHeaders(&.{
"content-length", "10",
"_host", "127.0.0.1",
"_user-agent", "Lightpanda/1.0",
"connection", "Close",
});
}
test "HttpClient: async with gzip body" {
var client = try testClient();
defer client.deinit();
var handler = try CaptureHandler.init();
defer handler.deinit();
const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/gzip");
var req = try client.request(.GET, &uri);
try req.sendAsync(&handler.loop, &handler, .{});
try handler.waitUntilDone();
const res = handler.response;
try testing.expectEqual("A new browser built for machines\n", res.body.items);
try testing.expectEqual(200, res.status);
try res.assertHeaders(&.{
"content-length", "63",
"connection", "close",
"content-encoding", "gzip",
});
}
test "HttpClient: async redirect" {
var client = try testClient();
defer client.deinit();
var handler = try CaptureHandler.init();
defer handler.deinit();
const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/redirect");
var req = try client.request(.GET, &uri);
try req.sendAsync(&handler.loop, &handler, .{});
// Called twice on purpose. The initial GET resutls in the # of pending
// events to reach 0. This causes our `run_for_ns` to return. But we then
// start to requeue events (from the redirected request), so we need the
// loop to process those also.
try handler.loop.io.run_for_ns(std.time.ns_per_ms);
try handler.waitUntilDone();
const res = handler.response;
try testing.expectEqual("over 9000!", res.body.items);
try testing.expectEqual(201, res.status);
try res.assertHeaders(&.{
"content-length", "10",
"_host", "127.0.0.1",
"_user-agent", "Lightpanda/1.0",
"connection", "Close",
});
}
test "HttpClient: async tls no body" {
var client = try testClient();
defer client.deinit();
for (0..5) |_| {
var handler = try CaptureHandler.init();
defer handler.deinit();
const uri = try Uri.parse("HTTPs://127.0.0.1:9581/http_client/simple");
var req = try client.request(.GET, &uri);
try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false });
try handler.waitUntilDone();
const res = handler.response;
try testing.expectEqual("", res.body.items);
try testing.expectEqual(200, res.status);
try res.assertHeaders(&.{
"content-length",
"0",
"connection",
"Close",
});
}
}
test "HttpClient: async tls with body x" {
for (0..5) |_| {
var client = try testClient();
defer client.deinit();
var handler = try CaptureHandler.init();
defer handler.deinit();
const uri = try Uri.parse("HTTPs://127.0.0.1:9581/http_client/body");
var req = try client.request(.GET, &uri);
try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false });
try handler.waitUntilDone();
const res = handler.response;
try testing.expectEqual("1234567890abcdefhijk", res.body.items);
try testing.expectEqual(201, res.status);
try res.assertHeaders(&.{
"content-length", "20",
"connection", "Close",
"another", "HEaDer",
});
}
}
test "HttpClient: async redirect from TLS to Plaintext" {
for (0..1) |_| {
var client = try testClient();
defer client.deinit();
var handler = try CaptureHandler.init();
defer handler.deinit();
const uri = try Uri.parse("https://127.0.0.1:9581/http_client/redirect/insecure");
var req = try client.request(.GET, &uri);
try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false });
try handler.waitUntilDone();
const res = handler.response;
try testing.expectEqual(201, res.status);
try testing.expectEqual("over 9000!", res.body.items);
try res.assertHeaders(&.{
"content-length", "10",
"_host", "127.0.0.1",
"_user-agent", "Lightpanda/1.0",
"connection", "Close",
});
}
}
test "HttpClient: async redirect plaintext to TLS" {
for (0..5) |_| {
var client = try testClient();
defer client.deinit();
var handler = try CaptureHandler.init();
defer handler.deinit();
const uri = try Uri.parse("http://127.0.0.1:9582/http_client/redirect/secure");
var req = try client.request(.GET, &uri);
try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false });
try handler.waitUntilDone();
const res = handler.response;
try testing.expectEqual(201, res.status);
try testing.expectEqual("1234567890abcdefhijk", res.body.items);
try res.assertHeaders(&.{ "content-length", "20", "connection", "Close", "another", "HEaDer" });
}
}
test "HttpClient: HeaderIterator" {
var header = ResponseHeader{};
defer header.headers.deinit(testing.allocator);
{
var it = header.iterate("nope");
try testing.expectEqual(null, it.next());
try testing.expectEqual(null, it.next());
}
// @constCast is totally unsafe here, but it's just a test, and we know
// nothing is going to write to it, so it works.
try header.headers.append(testing.allocator, .{ .name = "h1", .value = @constCast("value1") });
try header.headers.append(testing.allocator, .{ .name = "h2", .value = @constCast("value2") });
try header.headers.append(testing.allocator, .{ .name = "h3", .value = @constCast("value3") });
try header.headers.append(testing.allocator, .{ .name = "h1", .value = @constCast("value4") });
try header.headers.append(testing.allocator, .{ .name = "h1", .value = @constCast("value5") });
{
var it = header.iterate("nope");
try testing.expectEqual(null, it.next());
try testing.expectEqual(null, it.next());
}
{
var it = header.iterate("h2");
try testing.expectEqual("value2", it.next());
try testing.expectEqual(null, it.next());
try testing.expectEqual(null, it.next());
}
{
var it = header.iterate("h3");
try testing.expectEqual("value3", it.next());
try testing.expectEqual(null, it.next());
try testing.expectEqual(null, it.next());
}
{
var it = header.iterate("h1");
try testing.expectEqual("value1", it.next());
try testing.expectEqual("value4", it.next());
try testing.expectEqual("value5", it.next());
try testing.expectEqual(null, it.next());
try testing.expectEqual(null, it.next());
}
}
const TestResponse = struct {
status: u16,
arena: std.heap.ArenaAllocator,
body: std.ArrayListUnmanaged(u8),
headers: std.ArrayListUnmanaged(Header),
fn init() TestResponse {
return .{
.status = 0,
.body = .{},
.headers = .{},
.arena = ArenaAllocator.init(testing.allocator),
};
}
fn deinit(self: *TestResponse) void {
self.arena.deinit();
}
fn reset(self: *TestResponse) void {
_ = self.arena.reset(.{ .retain_capacity = {} });
self.status = 0;
self.body = .{};
self.headers = .{};
}
fn assertHeaders(self: *const TestResponse, expected: []const []const u8) !void {
const actual = self.headers.items;
errdefer {
std.debug.print("Actual headers:\n", .{});
for (actual) |a| {
std.debug.print("{s}: {s}\n", .{ a.name, a.value });
}
}
try testing.expectEqual(expected.len / 2, actual.len);
var i: usize = 0;
while (i < expected.len) : (i += 2) {
const a = actual[i / 2];
try testing.expectEqual(expected[i], a.name);
try testing.expectEqual(expected[i + 1], a.value);
}
}
};
const CaptureHandler = struct {
loop: Loop,
reset: Thread.ResetEvent,
response: TestResponse,
fn init() !CaptureHandler {
return .{
.reset = .{},
.response = TestResponse.init(),
.loop = try Loop.init(testing.allocator),
};
}
fn deinit(self: *CaptureHandler) void {
self.response.deinit();
self.loop.deinit();
}
fn onHttpResponse(self: *CaptureHandler, progress_: anyerror!Progress) !void {
self.process(progress_) catch |err| {
std.debug.print("capture handler error: {}\n", .{err});
};
}
fn process(self: *CaptureHandler, progress_: anyerror!Progress) !void {
const progress = try progress_;
const allocator = self.response.arena.allocator();
try self.response.body.appendSlice(allocator, progress.data orelse "");
if (progress.done) {
self.response.status = progress.header.status;
try self.response.headers.ensureTotalCapacity(allocator, progress.header.headers.items.len);
for (progress.header.headers.items) |header| {
self.response.headers.appendAssumeCapacity(.{
.name = try allocator.dupe(u8, header.name),
.value = try allocator.dupe(u8, header.value),
});
}
self.reset.set();
}
}
fn waitUntilDone(self: *CaptureHandler) !void {
for (0..20) |_| {
try self.loop.io.run_for_ns(std.time.ns_per_ms * 25);
if (self.reset.isSet()) {
return;
}
}
return error.TimeoutWaitingForRequestToComplete;
}
};
fn testReader(state: *State, res: *TestResponse, data: []const u8) !void {
var status: u16 = 0;
var keepalive = false;
var r = Reader.init(state, &keepalive);
// dupe it so that we have a mutable copy
const owned = try testing.allocator.dupe(u8, data);
defer testing.allocator.free(owned);
var unsent = owned;
while (unsent.len > 0) {
// send part of the response
const to_send = testing.Random.intRange(usize, 1, unsent.len);
var to_process = unsent[0..to_send];
while (true) {
const result = try r.process(to_process);
if (status == 0) {
if (r.response.status > 0) {
status = r.response.status;
}
} else {
// once set, it should not change
try testing.expectEqual(status, r.response.status);
}
if (result.data) |d| {
try res.body.appendSlice(res.arena.allocator(), d);
}
if (result.done) {
res.status = status;
res.headers = r.response.headers;
return;
}
to_process = result.unprocessed orelse break;
}
unsent = unsent[to_send..];
}
return error.NeverDone;
}
fn testClient() !Client {
return try Client.init(testing.allocator, 1, .{});
}