async: refacto with comptime generation

This commit is contained in:
Pierre Tachoire
2024-01-30 16:59:00 +01:00
parent c200f60d7d
commit 2fa66f93fd
4 changed files with 90 additions and 97 deletions

View File

@@ -2,27 +2,64 @@ const std = @import("std");
const net = std.net;
const Stream = @import("stream.zig").Stream;
const Loop = @import("jsruntime").Loop;
const NetworkImpl = Loop.Network(Conn.Command);
const ConnectCmd = struct {
const Self = @This();
// Conn is a TCP connection using jsruntime Loop async I/O.
// connect, send and receive are blocking, but use async I/O in the background.
// Client doesn't own the socket used for the connection, the caller is
// responsible for closing it.
pub const Conn = struct {
const Command = struct {
impl: NetworkImpl,
done: bool = false,
err: ?anyerror = null,
ln: usize = 0,
fn ok(self: *Command, err: ?anyerror, ln: usize) void {
self.err = err;
self.ln = ln;
self.done = true;
}
fn wait(self: *Command) !usize {
while (!self.done) try self.impl.tick();
if (self.err) |err| return err;
return self.ln;
}
pub fn onConnect(self: *Command, err: ?anyerror) void {
self.ok(err, 0);
}
pub fn onSend(self: *Command, ln: usize, err: ?anyerror) void {
self.ok(err, ln);
}
pub fn onReceive(self: *Command, ln: usize, err: ?anyerror) void {
self.ok(err, ln);
}
};
loop: *Loop,
socket: std.os.socket_t,
err: ?anyerror = null,
done: bool = false,
fn run(self: *Self, addr: std.net.Address) !void {
self.loop.connect(*Self, self, callback, self.socket, addr);
pub fn connect(self: *Conn, socket: std.os.socket_t, address: std.net.Address) !void {
var cmd = Command{ .impl = undefined };
cmd.impl = NetworkImpl.init(self.loop, &cmd);
cmd.impl.connect(socket, address);
_ = try cmd.wait();
}
fn callback(self: *Self, _: std.os.socket_t, err: ?anyerror) void {
self.err = err;
self.done = true;
pub fn send(self: *Conn, socket: std.os.socket_t, buffer: []const u8) !usize {
var cmd = Command{ .impl = undefined };
cmd.impl = NetworkImpl.init(self.loop, &cmd);
cmd.impl.send(socket, buffer);
return try cmd.wait();
}
fn wait(self: *Self) !void {
while (!self.done) try self.loop.tick();
if (self.err) |err| return err;
pub fn receive(self: *Conn, socket: std.os.socket_t, buffer: []u8) !usize {
var cmd = Command{ .impl = undefined };
cmd.impl = NetworkImpl.init(self.loop, &cmd);
cmd.impl.receive(socket, buffer);
return try cmd.wait();
}
};
@@ -34,7 +71,7 @@ pub fn tcpConnectToHost(alloc: std.mem.Allocator, loop: *Loop, name: []const u8,
if (list.addrs.len == 0) return error.UnknownHostName;
for (list.addrs) |addr| {
return tcpConnectToAddress(loop, addr) catch |err| switch (err) {
return tcpConnectToAddress(alloc, loop, addr) catch |err| switch (err) {
error.ConnectionRefused => {
continue;
},
@@ -44,19 +81,17 @@ pub fn tcpConnectToHost(alloc: std.mem.Allocator, loop: *Loop, name: []const u8,
return std.os.ConnectError.ConnectionRefused;
}
pub fn tcpConnectToAddress(loop: *Loop, addr: net.Address) !Stream {
const sockfd = try loop.open(addr.any.family, std.os.SOCK.STREAM, std.os.IPPROTO.TCP);
pub fn tcpConnectToAddress(alloc: std.mem.Allocator, loop: *Loop, addr: net.Address) !Stream {
const sockfd = try std.os.socket(addr.any.family, std.os.SOCK.STREAM, std.os.IPPROTO.TCP);
errdefer std.os.closeSocket(sockfd);
var cmd = ConnectCmd{
.loop = loop,
.socket = sockfd,
};
try cmd.run(addr);
try cmd.wait();
var conn = try alloc.create(Conn);
conn.* = Conn{ .loop = loop };
try conn.connect(sockfd, addr);
return Stream{
.loop = loop,
.alloc = alloc,
.conn = conn,
.handle = sockfd,
};
}