mirror of
https://github.com/lightpanda-io/browser.git
synced 2026-03-22 04:34:44 +00:00
mcp: simplify request processing to single-threaded
This commit is contained in:
@@ -138,8 +138,7 @@ fn run(allocator: Allocator, main_arena: Allocator) !void {
|
|||||||
var mcp_server = try lp.mcp.Server.init(allocator, app);
|
var mcp_server = try lp.mcp.Server.init(allocator, app);
|
||||||
defer mcp_server.deinit();
|
defer mcp_server.deinit();
|
||||||
|
|
||||||
try mcp_server.start();
|
try lp.mcp.router.processRequests(mcp_server);
|
||||||
lp.mcp.router.processRequests(mcp_server);
|
|
||||||
},
|
},
|
||||||
else => unreachable,
|
else => unreachable,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,11 +15,6 @@ browser: *lp.Browser,
|
|||||||
session: *lp.Session,
|
session: *lp.Session,
|
||||||
page: *lp.Page,
|
page: *lp.Page,
|
||||||
|
|
||||||
io_thread: ?std.Thread = null,
|
|
||||||
queue_mutex: std.Thread.Mutex = .{},
|
|
||||||
queue_condition: std.Thread.Condition = .{},
|
|
||||||
message_queue: std.ArrayListUnmanaged([]const u8) = .empty,
|
|
||||||
|
|
||||||
is_running: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
is_running: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
||||||
|
|
||||||
stdout_mutex: std.Thread.Mutex = .{},
|
stdout_mutex: std.Thread.Mutex = .{},
|
||||||
@@ -30,7 +25,6 @@ pub fn init(allocator: std.mem.Allocator, app: *App) !*Self {
|
|||||||
|
|
||||||
self.allocator = allocator;
|
self.allocator = allocator;
|
||||||
self.app = app;
|
self.app = app;
|
||||||
self.message_queue = .empty;
|
|
||||||
|
|
||||||
self.http_client = try app.http.createClient(allocator);
|
self.http_client = try app.http.createClient(allocator);
|
||||||
errdefer self.http_client.deinit();
|
errdefer self.http_client.deinit();
|
||||||
@@ -50,14 +44,7 @@ pub fn init(allocator: std.mem.Allocator, app: *App) !*Self {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn deinit(self: *Self) void {
|
pub fn deinit(self: *Self) void {
|
||||||
self.stop();
|
self.is_running.store(false, .seq_cst);
|
||||||
if (self.io_thread) |*thread| {
|
|
||||||
thread.join();
|
|
||||||
}
|
|
||||||
for (self.message_queue.items) |msg| {
|
|
||||||
self.allocator.free(msg);
|
|
||||||
}
|
|
||||||
self.message_queue.deinit(self.allocator);
|
|
||||||
|
|
||||||
self.browser.deinit();
|
self.browser.deinit();
|
||||||
self.allocator.destroy(self.browser);
|
self.allocator.destroy(self.browser);
|
||||||
@@ -67,63 +54,6 @@ pub fn deinit(self: *Self) void {
|
|||||||
self.allocator.destroy(self);
|
self.allocator.destroy(self);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start(self: *Self) !void {
|
|
||||||
self.is_running.store(true, .seq_cst);
|
|
||||||
self.io_thread = try std.Thread.spawn(.{}, ioWorker, .{self});
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn stop(self: *Self) void {
|
|
||||||
self.is_running.store(false, .seq_cst);
|
|
||||||
self.queue_mutex.lock();
|
|
||||||
self.queue_condition.signal();
|
|
||||||
self.queue_mutex.unlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
fn ioWorker(self: *Self) void {
|
|
||||||
var stdin_file = std.fs.File.stdin();
|
|
||||||
var stdin_buf: [8192]u8 = undefined;
|
|
||||||
var stdin = stdin_file.reader(&stdin_buf);
|
|
||||||
|
|
||||||
while (self.is_running.load(.seq_cst)) {
|
|
||||||
const msg_or_err = stdin.interface.adaptToOldInterface().readUntilDelimiterAlloc(self.allocator, '\n', 1024 * 1024 * 10);
|
|
||||||
if (msg_or_err) |msg| {
|
|
||||||
if (msg.len == 0) {
|
|
||||||
self.allocator.free(msg);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.queue_mutex.lock();
|
|
||||||
self.message_queue.append(self.allocator, msg) catch |err| {
|
|
||||||
lp.log.err(.app, "MCP Queue failed", .{ .err = err });
|
|
||||||
self.allocator.free(msg);
|
|
||||||
};
|
|
||||||
self.queue_mutex.unlock();
|
|
||||||
self.queue_condition.signal();
|
|
||||||
} else |err| {
|
|
||||||
if (err == error.EndOfStream) {
|
|
||||||
self.stop();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
lp.log.err(.app, "MCP IO Error", .{ .err = err });
|
|
||||||
std.Thread.sleep(100 * std.time.ns_per_ms);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn getNextMessage(self: *Self) ?[]const u8 {
|
|
||||||
self.queue_mutex.lock();
|
|
||||||
defer self.queue_mutex.unlock();
|
|
||||||
|
|
||||||
while (self.message_queue.items.len == 0 and self.is_running.load(.seq_cst)) {
|
|
||||||
self.queue_condition.wait(&self.queue_mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (self.message_queue.items.len > 0) {
|
|
||||||
return self.message_queue.orderedRemove(0);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn sendResponse(self: *Self, response: anytype) !void {
|
pub fn sendResponse(self: *Self, response: anytype) !void {
|
||||||
self.stdout_mutex.lock();
|
self.stdout_mutex.lock();
|
||||||
defer self.stdout_mutex.unlock();
|
defer self.stdout_mutex.unlock();
|
||||||
|
|||||||
@@ -8,20 +8,30 @@ const resources = @import("resources.zig");
|
|||||||
const Server = @import("Server.zig");
|
const Server = @import("Server.zig");
|
||||||
const tools = @import("tools.zig");
|
const tools = @import("tools.zig");
|
||||||
|
|
||||||
pub fn processRequests(server: *Server) void {
|
pub fn processRequests(server: *Server) !void {
|
||||||
|
var stdin_file = std.fs.File.stdin();
|
||||||
|
var stdin_buf: [8192]u8 = undefined;
|
||||||
|
var stdin = stdin_file.reader(&stdin_buf);
|
||||||
|
|
||||||
|
server.is_running.store(true, .seq_cst);
|
||||||
|
|
||||||
while (server.is_running.load(.seq_cst)) {
|
while (server.is_running.load(.seq_cst)) {
|
||||||
if (server.getNextMessage()) |msg| {
|
const msg = stdin.interface.adaptToOldInterface().readUntilDelimiterAlloc(server.allocator, '\n', 1024 * 1024 * 10) catch |err| {
|
||||||
defer server.allocator.free(msg);
|
if (err == error.EndOfStream) break;
|
||||||
|
return err;
|
||||||
|
};
|
||||||
|
defer server.allocator.free(msg);
|
||||||
|
|
||||||
// Critical: Per-request Arena
|
if (msg.len == 0) continue;
|
||||||
var arena = std.heap.ArenaAllocator.init(server.allocator);
|
|
||||||
defer arena.deinit();
|
|
||||||
|
|
||||||
handleMessage(server, arena.allocator(), msg) catch |err| {
|
// Critical: Per-request Arena
|
||||||
log.err(.app, "MCP Error processing message", .{ .err = err });
|
var arena = std.heap.ArenaAllocator.init(server.allocator);
|
||||||
// We should ideally send a parse error response back, but it's hard to extract the ID if parsing failed entirely.
|
defer arena.deinit();
|
||||||
};
|
|
||||||
}
|
handleMessage(server, arena.allocator(), msg) catch |err| {
|
||||||
|
log.err(.app, "MCP Error processing message", .{ .err = err });
|
||||||
|
// We should ideally send a parse error response back, but it's hard to extract the ID if parsing failed entirely.
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user