Delay setting the requests' keepalive flag until the request is fully processed

We currently set request._keepalive prematurely. There are [error cases] where
the request could be abandoned before being fully drained. While we do try to
drain in some cases, it isn't always possible. For this reason,
request.keepalive is only set at the end of the request lifecycle, at which
point we know the connection is ready to be re-used.
This commit is contained in:
Karl Seguin
2025-06-17 19:55:36 +08:00
parent 9b35736be3
commit a6ac7d9c4e

View File

@@ -716,7 +716,7 @@ pub const Request = struct {
} }
fn newReader(self: *Request) Reader { fn newReader(self: *Request) Reader {
return Reader.init(self._state, &self._keepalive); return Reader.init(self._state);
} }
// Does additional setup of the request for the firsts (i.e. non-redirect) call. // Does additional setup of the request for the firsts (i.e. non-redirect) call.
@@ -879,12 +879,14 @@ pub const Request = struct {
}); });
} }
fn requestCompleted(self: *Request, response: ResponseHeader) void { fn requestCompleted(self: *Request, response: ResponseHeader, can_keepalive: bool) void {
const notification = self.notification orelse return; const notification = self.notification orelse return;
if (self._notified_complete) { if (self._notified_complete) {
return; return;
} }
self._notified_complete = true; self._notified_complete = true;
self._keepalive = can_keepalive;
notification.dispatch(.http_request_complete, &.{ notification.dispatch(.http_request_complete, &.{
.id = self.id, .id = self.id,
.url = self.request_uri, .url = self.request_uri,
@@ -1111,14 +1113,14 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
.handler_error => { .handler_error => {
// handler should never have been called if we're redirecting // handler should never have been called if we're redirecting
std.debug.assert(self.redirect == null); std.debug.assert(self.redirect == null);
self.request.requestCompleted(self.reader.response); self.request.requestCompleted(self.reader.response, self.reader.keepalive);
self.deinit(); self.deinit();
return; return;
}, },
.done => { .done => {
const redirect = self.redirect orelse { const redirect = self.redirect orelse {
var handler = self.handler; var handler = self.handler;
self.request.requestCompleted(self.reader.response); self.request.requestCompleted(self.reader.response, self.reader.keepalive);
self.deinit(); self.deinit();
// Emit the done chunk. We expect the caller to do // Emit the done chunk. We expect the caller to do
@@ -1560,8 +1562,6 @@ const SyncHandler = struct {
var decompressor = std.compress.gzip.decompressor(compress_reader.reader()); var decompressor = std.compress.gzip.decompressor(compress_reader.reader());
try decompressor.decompress(body.writer(request.arena)); try decompressor.decompress(body.writer(request.arena));
self.request.requestCompleted(reader.response);
return .{ return .{
.header = reader.response, .header = reader.response,
._done = true, ._done = true,
@@ -1781,8 +1781,8 @@ const SyncHandler = struct {
// Used for reading the response (both the header and the body) // Used for reading the response (both the header and the body)
const Reader = struct { const Reader = struct {
// ref request.keepalive // Wether, from the reader's point of view, this connection could be kept-alive
keepalive: *bool, keepalive: bool,
// always references state.header_buf // always references state.header_buf
header_buf: []u8, header_buf: []u8,
@@ -1802,13 +1802,13 @@ const Reader = struct {
// Whether or not the current header has to be skipped [because it's too long]. // Whether or not the current header has to be skipped [because it's too long].
skip_current_header: bool, skip_current_header: bool,
fn init(state: *State, keepalive: *bool) Reader { fn init(state: *State) Reader {
return .{ return .{
.pos = 0, .pos = 0,
.response = .{}, .response = .{},
.body_reader = null, .body_reader = null,
.header_done = false, .header_done = false,
.keepalive = keepalive, .keepalive = false,
.skip_current_header = false, .skip_current_header = false,
.header_buf = state.header_buf, .header_buf = state.header_buf,
.arena = state.arena.allocator(), .arena = state.arena.allocator(),
@@ -1835,7 +1835,6 @@ const Reader = struct {
// us to emit whatever data we have, but it isn't safe to keep // us to emit whatever data we have, but it isn't safe to keep
// the connection alive. // the connection alive.
std.debug.assert(result.done == true); std.debug.assert(result.done == true);
self.keepalive.* = false;
} }
return result; return result;
} }
@@ -1930,7 +1929,7 @@ const Reader = struct {
// We think we're done reading the body, but we still have data // We think we're done reading the body, but we still have data
// We'll return what we have as-is, but close the connection // We'll return what we have as-is, but close the connection
// because we don't know what state it's in. // because we don't know what state it's in.
self.keepalive.* = false; self.keepalive = false;
} else { } else {
result.unprocessed = unprocessed; result.unprocessed = unprocessed;
} }
@@ -1945,7 +1944,7 @@ const Reader = struct {
if (response.get("connection")) |connection| { if (response.get("connection")) |connection| {
if (std.ascii.eqlIgnoreCase(connection, "close")) { if (std.ascii.eqlIgnoreCase(connection, "close")) {
self.keepalive.* = false; self.keepalive = false;
} }
} }
@@ -2005,7 +2004,7 @@ const Reader = struct {
} }
const protocol = data[0..9]; const protocol = data[0..9];
if (std.mem.eql(u8, protocol, "HTTP/1.1 ")) { if (std.mem.eql(u8, protocol, "HTTP/1.1 ")) {
self.keepalive.* = true; self.keepalive = true;
} else if (std.mem.eql(u8, protocol, "HTTP/1.0 ") == false) { } else if (std.mem.eql(u8, protocol, "HTTP/1.0 ") == false) {
return error.InvalidStatusLine; return error.InvalidStatusLine;
} }
@@ -2387,7 +2386,7 @@ pub const Response = struct {
return data; return data;
} }
if (self._done) { if (self._done) {
self._request.requestCompleted(self.header); self._request.requestCompleted(self.header, self._reader.keepalive);
return null; return null;
} }
@@ -3396,8 +3395,7 @@ const CaptureHandler = struct {
fn testReader(state: *State, res: *TestResponse, data: []const u8) !void { fn testReader(state: *State, res: *TestResponse, data: []const u8) !void {
var status: u16 = 0; var status: u16 = 0;
var keepalive = false; var r = Reader.init(state);
var r = Reader.init(state, &keepalive);
// dupe it so that we have a mutable copy // dupe it so that we have a mutable copy
const owned = try testing.allocator.dupe(u8, data); const owned = try testing.allocator.dupe(u8, data);