Merge pull request #788 from lightpanda-io/dont_keepalive_unprocess_request
Some checks failed
e2e-test / zig build release (push) Has been cancelled
e2e-test / puppeteer-perf (push) Has been cancelled
e2e-test / demo-scripts (push) Has been cancelled
e2e-test / cdp-and-hyperfine-bench (push) Has been cancelled
e2e-test / perf-fmt (push) Has been cancelled
zig-test / zig build dev (push) Has been cancelled
zig-test / browser fetch (push) Has been cancelled
zig-test / zig test (push) Has been cancelled
zig-test / perf-fmt (push) Has been cancelled

Delay setting the requests' keepalive flag until the request is fully…
This commit is contained in:
Karl Seguin
2025-06-18 16:46:45 +08:00
committed by GitHub

View File

@@ -716,7 +716,7 @@ pub const Request = struct {
} }
fn newReader(self: *Request) Reader { fn newReader(self: *Request) Reader {
return Reader.init(self._state, &self._keepalive); return Reader.init(self._state);
} }
// Does additional setup of the request for the firsts (i.e. non-redirect) call. // Does additional setup of the request for the firsts (i.e. non-redirect) call.
@@ -879,12 +879,14 @@ pub const Request = struct {
}); });
} }
fn requestCompleted(self: *Request, response: ResponseHeader) void { fn requestCompleted(self: *Request, response: ResponseHeader, can_keepalive: bool) void {
const notification = self.notification orelse return; const notification = self.notification orelse return;
if (self._notified_complete) { if (self._notified_complete) {
return; return;
} }
self._notified_complete = true; self._notified_complete = true;
self._keepalive = can_keepalive;
notification.dispatch(.http_request_complete, &.{ notification.dispatch(.http_request_complete, &.{
.id = self.id, .id = self.id,
.url = self.request_uri, .url = self.request_uri,
@@ -1111,14 +1113,14 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
.handler_error => { .handler_error => {
// handler should never have been called if we're redirecting // handler should never have been called if we're redirecting
std.debug.assert(self.redirect == null); std.debug.assert(self.redirect == null);
self.request.requestCompleted(self.reader.response); self.request.requestCompleted(self.reader.response, self.reader.keepalive);
self.deinit(); self.deinit();
return; return;
}, },
.done => { .done => {
const redirect = self.redirect orelse { const redirect = self.redirect orelse {
var handler = self.handler; var handler = self.handler;
self.request.requestCompleted(self.reader.response); self.request.requestCompleted(self.reader.response, self.reader.keepalive);
self.deinit(); self.deinit();
// Emit the done chunk. We expect the caller to do // Emit the done chunk. We expect the caller to do
@@ -1560,8 +1562,6 @@ const SyncHandler = struct {
var decompressor = std.compress.gzip.decompressor(compress_reader.reader()); var decompressor = std.compress.gzip.decompressor(compress_reader.reader());
try decompressor.decompress(body.writer(request.arena)); try decompressor.decompress(body.writer(request.arena));
self.request.requestCompleted(reader.response);
return .{ return .{
.header = reader.response, .header = reader.response,
._done = true, ._done = true,
@@ -1781,8 +1781,8 @@ const SyncHandler = struct {
// Used for reading the response (both the header and the body) // Used for reading the response (both the header and the body)
const Reader = struct { const Reader = struct {
// ref request.keepalive // Wether, from the reader's point of view, this connection could be kept-alive
keepalive: *bool, keepalive: bool,
// always references state.header_buf // always references state.header_buf
header_buf: []u8, header_buf: []u8,
@@ -1802,13 +1802,13 @@ const Reader = struct {
// Whether or not the current header has to be skipped [because it's too long]. // Whether or not the current header has to be skipped [because it's too long].
skip_current_header: bool, skip_current_header: bool,
fn init(state: *State, keepalive: *bool) Reader { fn init(state: *State) Reader {
return .{ return .{
.pos = 0, .pos = 0,
.response = .{}, .response = .{},
.body_reader = null, .body_reader = null,
.header_done = false, .header_done = false,
.keepalive = keepalive, .keepalive = false,
.skip_current_header = false, .skip_current_header = false,
.header_buf = state.header_buf, .header_buf = state.header_buf,
.arena = state.arena.allocator(), .arena = state.arena.allocator(),
@@ -1835,7 +1835,6 @@ const Reader = struct {
// us to emit whatever data we have, but it isn't safe to keep // us to emit whatever data we have, but it isn't safe to keep
// the connection alive. // the connection alive.
std.debug.assert(result.done == true); std.debug.assert(result.done == true);
self.keepalive.* = false;
} }
return result; return result;
} }
@@ -1930,7 +1929,7 @@ const Reader = struct {
// We think we're done reading the body, but we still have data // We think we're done reading the body, but we still have data
// We'll return what we have as-is, but close the connection // We'll return what we have as-is, but close the connection
// because we don't know what state it's in. // because we don't know what state it's in.
self.keepalive.* = false; self.keepalive = false;
} else { } else {
result.unprocessed = unprocessed; result.unprocessed = unprocessed;
} }
@@ -1945,7 +1944,7 @@ const Reader = struct {
if (response.get("connection")) |connection| { if (response.get("connection")) |connection| {
if (std.ascii.eqlIgnoreCase(connection, "close")) { if (std.ascii.eqlIgnoreCase(connection, "close")) {
self.keepalive.* = false; self.keepalive = false;
} }
} }
@@ -2005,7 +2004,7 @@ const Reader = struct {
} }
const protocol = data[0..9]; const protocol = data[0..9];
if (std.mem.eql(u8, protocol, "HTTP/1.1 ")) { if (std.mem.eql(u8, protocol, "HTTP/1.1 ")) {
self.keepalive.* = true; self.keepalive = true;
} else if (std.mem.eql(u8, protocol, "HTTP/1.0 ") == false) { } else if (std.mem.eql(u8, protocol, "HTTP/1.0 ") == false) {
return error.InvalidStatusLine; return error.InvalidStatusLine;
} }
@@ -2387,7 +2386,7 @@ pub const Response = struct {
return data; return data;
} }
if (self._done) { if (self._done) {
self._request.requestCompleted(self.header); self._request.requestCompleted(self.header, self._reader.keepalive);
return null; return null;
} }
@@ -3396,8 +3395,7 @@ const CaptureHandler = struct {
fn testReader(state: *State, res: *TestResponse, data: []const u8) !void { fn testReader(state: *State, res: *TestResponse, data: []const u8) !void {
var status: u16 = 0; var status: u16 = 0;
var keepalive = false; var r = Reader.init(state);
var r = Reader.init(state, &keepalive);
// dupe it so that we have a mutable copy // dupe it so that we have a mutable copy
const owned = try testing.allocator.dupe(u8, data); const owned = try testing.allocator.dupe(u8, data);