Cleaner merge

Switch to non-blocking sockets.

Fix TLS handshake/receive/send ordering
This commit is contained in:
Karl Seguin
2025-03-20 23:09:20 +08:00
parent feb2046549
commit 22aa126b29
12 changed files with 271 additions and 203 deletions

View File

@@ -5,8 +5,8 @@
.fingerprint = 0xda130f3af836cea0, .fingerprint = 0xda130f3af836cea0,
.dependencies = .{ .dependencies = .{
.tls = .{ .tls = .{
.url = "https://github.com/ianic/tls.zig/archive/21aeaa9dd90f89fb86b0cd597f201a2680236f06.tar.gz", .url = "https://github.com/ianic/tls.zig/archive/96b923fcdaa6371617154857cef7b8337778cbe2.tar.gz",
.hash = "1220e584a5962cfba7c2f8d13151754bf76338c9916fedfd9b7a754501b9d9276c61", .hash = "122031f94565d7420a155b6eaec65aaa02acc80e75e6f0947899be2106bc3055b1ec",
}, },
}, },
} }

View File

@@ -2,7 +2,7 @@ const std = @import("std");
const Loop = @import("jsruntime").Loop; const Loop = @import("jsruntime").Loop;
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const HttpClient = @import("http/Client.zig"); const HttpClient = @import("http/client.zig").Client;
const Telemetry = @import("telemetry/telemetry.zig").Telemetry; const Telemetry = @import("telemetry/telemetry.zig").Telemetry;
const log = std.log.scoped(.app); const log = std.log.scoped(.app);
@@ -38,7 +38,7 @@ pub const App = struct {
.allocator = allocator, .allocator = allocator,
.telemetry = undefined, .telemetry = undefined,
.app_dir_path = app_dir_path, .app_dir_path = app_dir_path,
.http_client = .{ .allocator = allocator }, .http_client = try HttpClient.init(allocator, 5, null),
}; };
app.telemetry = Telemetry.init(app, run_mode); app.telemetry = Telemetry.init(app, run_mode);

View File

@@ -43,7 +43,7 @@ const Location = @import("../html/location.zig").Location;
const storage = @import("../storage/storage.zig"); const storage = @import("../storage/storage.zig");
const http = @import("../http/client.zig"); const HttpClient = @import("../http/client.zig").Client;
const UserContext = @import("../user_context.zig").UserContext; const UserContext = @import("../user_context.zig").UserContext;
const polyfill = @import("../polyfill/polyfill.zig"); const polyfill = @import("../polyfill/polyfill.zig");
@@ -60,13 +60,13 @@ pub const Browser = struct {
app: *App, app: *App,
session: ?*Session, session: ?*Session,
allocator: Allocator, allocator: Allocator,
http_client: *http.Client, http_client: *HttpClient,
session_pool: SessionPool, session_pool: SessionPool,
page_arena: std.heap.ArenaAllocator, page_arena: std.heap.ArenaAllocator,
const SessionPool = std.heap.MemoryPool(Session); const SessionPool = std.heap.MemoryPool(Session);
pub fn init(app: *App) !Browser { pub fn init(app: *App) Browser {
const allocator = app.allocator; const allocator = app.allocator;
return .{ return .{
.app = app, .app = app,
@@ -74,7 +74,6 @@ pub const Browser = struct {
.allocator = allocator, .allocator = allocator,
.http_client = &app.http_client, .http_client = &app.http_client,
.session_pool = SessionPool.init(allocator), .session_pool = SessionPool.init(allocator),
.http_client = try http.Client.init(allocator, 5, null),
.page_arena = std.heap.ArenaAllocator.init(allocator), .page_arena = std.heap.ArenaAllocator.init(allocator),
}; };
} }
@@ -127,7 +126,7 @@ pub const Session = struct {
// TODO move the shed to the browser? // TODO move the shed to the browser?
storage_shed: storage.Shed, storage_shed: storage.Shed,
page: ?Page = null, page: ?Page = null,
http_client: *http.Client, http_client: *HttpClient,
jstypes: [Types.len]usize = undefined, jstypes: [Types.len]usize = undefined,
@@ -139,7 +138,7 @@ pub const Session = struct {
.env = undefined, .env = undefined,
.browser = browser, .browser = browser,
.inspector = undefined, .inspector = undefined,
.http_client = &browser.http_client, .http_client = browser.http_client,
.storage_shed = storage.Shed.init(allocator), .storage_shed = storage.Shed.init(allocator),
.arena = std.heap.ArenaAllocator.init(allocator), .arena = std.heap.ArenaAllocator.init(allocator),
.window = Window.create(null, .{ .agent = user_agent }), .window = Window.create(null, .{ .agent = user_agent }),
@@ -434,7 +433,7 @@ pub const Page = struct {
// replace the user context document with the new one. // replace the user context document with the new one.
try session.env.setUserContext(.{ try session.env.setUserContext(.{
.document = html_doc, .document = html_doc,
.http_client = self.session.browser.http_client, .http_client = @ptrCast(self.session.http_client),
}); });
// browse the DOM tree to retrieve scripts // browse the DOM tree to retrieve scripts

View File

@@ -73,13 +73,13 @@ pub fn CDPT(comptime TypeProvider: type) type {
pub const Browser = TypeProvider.Browser; pub const Browser = TypeProvider.Browser;
pub const Session = TypeProvider.Session; pub const Session = TypeProvider.Session;
pub fn init(app: *App, client: TypeProvider.Client) !Self { pub fn init(app: *App, client: TypeProvider.Client) Self {
const allocator = app.allocator; const allocator = app.allocator;
return .{ return .{
.client = client, .client = client,
.allocator = allocator, .allocator = allocator,
.browser_context = null, .browser_context = null,
.browser = try Browser.init(app), .browser = Browser.init(app),
.message_arena = std.heap.ArenaAllocator.init(allocator), .message_arena = std.heap.ArenaAllocator.init(allocator),
.browser_context_pool = std.heap.MemoryPool(BrowserContext(Self)).init(allocator), .browser_context_pool = std.heap.MemoryPool(BrowserContext(Self)).init(allocator),
}; };

View File

@@ -1,3 +1,21 @@
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std"); const std = @import("std");
const json = std.json; const json = std.json;
@@ -17,7 +35,7 @@ const Browser = struct {
session: ?*Session = null, session: ?*Session = null,
arena: std.heap.ArenaAllocator, arena: std.heap.ArenaAllocator,
pub fn init(app: *App) !Browser { pub fn init(app: *App) Browser {
return .{ return .{
.arena = std.heap.ArenaAllocator.init(app.allocator), .arena = std.heap.ArenaAllocator.init(app.allocator),
}; };

View File

@@ -252,10 +252,13 @@ pub const Request = struct {
}; };
if (self.secure) { if (self.secure) {
async_handler.connection.protocol = .{ .tls_client = try tls.asyn.Client(AsyncHandlerT.TLSHandler).init(self.arena, .{ .handler = async_handler }, .{ async_handler.connection.protocol = .{
.host = self.host(), .tls_client = try tls.asyn.Client(AsyncHandlerT.TLSHandler).init(self.arena, .{ .handler = async_handler }, .{
.root_ca = self._client.root_ca, .host = self.host(),
}) }; .root_ca = self._client.root_ca,
// .key_log_callback = tls.config.key_log.callback
}),
};
} }
loop.connect(AsyncHandlerT, async_handler, &async_handler.read_completion, AsyncHandlerT.connected, socket, address); loop.connect(AsyncHandlerT, async_handler, &async_handler.read_completion, AsyncHandlerT.connected, socket, address);
@@ -274,6 +277,8 @@ pub const Request = struct {
if (!self._has_host_header) { if (!self._has_host_header) {
try self.headers.append(arena, .{ .name = "Host", .value = self.host() }); try self.headers.append(arena, .{ .name = "Host", .value = self.host() });
} }
try self.headers.append(arena, .{ .name = "User-Agent", .value = "Lightpanda/1.0" });
} }
// Sets up the request for redirecting. // Sets up the request for redirecting.
@@ -442,6 +447,7 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
const ProcessStatus = enum { const ProcessStatus = enum {
done, done,
wait,
need_more, need_more,
}; };
@@ -466,7 +472,6 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
node.data = data; node.data = data;
self.send_queue.append(node); self.send_queue.append(node);
if (self.send_queue.len > 1) { if (self.send_queue.len > 1) {
// if we already had a message in the queue, then our send loop // if we already had a message in the queue, then our send loop
// is already setup. // is already setup.
@@ -488,16 +493,20 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
const n = n_ catch |err| { const n = n_ catch |err| {
return self.handleError("Write error", err); return self.handleError("Write error", err);
}; };
const node = self.send_queue.popFirst().?;
const node = self.send_queue.first.?;
const data = node.data; const data = node.data;
if (n < data.len) { var next: ?*SendQueue.Node = node;
if (n == data.len) {
_ = self.send_queue.popFirst();
next = node.next;
} else {
// didn't send all the data, we prematurely popped this off // didn't send all the data, we prematurely popped this off
// (because, in most cases, it _will_ send all the data) // (because, in most cases, it _will_ send all the data)
node.data = data[n..]; node.data = data[n..];
self.send_queue.prepend(node);
} }
if (self.send_queue.first) |next| { if (next) |next_| {
// we still have data to send // we still have data to send
self.loop.send( self.loop.send(
Self, Self,
@@ -505,12 +514,12 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
&self.send_completion, &self.send_completion,
sent, sent,
self.socket, self.socket,
next.data, next_.data,
); );
return; return;
} }
self.connection.sent(self.state) catch |err| { self.connection.sent() catch |err| {
self.handleError("Processing sent data", err); self.handleError("Processing sent data", err);
}; };
} }
@@ -546,6 +555,11 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
}; };
switch (status) { switch (status) {
.wait => {
// Happens when we're transitioning from handshaking to
// sending the request. Don't continue the read loop. Let
// the request get sent before we try to read again.
},
.need_more => self.receive(), .need_more => self.receive(),
.done => { .done => {
const redirect = self.redirect orelse { const redirect = self.redirect orelse {
@@ -610,7 +624,7 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
} }
if (done == true) { if (done == true) {
return .need_more; return .done;
} }
// With chunked-encoding, it's possible that we we've only // With chunked-encoding, it's possible that we we've only
@@ -638,8 +652,8 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
fn deinit(self: *Connection) void { fn deinit(self: *Connection) void {
switch (self.protocol) { switch (self.protocol) {
.tls_client => |*tls_client| tls_client.deinit(),
.plain => {}, .plain => {},
.tls_client => |*tls_client| tls_client.deinit(),
} }
} }
@@ -656,59 +670,14 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
handler.receive(); handler.receive();
}, },
.plain => { .plain => {
handler.state = .header; // queue everything up
handler.state = .body;
const header = try handler.request.buildHeader(); const header = try handler.request.buildHeader();
return handler.send(header); handler.send(header);
}, if (handler.request.body) |body| {
} handler.send(body);
}
fn sent(self: *Connection, state: SendState) !void {
const handler = self.handler;
std.debug.assert(handler.state == state);
switch (self.protocol) {
.tls_client => |*tls_client| {
switch (state) {
.handshake => {
// Our send is complete, but it was part of the
// TLS handshake. This isn't data we need to
// worry about.
},
.header => {
// we WERE sending the header, but that's done
handler.state = .body;
if (handler.request.body) |body| {
try tls_client.send(body);
}
},
.body => {
// We've finished sending the body. For non TLS
// we'll start receiving. But here, for TLS,
// we started a receive loop as soon as the c
// connection was established.
},
}
},
.plain => {
switch (state) {
.handshake => unreachable,
.header => {
// we WERE sending the header, but that's done
handler.state = .body;
if (handler.request.body) |body| {
handler.send(body);
} else {
// No body? time to start reading the response
handler.receive();
}
},
.body => {
// we're done sending the body, time to start
// reading the response
handler.receive();
},
} }
handler.receive();
}, },
} }
} }
@@ -723,6 +692,8 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
const pos = handler.read_pos; const pos = handler.read_pos;
const end = pos + n; const end = pos + n;
const is_handshaking = handler.state == .handshake;
const used = tls_client.onRecv(read_buf[0..end]) catch |err| switch (err) { const used = tls_client.onRecv(read_buf[0..end]) catch |err| switch (err) {
// https://github.com/ianic/tls.zig/pull/9 // https://github.com/ianic/tls.zig/pull/9
// we currently have no way to break out of the TLS handling // we currently have no way to break out of the TLS handling
@@ -738,30 +709,79 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
if (used == end) { if (used == end) {
// 1 - It used up all the data that we gave it // 1 - It used up all the data that we gave it
handler.read_pos = 0; handler.read_pos = 0;
} else if (used == 0) { if (is_handshaking and handler.state == .header) {
// we're transitioning from handshaking to
// sending the request. We should not be
// receiving data right now. This is particularly
// important becuase our socket is currently in
// blocking mode (until promise resolution is
// complete). If we try to receive now, we'll
// block the loop
return .wait;
}
// If we're here, we're either still handshaking
// (in which case we need more data), or we
// we're reading the response and we need more data
// (else we would have gotten TLSHandlerDone)
return .need_more;
}
if (used == 0) {
// 2 - It didn't use any of the data (i.e there // 2 - It didn't use any of the data (i.e there
// wasn't a full record) // wasn't a full record)
handler.read_pos = end; handler.read_pos = end;
} else { return .need_more;
// 3 - It used some of the data, but had leftover
// (i.e. there was 1+ full records AND an incomplete
// record). We need to maintain the "leftover" data
// for subsequent reads.
const extra = end - used;
std.mem.copyForwards(u8, read_buf, read_buf[extra..end]);
handler.read_pos = extra;
} }
// 3 - It used some of the data, but had leftover
// (i.e. there was 1+ full records AND an incomplete
// record). We need to maintain the "leftover" data
// for subsequent reads.
// Remember that our read_buf is the MAX possible TLS // Remember that our read_buf is the MAX possible TLS
// record size. So as long as we make sure that the start // record size. So as long as we make sure that the start
// of a record is at read_buf[0], we know that we'll // of a record is at read_buf[0], we know that we'll
// always have enough space for 1 record. // always have enough space for 1 record.
const unused = end - used;
std.mem.copyForwards(u8, read_buf, read_buf[unused..end]);
handler.read_pos = unused;
// an incomplete record means there must be more data
return .need_more; return .need_more;
}, },
.plain => return handler.processData(read_buf[0..n]), .plain => return handler.processData(read_buf[0..n]),
} }
} }
fn sent(self: *Connection) !void {
switch (self.protocol) {
.tls_client => |*tls_client| {
const handler = self.handler;
switch (handler.state) {
.handshake => {
// Our send is complete, but it was part of the
// TLS handshake. This isn't data we need to
// worry about.
},
.header => {
// we WERE sending the header, but that's done
handler.state = .body;
if (handler.request.body) |body| {
try tls_client.send(body);
} else {
// no body to send, start receiving the response
handler.receive();
}
},
.body => handler.receive(),
}
},
.plain => {
// For plain, we already queued the header, the body
// and the reader!
},
}
}
}; };
// Separate struct just to keep it a bit cleaner. tls.zig requires // Separate struct just to keep it a bit cleaner. tls.zig requires
@@ -774,12 +794,15 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
// Callback from tls.zig indicating that the handshake is complete // Callback from tls.zig indicating that the handshake is complete
pub fn onConnect(self: TLSHandler) void { pub fn onConnect(self: TLSHandler) void {
var handler = self.handler; var handler = self.handler;
handler.state = .header;
const header = handler.request.buildHeader() catch |err| { const header = handler.request.buildHeader() catch |err| {
return handler.handleError("out of memory", err); return handler.handleError("out of memory", err);
}; };
handler.state = .header;
handler.connection.protocol.tls_client.send(header) catch |err| { const tls_client = &handler.connection.protocol.tls_client;
return handler.handleError("TLS send", err); tls_client.send(header) catch |err| {
return handler.handleError("TLS send header", err);
}; };
} }
@@ -798,7 +821,9 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
handler.handleError("Premature server response", error.InvalidServerResonse); handler.handleError("Premature server response", error.InvalidServerResonse);
return error.InvalidServerResonse; return error.InvalidServerResonse;
} }
switch (handler.processData(data)) { switch (handler.processData(data)) {
.wait => unreachable, // processData never returns this
.need_more => {}, .need_more => {},
.done => return error.TLSHandlerDone, // https://github.com/ianic/tls.zig/pull/9 .done => return error.TLSHandlerDone, // https://github.com/ianic/tls.zig/pull/9
} }
@@ -818,10 +843,13 @@ const SyncHandler = struct {
var connection: Connection = undefined; var connection: Connection = undefined;
if (request.secure) { if (request.secure) {
connection = .{ .tls = try tls.client(std.net.Stream{ .handle = socket }, .{ connection = .{
.host = request.host(), .tls = try tls.client(std.net.Stream{ .handle = socket }, .{
.root_ca = request._client.root_ca, .host = request.host(),
}) }; .root_ca = request._client.root_ca,
// .key_log_callback = tls.config.key_log.callback,
}),
};
} else { } else {
connection = .{ .plain = socket }; connection = .{ .plain = socket };
} }
@@ -1700,11 +1728,12 @@ test "HttpClient: sync with body" {
try testing.expectEqual("over 9000!", try res.next()); try testing.expectEqual("over 9000!", try res.next());
try testing.expectEqual(201, res.header.status); try testing.expectEqual(201, res.header.status);
try testing.expectEqual(4, res.header.count()); try testing.expectEqual(5, res.header.count());
try testing.expectEqual("close", res.header.get("connection")); try testing.expectEqual("close", res.header.get("connection"));
try testing.expectEqual("10", res.header.get("content-length")); try testing.expectEqual("10", res.header.get("content-length"));
try testing.expectEqual("127.0.0.1", res.header.get("_host")); try testing.expectEqual("127.0.0.1", res.header.get("_host"));
try testing.expectEqual("Close", res.header.get("_connection")); try testing.expectEqual("Close", res.header.get("_connection"));
try testing.expectEqual("Lightpanda/1.0", res.header.get("_user-agent"));
} }
test "HttpClient: sync tls with body" { test "HttpClient: sync tls with body" {
@@ -1750,11 +1779,12 @@ test "HttpClient: sync redirect from TLS to Plaintext" {
arr.appendSliceAssumeCapacity(data); arr.appendSliceAssumeCapacity(data);
} }
try testing.expectEqual(201, res.header.status); try testing.expectEqual(201, res.header.status);
try testing.expectEqual(4, res.header.count()); try testing.expectEqual(5, res.header.count());
try testing.expectEqual("close", res.header.get("connection")); try testing.expectEqual("close", res.header.get("connection"));
try testing.expectEqual("10", res.header.get("content-length")); try testing.expectEqual("10", res.header.get("content-length"));
try testing.expectEqual("127.0.0.1", res.header.get("_host")); try testing.expectEqual("127.0.0.1", res.header.get("_host"));
try testing.expectEqual("Close", res.header.get("_connection")); try testing.expectEqual("Close", res.header.get("_connection"));
try testing.expectEqual("Lightpanda/1.0", res.header.get("_user-agent"));
} }
} }
@@ -1792,11 +1822,12 @@ test "HttpClient: sync GET redirect" {
try testing.expectEqual("over 9000!", try res.next()); try testing.expectEqual("over 9000!", try res.next());
try testing.expectEqual(201, res.header.status); try testing.expectEqual(201, res.header.status);
try testing.expectEqual(4, res.header.count()); try testing.expectEqual(5, res.header.count());
try testing.expectEqual("close", res.header.get("connection")); try testing.expectEqual("close", res.header.get("connection"));
try testing.expectEqual("10", res.header.get("content-length")); try testing.expectEqual("10", res.header.get("content-length"));
try testing.expectEqual("127.0.0.1", res.header.get("_host")); try testing.expectEqual("127.0.0.1", res.header.get("_host"));
try testing.expectEqual("Close", res.header.get("_connection")); try testing.expectEqual("Close", res.header.get("_connection"));
try testing.expectEqual("Lightpanda/1.0", res.header.get("_user-agent"));
} }
test "HttpClient: async connect error" { test "HttpClient: async connect error" {
@@ -1886,6 +1917,7 @@ test "HttpClient: async with body" {
"connection", "close", "connection", "close",
"content-length", "10", "content-length", "10",
"_host", "127.0.0.1", "_host", "127.0.0.1",
"_user-agent", "Lightpanda/1.0",
"_connection", "Close", "_connection", "Close",
}); });
} }
@@ -1917,6 +1949,7 @@ test "HttpClient: async redirect" {
"connection", "close", "connection", "close",
"content-length", "10", "content-length", "10",
"_host", "127.0.0.1", "_host", "127.0.0.1",
"_user-agent", "Lightpanda/1.0",
"_connection", "Close", "_connection", "Close",
}); });
} }

View File

@@ -88,7 +88,7 @@ fn testExecFn(
std.debug.print("documentHTMLClose error: {s}\n", .{@errorName(err)}); std.debug.print("documentHTMLClose error: {s}\n", .{@errorName(err)});
}; };
var http_client = try @import("http/client.zig").Client.init(alloc, 5, null); var http_client = try @import("http/client.zig").Client.init(alloc, 5, null );
defer http_client.deinit(); defer http_client.deinit();
try js_env.setUserContext(.{ try js_env.setUserContext(.{

View File

@@ -1,3 +1,21 @@
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std"); const std = @import("std");
const builtin = @import("builtin"); const builtin = @import("builtin");
const parser = @import("netsurf"); const parser = @import("netsurf");
@@ -204,7 +222,7 @@ fn serveCDP(address: std.net.Address) !void {
const server = @import("server.zig"); const server = @import("server.zig");
wg.finish(); wg.finish();
server.run(&app, address, std.time.ns_per_s * 2) catch |err| { server.run(app, address, std.time.ns_per_s * 2) catch |err| {
std.debug.print("CDP server error: {}", .{err}); std.debug.print("CDP server error: {}", .{err});
return err; return err;
}; };

View File

@@ -445,7 +445,7 @@ pub const Client = struct {
}; };
self.mode = .websocket; self.mode = .websocket;
self.cdp = try CDP.init(self.server.app, self); self.cdp = CDP.init(self.server.app, self);
return self.send(arena, response); return self.send(arena, response);
} }

View File

@@ -7,6 +7,7 @@ const Allocator = std.mem.Allocator;
const App = @import("../app.zig").App; const App = @import("../app.zig").App;
const telemetry = @import("telemetry.zig"); const telemetry = @import("telemetry.zig");
const HttpClient = @import("../http/client.zig").Client;
const log = std.log.scoped(.telemetry); const log = std.log.scoped(.telemetry);
const URL = "https://telemetry.lightpanda.io"; const URL = "https://telemetry.lightpanda.io";
@@ -20,7 +21,7 @@ pub const LightPanda = struct {
allocator: Allocator, allocator: Allocator,
mutex: std.Thread.Mutex, mutex: std.Thread.Mutex,
cond: Thread.Condition, cond: Thread.Condition,
client: *std.http.Client, client: *HttpClient,
node_pool: std.heap.MemoryPool(List.Node), node_pool: std.heap.MemoryPool(List.Node),
const List = std.DoublyLinkedList(LightPandaEvent); const List = std.DoublyLinkedList(LightPandaEvent);
@@ -34,7 +35,7 @@ pub const LightPanda = struct {
.thread = null, .thread = null,
.running = true, .running = true,
.allocator = allocator, .allocator = allocator,
.client = @ptrCast(&app.http_client), .client = &app.http_client,
.uri = std.Uri.parse(URL) catch unreachable, .uri = std.Uri.parse(URL) catch unreachable,
.node_pool = std.heap.MemoryPool(List.Node).init(allocator), .node_pool = std.heap.MemoryPool(List.Node).init(allocator),
}; };
@@ -72,7 +73,6 @@ pub const LightPanda = struct {
} }
fn run(self: *LightPanda) void { fn run(self: *LightPanda) void {
const client = self.client;
var arr: std.ArrayListUnmanaged(u8) = .{}; var arr: std.ArrayListUnmanaged(u8) = .{};
defer arr.deinit(self.allocator); defer arr.deinit(self.allocator);
@@ -82,7 +82,7 @@ pub const LightPanda = struct {
while (self.pending.first != null) { while (self.pending.first != null) {
const b = self.collectBatch(&batch); const b = self.collectBatch(&batch);
self.mutex.unlock(); self.mutex.unlock();
self.postEvent(b, client, &arr) catch |err| { self.postEvent(b, &arr) catch |err| {
log.warn("Telementry reporting error: {}", .{err}); log.warn("Telementry reporting error: {}", .{err});
}; };
self.mutex.lock(); self.mutex.lock();
@@ -94,7 +94,7 @@ pub const LightPanda = struct {
} }
} }
fn postEvent(self: *const LightPanda, events: []LightPandaEvent, client: *std.http.Client, arr: *std.ArrayListUnmanaged(u8)) !void { fn postEvent(self: *const LightPanda, events: []LightPandaEvent, arr: *std.ArrayListUnmanaged(u8)) !void {
defer arr.clearRetainingCapacity(); defer arr.clearRetainingCapacity();
var writer = arr.writer(self.allocator); var writer = arr.writer(self.allocator);
for (events) |event| { for (events) |event| {
@@ -102,16 +102,15 @@ pub const LightPanda = struct {
try writer.writeByte('\n'); try writer.writeByte('\n');
} }
var response_header_buffer: [2048]u8 = undefined; var req = try self.client.request(.POST, self.uri);
const result = try client.fetch(.{ defer req.deinit();
.method = .POST, req.body = arr.items;
.payload = arr.items,
.response_storage = .ignore, // drain the response
.location = .{ .uri = self.uri }, var res = try req.sendSync(.{});
.server_header_buffer = &response_header_buffer, while (try res.next()) |_| {}
}); if (res.header.status != 200) {
if (result.status != .ok) { log.warn("server error status: {d}", .{res.header.status});
log.warn("server error status: {}", .{result.status});
} }
} }

View File

@@ -1,3 +1,21 @@
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std"); const std = @import("std");
pub const allocator = std.testing.allocator; pub const allocator = std.testing.allocator;
@@ -172,4 +190,3 @@ pub const Random = struct {
return instance.?.random(); return instance.?.random();
} }
}; };
>>>>>>> eaccbd0 (replace zig-async-io and std.http.Client with a custom HTTP client)

View File

@@ -108,6 +108,7 @@ pub const XMLHttpRequest = struct {
headers: Headers, headers: Headers,
sync: bool = true, sync: bool = true,
err: ?anyerror = null, err: ?anyerror = null,
last_dispatch: i64 = 0,
// TODO uncomment this field causes casting issue with // TODO uncomment this field causes casting issue with
// XMLHttpRequestEventTarget. I think it's dueto an alignement issue, but // XMLHttpRequestEventTarget. I think it's dueto an alignement issue, but
@@ -143,8 +144,6 @@ pub const XMLHttpRequest = struct {
response_obj: ?ResponseObj = null, response_obj: ?ResponseObj = null,
send_flag: bool = false, send_flag: bool = false,
payload: ?[]const u8 = null,
pub const prototype = *XMLHttpRequestEventTarget; pub const prototype = *XMLHttpRequestEventTarget;
pub const mem_guarantied = true; pub const mem_guarantied = true;
@@ -298,9 +297,6 @@ pub const XMLHttpRequest = struct {
if (self.url) |v| alloc.free(v); if (self.url) |v| alloc.free(v);
self.url = null; self.url = null;
if (self.payload) |v| alloc.free(v);
self.payload = null;
if (self.response_obj) |v| v.deinit(); if (self.response_obj) |v| v.deinit();
self.response_obj = null; self.response_obj = null;
@@ -472,41 +468,30 @@ pub const XMLHttpRequest = struct {
if (self.state != .opened) return DOMError.InvalidState; if (self.state != .opened) return DOMError.InvalidState;
if (self.send_flag) return DOMError.InvalidState; if (self.send_flag) return DOMError.InvalidState;
// The body argument provides the request body, if any, and is ignored
// if the request method is GET or HEAD.
// https://xhr.spec.whatwg.org/#the-send()-method
// var used_body: ?XMLHttpRequestBodyInit = null;
if (body != null and self.method != .GET and self.method != .HEAD) {
// TODO If body is a Document, then set thiss request body to body, serialized, converted, and UTF-8 encoded.
const body_init = XMLHttpRequestBodyInit{ .String = body.? };
// keep the user content type from request headers.
if (self.headers.has("Content-Type")) {
// https://fetch.spec.whatwg.org/#bodyinit-safely-extract
try self.headers.append("Content-Type", try body_init.contentType());
}
// copy the payload
if (self.payload) |v| alloc.free(v);
self.payload = try body_init.dupe(alloc);
}
log.debug("{any} {any}", .{ self.method, self.uri }); log.debug("{any} {any}", .{ self.method, self.uri });
self.send_flag = true; self.send_flag = true;
self.priv_state = .open; self.priv_state = .open;
self.request = try self.client.request(self.method, self.uri); self.request = try self.client.request(self.method, self.uri);
var request = &self.request.?; var request = &self.request.?;
errdefer request.deinit(); errdefer request.deinit();
for (self.headers.list.items) |hdr| { for (self.headers.list.items) |hdr| {
try request.addHeader(hdr.name, hdr.value, .{}); try request.addHeader(hdr.name, hdr.value, .{});
} }
request.body = self.payload;
// The body argument provides the request body, if any, and is ignored
// if the request method is GET or HEAD.
// https://xhr.spec.whatwg.org/#the-send()-method
// var used_body: ?XMLHttpRequestBodyInit = null;
if (body) |b| {
if (self.method != .GET and self.method != .HEAD) {
request.body = try alloc.dupe(u8, b);
try request.addHeader("Content-Type", "text/plain; charset=UTF-8", .{});
}
}
try request.sendAsync(loop, self, .{}); try request.sendAsync(loop, self, .{});
} }
@@ -540,22 +525,22 @@ pub const XMLHttpRequest = struct {
self.dispatchProgressEvent("loadstart", .{ .loaded = 0, .total = 0 }); self.dispatchProgressEvent("loadstart", .{ .loaded = 0, .total = 0 });
self.state = .loading; self.state = .loading;
self.dispatchEvt("readystatechange");
} }
if (progress.data) |data| { if (progress.data) |data| {
const buf = &self.response_bytes; try self.response_bytes.appendSlice(self.alloc, data);
}
try buf.appendSlice(self.alloc, data);
const total_len = buf.items.len;
// TODO: don't dispatch this more than once every 50ms
// dispatch a progress event progress.
self.dispatchEvt("readystatechange");
const loaded = self.response_bytes.items.len;
const now = std.time.milliTimestamp();
if (now - self.last_dispatch > 50) {
// don't send this more than once every 50ms
self.dispatchProgressEvent("progress", .{ self.dispatchProgressEvent("progress", .{
.total = total_len, .total = loaded,
.loaded = total_len, .loaded = loaded,
}); });
self.last_dispatch = now;
} }
if (progress.done == false) { if (progress.done == false) {
@@ -566,11 +551,10 @@ pub const XMLHttpRequest = struct {
self.send_flag = false; self.send_flag = false;
self.dispatchEvt("readystatechange"); self.dispatchEvt("readystatechange");
const total_len = self.response_bytes.items.len;
// dispatch a progress event load. // dispatch a progress event load.
self.dispatchProgressEvent("load", .{ .loaded = total_len, .total = total_len }); self.dispatchProgressEvent("load", .{ .loaded = loaded, .total = loaded });
// dispatch a progress event loadend. // dispatch a progress event loadend.
self.dispatchProgressEvent("loadend", .{ .loaded = total_len, .total = total_len }); self.dispatchProgressEvent("loadend", .{ .loaded = loaded, .total = loaded });
} }
fn onErr(self: *XMLHttpRequest, err: anyerror) void { fn onErr(self: *XMLHttpRequest, err: anyerror) void {
@@ -863,59 +847,59 @@ pub fn testExecFn(
}; };
try checkCases(js_env, &send); try checkCases(js_env, &send);
// var document = [_]Case{ var document = [_]Case{
// .{ .src = "const req2 = new XMLHttpRequest()", .ex = "undefined" }, .{ .src = "const req2 = new XMLHttpRequest()", .ex = "undefined" },
// .{ .src = "req2.open('GET', 'https://httpbin.io/html')", .ex = "undefined" }, .{ .src = "req2.open('GET', 'https://httpbin.io/html')", .ex = "undefined" },
// .{ .src = "req2.responseType = 'document'", .ex = "document" }, .{ .src = "req2.responseType = 'document'", .ex = "document" },
// .{ .src = "req2.send()", .ex = "undefined" }, .{ .src = "req2.send()", .ex = "undefined" },
// // Each case executed waits for all loop callaback calls. // Each case executed waits for all loop callaback calls.
// // So the url has been retrieved. // So the url has been retrieved.
// .{ .src = "req2.status", .ex = "200" }, .{ .src = "req2.status", .ex = "200" },
// .{ .src = "req2.statusText", .ex = "OK" }, .{ .src = "req2.statusText", .ex = "OK" },
// .{ .src = "req2.response instanceof Document", .ex = "true" }, .{ .src = "req2.response instanceof Document", .ex = "true" },
// .{ .src = "req2.responseXML instanceof Document", .ex = "true" }, .{ .src = "req2.responseXML instanceof Document", .ex = "true" },
// }; };
// try checkCases(js_env, &document); try checkCases(js_env, &document);
// var json = [_]Case{ var json = [_]Case{
// .{ .src = "const req3 = new XMLHttpRequest()", .ex = "undefined" }, .{ .src = "const req3 = new XMLHttpRequest()", .ex = "undefined" },
// .{ .src = "req3.open('GET', 'https://httpbin.io/json')", .ex = "undefined" }, .{ .src = "req3.open('GET', 'https://httpbin.io/json')", .ex = "undefined" },
// .{ .src = "req3.responseType = 'json'", .ex = "json" }, .{ .src = "req3.responseType = 'json'", .ex = "json" },
// .{ .src = "req3.send()", .ex = "undefined" }, .{ .src = "req3.send()", .ex = "undefined" },
// // Each case executed waits for all loop callaback calls. // Each case executed waits for all loop callaback calls.
// // So the url has been retrieved. // So the url has been retrieved.
// .{ .src = "req3.status", .ex = "200" }, .{ .src = "req3.status", .ex = "200" },
// .{ .src = "req3.statusText", .ex = "OK" }, .{ .src = "req3.statusText", .ex = "OK" },
// .{ .src = "req3.response.slideshow.author", .ex = "Yours Truly" }, .{ .src = "req3.response.slideshow.author", .ex = "Yours Truly" },
// }; };
// try checkCases(js_env, &json); try checkCases(js_env, &json);
// var post = [_]Case{ var post = [_]Case{
// .{ .src = "const req4 = new XMLHttpRequest()", .ex = "undefined" }, .{ .src = "const req4 = new XMLHttpRequest()", .ex = "undefined" },
// .{ .src = "req4.open('POST', 'https://httpbin.io/post')", .ex = "undefined" }, .{ .src = "req4.open('POST', 'https://httpbin.io/post')", .ex = "undefined" },
// .{ .src = "req4.send('foo')", .ex = "undefined" }, .{ .src = "req4.send('foo')", .ex = "undefined" },
// // Each case executed waits for all loop callaback calls. // Each case executed waits for all loop callaback calls.
// // So the url has been retrieved. // So the url has been retrieved.
// .{ .src = "req4.status", .ex = "200" }, .{ .src = "req4.status", .ex = "200" },
// .{ .src = "req4.statusText", .ex = "OK" }, .{ .src = "req4.statusText", .ex = "OK" },
// .{ .src = "req4.responseText.length > 64", .ex = "true" }, .{ .src = "req4.responseText.length > 64", .ex = "true" },
// }; };
// try checkCases(js_env, &post); try checkCases(js_env, &post);
// var cbk = [_]Case{ var cbk = [_]Case{
// .{ .src = "const req5 = new XMLHttpRequest()", .ex = "undefined" }, .{ .src = "const req5 = new XMLHttpRequest()", .ex = "undefined" },
// .{ .src = "req5.open('GET', 'https://httpbin.io/json')", .ex = "undefined" }, .{ .src = "req5.open('GET', 'https://httpbin.io/json')", .ex = "undefined" },
// .{ .src = "var status = 0; req5.onload = function () { status = this.status };", .ex = "function () { status = this.status }" }, .{ .src = "var status = 0; req5.onload = function () { status = this.status };", .ex = "function () { status = this.status }" },
// .{ .src = "req5.send()", .ex = "undefined" }, .{ .src = "req5.send()", .ex = "undefined" },
// // Each case executed waits for all loop callaback calls. // Each case executed waits for all loop callaback calls.
// // So the url has been retrieved. // So the url has been retrieved.
// .{ .src = "status", .ex = "200" }, .{ .src = "status", .ex = "200" },
// }; };
// try checkCases(js_env, &cbk); try checkCases(js_env, &cbk);
} }