Try to sniff the mime type based on the body content

Synchronous body reader now exposes a peek() function to get the first few bytes
from the response body. This will be no less than 100 bytes (assuming the body
is that big), but could be more. Streaming API, via res.next() continues to work
as-is even if peek() is called.

Introduce Mime.sniff() that detects a few common types - the ones that we care
about right now - from the body content.
This commit is contained in:
Karl Seguin
2025-04-22 10:54:29 +08:00
parent 66ec087416
commit b9f61466ba
5 changed files with 343 additions and 160 deletions

View File

@@ -32,9 +32,13 @@ const Loop = @import("../runtime/loop.zig").Loop;
const log = std.log.scoped(.http_client);
// We might need to peek at the body to try and sniff the content-type.
// While we only need a few bytes, in most cases we need to ignore leading
// whitespace, so we want to get a reasonable-sized chunk.
const PEEK_BUF_LEN = 1024;
const BUFFER_LEN = 32 * 1024;
// The longest individual header line that we support
const MAX_HEADER_LINE_LEN = 4096;
// Thread-safe. Holds our root certificate, connection pool and state pool
@@ -900,6 +904,7 @@ const SyncHandler = struct {
// object which can be iterated to get the body.
std.debug.assert(result.done or reader.body_reader != null);
std.debug.assert(result.data == null);
return .{
._buf = buf,
._request = request,
@@ -907,6 +912,8 @@ const SyncHandler = struct {
._done = result.done,
._connection = connection,
._data = result.unprocessed,
._peek_len = 0,
._peek_buf = state.peek_buf,
.header = reader.response,
};
}
@@ -1046,7 +1053,7 @@ const Reader = struct {
// Still parsing the header
// what data do we have leftover in `data`.
// What data do we have leftover in `data`?
// When header_done == true, then this is part (or all) of the body
// When header_done == false, then this is a header line that we didn't
// have enough data for.
@@ -1504,23 +1511,49 @@ pub const Progress = struct {
header: ResponseHeader,
};
// The value that we return from a synchronous requst.
// The value that we return from a synchronous request.
pub const Response = struct {
_reader: Reader,
_request: *Request,
_buf: []u8,
_connection: SyncHandler.Connection,
// the buffer to read the peeked data into
_peek_buf: []u8,
// the length of data we've peeked. The peeked_data is _peek_buf[0.._peek_len].
// It's possible for peek_len > 0 and _done == true, in which case, the
// _peeked data should be emitted once and subsequent calls to `next` should
// return null.
_peek_len: usize,
// What we'll read from the socket into. This is the State's read_buf
_buf: []u8,
// Whether or not we're done reading the response. When true, next will
// return null.
_done: bool,
// Any data we over-read while parsing the header. This will be returned on
// the first call to next();
// Data that we've read. This can be set when the Response is first created
// from extra data received while parsing the body. Or, it can be set
// when `next` is called and we read more data from the socket.
_data: ?[]u8 = null,
header: ResponseHeader,
pub fn next(self: *Response) !?[]u8 {
var buf = self._buf;
// it's possible for peek_len > - and done == true. This would happen
// when, while peeking, we reached the end of the data. In that case,
// we return the peeked data once, and on subsequent call, we'll return
// null normally, because done == true;
const pl = self._peek_len;
if (pl > 0) {
self._peek_len = 0;
return self._peek_buf[0..pl];
}
return self._nextIgnorePeek(self._buf);
}
fn _nextIgnorePeek(self: *Response, buf: []u8) !?[]u8 {
while (true) {
if (try self.processData()) |data| {
return data;
@@ -1541,14 +1574,38 @@ pub const Response = struct {
self._data = result.unprocessed; // for the next call
return result.data;
}
pub fn peek(self: *Response) ![]u8 {
while (true) {
var peek_buf = self._peek_buf;
const peek_len = self._peek_len;
const data = (try self._nextIgnorePeek(peek_buf[peek_len..])) orelse {
return peek_buf[0..peek_len];
};
const peek_end = peek_len + data.len;
@memcpy(peek_buf[peek_len..peek_end], data);
self._peek_len = peek_end;
if (peek_end > 100) {
return peek_buf[peek_len..peek_end];
}
}
}
};
// Pooled and re-used when creating a request
const State = struct {
// used for reading chunks of payload data.
// We might be asked to peek at the response, i.e. to sniff the mime type.
// This will require storing any peeked data so that, later, if we stream
// the body, we can present a cohesive body.
peek_buf: []u8,
// Used for reading chunks of payload data.
read_buf: []u8,
// use for writing data. If you're wondering why BOTH a read_buf and a
// Used for writing data. If you're wondering why BOTH a read_buf and a
// write_buf, even though HTTP is req -> resp, it's for TLS, which has
// bidirectional data.
write_buf: []u8,
@@ -1561,7 +1618,10 @@ const State = struct {
// response headers.
arena: ArenaAllocator,
fn init(allocator: Allocator, header_size: usize, buf_size: usize) !State {
fn init(allocator: Allocator, header_size: usize, peek_size: usize, buf_size: usize) !State {
const peek_buf = try allocator.alloc(u8, peek_size);
errdefer allocator.free(peek_buf);
const read_buf = try allocator.alloc(u8, buf_size);
errdefer allocator.free(read_buf);
@@ -1572,6 +1632,7 @@ const State = struct {
errdefer allocator.free(header_buf);
return .{
.peek_buf = peek_buf,
.read_buf = read_buf,
.write_buf = write_buf,
.header_buf = header_buf,
@@ -1585,6 +1646,7 @@ const State = struct {
fn deinit(self: *State) void {
const allocator = self.arena.child_allocator;
allocator.free(self.peek_buf);
allocator.free(self.read_buf);
allocator.free(self.write_buf);
allocator.free(self.header_buf);
@@ -1611,7 +1673,7 @@ const StatePool = struct {
for (0..count) |i| {
const state = try allocator.create(State);
errdefer allocator.destroy(state);
state.* = try State.init(allocator, MAX_HEADER_LINE_LEN, BUFFER_LEN);
state.* = try State.init(allocator, MAX_HEADER_LINE_LEN, PEEK_BUF_LEN, BUFFER_LEN);
states[i] = state;
started += 1;
}
@@ -1662,7 +1724,7 @@ const StatePool = struct {
const testing = @import("../testing.zig");
test "HttpClient Reader: fuzz" {
var state = try State.init(testing.allocator, 1024, 1024);
var state = try State.init(testing.allocator, 1024, 1024, 100);
defer state.deinit();
var res = TestResponse.init();
@@ -1773,18 +1835,23 @@ test "HttpClient: sync connect error" {
}
test "HttpClient: sync no body" {
var client = try testClient();
defer client.deinit();
for (0..2) |i| {
var client = try testClient();
defer client.deinit();
const uri = try Uri.parse("http://127.0.0.1:9582/http_client/simple");
var req = try client.request(.GET, &uri);
var res = try req.sendSync(.{});
const uri = try Uri.parse("http://127.0.0.1:9582/http_client/simple");
var req = try client.request(.GET, &uri);
var res = try req.sendSync(.{});
try testing.expectEqual(null, try res.next());
try testing.expectEqual(200, res.header.status);
try testing.expectEqual(2, res.header.count());
try testing.expectEqual("close", res.header.get("connection"));
try testing.expectEqual("0", res.header.get("content-length"));
if (i == 0) {
try testing.expectEqual("", try res.peek());
}
try testing.expectEqual(null, try res.next());
try testing.expectEqual(200, res.header.status);
try testing.expectEqual(2, res.header.count());
try testing.expectEqual("close", res.header.get("connection"));
try testing.expectEqual("0", res.header.get("content-length"));
}
}
test "HttpClient: sync tls no body" {
@@ -1804,21 +1871,26 @@ test "HttpClient: sync tls no body" {
}
test "HttpClient: sync with body" {
var client = try testClient();
defer client.deinit();
for (0..2) |i| {
var client = try testClient();
defer client.deinit();
const uri = try Uri.parse("http://127.0.0.1:9582/http_client/echo");
var req = try client.request(.GET, &uri);
var res = try req.sendSync(.{});
const uri = try Uri.parse("http://127.0.0.1:9582/http_client/echo");
var req = try client.request(.GET, &uri);
var res = try req.sendSync(.{});
try testing.expectEqual("over 9000!", try res.next());
try testing.expectEqual(201, res.header.status);
try testing.expectEqual(5, res.header.count());
try testing.expectEqual("close", res.header.get("connection"));
try testing.expectEqual("10", res.header.get("content-length"));
try testing.expectEqual("127.0.0.1", res.header.get("_host"));
try testing.expectEqual("Close", res.header.get("_connection"));
try testing.expectEqual("Lightpanda/1.0", res.header.get("_user-agent"));
if (i == 0) {
try testing.expectEqual("over 9000!", try res.peek());
}
try testing.expectEqual("over 9000!", try res.next());
try testing.expectEqual(201, res.header.status);
try testing.expectEqual(5, res.header.count());
try testing.expectEqual("close", res.header.get("connection"));
try testing.expectEqual("10", res.header.get("content-length"));
try testing.expectEqual("127.0.0.1", res.header.get("_host"));
try testing.expectEqual("Close", res.header.get("_connection"));
try testing.expectEqual("Lightpanda/1.0", res.header.get("_user-agent"));
}
}
test "HttpClient: sync tls with body" {