Working sendLater (I/O timeout)

Signed-off-by: Francis Bouvier <francis@lightpanda.io>
This commit is contained in:
Francis Bouvier
2024-04-15 17:36:22 +02:00
parent defab0c774
commit cfd6fc9532
4 changed files with 62 additions and 44 deletions

View File

@@ -17,13 +17,12 @@ pub fn browser(
action: []const u8, action: []const u8,
scanner: *std.json.Scanner, scanner: *std.json.Scanner,
ctx: *Ctx, ctx: *Ctx,
comptime sendFn: SendFn,
) ![]const u8 { ) ![]const u8 {
const method = std.meta.stringToEnum(BrowserMethods, action) orelse const method = std.meta.stringToEnum(BrowserMethods, action) orelse
return error.UnknownBrowserMethod; return error.UnknownBrowserMethod;
return switch (method) { return switch (method) {
.getVersion => browserGetVersion(alloc, id, scanner, ctx, sendFn), .getVersion => browserGetVersion(alloc, id, scanner, ctx),
.setDownloadBehavior => browserSetDownloadBehavior(alloc, id, scanner, ctx, sendFn), .setDownloadBehavior => browserSetDownloadBehavior(alloc, id, scanner, ctx),
}; };
} }
@@ -38,7 +37,6 @@ fn browserGetVersion(
id: u64, id: u64,
_: *std.json.Scanner, _: *std.json.Scanner,
_: *Ctx, _: *Ctx,
comptime _: SendFn,
) ![]const u8 { ) ![]const u8 {
const Res = struct { const Res = struct {
protocolVersion: []const u8, protocolVersion: []const u8,
@@ -63,7 +61,6 @@ fn browserSetDownloadBehavior(
id: u64, id: u64,
scanner: *std.json.Scanner, scanner: *std.json.Scanner,
_: *Ctx, _: *Ctx,
comptime _: SendFn,
) ![]const u8 { ) ![]const u8 {
const Params = struct { const Params = struct {
behavior: []const u8, behavior: []const u8,

View File

@@ -16,7 +16,6 @@ pub fn do(
alloc: std.mem.Allocator, alloc: std.mem.Allocator,
s: []const u8, s: []const u8,
ctx: *Ctx, ctx: *Ctx,
comptime sendFn: SendFn,
) ![]const u8 { ) ![]const u8 {
var scanner = std.json.Scanner.initCompleteInput(alloc, s); var scanner = std.json.Scanner.initCompleteInput(alloc, s);
defer scanner.deinit(); defer scanner.deinit();
@@ -29,15 +28,15 @@ pub fn do(
try checkKey("method", (try scanner.next()).string); try checkKey("method", (try scanner.next()).string);
const method = (try scanner.next()).string; const method = (try scanner.next()).string;
std.log.debug("cmd: id {any}, method {s}\n", .{ id, method }); std.log.debug("cmd: id {any}, method {s}", .{ id, method });
var iter = std.mem.splitScalar(u8, method, '.'); var iter = std.mem.splitScalar(u8, method, '.');
const domain = std.meta.stringToEnum(Domains, iter.first()) orelse const domain = std.meta.stringToEnum(Domains, iter.first()) orelse
return error.UnknonwDomain; return error.UnknonwDomain;
return switch (domain) { return switch (domain) {
.Browser => browser(alloc, id, iter.next().?, &scanner, ctx, sendFn), .Browser => browser(alloc, id, iter.next().?, &scanner, ctx),
.Target => target(alloc, id, iter.next().?, &scanner, ctx, sendFn), .Target => target(alloc, id, iter.next().?, &scanner, ctx),
}; };
} }

View File

@@ -18,12 +18,11 @@ pub fn target(
action: []const u8, action: []const u8,
scanner: *std.json.Scanner, scanner: *std.json.Scanner,
ctx: *Ctx, ctx: *Ctx,
comptime sendFn: SendFn,
) ![]const u8 { ) ![]const u8 {
const method = std.meta.stringToEnum(TargetMethods, action) orelse const method = std.meta.stringToEnum(TargetMethods, action) orelse
return error.UnknownTargetMethod; return error.UnknownTargetMethod;
return switch (method) { return switch (method) {
.setAutoAttach => tagetSetAutoAttach(alloc, id, scanner, ctx, sendFn), .setAutoAttach => tagetSetAutoAttach(alloc, id, scanner, ctx),
// .getTargetInfo => tagetGetTargetInfo(alloc, id, scanner), // .getTargetInfo => tagetGetTargetInfo(alloc, id, scanner),
}; };
} }
@@ -33,7 +32,6 @@ fn tagetSetAutoAttach(
id: u64, id: u64,
scanner: *std.json.Scanner, scanner: *std.json.Scanner,
_: *Ctx, _: *Ctx,
comptime _: SendFn,
) ![]const u8 { ) ![]const u8 {
const Params = struct { const Params = struct {
autoAttach: bool, autoAttach: bool,
@@ -52,7 +50,6 @@ fn tagetGetTargetInfo(
id: u64, id: u64,
scanner: *std.json.Scanner, scanner: *std.json.Scanner,
_: *Ctx, _: *Ctx,
comptime _: SendFn,
) ![]const u8 { ) ![]const u8 {
_ = scanner; _ = scanner;

View File

@@ -12,7 +12,10 @@ pub const CmdContext = struct {
js_env: *public.Env, js_env: *public.Env,
socket: std.os.socket_t, socket: std.os.socket_t,
completion: *public.IO.Completion, completion: *public.IO.Completion,
buf: []u8,
read_buf: []u8,
write_buf: []const u8 = undefined,
close: bool = false, close: bool = false,
try_catch: public.TryCatch, try_catch: public.TryCatch,
@@ -21,6 +24,10 @@ pub const CmdContext = struct {
fn alloc(self: *CmdContext) std.mem.Allocator { fn alloc(self: *CmdContext) std.mem.Allocator {
return self.js_env.nat_ctx.alloc; return self.js_env.nat_ctx.alloc;
} }
fn loop(self: *CmdContext) *public.Loop {
return self.js_env.nat_ctx.loop;
}
}; };
fn respCallback( fn respCallback(
@@ -36,30 +43,48 @@ fn respCallback(
std.log.debug("send ok", .{}); std.log.debug("send ok", .{});
} }
pub fn send(ctx: *CmdContext, msg: []const u8) !void { fn timeoutCallback(
defer ctx.alloc().free(msg); ctx: *CmdContext,
return osSend(ctx, msg); completion: *public.IO.Completion,
result: public.IO.TimeoutError!void,
) void {
std.log.debug("sending after", .{});
_ = result catch |err| {
ctx.close = true;
std.debug.print("timeout error: {s}\n", .{@errorName(err)});
return;
};
ctx.alloc().destroy(completion);
send(ctx, ctx.write_buf) catch unreachable;
} }
pub const SendFn = (fn (*CmdContext, []const u8) anyerror!void); pub fn sendLater(ctx: *CmdContext, msg: []const u8) !void {
ctx.write_buf = msg;
// NOTE: it seems we can't use the same completion for concurrent
// recv and timeout operations
// TODO: maybe instead of allocating this each time we can create
// a timeout_completion on the context?
// Not sure if there is several concurrent timeout operations on the same context
const completion = try ctx.alloc().create(public.IO.Completion);
ctx.loop().io.timeout(*CmdContext, ctx, timeoutCallback, completion, 1000);
}
fn osSend(ctx: *CmdContext, msg: []const u8) !void { fn send(ctx: *CmdContext, msg: []const u8) !void {
defer ctx.alloc().free(msg); defer ctx.alloc().free(msg);
const s = try std.os.write(ctx.socket, msg); const s = try std.os.write(ctx.socket, msg);
std.log.debug("send ok {d}", .{s}); std.log.debug("send ok {d}", .{s});
} }
fn loopSend(ctx: *CmdContext, msg: []const u8) !void { fn loopSend(ctx: *CmdContext, msg: []const u8) !void {
ctx.buf = ctx.buf[0..msg.len]; ctx.write_buf = msg;
@memcpy(ctx.buf, msg); ctx.loop().io.send(
ctx.alloc().free(msg);
ctx.js_env.nat_ctx.loop.io.send(
*CmdContext, *CmdContext,
ctx, ctx,
respCallback, respCallback,
ctx.completion, ctx.completion,
ctx.socket, ctx.socket,
ctx.buf, ctx.write_buf,
); );
} }
@@ -69,13 +94,14 @@ fn cmdCallback(
completion: *public.IO.Completion, completion: *public.IO.Completion,
result: public.IO.RecvError!usize, result: public.IO.RecvError!usize,
) void { ) void {
// ctx.completion = completion;
const size = result catch |err| { const size = result catch |err| {
ctx.close = true; ctx.close = true;
std.debug.print("recv error: {s}\n", .{@errorName(err)}); std.debug.print("recv error: {s}\n", .{@errorName(err)});
return; return;
}; };
const input = ctx.buf[0..size]; const input = ctx.read_buf[0..size];
// close on exit command // close on exit command
if (std.mem.eql(u8, input, "exit")) { if (std.mem.eql(u8, input, "exit")) {
@@ -83,28 +109,27 @@ fn cmdCallback(
return; return;
} }
// continue receving messages asynchronously std.debug.print("\ninput {s}\n", .{input});
defer { const res = cdp.do(ctx.alloc(), input, ctx) catch |err| {
ctx.js_env.nat_ctx.loop.io.recv(
*CmdContext,
ctx,
cmdCallback,
completion,
ctx.socket,
ctx.buf,
);
}
std.debug.print("input {s}\n", .{input});
const res = cdp.do(ctx.alloc(), input, ctx, loopSend) catch |err| {
std.log.debug("error: {any}\n", .{err}); std.log.debug("error: {any}\n", .{err});
loopSend(ctx, "{}") catch unreachable; send(ctx, "{}") catch unreachable;
// TODO: return proper error // TODO: return proper error
return; return;
}; };
std.log.debug("res {s}", .{res}); std.log.debug("res {s}", .{res});
loopSend(ctx, res) catch unreachable; sendLater(ctx, res) catch unreachable;
std.log.debug("finish", .{});
// continue receving messages asynchronously
ctx.loop().io.recv(
*CmdContext,
ctx,
cmdCallback,
completion,
ctx.socket,
ctx.read_buf,
);
} }
// I/O connection context // I/O connection context
@@ -123,13 +148,13 @@ fn connCallback(
ctx.cmdContext.socket = result catch |err| @panic(@errorName(err)); ctx.cmdContext.socket = result catch |err| @panic(@errorName(err));
// launch receving messages asynchronously // launch receving messages asynchronously
ctx.cmdContext.js_env.nat_ctx.loop.io.recv( ctx.cmdContext.loop().io.recv(
*CmdContext, *CmdContext,
ctx.cmdContext, ctx.cmdContext,
cmdCallback, cmdCallback,
completion, completion,
ctx.cmdContext.socket, ctx.cmdContext.socket,
ctx.cmdContext.buf, ctx.cmdContext.read_buf,
); );
} }
@@ -165,7 +190,7 @@ pub fn execJS(
var cmd_ctx = CmdContext{ var cmd_ctx = CmdContext{
.js_env = js_env, .js_env = js_env,
.socket = undefined, .socket = undefined,
.buf = &input, .read_buf = &input,
.try_catch = try_catch, .try_catch = try_catch,
.completion = &completion, .completion = &completion,
}; };