// Copyright (C) 2023-2024 Lightpanda (Selecy SAS) // // Francis Bouvier // Pierre Tachoire // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as // published by the Free Software Foundation, either version 3 of the // License, or (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . const std = @import("std"); const builtin = @import("builtin"); const posix = std.posix; const io = std.io; const assert = std.debug.assert; const tcp = @import("tcp.zig"); pub const Stream = struct { alloc: std.mem.Allocator, conn: *tcp.Conn, handle: posix.socket_t, pub fn close(self: Stream) void { posix.close(self.handle); self.alloc.destroy(self.conn); } pub const ReadError = posix.ReadError; pub const WriteError = posix.WriteError; pub const Reader = io.Reader(Stream, ReadError, read); pub const Writer = io.Writer(Stream, WriteError, write); pub fn reader(self: Stream) Reader { return .{ .context = self }; } pub fn writer(self: Stream) Writer { return .{ .context = self }; } pub fn read(self: Stream, buffer: []u8) ReadError!usize { return self.conn.receive(self.handle, buffer) catch |err| switch (err) { else => return error.Unexpected, }; } pub fn readv(s: Stream, iovecs: []const posix.iovec) ReadError!usize { return posix.readv(s.handle, iovecs); } /// Returns the number of bytes read. If the number read is smaller than /// `buffer.len`, it means the stream reached the end. Reaching the end of /// a stream is not an error condition. pub fn readAll(s: Stream, buffer: []u8) ReadError!usize { return readAtLeast(s, buffer, buffer.len); } /// Returns the number of bytes read, calling the underlying read function /// the minimal number of times until the buffer has at least `len` bytes /// filled. If the number read is less than `len` it means the stream /// reached the end. Reaching the end of the stream is not an error /// condition. pub fn readAtLeast(s: Stream, buffer: []u8, len: usize) ReadError!usize { assert(len <= buffer.len); var index: usize = 0; while (index < len) { const amt = try s.read(buffer[index..]); if (amt == 0) break; index += amt; } return index; } /// TODO in evented I/O mode, this implementation incorrectly uses the event loop's /// file system thread instead of non-blocking. It needs to be reworked to properly /// use non-blocking I/O. pub fn write(self: Stream, buffer: []const u8) WriteError!usize { return self.conn.send(self.handle, buffer) catch |err| switch (err) { error.AccessDenied => error.AccessDenied, error.WouldBlock => error.WouldBlock, error.ConnectionResetByPeer => error.ConnectionResetByPeer, error.MessageTooBig => error.FileTooBig, error.BrokenPipe => error.BrokenPipe, else => return error.Unexpected, }; } pub fn writeAll(self: Stream, bytes: []const u8) WriteError!void { var index: usize = 0; while (index < bytes.len) { index += try self.write(bytes[index..]); } } /// See https://github.com/ziglang/zig/issues/7699 /// See equivalent function: `std.fs.File.writev`. pub fn writev(self: Stream, iovecs: []const posix.iovec_const) WriteError!usize { if (iovecs.len == 0) return 0; const first_buffer = iovecs[0].base[0..iovecs[0].len]; return try self.write(first_buffer); } /// The `iovecs` parameter is mutable because this function needs to mutate the fields in /// order to handle partial writes from the underlying OS layer. /// See https://github.com/ziglang/zig/issues/7699 /// See equivalent function: `std.fs.File.writevAll`. pub fn writevAll(self: Stream, iovecs: []posix.iovec_const) WriteError!void { if (iovecs.len == 0) return; var i: usize = 0; while (true) { var amt = try self.writev(iovecs[i..]); while (amt >= iovecs[i].len) { amt -= iovecs[i].len; i += 1; if (i >= iovecs.len) return; } iovecs[i].base += amt; iovecs[i].len -= amt; } } };