Merge pull request #688 from lightpanda-io/connection_cleanup

Fix connection memory leak
This commit is contained in:
Karl Seguin
2025-05-24 08:38:44 +08:00
committed by GitHub

View File

@@ -49,8 +49,7 @@ pub const Client = struct {
http_proxy: ?Uri, http_proxy: ?Uri,
root_ca: tls.config.CertBundle, root_ca: tls.config.CertBundle,
tls_verify_host: bool = true, tls_verify_host: bool = true,
idle_connections: IdleConnections, connection_manager: ConnectionManager,
connection_pool: std.heap.MemoryPool(Connection),
const Opts = struct { const Opts = struct {
tls_verify_host: bool = true, tls_verify_host: bool = true,
@@ -65,17 +64,16 @@ pub const Client = struct {
const state_pool = try StatePool.init(allocator, max_concurrent); const state_pool = try StatePool.init(allocator, max_concurrent);
errdefer state_pool.deinit(allocator); errdefer state_pool.deinit(allocator);
const idle_connections = IdleConnections.init(allocator, opts.max_idle_connection); const connection_manager = ConnectionManager.init(allocator, opts.max_idle_connection);
errdefer idle_connections.deinit(); errdefer connection_manager.deinit();
return .{ return .{
.root_ca = root_ca, .root_ca = root_ca,
.allocator = allocator, .allocator = allocator,
.state_pool = state_pool, .state_pool = state_pool,
.http_proxy = opts.http_proxy, .http_proxy = opts.http_proxy,
.idle_connections = idle_connections,
.tls_verify_host = opts.tls_verify_host, .tls_verify_host = opts.tls_verify_host,
.connection_pool = std.heap.MemoryPool(Connection).init(allocator), .connection_manager = connection_manager,
}; };
} }
@@ -85,8 +83,7 @@ pub const Client = struct {
self.root_ca.deinit(allocator); self.root_ca.deinit(allocator);
} }
self.state_pool.deinit(allocator); self.state_pool.deinit(allocator);
self.idle_connections.deinit(); self.connection_manager.deinit();
self.connection_pool.deinit();
} }
pub fn request(self: *Client, method: Request.Method, uri: *const Uri) !Request { pub fn request(self: *Client, method: Request.Method, uri: *const Uri) !Request {
@@ -307,11 +304,11 @@ pub const Request = struct {
self._connection = null; self._connection = null;
if (self._keepalive == false) { if (self._keepalive == false) {
self.destroyConnection(connection); self._client.connection_manager.destroy(connection);
return; return;
} }
self._client.idle_connections.put(connection) catch |err| { self._client.connection_manager.keepIdle(connection) catch |err| {
self.destroyConnection(connection); self.destroyConnection(connection);
log.err("failed to release connection to pool: {}", .{err}); log.err("failed to release connection to pool: {}", .{err});
}; };
@@ -319,24 +316,21 @@ pub const Request = struct {
fn createConnection(self: *Request, socket: posix.socket_t, blocking: bool) !*Connection { fn createConnection(self: *Request, socket: posix.socket_t, blocking: bool) !*Connection {
const client = self._client; const client = self._client;
const connection = try client.connection_pool.create(); const connection, const owned_host = try client.connection_manager.create(self._connect_host);
errdefer client.connection_pool.destroy(connection);
connection.* = .{ connection.* = .{
.tls = null, .tls = null,
.socket = socket, .socket = socket,
.blocking = blocking, .blocking = blocking,
.host = owned_host,
.port = self._connect_port, .port = self._connect_port,
.host = try client.allocator.dupe(u8, self._connect_host),
}; };
return connection; return connection;
} }
fn destroyConnection(self: *Request, connection: *Connection) void { fn destroyConnection(self: *Request, connection: *Connection) void {
const client = self._client; self._client.connection_manager.destroy(connection);
connection.deinit(client.allocator);
client.connection_pool.destroy(connection);
} }
const AddHeaderOpts = struct { const AddHeaderOpts = struct {
@@ -403,9 +397,10 @@ pub const Request = struct {
posix.close(socket); posix.close(socket);
return err; return err;
}; };
self._connection = connection;
if (self._secure) { if (self._secure) {
connection.tls = .{ self._connection.?.tls = .{
.blocking = try tls.client(std.net.Stream{ .handle = socket }, .{ .blocking = try tls.client(std.net.Stream{ .handle = socket }, .{
.host = self._connect_host, .host = self._connect_host,
.root_ca = self._client.root_ca, .root_ca = self._client.root_ca,
@@ -415,7 +410,6 @@ pub const Request = struct {
}; };
} }
self._connection = connection;
self._connection_from_keepalive = false; self._connection_from_keepalive = false;
} }
@@ -605,7 +599,7 @@ pub const Request = struct {
return null; return null;
} }
return self._client.idle_connections.get(self._secure, self._connect_host, self._connect_port, blocking); return self._client.connection_manager.get(self._secure, self._connect_host, self._connect_port, blocking);
} }
fn createSocket(self: *Request, blocking: bool) !struct { posix.socket_t, std.net.Address } { fn createSocket(self: *Request, blocking: bool) !struct { posix.socket_t, std.net.Address } {
@@ -2219,18 +2213,19 @@ const StatePool = struct {
// always re-use the connection (just toggle the socket's blocking status), but // always re-use the connection (just toggle the socket's blocking status), but
// for TLS, we'd need to see if the two different TLS objects (blocking and non // for TLS, we'd need to see if the two different TLS objects (blocking and non
// blocking) can be converted from each other. // blocking) can be converted from each other.
const IdleConnections = struct { const ConnectionManager = struct {
max: usize, max: usize,
idle: List, idle: List,
count: usize, count: usize,
mutex: Thread.Mutex, mutex: Thread.Mutex,
allocator: Allocator, allocator: Allocator,
node_pool: std.heap.MemoryPool(Node), node_pool: std.heap.MemoryPool(Node),
connection_pool: std.heap.MemoryPool(Connection),
const List = std.DoublyLinkedList(*Connection); const List = std.DoublyLinkedList(*Connection);
const Node = List.Node; const Node = List.Node;
fn init(allocator: Allocator, max: usize) IdleConnections { fn init(allocator: Allocator, max: usize) ConnectionManager {
return .{ return .{
.max = max, .max = max,
.count = 0, .count = 0,
@@ -2238,10 +2233,11 @@ const IdleConnections = struct {
.mutex = .{}, .mutex = .{},
.allocator = allocator, .allocator = allocator,
.node_pool = std.heap.MemoryPool(Node).init(allocator), .node_pool = std.heap.MemoryPool(Node).init(allocator),
.connection_pool = std.heap.MemoryPool(Connection).init(allocator),
}; };
} }
fn deinit(self: *IdleConnections) void { fn deinit(self: *ConnectionManager) void {
const allocator = self.allocator; const allocator = self.allocator;
self.mutex.lock(); self.mutex.lock();
@@ -2253,9 +2249,10 @@ const IdleConnections = struct {
node = next; node = next;
} }
self.node_pool.deinit(); self.node_pool.deinit();
self.connection_pool.deinit();
} }
fn get(self: *IdleConnections, secure: bool, host: []const u8, port: u16, blocking: bool) ?*Connection { fn get(self: *ConnectionManager, secure: bool, host: []const u8, port: u16, blocking: bool) ?*Connection {
self.mutex.lock(); self.mutex.lock();
defer self.mutex.unlock(); defer self.mutex.unlock();
@@ -2273,7 +2270,7 @@ const IdleConnections = struct {
return null; return null;
} }
fn put(self: *IdleConnections, connection: *Connection) !void { fn keepIdle(self: *ConnectionManager, connection: *Connection) !void {
self.mutex.lock(); self.mutex.lock();
defer self.mutex.unlock(); defer self.mutex.unlock();
@@ -2281,20 +2278,33 @@ const IdleConnections = struct {
if (self.count == self.max) { if (self.count == self.max) {
const oldest = self.idle.popFirst() orelse { const oldest = self.idle.popFirst() orelse {
std.debug.assert(self.max == 0); std.debug.assert(self.max == 0);
connection.deinit(self.allocator); self.destroy(connection);
return; return;
}; };
oldest.data.deinit(self.allocator); self.destroy(oldest.data);
// re-use the node // re-use the node
node = oldest; node = oldest;
} else { } else {
self.count += 1;
node = try self.node_pool.create(); node = try self.node_pool.create();
self.count += 1;
} }
node.data = connection; node.data = connection;
self.idle.append(node); self.idle.append(node);
} }
fn create(self: *ConnectionManager, host: []const u8) !struct { *Connection, []const u8 } {
const connection = try self.connection_pool.create();
errdefer self.connection_pool.destroy(connection);
const owned_host = try self.allocator.dupe(u8, host);
return .{ connection, owned_host };
}
fn destroy(self: *ConnectionManager, connection: *Connection) void {
connection.deinit(self.allocator);
self.connection_pool.destroy(connection);
}
}; };
const testing = @import("../testing.zig"); const testing = @import("../testing.zig");