mirror of
https://github.com/lightpanda-io/browser.git
synced 2025-10-29 07:03:29 +00:00
Merge pull request #310 from lightpanda-io/websockets
websocket: first implementation
This commit is contained in:
4
.gitmodules
vendored
4
.gitmodules
vendored
@@ -28,3 +28,7 @@
|
|||||||
[submodule "vendor/zig-async-io"]
|
[submodule "vendor/zig-async-io"]
|
||||||
path = vendor/zig-async-io
|
path = vendor/zig-async-io
|
||||||
url = git@github.com:lightpanda-io/zig-async-io.git
|
url = git@github.com:lightpanda-io/zig-async-io.git
|
||||||
|
[submodule "vendor/websocket.zig"]
|
||||||
|
path = vendor/websocket.zig
|
||||||
|
url = git@github.com:lightpanda-io/websocket.zig.git
|
||||||
|
branch = lightpanda
|
||||||
|
|||||||
@@ -168,6 +168,11 @@ fn common(
|
|||||||
.root_source_file = b.path("vendor/tls.zig/src/main.zig"),
|
.root_source_file = b.path("vendor/tls.zig/src/main.zig"),
|
||||||
});
|
});
|
||||||
step.root_module.addImport("tls", tlsmod);
|
step.root_module.addImport("tls", tlsmod);
|
||||||
|
|
||||||
|
const wsmod = b.addModule("websocket", .{
|
||||||
|
.root_source_file = b.path("vendor/websocket.zig/src/websocket.zig"),
|
||||||
|
});
|
||||||
|
step.root_module.addImport("websocket", wsmod);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn moduleNetSurf(b: *std.Build, target: std.Build.ResolvedTarget) !*std.Build.Module {
|
fn moduleNetSurf(b: *std.Build, target: std.Build.ResolvedTarget) !*std.Build.Module {
|
||||||
|
|||||||
@@ -193,7 +193,7 @@ pub fn sendEvent(
|
|||||||
const resp = Resp{ .method = name, .params = params, .sessionId = sessionID };
|
const resp = Resp{ .method = name, .params = params, .sessionId = sessionID };
|
||||||
|
|
||||||
const event_msg = try stringify(alloc, resp);
|
const event_msg = try stringify(alloc, resp);
|
||||||
try server.sendAsync(ctx, event_msg);
|
try ctx.send(event_msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Common
|
// Common
|
||||||
|
|||||||
@@ -323,7 +323,7 @@ fn navigate(
|
|||||||
.loaderId = ctx.state.loaderID,
|
.loaderId = ctx.state.loaderID,
|
||||||
};
|
};
|
||||||
const res = try result(alloc, input.id, Resp, resp, input.sessionId);
|
const res = try result(alloc, input.id, Resp, resp, input.sessionId);
|
||||||
try server.sendAsync(ctx, res);
|
try ctx.send(res);
|
||||||
|
|
||||||
// TODO: at this point do we need async the following actions to be async?
|
// TODO: at this point do we need async the following actions to be async?
|
||||||
|
|
||||||
|
|||||||
@@ -292,7 +292,7 @@ fn disposeBrowserContext(
|
|||||||
|
|
||||||
// output
|
// output
|
||||||
const res = try result(alloc, input.id, null, .{}, null);
|
const res = try result(alloc, input.id, null, .{}, null);
|
||||||
try server.sendAsync(ctx, res);
|
try ctx.send(res);
|
||||||
|
|
||||||
return error.DisposeBrowserContext;
|
return error.DisposeBrowserContext;
|
||||||
}
|
}
|
||||||
@@ -378,7 +378,7 @@ fn closeTarget(
|
|||||||
success: bool = true,
|
success: bool = true,
|
||||||
};
|
};
|
||||||
const res = try result(alloc, input.id, Resp, Resp{}, null);
|
const res = try result(alloc, input.id, Resp, Resp{}, null);
|
||||||
try server.sendAsync(ctx, res);
|
try ctx.send(res);
|
||||||
|
|
||||||
// Inspector.detached event
|
// Inspector.detached event
|
||||||
const InspectorDetached = struct {
|
const InspectorDetached = struct {
|
||||||
|
|||||||
89
src/handler.zig
Normal file
89
src/handler.zig
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
|
||||||
|
//
|
||||||
|
// Francis Bouvier <francis@lightpanda.io>
|
||||||
|
// Pierre Tachoire <pierre@lightpanda.io>
|
||||||
|
//
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as
|
||||||
|
// published by the Free Software Foundation, either version 3 of the
|
||||||
|
// License, or (at your option) any later version.
|
||||||
|
//
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
const std = @import("std");
|
||||||
|
|
||||||
|
const ws = @import("websocket");
|
||||||
|
|
||||||
|
const log = std.log.scoped(.handler);
|
||||||
|
|
||||||
|
pub const Stream = struct {
|
||||||
|
addr: std.net.Address,
|
||||||
|
socket: std.posix.socket_t = undefined,
|
||||||
|
ws_conn: *ws.Conn = undefined,
|
||||||
|
|
||||||
|
fn connectCDP(self: *Stream) !void {
|
||||||
|
const flags: u32 = std.posix.SOCK.STREAM;
|
||||||
|
const proto = blk: {
|
||||||
|
if (self.addr.any.family == std.posix.AF.UNIX) break :blk @as(u32, 0);
|
||||||
|
break :blk std.posix.IPPROTO.TCP;
|
||||||
|
};
|
||||||
|
const socket = try std.posix.socket(self.addr.any.family, flags, proto);
|
||||||
|
|
||||||
|
try std.posix.connect(
|
||||||
|
socket,
|
||||||
|
&self.addr.any,
|
||||||
|
self.addr.getOsSockLen(),
|
||||||
|
);
|
||||||
|
log.debug("connected to Stream server", .{});
|
||||||
|
self.socket = socket;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn closeCDP(self: *const Stream) void {
|
||||||
|
const close_msg: []const u8 = "5:close";
|
||||||
|
self.recv(close_msg) catch |err| {
|
||||||
|
log.err("stream close error: {any}", .{err});
|
||||||
|
};
|
||||||
|
std.posix.close(self.socket);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start(self: *Stream, ws_conn: *ws.Conn) !void {
|
||||||
|
try self.connectCDP();
|
||||||
|
self.ws_conn = ws_conn;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn recv(self: *const Stream, data: []const u8) !void {
|
||||||
|
var pos: usize = 0;
|
||||||
|
while (pos < data.len) {
|
||||||
|
const len = try std.posix.write(self.socket, data[pos..]);
|
||||||
|
pos += len;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send(self: *const Stream, data: []const u8) !void {
|
||||||
|
return self.ws_conn.write(data);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
pub const Handler = struct {
|
||||||
|
stream: *Stream,
|
||||||
|
|
||||||
|
pub fn init(_: ws.Handshake, ws_conn: *ws.Conn, stream: *Stream) !Handler {
|
||||||
|
try stream.start(ws_conn);
|
||||||
|
return .{ .stream = stream };
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn close(self: *Handler) void {
|
||||||
|
self.stream.closeCDP();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn clientMessage(self: *Handler, alloc: std.mem.Allocator, data: []const u8) !void {
|
||||||
|
const msg = try std.fmt.allocPrint(alloc, "{d}:{s}", .{ data.len, data });
|
||||||
|
try self.stream.recv(msg);
|
||||||
|
}
|
||||||
|
};
|
||||||
190
src/main.zig
190
src/main.zig
@@ -17,13 +17,14 @@
|
|||||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
const posix = std.posix;
|
|
||||||
const builtin = @import("builtin");
|
const builtin = @import("builtin");
|
||||||
|
|
||||||
const jsruntime = @import("jsruntime");
|
const jsruntime = @import("jsruntime");
|
||||||
|
const websocket = @import("websocket");
|
||||||
|
|
||||||
const Browser = @import("browser/browser.zig").Browser;
|
const Browser = @import("browser/browser.zig").Browser;
|
||||||
const server = @import("server.zig");
|
const server = @import("server.zig");
|
||||||
|
const handler = @import("handler.zig");
|
||||||
|
|
||||||
const parser = @import("netsurf");
|
const parser = @import("netsurf");
|
||||||
const apiweb = @import("apiweb.zig");
|
const apiweb = @import("apiweb.zig");
|
||||||
@@ -32,103 +33,12 @@ pub const Types = jsruntime.reflect(apiweb.Interfaces);
|
|||||||
pub const UserContext = apiweb.UserContext;
|
pub const UserContext = apiweb.UserContext;
|
||||||
pub const IO = @import("asyncio").Wrapper(jsruntime.Loop);
|
pub const IO = @import("asyncio").Wrapper(jsruntime.Loop);
|
||||||
|
|
||||||
|
// Simple blocking websocket connection model
|
||||||
|
// ie. 1 thread per ws connection without thread pool and epoll/kqueue
|
||||||
|
pub const websocket_blocking = true;
|
||||||
|
|
||||||
const log = std.log.scoped(.cli);
|
const log = std.log.scoped(.cli);
|
||||||
|
|
||||||
// Inspired by std.net.StreamServer in Zig < 0.12
|
|
||||||
pub const StreamServer = struct {
|
|
||||||
/// Copied from `Options` on `init`.
|
|
||||||
kernel_backlog: u31,
|
|
||||||
reuse_address: bool,
|
|
||||||
reuse_port: bool,
|
|
||||||
nonblocking: bool,
|
|
||||||
|
|
||||||
/// `undefined` until `listen` returns successfully.
|
|
||||||
listen_address: std.net.Address,
|
|
||||||
|
|
||||||
sockfd: ?posix.socket_t,
|
|
||||||
|
|
||||||
pub const Options = struct {
|
|
||||||
/// How many connections the kernel will accept on the application's behalf.
|
|
||||||
/// If more than this many connections pool in the kernel, clients will start
|
|
||||||
/// seeing "Connection refused".
|
|
||||||
kernel_backlog: u31 = 128,
|
|
||||||
|
|
||||||
/// Enable SO.REUSEADDR on the socket.
|
|
||||||
reuse_address: bool = false,
|
|
||||||
|
|
||||||
/// Enable SO.REUSEPORT on the socket.
|
|
||||||
reuse_port: bool = false,
|
|
||||||
|
|
||||||
/// Non-blocking mode.
|
|
||||||
nonblocking: bool = false,
|
|
||||||
};
|
|
||||||
|
|
||||||
/// After this call succeeds, resources have been acquired and must
|
|
||||||
/// be released with `deinit`.
|
|
||||||
pub fn init(options: Options) StreamServer {
|
|
||||||
return StreamServer{
|
|
||||||
.sockfd = null,
|
|
||||||
.kernel_backlog = options.kernel_backlog,
|
|
||||||
.reuse_address = options.reuse_address,
|
|
||||||
.reuse_port = options.reuse_port,
|
|
||||||
.nonblocking = options.nonblocking,
|
|
||||||
.listen_address = undefined,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Release all resources. The `StreamServer` memory becomes `undefined`.
|
|
||||||
pub fn deinit(self: *StreamServer) void {
|
|
||||||
self.close();
|
|
||||||
self.* = undefined;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn setSockOpt(fd: posix.socket_t, level: i32, option: u32, value: c_int) !void {
|
|
||||||
try posix.setsockopt(fd, level, option, &std.mem.toBytes(value));
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn listen(self: *StreamServer, address: std.net.Address) !void {
|
|
||||||
const sock_flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC;
|
|
||||||
var use_sock_flags: u32 = sock_flags;
|
|
||||||
if (self.nonblocking) use_sock_flags |= posix.SOCK.NONBLOCK;
|
|
||||||
const proto = if (address.any.family == posix.AF.UNIX) @as(u32, 0) else posix.IPPROTO.TCP;
|
|
||||||
|
|
||||||
const sockfd = try posix.socket(address.any.family, use_sock_flags, proto);
|
|
||||||
self.sockfd = sockfd;
|
|
||||||
errdefer {
|
|
||||||
posix.close(sockfd);
|
|
||||||
self.sockfd = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// socket options
|
|
||||||
if (self.reuse_address) {
|
|
||||||
try setSockOpt(sockfd, posix.SOL.SOCKET, posix.SO.REUSEADDR, 1);
|
|
||||||
}
|
|
||||||
if (@hasDecl(posix.SO, "REUSEPORT") and self.reuse_port) {
|
|
||||||
try setSockOpt(sockfd, posix.SOL.SOCKET, posix.SO.REUSEPORT, 1);
|
|
||||||
}
|
|
||||||
if (builtin.target.os.tag == .linux) { // posix.TCP not available on MacOS
|
|
||||||
// WARNING: disable Nagle's alogrithm to avoid latency issues
|
|
||||||
try setSockOpt(sockfd, posix.IPPROTO.TCP, posix.TCP.NODELAY, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
var socklen = address.getOsSockLen();
|
|
||||||
try posix.bind(sockfd, &address.any, socklen);
|
|
||||||
try posix.listen(sockfd, self.kernel_backlog);
|
|
||||||
try posix.getsockname(sockfd, &self.listen_address.any, &socklen);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Stop listening. It is still necessary to call `deinit` after stopping listening.
|
|
||||||
/// Calling `deinit` will automatically call `close`. It is safe to call `close` when
|
|
||||||
/// not listening.
|
|
||||||
pub fn close(self: *StreamServer) void {
|
|
||||||
if (self.sockfd) |fd| {
|
|
||||||
posix.close(fd);
|
|
||||||
self.sockfd = null;
|
|
||||||
self.listen_address = undefined;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
const usage =
|
const usage =
|
||||||
\\usage: {s} [options] [URL]
|
\\usage: {s} [options] [URL]
|
||||||
\\
|
\\
|
||||||
@@ -139,7 +49,7 @@ const usage =
|
|||||||
\\
|
\\
|
||||||
\\ -h, --help Print this help message and exit.
|
\\ -h, --help Print this help message and exit.
|
||||||
\\ --host Host of the CDP server (default "127.0.0.1")
|
\\ --host Host of the CDP server (default "127.0.0.1")
|
||||||
\\ --port Port of the CDP server (default "3245")
|
\\ --port Port of the CDP server (default "9222")
|
||||||
\\ --timeout Timeout for incoming connections of the CDP server (in seconds, default "3")
|
\\ --timeout Timeout for incoming connections of the CDP server (in seconds, default "3")
|
||||||
\\ --dump Dump document in stdout (fetch mode only)
|
\\ --dump Dump document in stdout (fetch mode only)
|
||||||
\\
|
\\
|
||||||
@@ -170,10 +80,11 @@ const CliMode = union(CliModeTag) {
|
|||||||
host: []const u8 = Host,
|
host: []const u8 = Host,
|
||||||
port: u16 = Port,
|
port: u16 = Port,
|
||||||
timeout: u8 = Timeout,
|
timeout: u8 = Timeout,
|
||||||
|
tcp: bool = false, // undocumented TCP mode
|
||||||
|
|
||||||
// default options
|
// default options
|
||||||
const Host = "127.0.0.1";
|
const Host = "127.0.0.1";
|
||||||
const Port = 3245;
|
const Port = 9222;
|
||||||
const Timeout = 3; // in seconds
|
const Timeout = 3; // in seconds
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -235,6 +146,10 @@ const CliMode = union(CliModeTag) {
|
|||||||
return printUsageExit(execname, 1);
|
return printUsageExit(execname, 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (std.mem.eql(u8, "--tcp", opt)) {
|
||||||
|
_server.tcp = true;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// unknown option
|
// unknown option
|
||||||
if (std.mem.startsWith(u8, opt, "--")) {
|
if (std.mem.startsWith(u8, opt, "--")) {
|
||||||
@@ -317,33 +232,64 @@ pub fn main() !void {
|
|||||||
defer cli_mode.deinit();
|
defer cli_mode.deinit();
|
||||||
|
|
||||||
switch (cli_mode) {
|
switch (cli_mode) {
|
||||||
.server => |mode| {
|
.server => |opts| {
|
||||||
|
|
||||||
// server
|
// Stream server
|
||||||
var srv = StreamServer.init(.{
|
const addr = blk: {
|
||||||
.reuse_address = true,
|
if (opts.tcp) {
|
||||||
.reuse_port = true,
|
break :blk opts.addr;
|
||||||
.nonblocking = true,
|
} else {
|
||||||
});
|
const unix_path = "/tmp/lightpanda";
|
||||||
defer srv.deinit();
|
std.fs.deleteFileAbsolute(unix_path) catch {}; // file could not exists
|
||||||
|
break :blk try std.net.Address.initUnix(unix_path);
|
||||||
srv.listen(mode.addr) catch |err| {
|
}
|
||||||
log.err("address (host:port) {any}\n", .{err});
|
|
||||||
return printUsageExit(mode.execname, 1);
|
|
||||||
};
|
};
|
||||||
defer srv.close();
|
const socket = server.listen(addr) catch |err| {
|
||||||
log.info("Server mode: listening on {s}:{d}...", .{ mode.host, mode.port });
|
log.err("Server listen error: {any}\n", .{err});
|
||||||
|
return printUsageExit(opts.execname, 1);
|
||||||
|
};
|
||||||
|
defer std.posix.close(socket);
|
||||||
|
log.debug("Server opts: listening internally on {any}...", .{addr});
|
||||||
|
|
||||||
|
const timeout = std.time.ns_per_s * @as(u64, opts.timeout);
|
||||||
|
|
||||||
// loop
|
// loop
|
||||||
var loop = try jsruntime.Loop.init(alloc);
|
var loop = try jsruntime.Loop.init(alloc);
|
||||||
defer loop.deinit();
|
defer loop.deinit();
|
||||||
|
|
||||||
// listen
|
// TCP server mode
|
||||||
try server.listen(alloc, &loop, srv.sockfd.?, std.time.ns_per_s * @as(u64, mode.timeout));
|
if (opts.tcp) {
|
||||||
|
return server.handle(alloc, &loop, socket, null, timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
// start stream server in separate thread
|
||||||
|
var stream = handler.Stream{ .addr = addr };
|
||||||
|
const cdp_thread = try std.Thread.spawn(
|
||||||
|
.{ .allocator = alloc },
|
||||||
|
server.handle,
|
||||||
|
.{ alloc, &loop, socket, &stream, timeout },
|
||||||
|
);
|
||||||
|
|
||||||
|
// Websocket server
|
||||||
|
var ws = try websocket.Server(handler.Handler).init(alloc, .{
|
||||||
|
.port = opts.port,
|
||||||
|
.address = opts.host,
|
||||||
|
.handshake = .{
|
||||||
|
.timeout = 3,
|
||||||
|
.max_size = 1024,
|
||||||
|
// since we aren't using hanshake.headers
|
||||||
|
// we can set this to 0 to save a few bytes.
|
||||||
|
.max_headers = 0,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
defer ws.deinit();
|
||||||
|
|
||||||
|
try ws.listen(&stream);
|
||||||
|
cdp_thread.join();
|
||||||
},
|
},
|
||||||
|
|
||||||
.fetch => |mode| {
|
.fetch => |opts| {
|
||||||
log.debug("Fetch mode: url {s}, dump {any}", .{ mode.url, mode.dump });
|
log.debug("Fetch mode: url {s}, dump {any}", .{ opts.url, opts.dump });
|
||||||
|
|
||||||
// vm
|
// vm
|
||||||
const vm = jsruntime.VM.init();
|
const vm = jsruntime.VM.init();
|
||||||
@@ -361,21 +307,21 @@ pub fn main() !void {
|
|||||||
// page
|
// page
|
||||||
const page = try browser.session.createPage();
|
const page = try browser.session.createPage();
|
||||||
|
|
||||||
_ = page.navigate(mode.url, null) catch |err| switch (err) {
|
_ = page.navigate(opts.url, null) catch |err| switch (err) {
|
||||||
error.UnsupportedUriScheme, error.UriMissingHost => {
|
error.UnsupportedUriScheme, error.UriMissingHost => {
|
||||||
log.err("'{s}' is not a valid URL ({any})\n", .{ mode.url, err });
|
log.err("'{s}' is not a valid URL ({any})\n", .{ opts.url, err });
|
||||||
return printUsageExit(mode.execname, 1);
|
return printUsageExit(opts.execname, 1);
|
||||||
},
|
},
|
||||||
else => {
|
else => {
|
||||||
log.err("'{s}' fetching error ({any})s\n", .{ mode.url, err });
|
log.err("'{s}' fetching error ({any})s\n", .{ opts.url, err });
|
||||||
return printUsageExit(mode.execname, 1);
|
return printUsageExit(opts.execname, 1);
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
try page.wait();
|
try page.wait();
|
||||||
|
|
||||||
// dump
|
// dump
|
||||||
if (mode.dump) {
|
if (opts.dump) {
|
||||||
try page.dump(std.io.getStdOut());
|
try page.dump(std.io.getStdOut());
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -102,7 +102,10 @@ pub const MsgBuffer = struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// copy the current input into MsgBuffer
|
// copy the current input into MsgBuffer
|
||||||
@memcpy(self.buf[self.pos..new_pos], _input[0..]);
|
// NOTE: we could use @memcpy but it's not Thread-safe (alias problem)
|
||||||
|
// see https://www.openmymind.net/Zigs-memcpy-copyForwards-and-copyBackwards/
|
||||||
|
// Intead we just use std.mem.copyForwards
|
||||||
|
std.mem.copyForwards(u8, self.buf[self.pos..new_pos], _input[0..]);
|
||||||
|
|
||||||
// set the new cursor position
|
// set the new cursor position
|
||||||
self.pos = new_pos;
|
self.pos = new_pos;
|
||||||
|
|||||||
@@ -19,6 +19,8 @@
|
|||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
const builtin = @import("builtin");
|
const builtin = @import("builtin");
|
||||||
|
|
||||||
|
const Stream = @import("handler.zig").Stream;
|
||||||
|
|
||||||
const jsruntime = @import("jsruntime");
|
const jsruntime = @import("jsruntime");
|
||||||
const Completion = jsruntime.IO.Completion;
|
const Completion = jsruntime.IO.Completion;
|
||||||
const AcceptError = jsruntime.IO.AcceptError;
|
const AcceptError = jsruntime.IO.AcceptError;
|
||||||
@@ -49,6 +51,7 @@ const MaxStdOutSize = 512; // ensure debug msg are not too long
|
|||||||
|
|
||||||
pub const Ctx = struct {
|
pub const Ctx = struct {
|
||||||
loop: *jsruntime.Loop,
|
loop: *jsruntime.Loop,
|
||||||
|
stream: ?*Stream,
|
||||||
|
|
||||||
// internal fields
|
// internal fields
|
||||||
accept_socket: std.posix.socket_t,
|
accept_socket: std.posix.socket_t,
|
||||||
@@ -117,8 +120,8 @@ pub const Ctx = struct {
|
|||||||
std.debug.assert(completion == self.conn_completion);
|
std.debug.assert(completion == self.conn_completion);
|
||||||
|
|
||||||
const size = result catch |err| {
|
const size = result catch |err| {
|
||||||
if (err == error.Canceled) {
|
if (self.isClosed() and err == error.FileDescriptorInvalid) {
|
||||||
log.debug("read canceled", .{});
|
log.debug("read has been canceled", .{});
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
log.err("read error: {any}", .{err});
|
log.err("read error: {any}", .{err});
|
||||||
@@ -199,7 +202,7 @@ pub const Ctx = struct {
|
|||||||
if (now.since(self.last_active.?) > self.timeout) {
|
if (now.since(self.last_active.?) > self.timeout) {
|
||||||
// close current connection
|
// close current connection
|
||||||
log.debug("conn timeout, closing...", .{});
|
log.debug("conn timeout, closing...", .{});
|
||||||
self.cancelAndClose();
|
self.close();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -213,19 +216,6 @@ pub const Ctx = struct {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn cancelCbk(self: *Ctx, completion: *Completion, result: CancelError!void) void {
|
|
||||||
std.debug.assert(completion == self.accept_completion);
|
|
||||||
|
|
||||||
_ = result catch |err| {
|
|
||||||
log.err("cancel error: {any}", .{err});
|
|
||||||
self.err = err;
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
log.debug("cancel done", .{});
|
|
||||||
|
|
||||||
self.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
// shortcuts
|
// shortcuts
|
||||||
// ---------
|
// ---------
|
||||||
|
|
||||||
@@ -262,7 +252,7 @@ pub const Ctx = struct {
|
|||||||
if (std.mem.eql(u8, cmd, "close")) {
|
if (std.mem.eql(u8, cmd, "close")) {
|
||||||
// close connection
|
// close connection
|
||||||
log.info("close cmd, closing conn...", .{});
|
log.info("close cmd, closing conn...", .{});
|
||||||
self.cancelAndClose();
|
self.close();
|
||||||
return error.Closed;
|
return error.Closed;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -283,30 +273,27 @@ pub const Ctx = struct {
|
|||||||
|
|
||||||
// send result
|
// send result
|
||||||
if (!std.mem.eql(u8, res, "")) {
|
if (!std.mem.eql(u8, res, "")) {
|
||||||
return sendAsync(self, res);
|
return self.send(res);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn cancelAndClose(self: *Ctx) void {
|
pub fn send(self: *Ctx, msg: []const u8) !void {
|
||||||
if (isLinux) { // cancel is only available on Linux
|
if (self.stream) |stream| {
|
||||||
self.loop.io.cancel(
|
// if we have a stream connection, just write on it
|
||||||
*Ctx,
|
defer self.alloc().free(msg);
|
||||||
self,
|
try stream.send(msg);
|
||||||
Ctx.cancelCbk,
|
|
||||||
self.accept_completion,
|
|
||||||
self.conn_completion,
|
|
||||||
);
|
|
||||||
} else {
|
} else {
|
||||||
self.close();
|
// otherwise write asynchronously on the socket connection
|
||||||
|
return sendAsync(self, msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn close(self: *Ctx) void {
|
fn close(self: *Ctx) void {
|
||||||
std.posix.close(self.conn_socket);
|
|
||||||
|
|
||||||
// conn is closed
|
// conn is closed
|
||||||
log.debug("connection closed", .{});
|
|
||||||
self.last_active = null;
|
self.last_active = null;
|
||||||
|
std.posix.close(self.conn_socket);
|
||||||
|
log.debug("connection closed", .{});
|
||||||
|
|
||||||
// restart a new browser session in case of re-connect
|
// restart a new browser session in case of re-connect
|
||||||
if (!self.sessionNew) {
|
if (!self.sessionNew) {
|
||||||
@@ -362,7 +349,7 @@ pub const Ctx = struct {
|
|||||||
.{ msg_open, cdp.ContextSessionID },
|
.{ msg_open, cdp.ContextSessionID },
|
||||||
);
|
);
|
||||||
|
|
||||||
try sendAsync(ctx, s);
|
try ctx.send(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn onInspectorResp(ctx_opaque: *anyopaque, _: u32, msg: []const u8) void {
|
pub fn onInspectorResp(ctx_opaque: *anyopaque, _: u32, msg: []const u8) void {
|
||||||
@@ -422,16 +409,17 @@ const Send = struct {
|
|||||||
|
|
||||||
pub fn sendAsync(ctx: *Ctx, msg: []const u8) !void {
|
pub fn sendAsync(ctx: *Ctx, msg: []const u8) !void {
|
||||||
const sd = try Send.init(ctx, msg);
|
const sd = try Send.init(ctx, msg);
|
||||||
ctx.loop.io.send(*Send, sd, Send.asyncCbk, &sd.completion, ctx.conn_socket, msg);
|
ctx.loop.io.send(*Send, sd, Send.asyncCbk, &sd.completion, ctx.conn_socket, sd.msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Listen
|
// Listener and handler
|
||||||
// ------
|
// --------------------
|
||||||
|
|
||||||
pub fn listen(
|
pub fn handle(
|
||||||
alloc: std.mem.Allocator,
|
alloc: std.mem.Allocator,
|
||||||
loop: *jsruntime.Loop,
|
loop: *jsruntime.Loop,
|
||||||
server_socket: std.posix.socket_t,
|
server_socket: std.posix.socket_t,
|
||||||
|
stream: ?*Stream,
|
||||||
timeout: u64,
|
timeout: u64,
|
||||||
) anyerror!void {
|
) anyerror!void {
|
||||||
|
|
||||||
@@ -458,6 +446,7 @@ pub fn listen(
|
|||||||
// for accepting connections and receving messages
|
// for accepting connections and receving messages
|
||||||
var ctx = Ctx{
|
var ctx = Ctx{
|
||||||
.loop = loop,
|
.loop = loop,
|
||||||
|
.stream = stream,
|
||||||
.browser = &browser,
|
.browser = &browser,
|
||||||
.sessionNew = true,
|
.sessionNew = true,
|
||||||
.read_buf = &read_buf,
|
.read_buf = &read_buf,
|
||||||
@@ -497,3 +486,43 @@ pub fn listen(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn setSockOpt(fd: std.posix.socket_t, level: i32, option: u32, value: c_int) !void {
|
||||||
|
try std.posix.setsockopt(fd, level, option, &std.mem.toBytes(value));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn isUnixSocket(addr: std.net.Address) bool {
|
||||||
|
return addr.any.family == std.posix.AF.UNIX;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn listen(address: std.net.Address) !std.posix.socket_t {
|
||||||
|
|
||||||
|
// create socket
|
||||||
|
const flags = std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC | std.posix.SOCK.NONBLOCK;
|
||||||
|
const proto = if (isUnixSocket(address)) @as(u32, 0) else std.posix.IPPROTO.TCP;
|
||||||
|
const sockfd = try std.posix.socket(address.any.family, flags, proto);
|
||||||
|
errdefer std.posix.close(sockfd);
|
||||||
|
|
||||||
|
// socket options
|
||||||
|
if (@hasDecl(std.posix.SO, "REUSEPORT")) {
|
||||||
|
try setSockOpt(sockfd, std.posix.SOL.SOCKET, std.posix.SO.REUSEPORT, 1);
|
||||||
|
} else {
|
||||||
|
try setSockOpt(sockfd, std.posix.SOL.SOCKET, std.posix.SO.REUSEADDR, 1);
|
||||||
|
}
|
||||||
|
if (!isUnixSocket(address)) {
|
||||||
|
if (builtin.target.os.tag == .linux) { // posix.TCP not available on MacOS
|
||||||
|
// WARNING: disable Nagle's alogrithm to avoid latency issues
|
||||||
|
try setSockOpt(sockfd, std.posix.IPPROTO.TCP, std.posix.TCP.NODELAY, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// bind & listen
|
||||||
|
var socklen = address.getOsSockLen();
|
||||||
|
try std.posix.bind(sockfd, &address.any, socklen);
|
||||||
|
const kernel_backlog = 1; // default value is 128. Here we just want 1 connection
|
||||||
|
try std.posix.listen(sockfd, kernel_backlog);
|
||||||
|
var listen_address: std.net.Address = undefined;
|
||||||
|
try std.posix.getsockname(sockfd, &listen_address.any, &socklen);
|
||||||
|
|
||||||
|
return sockfd;
|
||||||
|
}
|
||||||
|
|||||||
1
vendor/websocket.zig
vendored
Submodule
1
vendor/websocket.zig
vendored
Submodule
Submodule vendor/websocket.zig added at 215493b71e
Reference in New Issue
Block a user