Merge pull request #816 from lightpanda-io/connect_proxy

Connect proxy
This commit is contained in:
Pierre Tachoire
2025-06-25 17:31:27 -07:00
committed by GitHub
3 changed files with 291 additions and 80 deletions

View File

@@ -3,7 +3,8 @@ const Allocator = std.mem.Allocator;
const log = @import("log.zig"); const log = @import("log.zig");
const Loop = @import("runtime/loop.zig").Loop; const Loop = @import("runtime/loop.zig").Loop;
const HttpClient = @import("http/client.zig").Client; const http = @import("http/client.zig");
const Telemetry = @import("telemetry/telemetry.zig").Telemetry; const Telemetry = @import("telemetry/telemetry.zig").Telemetry;
const Notification = @import("notification.zig").Notification; const Notification = @import("notification.zig").Notification;
@@ -14,7 +15,7 @@ pub const App = struct {
config: Config, config: Config,
allocator: Allocator, allocator: Allocator,
telemetry: Telemetry, telemetry: Telemetry,
http_client: HttpClient, http_client: http.Client,
app_dir_path: ?[]const u8, app_dir_path: ?[]const u8,
notification: *Notification, notification: *Notification,
@@ -29,6 +30,7 @@ pub const App = struct {
run_mode: RunMode, run_mode: RunMode,
tls_verify_host: bool = true, tls_verify_host: bool = true,
http_proxy: ?std.Uri = null, http_proxy: ?std.Uri = null,
proxy_type: ?http.ProxyType = null,
}; };
pub fn init(allocator: Allocator, config: Config) !*App { pub fn init(allocator: Allocator, config: Config) !*App {
@@ -52,9 +54,10 @@ pub const App = struct {
.telemetry = undefined, .telemetry = undefined,
.app_dir_path = app_dir_path, .app_dir_path = app_dir_path,
.notification = notification, .notification = notification,
.http_client = try HttpClient.init(allocator, .{ .http_client = try http.Client.init(allocator, .{
.max_concurrent = 3, .max_concurrent = 3,
.http_proxy = config.http_proxy, .http_proxy = config.http_proxy,
.proxy_type = config.proxy_type,
.tls_verify_host = config.tls_verify_host, .tls_verify_host = config.tls_verify_host,
}), }),
.config = config, .config = config,

View File

@@ -41,6 +41,11 @@ const BUFFER_LEN = 32 * 1024;
const MAX_HEADER_LINE_LEN = 4096; const MAX_HEADER_LINE_LEN = 4096;
pub const ProxyType = enum {
forward,
connect,
};
// Thread-safe. Holds our root certificate, connection pool and state pool // Thread-safe. Holds our root certificate, connection pool and state pool
// Used to create Requests. // Used to create Requests.
pub const Client = struct { pub const Client = struct {
@@ -48,6 +53,7 @@ pub const Client = struct {
allocator: Allocator, allocator: Allocator,
state_pool: StatePool, state_pool: StatePool,
http_proxy: ?Uri, http_proxy: ?Uri,
proxy_type: ?ProxyType,
root_ca: tls.config.CertBundle, root_ca: tls.config.CertBundle,
tls_verify_host: bool = true, tls_verify_host: bool = true,
connection_manager: ConnectionManager, connection_manager: ConnectionManager,
@@ -56,6 +62,7 @@ pub const Client = struct {
const Opts = struct { const Opts = struct {
max_concurrent: usize = 3, max_concurrent: usize = 3,
http_proxy: ?std.Uri = null, http_proxy: ?std.Uri = null,
proxy_type: ?ProxyType = null,
tls_verify_host: bool = true, tls_verify_host: bool = true,
max_idle_connection: usize = 10, max_idle_connection: usize = 10,
}; };
@@ -76,6 +83,7 @@ pub const Client = struct {
.allocator = allocator, .allocator = allocator,
.state_pool = state_pool, .state_pool = state_pool,
.http_proxy = opts.http_proxy, .http_proxy = opts.http_proxy,
.proxy_type = if (opts.http_proxy == null) null else (opts.proxy_type orelse .connect),
.tls_verify_host = opts.tls_verify_host, .tls_verify_host = opts.tls_verify_host,
.connection_manager = connection_manager, .connection_manager = connection_manager,
.request_pool = std.heap.MemoryPool(Request).init(allocator), .request_pool = std.heap.MemoryPool(Request).init(allocator),
@@ -186,6 +194,16 @@ pub const Client = struct {
pub fn freeSlotCount(self: *Client) usize { pub fn freeSlotCount(self: *Client) usize {
return self.state_pool.freeSlotCount(); return self.state_pool.freeSlotCount();
} }
fn isConnectProxy(self: *const Client) bool {
const proxy_type = self.proxy_type orelse return false;
return proxy_type == .connect;
}
fn isSimpleProxy(self: *const Client) bool {
const proxy_type = self.proxy_type orelse return false;
return proxy_type == .forward;
}
}; };
const RequestOpts = struct { const RequestOpts = struct {
@@ -330,6 +348,7 @@ pub const Request = struct {
_keepalive: bool, _keepalive: bool,
// extracted from request_uri // extracted from request_uri
_request_port: u16,
_request_host: []const u8, _request_host: []const u8,
// extracted from connect_uri // extracted from connect_uri
@@ -420,6 +439,7 @@ pub const Request = struct {
._connect_host = decomposed.connect_host, ._connect_host = decomposed.connect_host,
._connect_port = decomposed.connect_port, ._connect_port = decomposed.connect_port,
._request_host = decomposed.request_host, ._request_host = decomposed.request_host,
._request_port = decomposed.request_port,
._state = state, ._state = state,
._client = client, ._client = client,
._aborter = null, ._aborter = null,
@@ -455,6 +475,7 @@ pub const Request = struct {
connect_port: u16, connect_port: u16,
connect_host: []const u8, connect_host: []const u8,
connect_uri: *const std.Uri, connect_uri: *const std.Uri,
request_port: u16,
request_host: []const u8, request_host: []const u8,
}; };
fn decomposeURL(client: *const Client, uri: *const Uri) !DecomposedURL { fn decomposeURL(client: *const Client, uri: *const Uri) !DecomposedURL {
@@ -470,8 +491,10 @@ pub const Request = struct {
connect_host = proxy.host.?.percent_encoded; connect_host = proxy.host.?.percent_encoded;
} }
const is_connect_proxy = client.isConnectProxy();
var secure: bool = undefined; var secure: bool = undefined;
const scheme = connect_uri.scheme; const scheme = if (is_connect_proxy) uri.scheme else connect_uri.scheme;
if (std.ascii.eqlIgnoreCase(scheme, "https")) { if (std.ascii.eqlIgnoreCase(scheme, "https")) {
secure = true; secure = true;
} else if (std.ascii.eqlIgnoreCase(scheme, "http")) { } else if (std.ascii.eqlIgnoreCase(scheme, "http")) {
@@ -479,13 +502,15 @@ pub const Request = struct {
} else { } else {
return error.UnsupportedUriScheme; return error.UnsupportedUriScheme;
} }
const connect_port: u16 = connect_uri.port orelse if (secure) 443 else 80; const request_port: u16 = uri.port orelse if (secure) 443 else 80;
const connect_port: u16 = connect_uri.port orelse (if (is_connect_proxy) 80 else request_port);
return .{ return .{
.secure = secure, .secure = secure,
.connect_port = connect_port, .connect_port = connect_port,
.connect_host = connect_host, .connect_host = connect_host,
.connect_uri = connect_uri, .connect_uri = connect_uri,
.request_port = request_port,
.request_host = request_host, .request_host = request_host,
}; };
} }
@@ -595,13 +620,18 @@ pub const Request = struct {
}; };
self._connection = connection; self._connection = connection;
const is_connect_proxy = self._client.isConnectProxy();
if (is_connect_proxy) {
try SyncHandler.connect(self);
}
if (self._secure) { if (self._secure) {
self._connection.?.tls = .{ self._connection.?.tls = .{
.blocking = try tls.client(std.net.Stream{ .handle = socket }, .{ .blocking = try tls.client(std.net.Stream{ .handle = socket }, .{
.host = self._connect_host, .host = if (is_connect_proxy) self._request_host else self._connect_host,
.root_ca = self._client.root_ca, .root_ca = self._client.root_ca,
.insecure_skip_verify = self._tls_verify_host == false, .insecure_skip_verify = self._tls_verify_host == false,
// .key_log_callback = tls.config.key_log.callback, .key_log_callback = tls.config.key_log.callback,
}), }),
}; };
} }
@@ -682,7 +712,7 @@ pub const Request = struct {
if (self._secure) { if (self._secure) {
connection.tls = .{ connection.tls = .{
.nonblocking = try tls.nb.Client().init(self._client.allocator, .{ .nonblocking = try tls.nb.Client().init(self._client.allocator, .{
.host = self._connect_host, .host = if (self._client.isConnectProxy()) self._request_host else self._connect_host,
.root_ca = self._client.root_ca, .root_ca = self._client.root_ca,
.insecure_skip_verify = self._tls_verify_host == false, .insecure_skip_verify = self._tls_verify_host == false,
// .key_log_callback = tls.config.key_log.callback, // .key_log_callback = tls.config.key_log.callback,
@@ -831,7 +861,7 @@ pub const Request = struct {
} }
fn buildHeader(self: *Request) ![]const u8 { fn buildHeader(self: *Request) ![]const u8 {
const proxied = self.connect_uri != self.request_uri; const proxied = self._client.isSimpleProxy();
const buf = self._state.header_buf; const buf = self._state.header_buf;
var fbs = std.io.fixedBufferStream(buf); var fbs = std.io.fixedBufferStream(buf);
@@ -851,6 +881,16 @@ pub const Request = struct {
return buf[0..fbs.pos]; return buf[0..fbs.pos];
} }
fn buildConnectHeader(self: *Request) ![]const u8 {
const buf = self._state.header_buf;
var fbs = std.io.fixedBufferStream(buf);
var writer = fbs.writer();
try writer.print("CONNECT {s}:{d} HTTP/1.1\r\n", .{ self._request_host, self._request_port });
try writer.print("Host: {s}:{d}\r\n\r\n", .{ self._request_host, self._request_port });
return buf[0..fbs.pos];
}
fn requestStarting(self: *Request) void { fn requestStarting(self: *Request) void {
const notification = self.notification orelse return; const notification = self.notification orelse return;
if (self._notified_start) { if (self._notified_start) {
@@ -895,6 +935,15 @@ pub const Request = struct {
.headers = response.headers.items, .headers = response.headers.items,
}); });
} }
fn shouldProxyConnect(self: *const Request) bool {
// if the connection comes from a keepalive pool, than we already
// made a CONNECT request
if (self._connection_from_keepalive) {
return false;
}
return self._client.isConnectProxy();
}
}; };
// Handles asynchronous requests // Handles asynchronous requests
@@ -958,6 +1007,7 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
const SendQueue = std.DoublyLinkedList([]const u8); const SendQueue = std.DoublyLinkedList([]const u8);
const SendState = enum { const SendState = enum {
connect,
handshake, handshake,
header, header,
body, body,
@@ -986,7 +1036,19 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
if (self.shutdown) { if (self.shutdown) {
return self.maybeShutdown(); return self.maybeShutdown();
} }
result catch |err| return self.handleError("Connection failed", err); result catch |err| return self.handleError("Connection failed", err);
if (self.request.shouldProxyConnect()) {
self.state = .connect;
const header = self.request.buildConnectHeader() catch |err| {
return self.handleError("Failed to build CONNECT header", err);
};
self.send(header);
self.receive();
return;
}
self.conn.connected() catch |err| { self.conn.connected() catch |err| {
self.handleError("connected handler error", err); self.handleError("connected handler error", err);
}; };
@@ -1056,6 +1118,12 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
return; return;
} }
if (self.state == .connect) {
// We're in a proxy CONNECT flow. There's nothing for us to
// do except for wait for the response.
return;
}
self.conn.sent() catch |err| { self.conn.sent() catch |err| {
self.handleError("send handling", err); self.handleError("send handling", err);
}; };
@@ -1099,7 +1167,27 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
return self.handleError("Connection closed", error.ConnectionResetByPeer); return self.handleError("Connection closed", error.ConnectionResetByPeer);
} }
const status = self.conn.received(self.read_buf[0 .. self.read_pos + n]) catch |err| { const data = self.read_buf[0 .. self.read_pos + n];
if (self.state == .connect) {
const success = self.reader.connectResponse(data) catch |err| {
return self.handleError("Invalid CONNECT response", err);
};
if (!success) {
self.receive();
} else {
// CONNECT was successful, resume our normal flow
self.state = .handshake;
self.reader = self.request.newReader();
self.conn.connected() catch |err| {
self.handleError("connected handler error", err);
};
}
return;
}
const status = self.conn.received(data) catch |err| {
if (err == error.TlsAlertCloseNotify and self.state == .handshake and self.maybeRetryRequest()) { if (err == error.TlsAlertCloseNotify and self.state == .handshake and self.maybeRetryRequest()) {
return; return;
} }
@@ -1438,7 +1526,7 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
const handler = self.handler; const handler = self.handler;
switch (self.protocol) { switch (self.protocol) {
.plain => switch (handler.state) { .plain => switch (handler.state) {
.handshake => unreachable, .handshake, .connect => unreachable,
.header => { .header => {
handler.state = .body; handler.state = .body;
if (handler.request.body) |body| { if (handler.request.body) |body| {
@@ -1455,6 +1543,7 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
return; return;
} }
switch (handler.state) { switch (handler.state) {
.connect => unreachable,
.handshake => return self.sendSecureHeader(tls_client), .handshake => return self.sendSecureHeader(tls_client),
.header => { .header => {
handler.state = .body; handler.state = .body;
@@ -1589,6 +1678,37 @@ const SyncHandler = struct {
} }
} }
// Unfortunately, this is called from the Request doSendSync since we need
// to do this before setting up our TLS connection.
fn connect(request: *Request) !void {
const socket = request._connection.?.socket;
const header = try request.buildConnectHeader();
try Conn.writeAll(socket, header);
var pos: usize = 0;
var reader = request.newReader();
var read_buf = request._state.read_buf;
while (true) {
// we would never 'maybeRetryOrErr' on a CONNECT request, because
// we only send CONNECT requests on newly established connections
// and maybeRetryOrErr is only for connections that might have been
// closed while being kept-alive
const n = try posix.read(socket, read_buf[pos..]);
if (n == 0) {
return error.ConnectionResetByPeer;
}
pos += n;
if (try reader.connectResponse(read_buf[0..pos])) {
// returns true if we have a successful connect response
return;
}
// we don't have enough data yet.
}
}
fn maybeRetryOrErr(self: *SyncHandler, err: anyerror) !Response { fn maybeRetryOrErr(self: *SyncHandler, err: anyerror) !Response {
var request = self.request; var request = self.request;
@@ -1828,6 +1948,26 @@ const Reader = struct {
return .{ .use_get = use_get, .location = location }; return .{ .use_get = use_get, .location = location };
} }
fn connectResponse(self: *Reader, data: []u8) !bool {
const result = try self.process(data);
if (self.header_done == false) {
return false;
}
if (result.done == false) {
// CONNECT responses should not have a body. If the header is
// done, then the entire response should be done.
return error.InvalidConnectResponse;
}
const status = self.response.status;
if (status < 200 or status > 299) {
return error.InvalidConnectResponseStatus;
}
return true;
}
fn process(self: *Reader, data: []u8) ProcessError!Result { fn process(self: *Reader, data: []u8) ProcessError!Result {
if (self.body_reader) |*br| { if (self.body_reader) |*br| {
const ok, const result = try br.process(data); const ok, const result = try br.process(data);
@@ -2790,14 +2930,14 @@ test "HttpClient Reader: fuzz" {
} }
test "HttpClient: invalid url" { test "HttpClient: invalid url" {
var client = try testClient(); var client = try testClient(.{});
defer client.deinit(); defer client.deinit();
const uri = try Uri.parse("http:///"); const uri = try Uri.parse("http:///");
try testing.expectError(error.UriMissingHost, client.request(.GET, &uri)); try testing.expectError(error.UriMissingHost, client.request(.GET, &uri));
} }
test "HttpClient: sync connect error" { test "HttpClient: sync connect error" {
var client = try testClient(); var client = try testClient(.{});
defer client.deinit(); defer client.deinit();
const uri = try Uri.parse("HTTP://127.0.0.1:9920"); const uri = try Uri.parse("HTTP://127.0.0.1:9920");
@@ -2809,7 +2949,7 @@ test "HttpClient: sync connect error" {
test "HttpClient: sync no body" { test "HttpClient: sync no body" {
for (0..2) |i| { for (0..2) |i| {
var client = try testClient(); var client = try testClient(.{});
defer client.deinit(); defer client.deinit();
const uri = try Uri.parse("http://127.0.0.1:9582/http_client/simple"); const uri = try Uri.parse("http://127.0.0.1:9582/http_client/simple");
@@ -2831,7 +2971,7 @@ test "HttpClient: sync no body" {
test "HttpClient: sync tls no body" { test "HttpClient: sync tls no body" {
for (0..1) |_| { for (0..1) |_| {
var client = try testClient(); var client = try testClient(.{});
defer client.deinit(); defer client.deinit();
const uri = try Uri.parse("https://127.0.0.1:9581/http_client/simple"); const uri = try Uri.parse("https://127.0.0.1:9581/http_client/simple");
@@ -2850,7 +2990,33 @@ test "HttpClient: sync tls no body" {
test "HttpClient: sync with body" { test "HttpClient: sync with body" {
for (0..2) |i| { for (0..2) |i| {
var client = try testClient(); var client = try testClient(.{});
defer client.deinit();
const uri = try Uri.parse("http://127.0.0.1:9582/http_client/echo");
var req = try client.request(.GET, &uri);
defer req.deinit();
var res = try req.sendSync(.{});
if (i == 0) {
try testing.expectEqual("over 9000!", try res.peek());
}
try testing.expectEqual("over 9000!", try res.next());
try testing.expectEqual(201, res.header.status);
try testing.expectEqual(5, res.header.count());
try testing.expectEqual("Close", res.header.get("connection"));
try testing.expectEqual("10", res.header.get("content-length"));
try testing.expectEqual("127.0.0.1", res.header.get("_host"));
try testing.expectEqual("Lightpanda/1.0", res.header.get("_user-agent"));
try testing.expectEqual("*/*", res.header.get("_accept"));
}
}
test "HttpClient: sync with body proxy CONNECT" {
for (0..2) |i| {
const proxy_uri = try Uri.parse("http://127.0.0.1:9582/");
var client = try testClient(.{ .proxy_type = .connect, .http_proxy = proxy_uri });
defer client.deinit(); defer client.deinit();
const uri = try Uri.parse("http://127.0.0.1:9582/http_client/echo"); const uri = try Uri.parse("http://127.0.0.1:9582/http_client/echo");
@@ -2875,7 +3041,7 @@ test "HttpClient: sync with body" {
test "HttpClient: sync with gzip body" { test "HttpClient: sync with gzip body" {
for (0..2) |i| { for (0..2) |i| {
var client = try testClient(); var client = try testClient(.{});
defer client.deinit(); defer client.deinit();
const uri = try Uri.parse("http://127.0.0.1:9582/http_client/gzip"); const uri = try Uri.parse("http://127.0.0.1:9582/http_client/gzip");
@@ -2897,7 +3063,7 @@ test "HttpClient: sync tls with body" {
defer arr.deinit(testing.allocator); defer arr.deinit(testing.allocator);
try arr.ensureTotalCapacity(testing.allocator, 20); try arr.ensureTotalCapacity(testing.allocator, 20);
var client = try testClient(); var client = try testClient(.{});
defer client.deinit(); defer client.deinit();
for (0..5) |_| { for (0..5) |_| {
defer arr.clearRetainingCapacity(); defer arr.clearRetainingCapacity();
@@ -2927,7 +3093,7 @@ test "HttpClient: sync redirect from TLS to Plaintext" {
for (0..5) |_| { for (0..5) |_| {
defer arr.clearRetainingCapacity(); defer arr.clearRetainingCapacity();
var client = try testClient(); var client = try testClient(.{});
defer client.deinit(); defer client.deinit();
const uri = try Uri.parse("https://127.0.0.1:9581/http_client/redirect/insecure"); const uri = try Uri.parse("https://127.0.0.1:9581/http_client/redirect/insecure");
@@ -2957,7 +3123,7 @@ test "HttpClient: sync redirect plaintext to TLS" {
for (0..5) |_| { for (0..5) |_| {
defer arr.clearRetainingCapacity(); defer arr.clearRetainingCapacity();
var client = try testClient(); var client = try testClient(.{});
defer client.deinit(); defer client.deinit();
const uri = try Uri.parse("http://127.0.0.1:9582/http_client/redirect/secure"); const uri = try Uri.parse("http://127.0.0.1:9582/http_client/redirect/secure");
@@ -2978,7 +3144,7 @@ test "HttpClient: sync redirect plaintext to TLS" {
} }
test "HttpClient: sync GET redirect" { test "HttpClient: sync GET redirect" {
var client = try testClient(); var client = try testClient(.{});
defer client.deinit(); defer client.deinit();
const uri = try Uri.parse("http://127.0.0.1:9582/http_client/redirect"); const uri = try Uri.parse("http://127.0.0.1:9582/http_client/redirect");
@@ -3024,7 +3190,7 @@ test "HttpClient: async connect error" {
}; };
var reset: Thread.ResetEvent = .{}; var reset: Thread.ResetEvent = .{};
var client = try testClient(); var client = try testClient(.{});
defer client.deinit(); defer client.deinit();
var handler = Handler{ var handler = Handler{
@@ -3056,7 +3222,7 @@ test "HttpClient: async connect error" {
test "HttpClient: async no body" { test "HttpClient: async no body" {
defer testing.reset(); defer testing.reset();
var client = try testClient(); var client = try testClient(.{});
defer client.deinit(); defer client.deinit();
var handler = try CaptureHandler.init(); var handler = try CaptureHandler.init();
@@ -3075,7 +3241,7 @@ test "HttpClient: async no body" {
test "HttpClient: async with body" { test "HttpClient: async with body" {
defer testing.reset(); defer testing.reset();
var client = try testClient(); var client = try testClient(.{});
defer client.deinit(); defer client.deinit();
var handler = try CaptureHandler.init(); var handler = try CaptureHandler.init();
@@ -3100,7 +3266,7 @@ test "HttpClient: async with body" {
test "HttpClient: async with gzip body" { test "HttpClient: async with gzip body" {
defer testing.reset(); defer testing.reset();
var client = try testClient(); var client = try testClient(.{});
defer client.deinit(); defer client.deinit();
var handler = try CaptureHandler.init(); var handler = try CaptureHandler.init();
@@ -3123,7 +3289,7 @@ test "HttpClient: async with gzip body" {
test "HttpClient: async redirect" { test "HttpClient: async redirect" {
defer testing.reset(); defer testing.reset();
var client = try testClient(); var client = try testClient(.{});
defer client.deinit(); defer client.deinit();
var handler = try CaptureHandler.init(); var handler = try CaptureHandler.init();
@@ -3153,7 +3319,7 @@ test "HttpClient: async redirect" {
test "HttpClient: async tls no body" { test "HttpClient: async tls no body" {
defer testing.reset(); defer testing.reset();
var client = try testClient(); var client = try testClient(.{});
defer client.deinit(); defer client.deinit();
for (0..5) |_| { for (0..5) |_| {
var handler = try CaptureHandler.init(); var handler = try CaptureHandler.init();
@@ -3178,7 +3344,7 @@ test "HttpClient: async tls no body" {
test "HttpClient: async tls with body" { test "HttpClient: async tls with body" {
defer testing.reset(); defer testing.reset();
for (0..5) |_| { for (0..5) |_| {
var client = try testClient(); var client = try testClient(.{});
defer client.deinit(); defer client.deinit();
var handler = try CaptureHandler.init(); var handler = try CaptureHandler.init();
@@ -3202,7 +3368,7 @@ test "HttpClient: async tls with body" {
test "HttpClient: async redirect from TLS to Plaintext" { test "HttpClient: async redirect from TLS to Plaintext" {
defer testing.reset(); defer testing.reset();
for (0..1) |_| { for (0..1) |_| {
var client = try testClient(); var client = try testClient(.{});
defer client.deinit(); defer client.deinit();
var handler = try CaptureHandler.init(); var handler = try CaptureHandler.init();
@@ -3228,7 +3394,7 @@ test "HttpClient: async redirect from TLS to Plaintext" {
test "HttpClient: async redirect plaintext to TLS" { test "HttpClient: async redirect plaintext to TLS" {
defer testing.reset(); defer testing.reset();
for (0..5) |_| { for (0..5) |_| {
var client = try testClient(); var client = try testClient(.{});
defer client.deinit(); defer client.deinit();
var handler = try CaptureHandler.init(); var handler = try CaptureHandler.init();
@@ -3441,6 +3607,8 @@ fn testReader(state: *State, res: *TestResponse, data: []const u8) !void {
return error.NeverDone; return error.NeverDone;
} }
fn testClient() !Client { fn testClient(opts: Client.Opts) !Client {
return try Client.init(testing.allocator, .{ .max_concurrent = 1 }); var o = opts;
o.max_concurrent = 1;
return try Client.init(testing.allocator, o);
} }

View File

@@ -23,6 +23,7 @@ const Allocator = std.mem.Allocator;
const log = @import("log.zig"); const log = @import("log.zig");
const server = @import("server.zig"); const server = @import("server.zig");
const App = @import("app.zig").App; const App = @import("app.zig").App;
const http = @import("http/client.zig");
const Platform = @import("runtime/js.zig").Platform; const Platform = @import("runtime/js.zig").Platform;
const Browser = @import("browser/browser.zig").Browser; const Browser = @import("browser/browser.zig").Browser;
@@ -83,6 +84,7 @@ fn run(alloc: Allocator) !void {
var app = try App.init(alloc, .{ var app = try App.init(alloc, .{
.run_mode = args.mode, .run_mode = args.mode,
.http_proxy = args.httpProxy(), .http_proxy = args.httpProxy(),
.proxy_type = args.proxyType(),
.tls_verify_host = args.tlsVerifyHost(), .tls_verify_host = args.tlsVerifyHost(),
}); });
defer app.deinit(); defer app.deinit();
@@ -155,6 +157,13 @@ const Command = struct {
}; };
} }
fn proxyType(self: *const Command) ?http.ProxyType {
return switch (self.mode) {
inline .serve, .fetch => |opts| opts.common.proxy_type,
else => unreachable,
};
}
fn logLevel(self: *const Command) ?log.Level { fn logLevel(self: *const Command) ?log.Level {
return switch (self.mode) { return switch (self.mode) {
inline .serve, .fetch => |opts| opts.common.log_level, inline .serve, .fetch => |opts| opts.common.log_level,
@@ -198,6 +207,7 @@ const Command = struct {
const Common = struct { const Common = struct {
http_proxy: ?std.Uri = null, http_proxy: ?std.Uri = null,
proxy_type: ?http.ProxyType = null,
tls_verify_host: bool = true, tls_verify_host: bool = true,
log_level: ?log.Level = null, log_level: ?log.Level = null,
log_format: ?log.Format = null, log_format: ?log.Format = null,
@@ -216,6 +226,13 @@ const Command = struct {
\\--http_proxy The HTTP proxy to use for all HTTP requests. \\--http_proxy The HTTP proxy to use for all HTTP requests.
\\ Defaults to none. \\ Defaults to none.
\\ \\
\\--proxy_type The type of proxy: connect, forward.
\\ 'connect' creates a tunnel through the proxy via
\\ and initial CONNECT request.
\\ 'forward' sends the full URL in the request target
\\ and expects the proxy to MITM the request.
\\ Defaults to connect when --http_proxy is set.
\\
\\--log_level The log level: debug, info, warn, error or fatal. \\--log_level The log level: debug, info, warn, error or fatal.
\\ Defaults to \\ Defaults to
++ (if (builtin.mode == .Debug) " info." else "warn.") ++ ++ (if (builtin.mode == .Debug) " info." else "warn.") ++
@@ -456,6 +473,22 @@ fn parseCommonArg(
return error.InvalidArgument; return error.InvalidArgument;
}; };
common.http_proxy = try std.Uri.parse(try allocator.dupe(u8, str)); common.http_proxy = try std.Uri.parse(try allocator.dupe(u8, str));
if (common.http_proxy.?.host == null) {
log.fatal(.app, "invalid http proxy", .{ .arg = "--http_proxy", .hint = "missing scheme?" });
return error.InvalidArgument;
}
return true;
}
if (std.mem.eql(u8, "--proxy_type", opt)) {
const str = args.next() orelse {
log.fatal(.app, "missing argument value", .{ .arg = "--proxy_type" });
return error.InvalidArgument;
};
common.proxy_type = std.meta.stringToEnum(http.ProxyType, str) orelse {
log.fatal(.app, "invalid option choice", .{ .arg = "--proxy_type", .value = str });
return error.InvalidArgument;
};
return true; return true;
} }
@@ -573,58 +606,65 @@ fn serveHTTP(address: std.net.Address) !void {
var conn = try listener.accept(); var conn = try listener.accept();
defer conn.stream.close(); defer conn.stream.close();
var http_server = std.http.Server.init(conn, &read_buffer); var http_server = std.http.Server.init(conn, &read_buffer);
REQUEST: while (true) {
var request = http_server.receiveHead() catch |err| switch (err) { var request = http_server.receiveHead() catch |err| switch (err) {
error.HttpConnectionClosing => continue :ACCEPT, error.HttpConnectionClosing => continue :ACCEPT,
else => { else => {
std.debug.print("Test HTTP Server error: {}\n", .{err}); std.debug.print("Test HTTP Server error: {}\n", .{err});
return err; return err;
},
};
const path = request.head.target;
if (std.mem.eql(u8, path, "/loader")) {
try request.respond("Hello!", .{
.extra_headers = &.{.{ .name = "Connection", .value = "close" }},
});
} else if (std.mem.eql(u8, path, "/http_client/simple")) {
try request.respond("", .{
.extra_headers = &.{.{ .name = "Connection", .value = "close" }},
});
} else if (std.mem.eql(u8, path, "/http_client/redirect")) {
try request.respond("", .{
.status = .moved_permanently,
.extra_headers = &.{
.{ .name = "Connection", .value = "close" },
.{ .name = "LOCATION", .value = "../http_client/echo" },
}, },
}); };
} else if (std.mem.eql(u8, path, "/http_client/redirect/secure")) {
try request.respond("", .{
.status = .moved_permanently,
.extra_headers = &.{ .{ .name = "Connection", .value = "close" }, .{ .name = "LOCATION", .value = "https://127.0.0.1:9581/http_client/body" } },
});
} else if (std.mem.eql(u8, path, "/http_client/gzip")) {
const body = &.{ 0x1f, 0x8b, 0x08, 0x08, 0x01, 0xc6, 0x19, 0x68, 0x00, 0x03, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x68, 0x74, 0x6d, 0x6c, 0x00, 0x73, 0x54, 0xc8, 0x4b, 0x2d, 0x57, 0x48, 0x2a, 0xca, 0x2f, 0x2f, 0x4e, 0x2d, 0x52, 0x48, 0x2a, 0xcd, 0xcc, 0x29, 0x51, 0x48, 0xcb, 0x2f, 0x52, 0xc8, 0x4d, 0x4c, 0xce, 0xc8, 0xcc, 0x4b, 0x2d, 0xe6, 0x02, 0x00, 0xe7, 0xc3, 0x4b, 0x27, 0x21, 0x00, 0x00, 0x00 };
try request.respond(body, .{
.extra_headers = &.{ .{ .name = "Connection", .value = "close" }, .{ .name = "Content-Encoding", .value = "gzip" } },
});
} else if (std.mem.eql(u8, path, "/http_client/echo")) {
var headers: std.ArrayListUnmanaged(std.http.Header) = .{};
var it = request.iterateHeaders(); if (request.head.method == .CONNECT) {
while (it.next()) |hdr| { try request.respond("", .{ .status = .ok });
try headers.append(aa, .{ continue :REQUEST;
.name = try std.fmt.allocPrint(aa, "_{s}", .{hdr.name}), }
.value = hdr.value,
const path = request.head.target;
if (std.mem.eql(u8, path, "/loader")) {
try request.respond("Hello!", .{
.extra_headers = &.{.{ .name = "Connection", .value = "close" }},
});
} else if (std.mem.eql(u8, path, "/http_client/simple")) {
try request.respond("", .{
.extra_headers = &.{.{ .name = "Connection", .value = "close" }},
});
} else if (std.mem.eql(u8, path, "/http_client/redirect")) {
try request.respond("", .{
.status = .moved_permanently,
.extra_headers = &.{
.{ .name = "Connection", .value = "close" },
.{ .name = "LOCATION", .value = "../http_client/echo" },
},
});
} else if (std.mem.eql(u8, path, "/http_client/redirect/secure")) {
try request.respond("", .{
.status = .moved_permanently,
.extra_headers = &.{ .{ .name = "Connection", .value = "close" }, .{ .name = "LOCATION", .value = "https://127.0.0.1:9581/http_client/body" } },
});
} else if (std.mem.eql(u8, path, "/http_client/gzip")) {
const body = &.{ 0x1f, 0x8b, 0x08, 0x08, 0x01, 0xc6, 0x19, 0x68, 0x00, 0x03, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x68, 0x74, 0x6d, 0x6c, 0x00, 0x73, 0x54, 0xc8, 0x4b, 0x2d, 0x57, 0x48, 0x2a, 0xca, 0x2f, 0x2f, 0x4e, 0x2d, 0x52, 0x48, 0x2a, 0xcd, 0xcc, 0x29, 0x51, 0x48, 0xcb, 0x2f, 0x52, 0xc8, 0x4d, 0x4c, 0xce, 0xc8, 0xcc, 0x4b, 0x2d, 0xe6, 0x02, 0x00, 0xe7, 0xc3, 0x4b, 0x27, 0x21, 0x00, 0x00, 0x00 };
try request.respond(body, .{
.extra_headers = &.{ .{ .name = "Connection", .value = "close" }, .{ .name = "Content-Encoding", .value = "gzip" } },
});
} else if (std.mem.eql(u8, path, "/http_client/echo")) {
var headers: std.ArrayListUnmanaged(std.http.Header) = .{};
var it = request.iterateHeaders();
while (it.next()) |hdr| {
try headers.append(aa, .{
.name = try std.fmt.allocPrint(aa, "_{s}", .{hdr.name}),
.value = hdr.value,
});
}
try headers.append(aa, .{ .name = "Connection", .value = "Close" });
try request.respond("over 9000!", .{
.status = .created,
.extra_headers = headers.items,
}); });
} }
try headers.append(aa, .{ .name = "Connection", .value = "Close" }); continue :ACCEPT;
try request.respond("over 9000!", .{
.status = .created,
.extra_headers = headers.items,
});
} }
} }
} }