diff --git a/src/cdp/browser.zig b/src/cdp/browser.zig index ff20c329..b3f04c21 100644 --- a/src/cdp/browser.zig +++ b/src/cdp/browser.zig @@ -17,13 +17,12 @@ pub fn browser( action: []const u8, scanner: *std.json.Scanner, ctx: *Ctx, - comptime sendFn: SendFn, ) ![]const u8 { const method = std.meta.stringToEnum(BrowserMethods, action) orelse return error.UnknownBrowserMethod; return switch (method) { - .getVersion => browserGetVersion(alloc, id, scanner, ctx, sendFn), - .setDownloadBehavior => browserSetDownloadBehavior(alloc, id, scanner, ctx, sendFn), + .getVersion => browserGetVersion(alloc, id, scanner, ctx), + .setDownloadBehavior => browserSetDownloadBehavior(alloc, id, scanner, ctx), }; } @@ -38,7 +37,6 @@ fn browserGetVersion( id: u64, _: *std.json.Scanner, _: *Ctx, - comptime _: SendFn, ) ![]const u8 { const Res = struct { protocolVersion: []const u8, @@ -63,7 +61,6 @@ fn browserSetDownloadBehavior( id: u64, scanner: *std.json.Scanner, _: *Ctx, - comptime _: SendFn, ) ![]const u8 { const Params = struct { behavior: []const u8, diff --git a/src/cdp/cdp.zig b/src/cdp/cdp.zig index 7d33bc66..dcfb4f9f 100644 --- a/src/cdp/cdp.zig +++ b/src/cdp/cdp.zig @@ -16,7 +16,6 @@ pub fn do( alloc: std.mem.Allocator, s: []const u8, ctx: *Ctx, - comptime sendFn: SendFn, ) ![]const u8 { var scanner = std.json.Scanner.initCompleteInput(alloc, s); defer scanner.deinit(); @@ -29,15 +28,15 @@ pub fn do( try checkKey("method", (try scanner.next()).string); const method = (try scanner.next()).string; - std.log.debug("cmd: id {any}, method {s}\n", .{ id, method }); + std.log.debug("cmd: id {any}, method {s}", .{ id, method }); var iter = std.mem.splitScalar(u8, method, '.'); const domain = std.meta.stringToEnum(Domains, iter.first()) orelse return error.UnknonwDomain; return switch (domain) { - .Browser => browser(alloc, id, iter.next().?, &scanner, ctx, sendFn), - .Target => target(alloc, id, iter.next().?, &scanner, ctx, sendFn), + .Browser => browser(alloc, id, iter.next().?, &scanner, ctx), + .Target => target(alloc, id, iter.next().?, &scanner, ctx), }; } diff --git a/src/cdp/target.zig b/src/cdp/target.zig index cf861086..bca669cb 100644 --- a/src/cdp/target.zig +++ b/src/cdp/target.zig @@ -18,12 +18,11 @@ pub fn target( action: []const u8, scanner: *std.json.Scanner, ctx: *Ctx, - comptime sendFn: SendFn, ) ![]const u8 { const method = std.meta.stringToEnum(TargetMethods, action) orelse return error.UnknownTargetMethod; return switch (method) { - .setAutoAttach => tagetSetAutoAttach(alloc, id, scanner, ctx, sendFn), + .setAutoAttach => tagetSetAutoAttach(alloc, id, scanner, ctx), // .getTargetInfo => tagetGetTargetInfo(alloc, id, scanner), }; } @@ -33,7 +32,6 @@ fn tagetSetAutoAttach( id: u64, scanner: *std.json.Scanner, _: *Ctx, - comptime _: SendFn, ) ![]const u8 { const Params = struct { autoAttach: bool, @@ -52,7 +50,6 @@ fn tagetGetTargetInfo( id: u64, scanner: *std.json.Scanner, _: *Ctx, - comptime _: SendFn, ) ![]const u8 { _ = scanner; diff --git a/src/server.zig b/src/server.zig index 54f1d695..3f4d1443 100644 --- a/src/server.zig +++ b/src/server.zig @@ -12,7 +12,10 @@ pub const CmdContext = struct { js_env: *public.Env, socket: std.os.socket_t, completion: *public.IO.Completion, - buf: []u8, + + read_buf: []u8, + write_buf: []const u8 = undefined, + close: bool = false, try_catch: public.TryCatch, @@ -21,6 +24,10 @@ pub const CmdContext = struct { fn alloc(self: *CmdContext) std.mem.Allocator { return self.js_env.nat_ctx.alloc; } + + fn loop(self: *CmdContext) *public.Loop { + return self.js_env.nat_ctx.loop; + } }; fn respCallback( @@ -36,30 +43,48 @@ fn respCallback( std.log.debug("send ok", .{}); } -pub fn send(ctx: *CmdContext, msg: []const u8) !void { - defer ctx.alloc().free(msg); - return osSend(ctx, msg); +fn timeoutCallback( + ctx: *CmdContext, + completion: *public.IO.Completion, + result: public.IO.TimeoutError!void, +) void { + std.log.debug("sending after", .{}); + _ = result catch |err| { + ctx.close = true; + std.debug.print("timeout error: {s}\n", .{@errorName(err)}); + return; + }; + + ctx.alloc().destroy(completion); + send(ctx, ctx.write_buf) catch unreachable; } -pub const SendFn = (fn (*CmdContext, []const u8) anyerror!void); +pub fn sendLater(ctx: *CmdContext, msg: []const u8) !void { + ctx.write_buf = msg; + // NOTE: it seems we can't use the same completion for concurrent + // recv and timeout operations + // TODO: maybe instead of allocating this each time we can create + // a timeout_completion on the context? + // Not sure if there is several concurrent timeout operations on the same context + const completion = try ctx.alloc().create(public.IO.Completion); + ctx.loop().io.timeout(*CmdContext, ctx, timeoutCallback, completion, 1000); +} -fn osSend(ctx: *CmdContext, msg: []const u8) !void { +fn send(ctx: *CmdContext, msg: []const u8) !void { defer ctx.alloc().free(msg); const s = try std.os.write(ctx.socket, msg); std.log.debug("send ok {d}", .{s}); } fn loopSend(ctx: *CmdContext, msg: []const u8) !void { - ctx.buf = ctx.buf[0..msg.len]; - @memcpy(ctx.buf, msg); - ctx.alloc().free(msg); - ctx.js_env.nat_ctx.loop.io.send( + ctx.write_buf = msg; + ctx.loop().io.send( *CmdContext, ctx, respCallback, ctx.completion, ctx.socket, - ctx.buf, + ctx.write_buf, ); } @@ -69,13 +94,14 @@ fn cmdCallback( completion: *public.IO.Completion, result: public.IO.RecvError!usize, ) void { + // ctx.completion = completion; const size = result catch |err| { ctx.close = true; std.debug.print("recv error: {s}\n", .{@errorName(err)}); return; }; - const input = ctx.buf[0..size]; + const input = ctx.read_buf[0..size]; // close on exit command if (std.mem.eql(u8, input, "exit")) { @@ -83,28 +109,27 @@ fn cmdCallback( return; } - // continue receving messages asynchronously - defer { - ctx.js_env.nat_ctx.loop.io.recv( - *CmdContext, - ctx, - cmdCallback, - completion, - ctx.socket, - ctx.buf, - ); - } - - std.debug.print("input {s}\n", .{input}); - const res = cdp.do(ctx.alloc(), input, ctx, loopSend) catch |err| { + std.debug.print("\ninput {s}\n", .{input}); + const res = cdp.do(ctx.alloc(), input, ctx) catch |err| { std.log.debug("error: {any}\n", .{err}); - loopSend(ctx, "{}") catch unreachable; + send(ctx, "{}") catch unreachable; // TODO: return proper error return; }; std.log.debug("res {s}", .{res}); - loopSend(ctx, res) catch unreachable; + sendLater(ctx, res) catch unreachable; + std.log.debug("finish", .{}); + + // continue receving messages asynchronously + ctx.loop().io.recv( + *CmdContext, + ctx, + cmdCallback, + completion, + ctx.socket, + ctx.read_buf, + ); } // I/O connection context @@ -123,13 +148,13 @@ fn connCallback( ctx.cmdContext.socket = result catch |err| @panic(@errorName(err)); // launch receving messages asynchronously - ctx.cmdContext.js_env.nat_ctx.loop.io.recv( + ctx.cmdContext.loop().io.recv( *CmdContext, ctx.cmdContext, cmdCallback, completion, ctx.cmdContext.socket, - ctx.cmdContext.buf, + ctx.cmdContext.read_buf, ); } @@ -165,7 +190,7 @@ pub fn execJS( var cmd_ctx = CmdContext{ .js_env = js_env, .socket = undefined, - .buf = &input, + .read_buf = &input, .try_catch = try_catch, .completion = &completion, };