From a6ac7d9c4e3607cb67f3aaa30ce7dadfa6aebf99 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Tue, 17 Jun 2025 19:55:36 +0800 Subject: [PATCH] Delay setting the requests' keepalive flag until the request is fully processed We currently set request._keepalive prematurely. There are [error cases] where the request could be abandoned before being fully drained. While we do try to drain in some cases, it isn't always possible. For this reason, request.keepalive is only set at the end of the request lifecycle, at which point we know the connection is ready to be re-used. --- src/http/client.zig | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/src/http/client.zig b/src/http/client.zig index 385e8e0a..052ce228 100644 --- a/src/http/client.zig +++ b/src/http/client.zig @@ -716,7 +716,7 @@ pub const Request = struct { } fn newReader(self: *Request) Reader { - return Reader.init(self._state, &self._keepalive); + return Reader.init(self._state); } // Does additional setup of the request for the firsts (i.e. non-redirect) call. @@ -879,12 +879,14 @@ pub const Request = struct { }); } - fn requestCompleted(self: *Request, response: ResponseHeader) void { + fn requestCompleted(self: *Request, response: ResponseHeader, can_keepalive: bool) void { const notification = self.notification orelse return; if (self._notified_complete) { return; } + self._notified_complete = true; + self._keepalive = can_keepalive; notification.dispatch(.http_request_complete, &.{ .id = self.id, .url = self.request_uri, @@ -1111,14 +1113,14 @@ fn AsyncHandler(comptime H: type, comptime L: type) type { .handler_error => { // handler should never have been called if we're redirecting std.debug.assert(self.redirect == null); - self.request.requestCompleted(self.reader.response); + self.request.requestCompleted(self.reader.response, self.reader.keepalive); self.deinit(); return; }, .done => { const redirect = self.redirect orelse { var handler = self.handler; - self.request.requestCompleted(self.reader.response); + self.request.requestCompleted(self.reader.response, self.reader.keepalive); self.deinit(); // Emit the done chunk. We expect the caller to do @@ -1560,8 +1562,6 @@ const SyncHandler = struct { var decompressor = std.compress.gzip.decompressor(compress_reader.reader()); try decompressor.decompress(body.writer(request.arena)); - self.request.requestCompleted(reader.response); - return .{ .header = reader.response, ._done = true, @@ -1781,8 +1781,8 @@ const SyncHandler = struct { // Used for reading the response (both the header and the body) const Reader = struct { - // ref request.keepalive - keepalive: *bool, + // Wether, from the reader's point of view, this connection could be kept-alive + keepalive: bool, // always references state.header_buf header_buf: []u8, @@ -1802,13 +1802,13 @@ const Reader = struct { // Whether or not the current header has to be skipped [because it's too long]. skip_current_header: bool, - fn init(state: *State, keepalive: *bool) Reader { + fn init(state: *State) Reader { return .{ .pos = 0, .response = .{}, .body_reader = null, .header_done = false, - .keepalive = keepalive, + .keepalive = false, .skip_current_header = false, .header_buf = state.header_buf, .arena = state.arena.allocator(), @@ -1835,7 +1835,6 @@ const Reader = struct { // us to emit whatever data we have, but it isn't safe to keep // the connection alive. std.debug.assert(result.done == true); - self.keepalive.* = false; } return result; } @@ -1930,7 +1929,7 @@ const Reader = struct { // We think we're done reading the body, but we still have data // We'll return what we have as-is, but close the connection // because we don't know what state it's in. - self.keepalive.* = false; + self.keepalive = false; } else { result.unprocessed = unprocessed; } @@ -1945,7 +1944,7 @@ const Reader = struct { if (response.get("connection")) |connection| { if (std.ascii.eqlIgnoreCase(connection, "close")) { - self.keepalive.* = false; + self.keepalive = false; } } @@ -2005,7 +2004,7 @@ const Reader = struct { } const protocol = data[0..9]; if (std.mem.eql(u8, protocol, "HTTP/1.1 ")) { - self.keepalive.* = true; + self.keepalive = true; } else if (std.mem.eql(u8, protocol, "HTTP/1.0 ") == false) { return error.InvalidStatusLine; } @@ -2387,7 +2386,7 @@ pub const Response = struct { return data; } if (self._done) { - self._request.requestCompleted(self.header); + self._request.requestCompleted(self.header, self._reader.keepalive); return null; } @@ -3396,8 +3395,7 @@ const CaptureHandler = struct { fn testReader(state: *State, res: *TestResponse, data: []const u8) !void { var status: u16 = 0; - var keepalive = false; - var r = Reader.init(state, &keepalive); + var r = Reader.init(state); // dupe it so that we have a mutable copy const owned = try testing.allocator.dupe(u8, data);