diff --git a/src/cdp/cdp.zig b/src/cdp/cdp.zig index a9624556..df5003f2 100644 --- a/src/cdp/cdp.zig +++ b/src/cdp/cdp.zig @@ -226,8 +226,7 @@ pub fn sendEvent( const resp = Resp{ .method = name, .params = params, .sessionId = sessionID }; const event_msg = try stringify(alloc, resp); - defer alloc.free(event_msg); - try server.sendSync(ctx, event_msg); + try server.sendAsync(ctx, event_msg); } fn getParams( diff --git a/src/cdp/page.zig b/src/cdp/page.zig index 1c5076c5..8404ea42 100644 --- a/src/cdp/page.zig +++ b/src/cdp/page.zig @@ -331,8 +331,7 @@ fn navigate( .loaderId = ctx.state.loaderID, }; const res = try result(alloc, msg.id, Resp, resp, msg.sessionID); - defer alloc.free(res); - try server.sendSync(ctx, res); + try server.sendAsync(ctx, res); // TODO: at this point do we need async the following actions to be async? diff --git a/src/cdp/target.zig b/src/cdp/target.zig index ba0e977c..e91c4573 100644 --- a/src/cdp/target.zig +++ b/src/cdp/target.zig @@ -256,8 +256,7 @@ fn disposeBrowserContext( // output const res = try result(alloc, msg.id, null, .{}, null); - defer alloc.free(res); - try server.sendSync(ctx, res); + try server.sendAsync(ctx, res); return error.DisposeBrowserContext; } @@ -345,8 +344,7 @@ fn closeTarget( success: bool = true, }; const res = try result(alloc, msg.id, Resp, Resp{}, null); - defer alloc.free(res); - try server.sendSync(ctx, res); + try server.sendAsync(ctx, res); // Inspector.detached event const InspectorDetached = struct { diff --git a/src/msg.zig b/src/msg.zig index c7b6d353..b3d4da5a 100644 --- a/src/msg.zig +++ b/src/msg.zig @@ -55,89 +55,72 @@ pub const MsgBuffer = struct { } // read input - // - `do_func` is a callback to execute on each message of the input - // - `data` is an arbitrary user data that will be forwarded to the do_func callback - pub fn read( - self: *MsgBuffer, - alloc: std.mem.Allocator, - input: []const u8, - data: anytype, - comptime do_func: fn (data: @TypeOf(data), msg: []const u8) anyerror!void, - ) !void { + pub fn read(self: *MsgBuffer, alloc: std.mem.Allocator, input: []const u8) !struct { + msg: []const u8, + left: []const u8, + } { var _input = input; // make input writable - while (true) { - var msg: []const u8 = undefined; - - // msg size - var msg_size: usize = undefined; - if (self.isEmpty()) { - // parse msg size metadata - const size_pos = std.mem.indexOfScalar(u8, _input, ':') orelse return error.InputWithoutSize; - const size_str = _input[0..size_pos]; - msg_size = try std.fmt.parseInt(u32, size_str, 10); - _input = _input[size_pos + 1 ..]; - } else { - msg_size = self.size; - } - - // multipart - const is_multipart = !self.isEmpty() or _input.len < msg_size; - if (is_multipart) { - - // set msg size on empty MsgBuffer - if (self.isEmpty()) { - self.size = msg_size; - } - - // get the new position of the cursor - const new_pos = self.pos + _input.len; - - // check max limit size - if (new_pos > MaxSize) { - return error.MsgTooBig; - } - - // check if the current input can fit in MsgBuffer - if (new_pos > self.buf.len) { - // we want to realloc at least: - // - a size big enough to fit the entire input (ie. new_pos) - // - a size big enough (ie. current size + starting size) - // to avoid multiple reallocation - const new_size = @max(self.buf.len + self.size, new_pos); - // resize the MsgBuffer to fit - self.buf = try alloc.realloc(self.buf, new_size); - } - - // copy the current input into MsgBuffer - @memcpy(self.buf[self.pos..new_pos], _input[0..]); - - // set the new cursor position - self.pos = new_pos; - - // if multipart is not finished, go fetch the next input - if (!self.isFinished()) return; - - // otherwhise multipart is finished, use its buffer as input - _input = self.buf[0..self.pos]; - self.reset(); - } - - // handle several JSON msg in 1 read - const is_combined = _input.len > msg_size; - msg = _input[0..msg_size]; - if (is_combined) { - _input = _input[msg_size..]; - } - - try @call(.auto, do_func, .{ data, msg }); - - if (!is_combined) break; + // msg size + var msg_size: usize = undefined; + if (self.isEmpty()) { + // parse msg size metadata + const size_pos = std.mem.indexOfScalar(u8, _input, ':') orelse return error.InputWithoutSize; + const size_str = _input[0..size_pos]; + msg_size = try std.fmt.parseInt(u32, size_str, 10); + _input = _input[size_pos + 1 ..]; + } else { + msg_size = self.size; } + + // multipart + const is_multipart = !self.isEmpty() or _input.len < msg_size; + if (is_multipart) { + + // set msg size on empty MsgBuffer + if (self.isEmpty()) { + self.size = msg_size; + } + + // get the new position of the cursor + const new_pos = self.pos + _input.len; + + // check max limit size + if (new_pos > MaxSize) { + return error.MsgTooBig; + } + + // check if the current input can fit in MsgBuffer + if (new_pos > self.buf.len) { + // we want to realloc at least: + // - a size big enough to fit the entire input (ie. new_pos) + // - a size big enough (ie. current msg size + starting buffer size) + // to avoid multiple reallocation + const new_size = @max(self.buf.len + self.size, new_pos); + // resize the MsgBuffer to fit + self.buf = try alloc.realloc(self.buf, new_size); + } + + // copy the current input into MsgBuffer + @memcpy(self.buf[self.pos..new_pos], _input[0..]); + + // set the new cursor position + self.pos = new_pos; + + // if multipart is not finished, go fetch the next input + if (!self.isFinished()) return error.MsgMultipart; + + // otherwhise multipart is finished, use its buffer as input + _input = self.buf[0..self.pos]; + self.reset(); + } + + // handle several JSON msg in 1 read + return .{ .msg = _input[0..msg_size], .left = _input[msg_size..] }; } }; -fn doTest(nb: *u8, _: []const u8) anyerror!void { +fn doTest(nb: *u8) void { nb.* += 1; } @@ -171,12 +154,19 @@ test "MsgBuffer" { .{ .input = "2:ok9:multi", .nb = 1 }, .{ .input = "part", .nb = 1 }, }; - var nb: u8 = undefined; var msg_buf = try MsgBuffer.init(alloc, 10); defer msg_buf.deinit(alloc); for (cases) |case| { - nb = 0; - try msg_buf.read(alloc, case.input, &nb, doTest); + var nb: u8 = 0; + var input: []const u8 = case.input; + while (input.len > 0) { + const parts = msg_buf.read(alloc, input) catch |err| { + if (err == error.MsgMultipart) break; // go to the next case input + return err; + }; + nb += 1; + input = parts.left; + } try std.testing.expect(nb == case.nb); } } diff --git a/src/server.zig b/src/server.zig index 2b8ee379..6c428a99 100644 --- a/src/server.zig +++ b/src/server.zig @@ -128,17 +128,6 @@ pub const Ctx = struct { return; } - // input - const input = self.read_buf[0..size]; - - // read and execute input - self.msg_buf.read(self.alloc(), input, self, Ctx.do) catch |err| { - if (err != error.Closed) { - log.err("do error: {any}", .{err}); - } - return; - }; - // set connection timestamp self.last_active = std.time.Instant.now() catch |err| { log.err("read timestamp error: {any}", .{err}); @@ -154,6 +143,26 @@ pub const Ctx = struct { self.conn_socket, self.read_buf, ); + + // read and execute input + var input: []const u8 = self.read_buf[0..size]; + while (input.len > 0) { + const parts = self.msg_buf.read(self.alloc(), input) catch |err| { + if (err == error.MsgMultipart) { + return; + } else { + log.err("msg read error: {any}", .{err}); + return; + } + }; + input = parts.left; + // execute + self.do(parts.msg) catch |err| { + if (err != error.Closed) { + log.err("do error: {any}", .{err}); + } + }; + } } fn timeoutCbk(self: *Ctx, completion: *Completion, result: TimeoutError!void) void { @@ -327,9 +336,8 @@ pub const Ctx = struct { tpl, .{ msg_open, cdp.ContextSessionID }, ); - defer ctx.alloc().free(s); - try sendSync(ctx, s); + try sendAsync(ctx, s); } pub fn onInspectorResp(ctx_opaque: *anyopaque, _: u32, msg: []const u8) void { @@ -392,10 +400,6 @@ pub fn sendAsync(ctx: *Ctx, msg: []const u8) !void { ctx.loop.io.send(*Send, sd, Send.asyncCbk, &sd.completion, ctx.conn_socket, msg); } -pub fn sendSync(ctx: *Ctx, msg: []const u8) !void { - _ = try std.posix.write(ctx.conn_socket, msg); -} - // Listen // ------