Fixup error handling in HttpClient process messages

This commit is contained in:
Nikolay Govorov
2026-03-25 17:24:56 +00:00
parent c4b837b598
commit 15d60d845a

View File

@@ -724,11 +724,7 @@ fn perform(self: *Client, timeout_ms: c_int) anyerror!PerformStatus {
return status; return status;
} }
fn processMessages(self: *Client) !bool { fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *Transfer) !bool {
var processed = false;
while (self.handles.readMessage()) |msg| {
const transfer = try Transfer.fromConnection(&msg.conn);
// Detect auth challenge from response headers. // Detect auth challenge from response headers.
if (msg.err == null) { if (msg.err == null) {
transfer.detectAuthChallenge(&msg.conn); transfer.detectAuthChallenge(&msg.conn);
@@ -755,7 +751,7 @@ fn processMessages(self: *Client) !bool {
// In the case of an async request, we can just "forget" // In the case of an async request, we can just "forget"
// about this transfer until it gets updated asynchronously // about this transfer until it gets updated asynchronously
// from some CDP command. // from some CDP command.
continue; return false;
} }
// In the case of a sync request, we need to block until we // In the case of a sync request, we need to block until we
@@ -772,7 +768,7 @@ fn processMessages(self: *Client) !bool {
// aborted, already cleaned up // aborted, already cleaned up
} }
continue; return false;
} }
} }
@@ -780,26 +776,28 @@ fn processMessages(self: *Client) !bool {
if (msg.err == null) { if (msg.err == null) {
const status = try msg.conn.getResponseCode(); const status = try msg.conn.getResponseCode();
if (status >= 300 and status <= 399) { if (status >= 300 and status <= 399) {
transfer.handleRedirect() catch |err| { try transfer.handleRedirect();
transfer.requestFailed(err, true);
transfer.deinit();
continue;
};
const conn = transfer._conn.?; const conn = transfer._conn.?;
try self.handles.remove(conn); try self.handles.remove(conn);
transfer._conn = null;
transfer._detached_conn = conn; // signal orphan for processMessages cleanup
transfer.reset(); transfer.reset();
try transfer.configureConn(conn); try transfer.configureConn(conn);
try self.handles.add(conn); try self.handles.add(conn);
transfer._detached_conn = null;
transfer._conn = conn; // reattach after successful re-add
_ = try self.perform(0); _ = try self.perform(0);
continue; return false;
} }
} }
defer transfer.deinit(); // Transfer is done (success or error). Caller (processMessages) owns deinit.
// Return true = done (caller will deinit), false = continues (redirect/auth).
// When the server sends "Connection: close" and closes the TLS // When the server sends "Connection: close" and closes the TLS
// connection without a close_notify alert, BoringSSL reports // connection without a close_notify alert, BoringSSL reports
@@ -817,7 +815,9 @@ fn processMessages(self: *Client) !bool {
if (msg.err != null and !is_conn_close_recv) { if (msg.err != null and !is_conn_close_recv) {
transfer.requestFailed(transfer._callback_error orelse msg.err.?, true); transfer.requestFailed(transfer._callback_error orelse msg.err.?, true);
} else blk: { return true;
}
// make sure the transfer can't be immediately aborted from a callback // make sure the transfer can't be immediately aborted from a callback
// since we still need it here. // since we still need it here.
transfer._performing = true; transfer._performing = true;
@@ -826,25 +826,17 @@ fn processMessages(self: *Client) !bool {
if (!transfer._header_done_called) { if (!transfer._header_done_called) {
// In case of request w/o data, we need to call the header done // In case of request w/o data, we need to call the header done
// callback now. // callback now.
const proceed = transfer.headerDoneCallback(&msg.conn) catch |err| { const proceed = try transfer.headerDoneCallback(&msg.conn);
log.err(.http, "header_done_callback", .{ .err = err, .req = transfer });
transfer.requestFailed(err, true);
continue;
};
if (!proceed) { if (!proceed) {
transfer.requestFailed(error.Abort, true); transfer.requestFailed(error.Abort, true);
break :blk; return true;
} }
} }
// Replay buffered body through user's data_callback. // Replay buffered body through user's data_callback.
if (transfer._stream_buffer.items.len > 0) { if (transfer._stream_buffer.items.len > 0) {
const body = transfer._stream_buffer.items; const body = transfer._stream_buffer.items;
transfer.req.data_callback(transfer, body) catch |err| { try transfer.req.data_callback(transfer, body);
log.err(.http, "data_callback", .{ .err = err, .req = transfer });
transfer.requestFailed(err, true);
break :blk;
};
transfer.req.notification.dispatch(.http_response_data, &.{ transfer.req.notification.dispatch(.http_response_data, &.{
.data = body, .data = body,
@@ -853,7 +845,7 @@ fn processMessages(self: *Client) !bool {
if (transfer.aborted) { if (transfer.aborted) {
transfer.requestFailed(error.Abort, true); transfer.requestFailed(error.Abort, true);
break :blk; return true;
} }
} }
@@ -861,16 +853,34 @@ fn processMessages(self: *Client) !bool {
// will load more resources. // will load more resources.
transfer.releaseConn(); transfer.releaseConn();
transfer.req.done_callback(transfer.ctx) catch |err| { try transfer.req.done_callback(transfer.ctx);
// transfer isn't valid at this point, don't use it.
log.err(.http, "done_callback", .{ .err = err });
transfer.requestFailed(err, true);
continue;
};
transfer.req.notification.dispatch(.http_request_done, &.{ transfer.req.notification.dispatch(.http_request_done, &.{
.transfer = transfer, .transfer = transfer,
}); });
return true;
}
fn processMessages(self: *Client) !bool {
var processed = false;
while (self.handles.readMessage()) |msg| {
const transfer = try Transfer.fromConnection(&msg.conn);
const done = self.processOneMessage(msg, transfer) catch |err| blk: {
log.err(.http, "process_messages", .{ .err = err, .req = transfer });
transfer.requestFailed(err, true);
if (transfer._detached_conn) |c| {
// Conn was removed from handles during redirect reconfiguration
// but not re-added. Release it directly to avoid double-remove.
self.in_use.remove(&c.node);
self.active -= 1;
self.releaseConn(c);
transfer._detached_conn = null;
}
break :blk true;
};
if (done) {
transfer.deinit();
processed = true; processed = true;
} }
} }
@@ -973,6 +983,10 @@ pub const Transfer = struct {
_notified_fail: bool = false, _notified_fail: bool = false,
_conn: ?*http.Connection = null, _conn: ?*http.Connection = null,
// Set when conn is temporarily detached from transfer during redirect
// reconfiguration. Used by processMessages to release the orphaned conn
// if reconfiguration fails.
_detached_conn: ?*http.Connection = null,
_auth_challenge: ?http.AuthChallenge = null, _auth_challenge: ?http.AuthChallenge = null,
@@ -981,11 +995,13 @@ pub const Transfer = struct {
_tries: u8 = 0, _tries: u8 = 0,
_performing: bool = false, _performing: bool = false,
_redirect_count: u8 = 0, _redirect_count: u8 = 0,
_skip_body: bool = false,
_first_data_received: bool = false,
// Buffered response body. Filled by bufferDataCallback, consumed in processMessages. // Buffered response body. Filled by dataCallback, consumed in processMessages.
_stream_buffer: std.ArrayList(u8) = .{}, _stream_buffer: std.ArrayList(u8) = .{},
// Error captured in bufferDataCallback to be reported in processMessages. // Error captured in dataCallback to be reported in processMessages.
_callback_error: ?anyerror = null, _callback_error: ?anyerror = null,
// for when a Transfer is queued in the client.queue // for when a Transfer is queued in the client.queue
@@ -1129,6 +1145,8 @@ pub const Transfer = struct {
self._tries += 1; self._tries += 1;
self._stream_buffer.clearRetainingCapacity(); self._stream_buffer.clearRetainingCapacity();
self._callback_error = null; self._callback_error = null;
self._skip_body = false;
self._first_data_received = false;
} }
fn buildResponseHeader(self: *Transfer, conn: *const http.Connection) !void { fn buildResponseHeader(self: *Transfer, conn: *const http.Connection) !void {
@@ -1272,14 +1290,6 @@ pub const Transfer = struct {
try transfer.buildResponseHeader(conn); try transfer.buildResponseHeader(conn);
if (conn.getResponseHeader("content-type", 0)) |ct| {
var hdr = &transfer.response_header.?;
const value = ct.value;
const len = @min(value.len, ResponseHead.MAX_CONTENT_TYPE_LEN);
hdr._content_type_len = len;
@memcpy(hdr._content_type[0..len], value[0..len]);
}
if (transfer.req.cookie_jar) |jar| { if (transfer.req.cookie_jar) |jar| {
var i: usize = 0; var i: usize = 0;
while (true) { while (true) {
@@ -1326,12 +1336,33 @@ pub const Transfer = struct {
return http.writefunc_error; return http.writefunc_error;
}; };
if (!transfer._first_data_received) {
transfer._first_data_received = true;
// Skip body for responses that will be retried (redirects, auth challenges). // Skip body for responses that will be retried (redirects, auth challenges).
const status = conn.getResponseCode() catch return http.writefunc_error; const status = conn.getResponseCode() catch |err| {
log.err(.http, "getResponseCode", .{ .err = err, .source = "body callback" });
return http.writefunc_error;
};
if ((status >= 300 and status <= 399) or status == 401 or status == 407) { if ((status >= 300 and status <= 399) or status == 401 or status == 407) {
transfer._skip_body = true;
return @intCast(chunk_len); return @intCast(chunk_len);
} }
// Pre-size buffer from Content-Length.
if (transfer.getContentLength()) |cl| {
if (transfer.max_response_size) |max_size| {
if (cl > max_size) {
transfer._callback_error = error.ResponseTooLarge;
return http.writefunc_error;
}
}
transfer._stream_buffer.ensureTotalCapacity(transfer.arena.allocator(), cl) catch {};
}
}
if (transfer._skip_body) return @intCast(chunk_len);
transfer.bytes_received += chunk_len; transfer.bytes_received += chunk_len;
if (transfer.max_response_size) |max_size| { if (transfer.max_response_size) |max_size| {
if (transfer.bytes_received > max_size) { if (transfer.bytes_received > max_size) {