Move cdp callbacks from dataCallback to processMessages

This commit is contained in:
Nikolay Govorov
2026-03-18 22:23:26 +00:00
parent 8963aeb965
commit 9ad446b67a

View File

@@ -803,7 +803,7 @@ fn processMessages(self: *Client) !bool {
defer transfer.deinit(); defer transfer.deinit();
if (msg.err) |err| { if (msg.err) |err| {
transfer.requestFailed(err, true); transfer.requestFailed(transfer._callback_error orelse err, true);
} else blk: { } else blk: {
// make sure the transfer can't be immediately aborted from a callback // make sure the transfer can't be immediately aborted from a callback
// since we still need it here. // since we still need it here.
@@ -814,7 +814,7 @@ fn processMessages(self: *Client) !bool {
// In case of request w/o data, we need to call the header done // In case of request w/o data, we need to call the header done
// callback now. // callback now.
const proceed = transfer.headerDoneCallback(&msg.conn) catch |err| { const proceed = transfer.headerDoneCallback(&msg.conn) catch |err| {
lp.log.err(.http, "header_done_callback2", .{ .err = err }); lp.log.err(.http, "header_done_callback", .{ .err = err, .req = transfer });
transfer.requestFailed(err, true); transfer.requestFailed(err, true);
continue; continue;
}; };
@@ -824,6 +824,26 @@ fn processMessages(self: *Client) !bool {
} }
} }
// Replay buffered body through user's data_callback.
if (transfer._stream_buffer.items.len > 0) {
const body = transfer._stream_buffer.items;
transfer.req.data_callback(transfer, body) catch |err| {
lp.log.err(.http, "data_callback", .{ .err = err, .req = transfer });
transfer.requestFailed(err, true);
break :blk;
};
transfer.req.notification.dispatch(.http_response_data, &.{
.data = body,
.transfer = transfer,
});
if (transfer.aborted) {
transfer.requestFailed(error.Abort, true);
break :blk;
}
}
// release conn ASAP so that it's available; some done_callbacks // release conn ASAP so that it's available; some done_callbacks
// will load more resources. // will load more resources.
transfer.releaseConn(); transfer.releaseConn();
@@ -949,6 +969,12 @@ pub const Transfer = struct {
_performing: bool = false, _performing: bool = false,
_redirect_count: u8 = 0, _redirect_count: u8 = 0,
// Buffered response body. Filled by bufferDataCallback, consumed in processMessages.
_stream_buffer: std.ArrayListUnmanaged(u8) = .{},
// Error captured in bufferDataCallback to be reported in processMessages.
_callback_error: ?anyerror = null,
// for when a Transfer is queued in the client.queue // for when a Transfer is queued in the client.queue
_node: std.DoublyLinkedList.Node = .{}, _node: std.DoublyLinkedList.Node = .{},
_intercept_state: InterceptState = .not_intercepted, _intercept_state: InterceptState = .not_intercepted,
@@ -1088,6 +1114,8 @@ pub const Transfer = struct {
self.response_header = null; self.response_header = null;
self.bytes_received = 0; self.bytes_received = 0;
self._tries += 1; self._tries += 1;
self._stream_buffer.clearRetainingCapacity();
self._callback_error = null;
} }
fn buildResponseHeader(self: *Transfer, conn: *const http.Connection) !void { fn buildResponseHeader(self: *Transfer, conn: *const http.Connection) !void {
@@ -1291,36 +1319,20 @@ pub const Transfer = struct {
return @intCast(chunk_len); return @intCast(chunk_len);
} }
if (!transfer._header_done_called) {
const proceed = transfer.headerDoneCallback(conn) catch |err| {
lp.log.err(.http, "header_done_callback", .{ .err = err, .req = transfer });
return http.writefunc_error;
};
if (!proceed) {
// signal abort to libcurl
return http.writefunc_error;
}
}
transfer.bytes_received += chunk_len; transfer.bytes_received += chunk_len;
if (transfer.max_response_size) |max_size| { if (transfer.max_response_size) |max_size| {
if (transfer.bytes_received > max_size) { if (transfer.bytes_received > max_size) {
transfer.requestFailed(error.ResponseTooLarge, true); transfer._callback_error = error.ResponseTooLarge;
return http.writefunc_error; return http.writefunc_error;
} }
} }
const chunk = buffer[0..chunk_len]; const chunk = buffer[0..chunk_len];
transfer.req.data_callback(transfer, chunk) catch |err| { transfer._stream_buffer.appendSlice(transfer.arena.allocator(), chunk) catch |err| {
lp.log.err(.http, "data_callback", .{ .err = err, .req = transfer }); transfer._callback_error = err;
return http.writefunc_error; return http.writefunc_error;
}; };
transfer.req.notification.dispatch(.http_response_data, &.{
.data = chunk,
.transfer = transfer,
});
if (transfer.aborted) { if (transfer.aborted) {
return http.writefunc_error; return http.writefunc_error;
} }