mirror of
https://github.com/lightpanda-io/browser.git
synced 2025-10-29 07:03:29 +00:00
Merge pull request #311 from lightpanda-io/msg_size_encode
Some checks are pending
wpt / web platform tests (push) Waiting to run
wpt / perf-fmt (push) Blocked by required conditions
zig-test / zig build dev (push) Waiting to run
zig-test / zig build release (push) Waiting to run
zig-test / zig test (push) Waiting to run
zig-test / perf-fmt (push) Blocked by required conditions
Some checks are pending
wpt / web platform tests (push) Waiting to run
wpt / perf-fmt (push) Blocked by required conditions
zig-test / zig build dev (push) Waiting to run
zig-test / zig build release (push) Waiting to run
zig-test / zig test (push) Waiting to run
zig-test / perf-fmt (push) Blocked by required conditions
Msg size encode
This commit is contained in:
@@ -19,6 +19,7 @@
|
|||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
|
|
||||||
const ws = @import("websocket");
|
const ws = @import("websocket");
|
||||||
|
const Msg = @import("msg.zig").Msg;
|
||||||
|
|
||||||
const log = std.log.scoped(.handler);
|
const log = std.log.scoped(.handler);
|
||||||
|
|
||||||
@@ -45,7 +46,7 @@ pub const Stream = struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn closeCDP(self: *const Stream) void {
|
fn closeCDP(self: *const Stream) void {
|
||||||
const close_msg: []const u8 = "5:close";
|
const close_msg: []const u8 = .{ 5, 0 } ++ "close";
|
||||||
self.recv(close_msg) catch |err| {
|
self.recv(close_msg) catch |err| {
|
||||||
log.err("stream close error: {any}", .{err});
|
log.err("stream close error: {any}", .{err});
|
||||||
};
|
};
|
||||||
@@ -82,8 +83,10 @@ pub const Handler = struct {
|
|||||||
self.stream.closeCDP();
|
self.stream.closeCDP();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn clientMessage(self: *Handler, alloc: std.mem.Allocator, data: []const u8) !void {
|
pub fn clientMessage(self: *Handler, data: []const u8) !void {
|
||||||
const msg = try std.fmt.allocPrint(alloc, "{d}:{s}", .{ data.len, data });
|
var header: [2]u8 = undefined;
|
||||||
try self.stream.recv(msg);
|
Msg.setSize(data.len, &header);
|
||||||
|
try self.stream.recv(&header);
|
||||||
|
try self.stream.recv(data);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ const websocket = @import("websocket");
|
|||||||
const Browser = @import("browser/browser.zig").Browser;
|
const Browser = @import("browser/browser.zig").Browser;
|
||||||
const server = @import("server.zig");
|
const server = @import("server.zig");
|
||||||
const handler = @import("handler.zig");
|
const handler = @import("handler.zig");
|
||||||
|
const MaxSize = @import("msg.zig").MaxSize;
|
||||||
|
|
||||||
const parser = @import("netsurf");
|
const parser = @import("netsurf");
|
||||||
const apiweb = @import("apiweb.zig");
|
const apiweb = @import("apiweb.zig");
|
||||||
@@ -274,6 +275,8 @@ pub fn main() !void {
|
|||||||
var ws = try websocket.Server(handler.Handler).init(alloc, .{
|
var ws = try websocket.Server(handler.Handler).init(alloc, .{
|
||||||
.port = opts.port,
|
.port = opts.port,
|
||||||
.address = opts.host,
|
.address = opts.host,
|
||||||
|
.max_message_size = MaxSize + 14, // overhead websocket
|
||||||
|
.max_conn = 1,
|
||||||
.handshake = .{
|
.handshake = .{
|
||||||
.timeout = 3,
|
.timeout = 3,
|
||||||
.max_size = 1024,
|
.max_size = 1024,
|
||||||
|
|||||||
102
src/msg.zig
102
src/msg.zig
@@ -18,44 +18,47 @@
|
|||||||
|
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
|
|
||||||
/// MsgBuffer returns messages from a raw text read stream,
|
pub const MsgSize = 16 * 1204; // 16KB
|
||||||
/// according to the following format `<msg_size>:<msg>`.
|
pub const HeaderSize = 2;
|
||||||
|
pub const MaxSize = HeaderSize + MsgSize;
|
||||||
|
|
||||||
|
pub const Msg = struct {
|
||||||
|
pub fn getSize(data: []const u8) usize {
|
||||||
|
return std.mem.readInt(u16, data[0..HeaderSize], .little);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn setSize(len: usize, header: *[2]u8) void {
|
||||||
|
std.mem.writeInt(u16, header, @intCast(len), .little);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Buffer returns messages from a raw text read stream,
|
||||||
|
/// with the message size being encoded on the 2 first bytes (little endian)
|
||||||
/// It handles both:
|
/// It handles both:
|
||||||
/// - combined messages in one read
|
/// - combined messages in one read
|
||||||
/// - single message in several reads (multipart)
|
/// - single message in several reads (multipart)
|
||||||
/// It's safe (and a good practice) to reuse the same MsgBuffer
|
/// It's safe (and a good practice) to reuse the same Buffer
|
||||||
/// on several reads of the same stream.
|
/// on several reads of the same stream.
|
||||||
pub const MsgBuffer = struct {
|
pub const Buffer = struct {
|
||||||
size: usize = 0,
|
|
||||||
buf: []u8,
|
buf: []u8,
|
||||||
|
size: usize = 0,
|
||||||
pos: usize = 0,
|
pos: usize = 0,
|
||||||
|
|
||||||
const MaxSize = 1024 * 1024; // 1MB
|
fn isFinished(self: *const Buffer) bool {
|
||||||
|
|
||||||
pub fn init(alloc: std.mem.Allocator, size: usize) std.mem.Allocator.Error!MsgBuffer {
|
|
||||||
const buf = try alloc.alloc(u8, size);
|
|
||||||
return .{ .buf = buf };
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn deinit(self: MsgBuffer, alloc: std.mem.Allocator) void {
|
|
||||||
alloc.free(self.buf);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn isFinished(self: *MsgBuffer) bool {
|
|
||||||
return self.pos >= self.size;
|
return self.pos >= self.size;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn isEmpty(self: MsgBuffer) bool {
|
fn isEmpty(self: *const Buffer) bool {
|
||||||
return self.size == 0 and self.pos == 0;
|
return self.size == 0 and self.pos == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn reset(self: *MsgBuffer) void {
|
fn reset(self: *Buffer) void {
|
||||||
self.size = 0;
|
self.size = 0;
|
||||||
self.pos = 0;
|
self.pos = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// read input
|
// read input
|
||||||
pub fn read(self: *MsgBuffer, alloc: std.mem.Allocator, input: []const u8) !struct {
|
pub fn read(self: *Buffer, input: []const u8) !struct {
|
||||||
msg: []const u8,
|
msg: []const u8,
|
||||||
left: []const u8,
|
left: []const u8,
|
||||||
} {
|
} {
|
||||||
@@ -64,11 +67,9 @@ pub const MsgBuffer = struct {
|
|||||||
// msg size
|
// msg size
|
||||||
var msg_size: usize = undefined;
|
var msg_size: usize = undefined;
|
||||||
if (self.isEmpty()) {
|
if (self.isEmpty()) {
|
||||||
// parse msg size metadata
|
// decode msg size header
|
||||||
const size_pos = std.mem.indexOfScalar(u8, _input, ':') orelse return error.InputWithoutSize;
|
msg_size = Msg.getSize(_input);
|
||||||
const size_str = _input[0..size_pos];
|
_input = _input[HeaderSize..];
|
||||||
msg_size = try std.fmt.parseInt(u32, size_str, 10);
|
|
||||||
_input = _input[size_pos + 1 ..];
|
|
||||||
} else {
|
} else {
|
||||||
msg_size = self.size;
|
msg_size = self.size;
|
||||||
}
|
}
|
||||||
@@ -77,7 +78,7 @@ pub const MsgBuffer = struct {
|
|||||||
const is_multipart = !self.isEmpty() or _input.len < msg_size;
|
const is_multipart = !self.isEmpty() or _input.len < msg_size;
|
||||||
if (is_multipart) {
|
if (is_multipart) {
|
||||||
|
|
||||||
// set msg size on empty MsgBuffer
|
// set msg size on empty Buffer
|
||||||
if (self.isEmpty()) {
|
if (self.isEmpty()) {
|
||||||
self.size = msg_size;
|
self.size = msg_size;
|
||||||
}
|
}
|
||||||
@@ -90,18 +91,7 @@ pub const MsgBuffer = struct {
|
|||||||
return error.MsgTooBig;
|
return error.MsgTooBig;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if the current input can fit in MsgBuffer
|
// copy the current input into Buffer
|
||||||
if (new_pos > self.buf.len) {
|
|
||||||
// we want to realloc at least:
|
|
||||||
// - a size big enough to fit the entire input (ie. new_pos)
|
|
||||||
// - a size big enough (ie. current msg size + starting buffer size)
|
|
||||||
// to avoid multiple reallocation
|
|
||||||
const new_size = @max(self.buf.len + self.size, new_pos);
|
|
||||||
// resize the MsgBuffer to fit
|
|
||||||
self.buf = try alloc.realloc(self.buf, new_size);
|
|
||||||
}
|
|
||||||
|
|
||||||
// copy the current input into MsgBuffer
|
|
||||||
// NOTE: we could use @memcpy but it's not Thread-safe (alias problem)
|
// NOTE: we could use @memcpy but it's not Thread-safe (alias problem)
|
||||||
// see https://www.openmymind.net/Zigs-memcpy-copyForwards-and-copyBackwards/
|
// see https://www.openmymind.net/Zigs-memcpy-copyForwards-and-copyBackwards/
|
||||||
// Intead we just use std.mem.copyForwards
|
// Intead we just use std.mem.copyForwards
|
||||||
@@ -123,47 +113,45 @@ pub const MsgBuffer = struct {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
fn doTest(nb: *u8) void {
|
test "Buffer" {
|
||||||
nb.* += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
test "MsgBuffer" {
|
|
||||||
const Case = struct {
|
const Case = struct {
|
||||||
input: []const u8,
|
input: []const u8,
|
||||||
nb: u8,
|
nb: u8,
|
||||||
};
|
};
|
||||||
const alloc = std.testing.allocator;
|
|
||||||
const cases = [_]Case{
|
const cases = [_]Case{
|
||||||
// simple
|
// simple
|
||||||
.{ .input = "2:ok", .nb = 1 },
|
.{ .input = .{ 2, 0 } ++ "ok", .nb = 1 },
|
||||||
// combined
|
// combined
|
||||||
.{ .input = "2:ok3:foo7:bar2:ok", .nb = 3 }, // "bar2:ok" is a message, no need to escape "2:" here
|
.{ .input = .{ 2, 0 } ++ "ok" ++ .{ 3, 0 } ++ "foo", .nb = 2 },
|
||||||
// multipart
|
// multipart
|
||||||
.{ .input = "9:multi", .nb = 0 },
|
.{ .input = .{ 9, 0 } ++ "multi", .nb = 0 },
|
||||||
.{ .input = "part", .nb = 1 },
|
.{ .input = "part", .nb = 1 },
|
||||||
// multipart & combined
|
// multipart & combined
|
||||||
.{ .input = "9:multi", .nb = 0 },
|
.{ .input = .{ 9, 0 } ++ "multi", .nb = 0 },
|
||||||
.{ .input = "part2:ok", .nb = 2 },
|
.{ .input = "part" ++ .{ 2, 0 } ++ "ok", .nb = 2 },
|
||||||
// multipart & combined with other multipart
|
// multipart & combined with other multipart
|
||||||
.{ .input = "9:multi", .nb = 0 },
|
.{ .input = .{ 9, 0 } ++ "multi", .nb = 0 },
|
||||||
.{ .input = "part8:co", .nb = 1 },
|
.{ .input = "part" ++ .{ 8, 0 } ++ "co", .nb = 1 },
|
||||||
.{ .input = "mbined", .nb = 1 },
|
.{ .input = "mbined", .nb = 1 },
|
||||||
// several multipart
|
// several multipart
|
||||||
.{ .input = "23:multi", .nb = 0 },
|
.{ .input = .{ 23, 0 } ++ "multi", .nb = 0 },
|
||||||
.{ .input = "several", .nb = 0 },
|
.{ .input = "several", .nb = 0 },
|
||||||
.{ .input = "complex", .nb = 0 },
|
.{ .input = "complex", .nb = 0 },
|
||||||
.{ .input = "part", .nb = 1 },
|
.{ .input = "part", .nb = 1 },
|
||||||
// combined & multipart
|
// combined & multipart
|
||||||
.{ .input = "2:ok9:multi", .nb = 1 },
|
.{ .input = .{ 2, 0 } ++ "ok" ++ .{ 9, 0 } ++ "multi", .nb = 1 },
|
||||||
.{ .input = "part", .nb = 1 },
|
.{ .input = "part", .nb = 1 },
|
||||||
};
|
};
|
||||||
var msg_buf = try MsgBuffer.init(alloc, 10);
|
|
||||||
defer msg_buf.deinit(alloc);
|
var b: [MaxSize]u8 = undefined;
|
||||||
|
var buf = Buffer{ .buf = &b };
|
||||||
|
|
||||||
for (cases) |case| {
|
for (cases) |case| {
|
||||||
var nb: u8 = 0;
|
var nb: u8 = 0;
|
||||||
var input: []const u8 = case.input;
|
var input = case.input;
|
||||||
while (input.len > 0) {
|
while (input.len > 0) {
|
||||||
const parts = msg_buf.read(alloc, input) catch |err| {
|
const parts = buf.read(input) catch |err| {
|
||||||
if (err == error.MsgMultipart) break; // go to the next case input
|
if (err == error.MsgMultipart) break; // go to the next case input
|
||||||
return err;
|
return err;
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -30,7 +30,8 @@ const CloseError = jsruntime.IO.CloseError;
|
|||||||
const CancelError = jsruntime.IO.CancelError;
|
const CancelError = jsruntime.IO.CancelError;
|
||||||
const TimeoutError = jsruntime.IO.TimeoutError;
|
const TimeoutError = jsruntime.IO.TimeoutError;
|
||||||
|
|
||||||
const MsgBuffer = @import("msg.zig").MsgBuffer;
|
const MsgBuffer = @import("msg.zig").Buffer;
|
||||||
|
const MaxSize = @import("msg.zig").MaxSize;
|
||||||
const Browser = @import("browser/browser.zig").Browser;
|
const Browser = @import("browser/browser.zig").Browser;
|
||||||
const cdp = @import("cdp/cdp.zig");
|
const cdp = @import("cdp/cdp.zig");
|
||||||
|
|
||||||
@@ -161,7 +162,7 @@ pub const Ctx = struct {
|
|||||||
// read and execute input
|
// read and execute input
|
||||||
var input: []const u8 = self.read_buf[0..size];
|
var input: []const u8 = self.read_buf[0..size];
|
||||||
while (input.len > 0) {
|
while (input.len > 0) {
|
||||||
const parts = self.msg_buf.read(self.alloc(), input) catch |err| {
|
const parts = self.msg_buf.read(input) catch |err| {
|
||||||
if (err == error.MsgMultipart) {
|
if (err == error.MsgMultipart) {
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
@@ -434,8 +435,8 @@ pub fn handle(
|
|||||||
|
|
||||||
// create buffers
|
// create buffers
|
||||||
var read_buf: [BufReadSize]u8 = undefined;
|
var read_buf: [BufReadSize]u8 = undefined;
|
||||||
var msg_buf = try MsgBuffer.init(loop.alloc, BufReadSize * 256); // 256KB
|
var buf: [MaxSize]u8 = undefined;
|
||||||
defer msg_buf.deinit(loop.alloc);
|
var msg_buf = MsgBuffer{ .buf = &buf };
|
||||||
|
|
||||||
// create I/O completions
|
// create I/O completions
|
||||||
var accept_completion: Completion = undefined;
|
var accept_completion: Completion = undefined;
|
||||||
|
|||||||
Reference in New Issue
Block a user