Re-enable telemetry

Start work on supporting navigation events (clicks, form submission).
This commit is contained in:
Karl Seguin
2025-08-01 21:58:24 +08:00
parent 94e8964f69
commit f65a39a3e3
18 changed files with 818 additions and 588 deletions

View File

@@ -48,8 +48,9 @@ const MAX_MESSAGE_SIZE = 512 * 1024 + 14;
const Server = struct {
app: *App,
allocator: Allocator,
loop: *Loop,
allocator: Allocator,
client: ?*Client = null,
// internal fields
listener: posix.socket_t,
@@ -96,6 +97,7 @@ const Server = struct {
const client = try self.allocator.create(Client);
client.* = Client.init(socket, self);
client.start();
self.client = client;
if (log.enabled(.app, .info)) {
var address: std.net.Address = undefined;
@@ -107,6 +109,7 @@ const Server = struct {
fn releaseClient(self: *Server, client: *Client) void {
self.allocator.destroy(client);
self.client = null;
}
};
@@ -163,9 +166,7 @@ pub const Client = struct {
const SendQueue = std.DoublyLinkedList(Outgoing);
const Self = @This();
fn init(socket: posix.socket_t, server: *Server) Self {
fn init(socket: posix.socket_t, server: *Server) Client {
return .{
.cdp = null,
.mode = .http,
@@ -185,7 +186,7 @@ pub const Client = struct {
};
}
fn maybeDeinit(self: *Self) void {
fn maybeDeinit(self: *Client) void {
if (self.read_pending or self.write_pending) {
// We cannot do anything as long as we still have these pending
// They should not be pending for long as we're only here after
@@ -222,7 +223,7 @@ pub const Client = struct {
}
}
fn close(self: *Self) void {
fn close(self: *Client) void {
log.info(.app, "client disconnected", .{});
self.connected = false;
// recv only, because we might have pending writes we'd like to get
@@ -231,14 +232,14 @@ pub const Client = struct {
self.maybeDeinit();
}
fn start(self: *Self) void {
fn start(self: *Client) void {
self.queueRead();
self.queueTimeout();
}
fn queueRead(self: *Self) void {
fn queueRead(self: *Client) void {
self.server.loop.io.recv(
*Self,
*Client,
self,
callbackRead,
&self.read_completion,
@@ -248,7 +249,7 @@ pub const Client = struct {
self.read_pending = true;
}
fn callbackRead(self: *Self, _: *Completion, result: RecvError!usize) void {
fn callbackRead(self: *Client, _: *Completion, result: RecvError!usize) void {
self.read_pending = false;
if (self.connected == false) {
self.maybeDeinit();
@@ -277,11 +278,11 @@ pub const Client = struct {
}
}
fn readBuf(self: *Self) []u8 {
fn readBuf(self: *Client) []u8 {
return self.reader.readBuf();
}
fn processData(self: *Self, len: usize) !bool {
fn processData(self: *Client, len: usize) !bool {
self.last_active = now();
self.reader.len += len;
@@ -294,7 +295,7 @@ pub const Client = struct {
}
}
fn processHTTPRequest(self: *Self) !void {
fn processHTTPRequest(self: *Client) !void {
std.debug.assert(self.reader.pos == 0);
const request = self.reader.buf[0..self.reader.len];
@@ -330,7 +331,7 @@ pub const Client = struct {
self.reader.len = 0;
}
fn handleHTTPRequest(self: *Self, request: []u8) !void {
fn handleHTTPRequest(self: *Client, request: []u8) !void {
if (request.len < 18) {
// 18 is [generously] the smallest acceptable HTTP request
return error.InvalidRequest;
@@ -365,7 +366,7 @@ pub const Client = struct {
return error.NotFound;
}
fn upgradeConnection(self: *Self, request: []u8) !void {
fn upgradeConnection(self: *Client, request: []u8) !void {
// our caller already confirmed that we have a trailing \r\n\r\n
const request_line_end = std.mem.indexOfScalar(u8, request, '\r') orelse unreachable;
const request_line = request[0..request_line_end];
@@ -462,7 +463,7 @@ pub const Client = struct {
return self.send(arena, response);
}
fn writeHTTPErrorResponse(self: *Self, comptime status: u16, comptime body: []const u8) void {
fn writeHTTPErrorResponse(self: *Client, comptime status: u16, comptime body: []const u8) void {
const response = std.fmt.comptimePrint(
"HTTP/1.1 {d} \r\nConnection: Close\r\nContent-Length: {d}\r\n\r\n{s}",
.{ status, body.len, body },
@@ -473,7 +474,7 @@ pub const Client = struct {
self.send(null, response) catch {};
}
fn processWebsocketMessage(self: *Self) !bool {
fn processWebsocketMessage(self: *Client) !bool {
errdefer self.close();
var reader = &self.reader;
@@ -517,7 +518,7 @@ pub const Client = struct {
return true;
}
fn sendPong(self: *Self, data: []const u8) !void {
fn sendPong(self: *Client, data: []const u8) !void {
if (data.len == 0) {
return self.send(null, &EMPTY_PONG);
}
@@ -539,7 +540,7 @@ pub const Client = struct {
// writev, so we need to get creative. We'll JSON serialize to a
// buffer, where the first 10 bytes are reserved. We can then backfill
// the header and send the slice.
pub fn sendJSON(self: *Self, message: anytype, opts: std.json.StringifyOptions) !void {
pub fn sendJSON(self: *Client, message: anytype, opts: std.json.StringifyOptions) !void {
var arena = ArenaAllocator.init(self.server.allocator);
errdefer arena.deinit();
@@ -557,7 +558,7 @@ pub const Client = struct {
}
pub fn sendJSONRaw(
self: *Self,
self: *Client,
arena: ArenaAllocator,
buf: std.ArrayListUnmanaged(u8),
) !void {
@@ -567,9 +568,9 @@ pub const Client = struct {
return self.send(arena, framed);
}
fn queueTimeout(self: *Self) void {
fn queueTimeout(self: *Client) void {
self.server.loop.io.timeout(
*Self,
*Client,
self,
callbackTimeout,
&self.timeout_completion,
@@ -578,7 +579,7 @@ pub const Client = struct {
self.timeout_pending = true;
}
fn callbackTimeout(self: *Self, _: *Completion, result: TimeoutError!void) void {
fn callbackTimeout(self: *Client, _: *Completion, result: TimeoutError!void) void {
self.timeout_pending = false;
if (self.connected == false) {
if (self.read_pending == false and self.write_pending == false) {
@@ -614,7 +615,7 @@ pub const Client = struct {
self.queueTimeout();
}
fn send(self: *Self, arena: ?ArenaAllocator, data: []const u8) !void {
fn send(self: *Client, arena: ?ArenaAllocator, data: []const u8) !void {
const node = try self.send_queue_node_pool.create();
errdefer self.send_queue_node_pool.destroy(node);
@@ -632,7 +633,7 @@ pub const Client = struct {
self.queueSend();
}
fn queueSend(self: *Self) void {
fn queueSend(self: *Client) void {
if (self.connected == false) {
return;
}
@@ -643,7 +644,7 @@ pub const Client = struct {
};
self.server.loop.io.send(
*Self,
*Client,
self,
sendCallback,
&self.write_completion,
@@ -653,7 +654,7 @@ pub const Client = struct {
self.write_pending = true;
}
fn sendCallback(self: *Self, _: *Completion, result: SendError!usize) void {
fn sendCallback(self: *Client, _: *Completion, result: SendError!usize) void {
self.write_pending = false;
if (self.connected == false) {
self.maybeDeinit();
@@ -1054,12 +1055,20 @@ pub fn run(
// - JS callbacks events from scripts
// var http_client = app.http_client;
while (true) {
// // @newhttp
// // This is a temporary hack for the newhttp work. The issue is that we
// // now have 2 event loops.
// if (http_client.active > 0) {
// _ = try http_client.tick(10);
// }
// @newhttp. This is a hack. We used to just have 1 loop, so we could
// sleep it it "forever" and any activity (message to this server,
// JS callback, http data) would wake it up.
// Now we have 2 loops. If we block on one, the other won't get woken
// up. We don't block "forever" but even 10ms adds a bunch of latency
// since this is called in a loop.
// Hopefully this is temporary and we can remove the io loop and then
// only have 1 loop. But, until then, we need to check both loops and
// pay some blocking penalty.
if (server.client) |client| {
if (client.cdp) |*cdp| {
cdp.pageWait();
}
}
try loop.io.run_for_ns(10 * std.time.ns_per_ms);
}