From d524a79d43d6785d36e52929727199f51ace3f3b Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 15 Dec 2025 18:02:37 +0100 Subject: [PATCH 01/33] wip --- Cargo.lock | 32 ++++-- objectstore-server/Cargo.toml | 5 +- objectstore-server/src/auth/service.rs | 11 +- objectstore-server/src/endpoints/batch.rs | 112 +++++++++++++++++++++ objectstore-server/src/endpoints/common.rs | 8 ++ objectstore-server/src/endpoints/mod.rs | 5 +- objectstore-service/src/id.rs | 4 +- objectstore-service/src/lib.rs | 4 + 8 files changed, 163 insertions(+), 18 deletions(-) create mode 100644 objectstore-server/src/endpoints/batch.rs diff --git a/Cargo.lock b/Cargo.lock index cbf75912..8ae725b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -295,9 +295,9 @@ dependencies = [ [[package]] name = "axum" -version = "0.8.6" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a18ed336352031311f4e0b4dd2ff392d4fbb370777c9d18d7fc9d7359f73871" +checksum = "5b098575ebe77cb6d14fc7f32749631a6e44edbef6b796f89b020e99ba20d425" dependencies = [ "axum-core", "bytes", @@ -312,6 +312,7 @@ dependencies = [ "matchit", "memchr", "mime", + "multer", "percent-encoding", "pin-project-lite", "serde_core", @@ -347,21 +348,22 @@ dependencies = [ [[package]] name = "axum-extra" -version = "0.10.3" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9963ff19f40c6102c76756ef0a46004c0d58957d87259fc9208ff8441c12ab96" +checksum = "dbfe9f610fe4e99cf0cfcd03ccf8c63c28c616fe714d80475ef731f3b13dd21b" dependencies = [ "axum", "axum-core", "bytes", + "fastrand", + "futures-core", "futures-util", "http 1.3.1", "http-body", "http-body-util", "mime", + "multer", "pin-project-lite", - "rustversion", - "serde_core", "tower-layer", "tower-service", "tracing", @@ -2037,6 +2039,23 @@ dependencies = [ "uuid", ] +[[package]] +name = "multer" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 1.3.1", + "httparse", + "memchr", + "mime", + "spin", + "version_check", +] + [[package]] name = "multimap" version = "0.10.1" @@ -2204,6 +2223,7 @@ dependencies = [ "console", "elegant-departure", "figment", + "futures", "futures-util", "humantime", "humantime-serde", diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index 2e0386f9..7df99a0c 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -13,11 +13,12 @@ publish = false [dependencies] anyhow = { workspace = true } argh = "0.1.13" -axum = "0.8.4" -axum-extra = "0.10.1" +axum = { version = "0.8.4", features = ["multipart"] } +axum-extra = { version = "0.12.2", features = ["multipart"] } console = "0.16.1" elegant-departure = { version = "0.3.2", features = ["tokio"] } figment = { version = "0.10.19", features = ["env", "test", "yaml"] } +futures = { workspace = true } futures-util = { workspace = true } humantime = { workspace = true } humantime-serde = { workspace = true } diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index 8429ced1..f6474982 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -1,5 +1,5 @@ use objectstore_service::id::{ObjectContext, ObjectId}; -use objectstore_service::{PayloadStream, StorageService}; +use objectstore_service::{DeleteResult, GetResult, InsertResult, PayloadStream, StorageService}; use objectstore_types::{Metadata, Permission}; use crate::auth::AuthContext; @@ -55,7 +55,7 @@ impl AuthAwareService { key: Option, metadata: &Metadata, stream: PayloadStream, - ) -> anyhow::Result { + ) -> InsertResult { self.assert_authorized(Permission::ObjectWrite, &context)?; self.service .insert_object(context, key, metadata, stream) @@ -63,16 +63,13 @@ impl AuthAwareService { } /// Auth-aware wrapper around [`StorageService::get_object`]. - pub async fn get_object( - &self, - id: &ObjectId, - ) -> anyhow::Result> { + pub async fn get_object(&self, id: &ObjectId) -> GetResult { self.assert_authorized(Permission::ObjectRead, id.context())?; self.service.get_object(id).await } /// Auth-aware wrapper around [`StorageService::delete_object`]. - pub async fn delete_object(&self, id: &ObjectId) -> anyhow::Result<()> { + pub async fn delete_object(&self, id: &ObjectId) -> DeleteResult { self.assert_authorized(Permission::ObjectDelete, id.context())?; self.service.delete_object(id).await } diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs new file mode 100644 index 00000000..cded7324 --- /dev/null +++ b/objectstore-server/src/endpoints/batch.rs @@ -0,0 +1,112 @@ +use std::os::unix::fs::OpenOptionsExt; + +use axum::extract::{DefaultBodyLimit, Multipart}; +use axum::http::StatusCode; +use axum::response::{IntoResponse, IntoResponseParts}; +use axum::routing; +use axum::{Json, Router}; +use axum_extra::response::multiple::MultipartForm; +use futures::TryStreamExt; +use futures_util::StreamExt; +use objectstore_service::id::{ObjectContext, ObjectId}; +use objectstore_service::{DeleteResult, GetResult}; +use objectstore_types::Metadata; +use serde::{Deserialize, Serialize}; + +use crate::auth::AuthAwareService; +use crate::error::ApiResult; +use crate::extractors::Xt; +use crate::state::ServiceState; + +pub fn router() -> Router { + Router::new() + .route("/objects:batch/{usecase}/{scopes}/", routing::post(batch)) + .layer(DefaultBodyLimit::max(500 * 1_000_000)) +} + +#[derive(Deserialize, Debug)] +#[serde(tag = "op")] +enum Operation { + Get(String), + Insert(Option), + Delete(String), +} + +#[derive(Deserialize, Debug)] +struct RequestManifest { + operations: Vec, +} + +#[derive(Serialize, Debug)] +struct Response {} + +pub trait IntoPart { + fn into_part(self) -> axum_extra::response::multiple::Part; +} + +impl IntoPart for GetResult { + fn into_part(self) -> axum_extra::response::multiple::Part { + todo!() + } +} + +#[derive(Deserialize, Debug)] +struct ResultManifest {} + +impl IntoPart for ResultManifest { + fn into_part(self) -> axum_extra::response::multiple::Part { + todo!() + } +} + +async fn batch( + service: AuthAwareService, + Xt(context): Xt, + mut multipart: Multipart, +) -> ApiResult { + let Some(manifest) = multipart.next_field().await? else { + return Ok((StatusCode::BAD_REQUEST, "expected manifest").into_response()); + }; + + // TODO: enforce max size on the manifest + let manifest = manifest.bytes().await?; + let manifest = str::from_utf8(&manifest) + .map_err(|err| anyhow::Error::new(err).context("failed to parse manifest as UTF-8"))?; + let manifest: RequestManifest = serde_json::from_str(manifest) + .map_err(|err| anyhow::Error::new(err).context("failed to deserialize manifest"))?; + + let result_manifest = ResultManifest {}; + let mut parts = vec![]; + for operation in manifest.operations { + let result = match operation { + Operation::Get(key) => { + let result = service + .get_object(&ObjectId::new(context.clone(), key)) + .await; + parts.push(result.into_part()); + } + Operation::Insert(key) => { + service.insert_object( + context.clone(), + key, + &Metadata::default(), + multipart + .next_field() + .await + .unwrap() + .unwrap() + .map_err(|err| std::io::Error::other(err)) + .boxed(), + ); + } + Operation::Delete(key) => { + service + .delete_object(&ObjectId::new(context.clone(), key)) + .await; + } + }; + } + parts.insert(0, result_manifest.into_part()); + + Ok(MultipartForm::with_parts(parts).into_response()) +} diff --git a/objectstore-server/src/endpoints/common.rs b/objectstore-server/src/endpoints/common.rs index 8da1c300..5a3a1d5a 100644 --- a/objectstore-server/src/endpoints/common.rs +++ b/objectstore-server/src/endpoints/common.rs @@ -1,8 +1,10 @@ //! //! This is mostly adapted from +use axum::extract::multipart::MultipartError; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; +use objectstore_service::DeleteResult; pub enum AnyhowResponse { Error(anyhow::Error), @@ -40,3 +42,9 @@ impl From for AnyhowResponse { Self::Error(err) } } + +impl From for AnyhowResponse { + fn from(value: MultipartError) -> Self { + Self::Error(value.into()) + } +} diff --git a/objectstore-server/src/endpoints/mod.rs b/objectstore-server/src/endpoints/mod.rs index 6907977e..b9c0c468 100644 --- a/objectstore-server/src/endpoints/mod.rs +++ b/objectstore-server/src/endpoints/mod.rs @@ -6,12 +6,15 @@ use axum::Router; use crate::state::ServiceState; +mod batch; mod common; mod health; mod objects; pub fn routes() -> Router { - let routes_v1 = Router::new().merge(objects::router()); + let routes_v1 = Router::new() + .merge(objects::router()) + .merge(batch::router()); Router::new() .merge(health::router()) diff --git a/objectstore-service/src/id.rs b/objectstore-service/src/id.rs index 622cf652..fdd34177 100644 --- a/objectstore-service/src/id.rs +++ b/objectstore-service/src/id.rs @@ -11,9 +11,9 @@ use std::fmt; use objectstore_types::scope::{Scope, Scopes}; -/// Defines where an object belongs within the object store. +/// Defines where an object, or batch of objects, belongs within the object store. /// -/// This is part of the full object identifier, see [`ObjectId`]. +/// This is part of the full object identifier for single objects, see [`ObjectId`]. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ObjectContext { /// The usecase, or "product" this object belongs to. diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 3566044a..0ddf4ab7 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -86,6 +86,10 @@ pub enum StorageConfig<'a> { }, } +pub type GetResult = anyhow::Result>; +pub type InsertResult = anyhow::Result; +pub type DeleteResult = anyhow::Result<()>; + impl StorageService { /// Creates a new `StorageService` with the specified configuration. pub async fn new( From e17e474ba09e39435bf9381e5a05ba4398a6ddfc Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 16 Dec 2025 14:06:34 +0100 Subject: [PATCH 02/33] pass around stream of inserts --- Cargo.lock | 2 + objectstore-server/Cargo.toml | 2 + objectstore-server/src/auth/service.rs | 16 ++++- objectstore-server/src/endpoints/batch.rs | 76 ++++++++--------------- objectstore-service/src/lib.rs | 31 +++++++++ 5 files changed, 77 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8ae725b5..f6b32397 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2220,11 +2220,13 @@ dependencies = [ "argh", "axum", "axum-extra", + "bytes", "console", "elegant-departure", "figment", "futures", "futures-util", + "http 1.3.1", "humantime", "humantime-serde", "jsonwebtoken", diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index 7df99a0c..d42b270f 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -15,11 +15,13 @@ anyhow = { workspace = true } argh = "0.1.13" axum = { version = "0.8.4", features = ["multipart"] } axum-extra = { version = "0.12.2", features = ["multipart"] } +bytes = { workspace = true } console = "0.16.1" elegant-departure = { version = "0.3.2", features = ["tokio"] } figment = { version = "0.10.19", features = ["env", "test", "yaml"] } futures = { workspace = true } futures-util = { workspace = true } +http = { workspace = true } humantime = { workspace = true } humantime-serde = { workspace = true } jsonwebtoken = { workspace = true } diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index f6474982..1eac96de 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -1,5 +1,9 @@ +use bytes::Bytes; +use futures::Stream; use objectstore_service::id::{ObjectContext, ObjectId}; -use objectstore_service::{DeleteResult, GetResult, InsertResult, PayloadStream, StorageService}; +use objectstore_service::{ + BatchInsertResult, DeleteResult, GetResult, InsertResult, PayloadStream, StorageService, +}; use objectstore_types::{Metadata, Permission}; use crate::auth::AuthContext; @@ -73,4 +77,14 @@ impl AuthAwareService { self.assert_authorized(Permission::ObjectDelete, id.context())?; self.service.delete_object(id).await } + + /// Auth-aware wrapper around [`StorageService::insert_objects`]. + pub async fn insert_objects( + &self, + context: ObjectContext, + inserts: impl Stream>, + ) -> BatchInsertResult { + self.assert_authorized(Permission::ObjectWrite, &context)?; + self.service.insert_objects(context, inserts).await + } } diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index cded7324..9886be21 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -2,14 +2,15 @@ use std::os::unix::fs::OpenOptionsExt; use axum::extract::{DefaultBodyLimit, Multipart}; use axum::http::StatusCode; -use axum::response::{IntoResponse, IntoResponseParts}; +use axum::response::IntoResponse; +use axum::response::Response; use axum::routing; use axum::{Json, Router}; -use axum_extra::response::multiple::MultipartForm; +use axum_extra::response::multiple::{MultipartForm, Part}; use futures::TryStreamExt; -use futures_util::StreamExt; +use futures::stream::{self, StreamExt, unfold}; use objectstore_service::id::{ObjectContext, ObjectId}; -use objectstore_service::{DeleteResult, GetResult}; +use objectstore_service::{DeleteResult, GetResult, InsertResult}; use objectstore_types::Metadata; use serde::{Deserialize, Serialize}; @@ -37,28 +38,6 @@ struct RequestManifest { operations: Vec, } -#[derive(Serialize, Debug)] -struct Response {} - -pub trait IntoPart { - fn into_part(self) -> axum_extra::response::multiple::Part; -} - -impl IntoPart for GetResult { - fn into_part(self) -> axum_extra::response::multiple::Part { - todo!() - } -} - -#[derive(Deserialize, Debug)] -struct ResultManifest {} - -impl IntoPart for ResultManifest { - fn into_part(self) -> axum_extra::response::multiple::Part { - todo!() - } -} - async fn batch( service: AuthAwareService, Xt(context): Xt, @@ -75,38 +54,37 @@ async fn batch( let manifest: RequestManifest = serde_json::from_str(manifest) .map_err(|err| anyhow::Error::new(err).context("failed to deserialize manifest"))?; - let result_manifest = ResultManifest {}; - let mut parts = vec![]; - for operation in manifest.operations { - let result = match operation { + let inserts = unfold(multipart, |mut m| async move { + match m.next_field().await { + Ok(Some(field)) => { + let metadata = Metadata::from_headers(field.headers(), "").unwrap(); + let bytes = field + .bytes() + .await + .map_err(|e| anyhow::Error::new(e)) + .unwrap(); + Some((Ok((metadata, bytes)), m)) + } + Ok(None) => None, + Err(_) => todo!(), + } + }); + let insert_results = service.insert_objects(context.clone(), inserts).await; + + for operation in manifest.operations.into_iter() { + match operation { Operation::Get(key) => { let result = service .get_object(&ObjectId::new(context.clone(), key)) .await; - parts.push(result.into_part()); - } - Operation::Insert(key) => { - service.insert_object( - context.clone(), - key, - &Metadata::default(), - multipart - .next_field() - .await - .unwrap() - .unwrap() - .map_err(|err| std::io::Error::other(err)) - .boxed(), - ); } Operation::Delete(key) => { - service + let result = service .delete_object(&ObjectId::new(context.clone(), key)) .await; } + _ => (), }; } - parts.insert(0, result_manifest.into_part()); - - Ok(MultipartForm::with_parts(parts).into_response()) + Ok(StatusCode::OK.into_response()) } diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 0ddf4ab7..531672b1 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -15,6 +15,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Instant; use bytes::{Bytes, BytesMut}; +use futures_util::Stream; use futures_util::{StreamExt, TryStreamExt, stream::BoxStream}; use objectstore_types::Metadata; @@ -88,6 +89,7 @@ pub enum StorageConfig<'a> { pub type GetResult = anyhow::Result>; pub type InsertResult = anyhow::Result; +pub type BatchInsertResult = anyhow::Result>; pub type DeleteResult = anyhow::Result<()>; impl StorageService { @@ -278,6 +280,35 @@ impl StorageService { Ok(()) } + + /// TODO + pub async fn insert_objects( + &self, + context: ObjectContext, + inserts: impl Stream>, + ) -> BatchInsertResult { + let mut inserts = Box::pin(inserts); + + let mut results = Vec::new(); + while let Some(item) = inserts.next().await { + let result = match item { + Ok((metadata, bytes)) => { + let id = ObjectId::optional(context.clone(), None); + let stream = futures_util::stream::once(async { Ok(bytes) }).boxed(); + + self.0 + .high_volume_backend + .put_object(&id, &metadata, stream) + .await?; + + Ok(id) + } + Err(e) => Err(e), + }; + results.push(result); + } + Ok(results) + } } fn is_tombstoned(result: &Option<(Metadata, PayloadStream)>) -> bool { From df0f4599c46475528d25d6514b183ba99483ccec Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 10:29:25 +0100 Subject: [PATCH 03/33] wip --- Cargo.lock | 14 ++ objectstore-server/Cargo.toml | 4 +- objectstore-server/src/endpoints/batch.rs | 85 ++---------- objectstore-server/src/extractors/batch.rs | 146 +++++++++++++++++++++ objectstore-server/src/extractors/mod.rs | 3 + objectstore-service/src/id.rs | 10 +- objectstore-service/src/lib.rs | 2 +- 7 files changed, 184 insertions(+), 80 deletions(-) create mode 100644 objectstore-server/src/extractors/batch.rs diff --git a/Cargo.lock b/Cargo.lock index f6b32397..33e52c8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -300,6 +300,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b098575ebe77cb6d14fc7f32749631a6e44edbef6b796f89b020e99ba20d425" dependencies = [ "axum-core", + "axum-macros", "bytes", "form_urlencoded", "futures-util", @@ -369,6 +370,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-macros" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "backtrace" version = "0.3.76" @@ -2232,6 +2244,8 @@ dependencies = [ "jsonwebtoken", "merni", "mimalloc", + "mime", + "multer", "nix", "num_cpus", "objectstore-service", diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index d42b270f..d39ac30f 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -13,7 +13,7 @@ publish = false [dependencies] anyhow = { workspace = true } argh = "0.1.13" -axum = { version = "0.8.4", features = ["multipart"] } +axum = { version = "0.8.4", features = ["multipart", "macros"] } axum-extra = { version = "0.12.2", features = ["multipart"] } bytes = { workspace = true } console = "0.16.1" @@ -27,6 +27,8 @@ humantime-serde = { workspace = true } jsonwebtoken = { workspace = true } merni = { workspace = true } mimalloc = { workspace = true } +mime = "0.3.17" +multer = "3.1.0" num_cpus = "1.17.0" objectstore-service = { workspace = true } objectstore-types = { workspace = true } diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index 9886be21..4954e8b8 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -1,22 +1,13 @@ -use std::os::unix::fs::OpenOptionsExt; - -use axum::extract::{DefaultBodyLimit, Multipart}; +use axum::Router; +use axum::extract::DefaultBodyLimit; use axum::http::StatusCode; -use axum::response::IntoResponse; -use axum::response::Response; +use axum::response::{IntoResponse, Response}; use axum::routing; -use axum::{Json, Router}; -use axum_extra::response::multiple::{MultipartForm, Part}; -use futures::TryStreamExt; -use futures::stream::{self, StreamExt, unfold}; -use objectstore_service::id::{ObjectContext, ObjectId}; -use objectstore_service::{DeleteResult, GetResult, InsertResult}; -use objectstore_types::Metadata; -use serde::{Deserialize, Serialize}; +use objectstore_service::id::ObjectContext; use crate::auth::AuthAwareService; use crate::error::ApiResult; -use crate::extractors::Xt; +use crate::extractors::{BatchRequest, Xt}; use crate::state::ServiceState; pub fn router() -> Router { @@ -25,66 +16,10 @@ pub fn router() -> Router { .layer(DefaultBodyLimit::max(500 * 1_000_000)) } -#[derive(Deserialize, Debug)] -#[serde(tag = "op")] -enum Operation { - Get(String), - Insert(Option), - Delete(String), -} - -#[derive(Deserialize, Debug)] -struct RequestManifest { - operations: Vec, -} - async fn batch( - service: AuthAwareService, - Xt(context): Xt, - mut multipart: Multipart, -) -> ApiResult { - let Some(manifest) = multipart.next_field().await? else { - return Ok((StatusCode::BAD_REQUEST, "expected manifest").into_response()); - }; - - // TODO: enforce max size on the manifest - let manifest = manifest.bytes().await?; - let manifest = str::from_utf8(&manifest) - .map_err(|err| anyhow::Error::new(err).context("failed to parse manifest as UTF-8"))?; - let manifest: RequestManifest = serde_json::from_str(manifest) - .map_err(|err| anyhow::Error::new(err).context("failed to deserialize manifest"))?; - - let inserts = unfold(multipart, |mut m| async move { - match m.next_field().await { - Ok(Some(field)) => { - let metadata = Metadata::from_headers(field.headers(), "").unwrap(); - let bytes = field - .bytes() - .await - .map_err(|e| anyhow::Error::new(e)) - .unwrap(); - Some((Ok((metadata, bytes)), m)) - } - Ok(None) => None, - Err(_) => todo!(), - } - }); - let insert_results = service.insert_objects(context.clone(), inserts).await; - - for operation in manifest.operations.into_iter() { - match operation { - Operation::Get(key) => { - let result = service - .get_object(&ObjectId::new(context.clone(), key)) - .await; - } - Operation::Delete(key) => { - let result = service - .delete_object(&ObjectId::new(context.clone(), key)) - .await; - } - _ => (), - }; - } - Ok(StatusCode::OK.into_response()) + _service: AuthAwareService, + Xt(_context): Xt, + _request: BatchRequest, +) -> ApiResult { + Ok(StatusCode::NOT_IMPLEMENTED.into_response()) } diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs new file mode 100644 index 00000000..7fd436fe --- /dev/null +++ b/objectstore-server/src/extractors/batch.rs @@ -0,0 +1,146 @@ +use std::{fmt::Debug, path::Path, pin::Pin}; + +use anyhow::Context; +use axum::{ + RequestExt, Router, + body::{Body, Bytes}, + extract::{FromRequest, Request}, + http::{ + StatusCode, + header::{HeaderValue, USER_AGENT}, + }, + response::{IntoResponse, Response}, + routing::get, +}; +use bytes::Buf; +use futures::{Stream, stream}; +use http::header::CONTENT_TYPE; +use multer::{Constraints, Multipart, SizeLimit}; +use objectstore_service::{BACKEND_SIZE_THRESHOLD, id::ObjectKey}; +use objectstore_types::Metadata; +use serde::Deserialize; + +use crate::error::AnyhowResponse; + +#[derive(Deserialize, Debug)] +#[serde(tag = "op")] +pub enum Operation { + Get(ObjectKey), + Insert(Option), + Delete(ObjectKey), +} + +#[derive(Deserialize, Debug)] +pub struct Manifest { + pub operations: Vec, +} + +pub struct BatchRequest { + pub manifest: Manifest, + pub parts: Pin> + Send>>, +} + +const MANIFEST_FIELD_NAME: &'static str = "manifest"; + +impl Debug for BatchRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BatchRequest") + .field("manifest", &self.manifest) + .finish() + } +} + +impl FromRequest for BatchRequest +where + S: Send + Sync, +{ + type Rejection = AnyhowResponse; + + async fn from_request(request: Request, _: &S) -> Result { + let Some(content_type) = request + .headers() + .get(CONTENT_TYPE) + .and_then(|ct| ct.to_str().ok()) + else { + return Err((StatusCode::BAD_REQUEST, "expected valid Content-Type") + .into_response() + .into()); + }; + + let Ok(mime) = content_type.parse::() else { + return Err((StatusCode::BAD_REQUEST, "expected valid Content-Type") + .into_response() + .into()); + }; + if !(mime.type_() == mime::MULTIPART && mime.subtype() == "mixed") { + return Err(( + StatusCode::BAD_REQUEST, + "expected Content-Type: multipart/mixed", + ) + .into_response() + .into()); + } + + // XXX: `multer::parse_boundary` requires the content-type to be `multipart/form-data` + let content_type = content_type.replace("multipart/mixed", "multipart/form-data"); + let boundary = + multer::parse_boundary(content_type).context("failed to parse multipart boundary")?; + + let mut parts = Multipart::with_constraints( + request.into_body().into_data_stream(), + boundary, + Constraints::new().size_limit( + SizeLimit::new() + // 200 MiB: BigTable's maximum size for a single mutation + .whole_stream(200 * 1024 * 1024) + // A single operation serializes to (minimum) roughly 14 bytes, so this roughly + // means we accept a maximum of 10_000 operations per batch request + .for_field(MANIFEST_FIELD_NAME, 14 * 10_000) + // Each payload needs to be within the maximum size supported by BigTable + .per_field(BACKEND_SIZE_THRESHOLD as u64), + ), + ); + + let manifest = parts + .next_field() + .await + .context("failed to parse multipart part")? + .ok_or( + ( + StatusCode::BAD_REQUEST, + "expected at least one multipart part", + ) + .into_response(), + )?; + let manifest = manifest + .bytes() + .await + .context("failed to extract manifest")?; + let manifest = serde_json::from_reader::<_, Manifest>(manifest.reader()) + .context("failed to parse manifest")?; + + let parts = Box::pin(stream::unfold(parts, |mut m| async move { + match m.next_field().await { + Ok(Some(field)) => { + let metadata = match Metadata::from_headers(field.headers(), "") { + Ok(metadata) => metadata, + Err(err) => { + return Some((Err(err.into()), m)); + } + }; + let bytes = match field.bytes().await { + Ok(bytes) => bytes, + Err(err) => { + return Some((Err(err.into()), m)); + } + }; + Some((Ok((metadata, bytes)), m)) + } + Ok(None) => None, + Err(err) => Some((Err(err).context("failed to parse multipart part"), m)), + } + })); + + Ok(Self { manifest, parts }) + } +} diff --git a/objectstore-server/src/extractors/mod.rs b/objectstore-server/src/extractors/mod.rs index 5b270335..2bd79ed0 100644 --- a/objectstore-server/src/extractors/mod.rs +++ b/objectstore-server/src/extractors/mod.rs @@ -1,3 +1,4 @@ +mod batch; mod id; mod service; @@ -8,3 +9,5 @@ mod service; /// `Xt` for this to work. #[derive(Debug)] pub struct Xt(pub T); + +pub use batch::{BatchRequest, Manifest, Operation}; diff --git a/objectstore-service/src/id.rs b/objectstore-service/src/id.rs index fdd34177..7219d35d 100644 --- a/objectstore-service/src/id.rs +++ b/objectstore-service/src/id.rs @@ -10,11 +10,12 @@ use std::fmt; use objectstore_types::scope::{Scope, Scopes}; +use serde::Deserialize; /// Defines where an object, or batch of objects, belongs within the object store. /// /// This is part of the full object identifier for single objects, see [`ObjectId`]. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)] pub struct ObjectContext { /// The usecase, or "product" this object belongs to. /// @@ -62,7 +63,7 @@ pub struct ObjectContext { /// /// This consists of a usecase and the scopes, which make up the object's context and define where /// the object belongs within objectstore, as well as the unique key within the context. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)] pub struct ObjectId { /// The usecase and scopes this object belongs to. pub context: ObjectContext, @@ -73,9 +74,12 @@ pub struct ObjectId { /// a key makes a unique identifier. /// /// Keys can be assigned by the service. For this, use [`ObjectId::random`]. - pub key: String, + pub key: ObjectKey, } +/// A key that uniquely identifies an object within its usecase and scopes. +pub type ObjectKey = String; + impl ObjectId { /// Creates a new `ObjectId` with the given `context` and `key`. pub fn new(context: ObjectContext, key: String) -> Self { diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 531672b1..3ecca695 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -23,7 +23,7 @@ use crate::backend::common::BoxedBackend; use crate::id::{ObjectContext, ObjectId}; /// The threshold up until which we will go to the "high volume" backend. -const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB +pub const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB enum BackendChoice { HighVolume, From 7f898eb4cb05701dc786df7d4925cca51b3246c3 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 10:31:12 +0100 Subject: [PATCH 04/33] wip --- Cargo.lock | 12 ------------ objectstore-server/Cargo.toml | 2 +- objectstore-service/src/id.rs | 5 ++--- 3 files changed, 3 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 33e52c8c..096f1c03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -300,7 +300,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b098575ebe77cb6d14fc7f32749631a6e44edbef6b796f89b020e99ba20d425" dependencies = [ "axum-core", - "axum-macros", "bytes", "form_urlencoded", "futures-util", @@ -370,17 +369,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "axum-macros" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "backtrace" version = "0.3.76" diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index d39ac30f..bca89e5c 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -13,7 +13,7 @@ publish = false [dependencies] anyhow = { workspace = true } argh = "0.1.13" -axum = { version = "0.8.4", features = ["multipart", "macros"] } +axum = { version = "0.8.4", features = ["multipart"] } axum-extra = { version = "0.12.2", features = ["multipart"] } bytes = { workspace = true } console = "0.16.1" diff --git a/objectstore-service/src/id.rs b/objectstore-service/src/id.rs index 7219d35d..5154ba72 100644 --- a/objectstore-service/src/id.rs +++ b/objectstore-service/src/id.rs @@ -10,12 +10,11 @@ use std::fmt; use objectstore_types::scope::{Scope, Scopes}; -use serde::Deserialize; /// Defines where an object, or batch of objects, belongs within the object store. /// /// This is part of the full object identifier for single objects, see [`ObjectId`]. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ObjectContext { /// The usecase, or "product" this object belongs to. /// @@ -63,7 +62,7 @@ pub struct ObjectContext { /// /// This consists of a usecase and the scopes, which make up the object's context and define where /// the object belongs within objectstore, as well as the unique key within the context. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ObjectId { /// The usecase and scopes this object belongs to. pub context: ObjectContext, From 5bd4a2c72a71c1b01e94d776acd9c518e9029284 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 10:39:46 +0100 Subject: [PATCH 05/33] wip --- objectstore-server/src/auth/service.rs | 4 ++- objectstore-server/src/endpoints/common.rs | 1 - objectstore-server/src/extractors/batch.rs | 13 +++------ objectstore-service/src/lib.rs | 34 +++++++--------------- 4 files changed, 17 insertions(+), 35 deletions(-) diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index 1eac96de..eee1ddc6 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -1,3 +1,5 @@ +use std::pin::Pin; + use bytes::Bytes; use futures::Stream; use objectstore_service::id::{ObjectContext, ObjectId}; @@ -82,7 +84,7 @@ impl AuthAwareService { pub async fn insert_objects( &self, context: ObjectContext, - inserts: impl Stream>, + inserts: Pin> + Send>>, ) -> BatchInsertResult { self.assert_authorized(Permission::ObjectWrite, &context)?; self.service.insert_objects(context, inserts).await diff --git a/objectstore-server/src/endpoints/common.rs b/objectstore-server/src/endpoints/common.rs index 5a3a1d5a..346cee3a 100644 --- a/objectstore-server/src/endpoints/common.rs +++ b/objectstore-server/src/endpoints/common.rs @@ -4,7 +4,6 @@ use axum::extract::multipart::MultipartError; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; -use objectstore_service::DeleteResult; pub enum AnyhowResponse { Error(anyhow::Error), diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 7fd436fe..a91f922c 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -1,16 +1,11 @@ -use std::{fmt::Debug, path::Path, pin::Pin}; +use std::{fmt::Debug, pin::Pin}; use anyhow::Context; use axum::{ - RequestExt, Router, - body::{Body, Bytes}, + body::Bytes, extract::{FromRequest, Request}, - http::{ - StatusCode, - header::{HeaderValue, USER_AGENT}, - }, - response::{IntoResponse, Response}, - routing::get, + http::StatusCode, + response::IntoResponse, }; use bytes::Buf; use futures::{Stream, stream}; diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 3ecca695..51576512 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -10,6 +10,7 @@ mod backend; pub mod id; use std::path::Path; +use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Instant; @@ -87,11 +88,16 @@ pub enum StorageConfig<'a> { }, } +/// Result type for get operations. pub type GetResult = anyhow::Result>; +/// Result type for insert operations. pub type InsertResult = anyhow::Result; -pub type BatchInsertResult = anyhow::Result>; +/// Result type for delete operations. pub type DeleteResult = anyhow::Result<()>; +/// Result type for batch insert operations. +pub type BatchInsertResult = anyhow::Result>; + impl StorageService { /// Creates a new `StorageService` with the specified configuration. pub async fn new( @@ -284,30 +290,10 @@ impl StorageService { /// TODO pub async fn insert_objects( &self, - context: ObjectContext, - inserts: impl Stream>, + _context: ObjectContext, + _inserts: Pin> + Send>>, ) -> BatchInsertResult { - let mut inserts = Box::pin(inserts); - - let mut results = Vec::new(); - while let Some(item) = inserts.next().await { - let result = match item { - Ok((metadata, bytes)) => { - let id = ObjectId::optional(context.clone(), None); - let stream = futures_util::stream::once(async { Ok(bytes) }).boxed(); - - self.0 - .high_volume_backend - .put_object(&id, &metadata, stream) - .await?; - - Ok(id) - } - Err(e) => Err(e), - }; - results.push(result); - } - Ok(results) + todo!(); } } From c7f0828a6e9a91d8edfba3051066aef798feaa1c Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 10:45:36 +0100 Subject: [PATCH 06/33] wip --- objectstore-server/src/extractors/batch.rs | 13 ++++++------- objectstore-service/src/lib.rs | 3 +++ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index a91f922c..4bd7d170 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -1,17 +1,16 @@ -use std::{fmt::Debug, pin::Pin}; +use std::fmt::Debug; use anyhow::Context; use axum::{ - body::Bytes, extract::{FromRequest, Request}, http::StatusCode, response::IntoResponse, }; use bytes::Buf; -use futures::{Stream, stream}; +use futures::stream; use http::header::CONTENT_TYPE; use multer::{Constraints, Multipart, SizeLimit}; -use objectstore_service::{BACKEND_SIZE_THRESHOLD, id::ObjectKey}; +use objectstore_service::{BACKEND_SIZE_THRESHOLD, InsertStream, id::ObjectKey}; use objectstore_types::Metadata; use serde::Deserialize; @@ -32,7 +31,7 @@ pub struct Manifest { pub struct BatchRequest { pub manifest: Manifest, - pub parts: Pin> + Send>>, + pub inserts: InsertStream, } const MANIFEST_FIELD_NAME: &'static str = "manifest"; @@ -114,7 +113,7 @@ where let manifest = serde_json::from_reader::<_, Manifest>(manifest.reader()) .context("failed to parse manifest")?; - let parts = Box::pin(stream::unfold(parts, |mut m| async move { + let inserts = Box::pin(stream::unfold(parts, |mut m| async move { match m.next_field().await { Ok(Some(field)) => { let metadata = match Metadata::from_headers(field.headers(), "") { @@ -136,6 +135,6 @@ where } })); - Ok(Self { manifest, parts }) + Ok(Self { manifest, inserts }) } } diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 51576512..aec6f98e 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -95,6 +95,9 @@ pub type InsertResult = anyhow::Result; /// Result type for delete operations. pub type DeleteResult = anyhow::Result<()>; +/// Type alias to represent a stream of insert operations. +pub type InsertStream = + Pin> + Send>>; /// Result type for batch insert operations. pub type BatchInsertResult = anyhow::Result>; From 73311254366511f79d03d6ccc0f3b0a2a9d47a9d Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 10:46:08 +0100 Subject: [PATCH 07/33] wip --- objectstore-server/src/endpoints/common.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/objectstore-server/src/endpoints/common.rs b/objectstore-server/src/endpoints/common.rs index 346cee3a..8da1c300 100644 --- a/objectstore-server/src/endpoints/common.rs +++ b/objectstore-server/src/endpoints/common.rs @@ -1,7 +1,6 @@ //! //! This is mostly adapted from -use axum::extract::multipart::MultipartError; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; @@ -41,9 +40,3 @@ impl From for AnyhowResponse { Self::Error(err) } } - -impl From for AnyhowResponse { - fn from(value: MultipartError) -> Self { - Self::Error(value.into()) - } -} From e569b7aa0c8225ccba7f23fd85e0456dbba4be2d Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 10:48:12 +0100 Subject: [PATCH 08/33] wip --- objectstore-server/src/auth/service.rs | 5 +++-- objectstore-service/src/lib.rs | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index eee1ddc6..8a13af53 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -4,7 +4,8 @@ use bytes::Bytes; use futures::Stream; use objectstore_service::id::{ObjectContext, ObjectId}; use objectstore_service::{ - BatchInsertResult, DeleteResult, GetResult, InsertResult, PayloadStream, StorageService, + BatchInsertResult, DeleteResult, GetResult, InsertResult, InsertStream, PayloadStream, + StorageService, }; use objectstore_types::{Metadata, Permission}; @@ -84,7 +85,7 @@ impl AuthAwareService { pub async fn insert_objects( &self, context: ObjectContext, - inserts: Pin> + Send>>, + inserts: InsertStream, ) -> BatchInsertResult { self.assert_authorized(Permission::ObjectWrite, &context)?; self.service.insert_objects(context, inserts).await diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index aec6f98e..a8fef168 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -294,7 +294,7 @@ impl StorageService { pub async fn insert_objects( &self, _context: ObjectContext, - _inserts: Pin> + Send>>, + _inserts: InsertStream, ) -> BatchInsertResult { todo!(); } From 53a1abcfc873cb6dd4d9fdf8e1fb9453d9a021fe Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 10:55:32 +0100 Subject: [PATCH 09/33] wip --- objectstore-server/src/endpoints/batch.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index 4954e8b8..3a9133a1 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -1,5 +1,4 @@ use axum::Router; -use axum::extract::DefaultBodyLimit; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use axum::routing; @@ -11,9 +10,7 @@ use crate::extractors::{BatchRequest, Xt}; use crate::state::ServiceState; pub fn router() -> Router { - Router::new() - .route("/objects:batch/{usecase}/{scopes}/", routing::post(batch)) - .layer(DefaultBodyLimit::max(500 * 1_000_000)) + Router::new().route("/objects:batch/{usecase}/{scopes}/", routing::post(batch)) } async fn batch( From e1dcd1216052440eda4e16439ef448de7eac7d97 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 10:56:19 +0100 Subject: [PATCH 10/33] wip --- objectstore-server/src/auth/service.rs | 4 ---- objectstore-server/src/extractors/batch.rs | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index 8a13af53..373a385b 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -1,7 +1,3 @@ -use std::pin::Pin; - -use bytes::Bytes; -use futures::Stream; use objectstore_service::id::{ObjectContext, ObjectId}; use objectstore_service::{ BatchInsertResult, DeleteResult, GetResult, InsertResult, InsertStream, PayloadStream, diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 4bd7d170..fdca2a51 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -34,7 +34,7 @@ pub struct BatchRequest { pub inserts: InsertStream, } -const MANIFEST_FIELD_NAME: &'static str = "manifest"; +const MANIFEST_FIELD_NAME: &str = "manifest"; impl Debug for BatchRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { From 01640d3c03c2d150325a0c59a92e6416544a9080 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 11:08:15 +0100 Subject: [PATCH 11/33] remove limits --- objectstore-server/src/extractors/batch.rs | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index fdca2a51..fc870a51 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -79,21 +79,7 @@ where let content_type = content_type.replace("multipart/mixed", "multipart/form-data"); let boundary = multer::parse_boundary(content_type).context("failed to parse multipart boundary")?; - - let mut parts = Multipart::with_constraints( - request.into_body().into_data_stream(), - boundary, - Constraints::new().size_limit( - SizeLimit::new() - // 200 MiB: BigTable's maximum size for a single mutation - .whole_stream(200 * 1024 * 1024) - // A single operation serializes to (minimum) roughly 14 bytes, so this roughly - // means we accept a maximum of 10_000 operations per batch request - .for_field(MANIFEST_FIELD_NAME, 14 * 10_000) - // Each payload needs to be within the maximum size supported by BigTable - .per_field(BACKEND_SIZE_THRESHOLD as u64), - ), - ); + let mut parts = Multipart::new(request.into_body().into_data_stream(), boundary); let manifest = parts .next_field() From 237171f1449c94655dce71387e9e96d8c91d373d Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 11:08:46 +0100 Subject: [PATCH 12/33] wip --- objectstore-service/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index a8fef168..838be53b 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -24,7 +24,7 @@ use crate::backend::common::BoxedBackend; use crate::id::{ObjectContext, ObjectId}; /// The threshold up until which we will go to the "high volume" backend. -pub const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB +const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB enum BackendChoice { HighVolume, From 0a8f2f02b49fdfc1f72062302a3f7294164a0e3a Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 11:09:17 +0100 Subject: [PATCH 13/33] wip --- objectstore-server/src/extractors/batch.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index fc870a51..64775b04 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -9,8 +9,8 @@ use axum::{ use bytes::Buf; use futures::stream; use http::header::CONTENT_TYPE; -use multer::{Constraints, Multipart, SizeLimit}; -use objectstore_service::{BACKEND_SIZE_THRESHOLD, InsertStream, id::ObjectKey}; +use multer::Multipart; +use objectstore_service::{InsertStream, id::ObjectKey}; use objectstore_types::Metadata; use serde::Deserialize; @@ -34,8 +34,6 @@ pub struct BatchRequest { pub inserts: InsertStream, } -const MANIFEST_FIELD_NAME: &str = "manifest"; - impl Debug for BatchRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("BatchRequest") From d5ae24fb5dd31a7144310e94deaf3abcd7de3709 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 11:36:05 +0100 Subject: [PATCH 14/33] add test --- objectstore-server/src/endpoints/common.rs | 1 + objectstore-server/src/extractors/batch.rs | 82 ++++++++++++++++++++-- 2 files changed, 78 insertions(+), 5 deletions(-) diff --git a/objectstore-server/src/endpoints/common.rs b/objectstore-server/src/endpoints/common.rs index 8da1c300..799a035c 100644 --- a/objectstore-server/src/endpoints/common.rs +++ b/objectstore-server/src/endpoints/common.rs @@ -4,6 +4,7 @@ use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; +#[derive(Debug)] pub enum AnyhowResponse { Error(anyhow::Error), Response(Response), diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 64775b04..bcc41b67 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -16,15 +16,15 @@ use serde::Deserialize; use crate::error::AnyhowResponse; -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, PartialEq)] #[serde(tag = "op")] pub enum Operation { - Get(ObjectKey), - Insert(Option), - Delete(ObjectKey), + Get { key: ObjectKey }, + Insert { key: Option }, + Delete { key: ObjectKey }, } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, PartialEq)] pub struct Manifest { pub operations: Vec, } @@ -122,3 +122,75 @@ where Ok(Self { manifest, inserts }) } } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + use axum::body::Body; + use axum::http::{Request, header::CONTENT_TYPE}; + use futures::StreamExt; + use objectstore_types::{ExpirationPolicy, HEADER_EXPIRATION}; + + #[tokio::test] + async fn test_valid_request_works() { + let manifest = r#"{"operations":[{"op":"Insert"},{"op":"Get","key":"abc123"},{"op":"Insert","key":"xyz789"},{"op":"Delete","key":"def456"}]}"#; + let insert1_data = b"first blob data"; + let insert2_data = b"second blob data"; + let expiration = ExpirationPolicy::TimeToLive(Duration::from_hours(1)); + let body = format!( + "--boundary\r\n\ + Content-Type: application/json\r\n\ + \r\n\ + {manifest}\r\n\ + --boundary\r\n\ + Content-Type: application/octet-stream\r\n\ + \r\n\ + {insert1}\r\n\ + --boundary\r\n\ + Content-Type: text/plain\r\n\ + {HEADER_EXPIRATION}: {expiration}\r\n\ + \r\n\ + {insert2}\r\n\ + --boundary--\r\n", + insert1 = String::from_utf8_lossy(insert1_data), + insert2 = String::from_utf8_lossy(insert2_data), + ); + + let request = Request::builder() + .header(CONTENT_TYPE, "multipart/mixed; boundary=boundary") + .body(Body::from(body)) + .unwrap(); + + let batch_request = BatchRequest::from_request(request, &()).await.unwrap(); + + let expected_manifest = Manifest { + operations: vec![ + Operation::Insert { key: None }, + Operation::Get { + key: "abc123".to_string(), + }, + Operation::Insert { + key: Some("xyz789".to_string()), + }, + Operation::Delete { + key: "def456".to_string(), + }, + ], + }; + assert_eq!(batch_request.manifest, expected_manifest); + + let inserts: Vec<_> = batch_request.inserts.collect().await; + assert_eq!(inserts.len(), 2); + + let (metadata1, bytes1) = inserts[0].as_ref().unwrap(); + assert_eq!(metadata1.content_type, "application/octet-stream"); + assert_eq!(bytes1.as_ref(), insert1_data); + + let (metadata2, bytes2) = inserts[1].as_ref().unwrap(); + assert_eq!(metadata2.content_type, "text/plain"); + assert_eq!(metadata2.expiration_policy, expiration); + assert_eq!(bytes2.as_ref(), insert2_data); + } +} From 7aa0279e2df1bd9072c5ca7761086e74e5df6c23 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 11:46:51 +0100 Subject: [PATCH 15/33] missing debug impls --- objectstore-server/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/objectstore-server/src/lib.rs b/objectstore-server/src/lib.rs index b57d655c..541d9a3d 100644 --- a/objectstore-server/src/lib.rs +++ b/objectstore-server/src/lib.rs @@ -2,6 +2,7 @@ //! //! This builds on top of the [`objectstore_service`], and exposes the underlying storage layer as //! an `HTTP` layer which can serve files directly to *external clients* and our SDK. +#![warn(missing_debug_implementations)] pub mod auth; pub mod cli; From 2b14ae632fe5f2807ecffe47fabd8858d507a910 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 11:54:14 +0100 Subject: [PATCH 16/33] rename --- objectstore-server/src/extractors/batch.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index bcc41b67..1592b076 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -97,25 +97,25 @@ where let manifest = serde_json::from_reader::<_, Manifest>(manifest.reader()) .context("failed to parse manifest")?; - let inserts = Box::pin(stream::unfold(parts, |mut m| async move { - match m.next_field().await { + let inserts = Box::pin(stream::unfold(parts, |mut parts| async move { + match parts.next_field().await { Ok(Some(field)) => { let metadata = match Metadata::from_headers(field.headers(), "") { Ok(metadata) => metadata, Err(err) => { - return Some((Err(err.into()), m)); + return Some((Err(err.into()), parts)); } }; let bytes = match field.bytes().await { Ok(bytes) => bytes, Err(err) => { - return Some((Err(err.into()), m)); + return Some((Err(err.into()), parts)); } }; - Some((Ok((metadata, bytes)), m)) + Some((Ok((metadata, bytes)), parts)) } Ok(None) => None, - Err(err) => Some((Err(err).context("failed to parse multipart part"), m)), + Err(err) => Some((Err(err).context("failed to parse multipart part"), parts)), } })); From f958828535a90ee297078c2c107f9739a94a2450 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 12:01:41 +0100 Subject: [PATCH 17/33] skill issues --- Cargo.lock | 23 ++++++++++++++++++ objectstore-server/Cargo.toml | 1 + objectstore-server/src/extractors/batch.rs | 27 ++++++---------------- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 096f1c03..4ca6ed82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -238,6 +238,28 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -2218,6 +2240,7 @@ version = "0.1.0" dependencies = [ "anyhow", "argh", + "async-stream", "axum", "axum-extra", "bytes", diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index bca89e5c..65243f24 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -13,6 +13,7 @@ publish = false [dependencies] anyhow = { workspace = true } argh = "0.1.13" +async-stream = "0.3.6" axum = { version = "0.8.4", features = ["multipart"] } axum-extra = { version = "0.12.2", features = ["multipart"] } bytes = { workspace = true } diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 1592b076..90825fc1 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -1,6 +1,7 @@ use std::fmt::Debug; use anyhow::Context; +use async_stream::try_stream; use axum::{ extract::{FromRequest, Request}, http::StatusCode, @@ -97,27 +98,13 @@ where let manifest = serde_json::from_reader::<_, Manifest>(manifest.reader()) .context("failed to parse manifest")?; - let inserts = Box::pin(stream::unfold(parts, |mut parts| async move { - match parts.next_field().await { - Ok(Some(field)) => { - let metadata = match Metadata::from_headers(field.headers(), "") { - Ok(metadata) => metadata, - Err(err) => { - return Some((Err(err.into()), parts)); - } - }; - let bytes = match field.bytes().await { - Ok(bytes) => bytes, - Err(err) => { - return Some((Err(err.into()), parts)); - } - }; - Some((Ok((metadata, bytes)), parts)) - } - Ok(None) => None, - Err(err) => Some((Err(err).context("failed to parse multipart part"), parts)), + let inserts = Box::pin(async_stream::try_stream! { + while let Some(field) = parts.next_field().await? { + let metadata = Metadata::from_headers(field.headers(), "")?; + let bytes = field.bytes().await?; + yield (metadata, bytes); } - })); + }); Ok(Self { manifest, inserts }) } From 941f53ac62cec7560197fc82b91e7121be0f3f13 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 12:24:22 +0100 Subject: [PATCH 18/33] rename all lowercase --- objectstore-server/src/extractors/batch.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 90825fc1..1d05201b 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -18,7 +18,7 @@ use serde::Deserialize; use crate::error::AnyhowResponse; #[derive(Deserialize, Debug, PartialEq)] -#[serde(tag = "op")] +#[serde(tag = "op", rename_all = "lowercase")] pub enum Operation { Get { key: ObjectKey }, Insert { key: Option }, @@ -122,7 +122,7 @@ mod tests { #[tokio::test] async fn test_valid_request_works() { - let manifest = r#"{"operations":[{"op":"Insert"},{"op":"Get","key":"abc123"},{"op":"Insert","key":"xyz789"},{"op":"Delete","key":"def456"}]}"#; + let manifest = r#"{"operations":[{"op":"insert"},{"op":"get","key":"abc123"},{"op":"insert","key":"xyz789"},{"op":"delete","key":"def456"}]}"#; let insert1_data = b"first blob data"; let insert2_data = b"second blob data"; let expiration = ExpirationPolicy::TimeToLive(Duration::from_hours(1)); From 3228609c1e2f1ee1e8aec0ec2064e09e96a7a586 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 18 Dec 2025 14:55:39 +0100 Subject: [PATCH 19/33] refactor to new structure --- objectstore-server/src/extractors/batch.rs | 194 +++++++++++++-------- objectstore-server/src/extractors/mod.rs | 2 +- 2 files changed, 123 insertions(+), 73 deletions(-) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 1d05201b..9a687124 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -1,48 +1,96 @@ -use std::fmt::Debug; +use std::{fmt::Debug, pin::Pin}; use anyhow::Context; -use async_stream::try_stream; use axum::{ extract::{FromRequest, Request}, http::StatusCode, response::IntoResponse, }; -use bytes::Buf; -use futures::stream; +use bytes::Bytes; +use futures::Stream; use http::header::CONTENT_TYPE; -use multer::Multipart; -use objectstore_service::{InsertStream, id::ObjectKey}; +use multer::Field; +use multer::{Constraints, Multipart, SizeLimit}; +use objectstore_service::id::ObjectKey; use objectstore_types::Metadata; -use serde::Deserialize; use crate::error::AnyhowResponse; -#[derive(Deserialize, Debug, PartialEq)] -#[serde(tag = "op", rename_all = "lowercase")] +#[derive(Debug)] +pub struct GetOperation { + pub key: ObjectKey, +} + +#[derive(Debug)] +pub struct InsertOperation { + pub key: Option, + pub metadata: Metadata, + pub payload: Bytes, +} + +#[derive(Debug)] +pub struct DeleteOperation { + pub key: ObjectKey, +} + +#[derive(Debug)] pub enum Operation { - Get { key: ObjectKey }, - Insert { key: Option }, - Delete { key: ObjectKey }, + Get(GetOperation), + Insert(InsertOperation), + Delete(DeleteOperation), } -#[derive(Deserialize, Debug, PartialEq)] -pub struct Manifest { - pub operations: Vec, +impl Operation { + async fn try_from_field(field: Field<'_>) -> anyhow::Result { + let kind = field + .headers() + .get(HEADER_BATCH_OPERATION_KIND) + .ok_or(anyhow::anyhow!( + "missing {HEADER_BATCH_OPERATION_KIND} header" + ))?; + let kind = kind + .to_str() + .context(format!("invalid {HEADER_BATCH_OPERATION_KIND} header"))? + .to_lowercase(); + + let key = match field.headers().get(HEADER_BATCH_OPERATION_KEY) { + Some(key) => Some(key.to_str().context("invalid object key")?.to_owned()), + None => None, + }; + + let operation = match kind.as_str() { + "get" => { + let key = key.context("missing object key for get operation")?; + Operation::Get(GetOperation { key }) + } + "insert" => Operation::Insert(InsertOperation { + key, + metadata: Metadata::from_headers(field.headers(), "")?, + payload: field.bytes().await?, + }), + "delete" => { + let key = key.context("missing object key for delet operation")?; + Operation::Delete(DeleteOperation { key }) + } + _ => anyhow::bail!("invalid {HEADER_BATCH_OPERATION_KIND} header"), + }; + Ok(operation) + } } pub struct BatchRequest { - pub manifest: Manifest, - pub inserts: InsertStream, + pub operations: Pin> + Send>>, } impl Debug for BatchRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("BatchRequest") - .field("manifest", &self.manifest) - .finish() + f.debug_struct("BatchRequest").finish() } } +pub const HEADER_BATCH_OPERATION_KIND: &str = "x-sn-batch-operation-kind"; +pub const HEADER_BATCH_OPERATION_KEY: &str = "x-sn-batch-operation-key"; + impl FromRequest for BatchRequest where S: Send + Sync, @@ -78,35 +126,28 @@ where let content_type = content_type.replace("multipart/mixed", "multipart/form-data"); let boundary = multer::parse_boundary(content_type).context("failed to parse multipart boundary")?; - let mut parts = Multipart::new(request.into_body().into_data_stream(), boundary); - - let manifest = parts - .next_field() - .await - .context("failed to parse multipart part")? - .ok_or( - ( - StatusCode::BAD_REQUEST, - "expected at least one multipart part", - ) - .into_response(), - )?; - let manifest = manifest - .bytes() - .await - .context("failed to extract manifest")?; - let manifest = serde_json::from_reader::<_, Manifest>(manifest.reader()) - .context("failed to parse manifest")?; - - let inserts = Box::pin(async_stream::try_stream! { + let mut parts = Multipart::with_constraints( + request.into_body().into_data_stream(), + boundary, + Constraints::new().size_limit( + // TODO(lcian): tentative limits that should be tested + SizeLimit::new() + .per_field(1024 * 1024) // 1 MB + .whole_stream(1024 * 1024 * 1024), // 1 GB + ), + ); + let operations = Box::pin(async_stream::try_stream! { + let mut count = 0; while let Some(field) = parts.next_field().await? { - let metadata = Metadata::from_headers(field.headers(), "")?; - let bytes = field.bytes().await?; - yield (metadata, bytes); + if count >= 1000 { + Err(anyhow::anyhow!("exceeded limit of 1000 operations per batch request"))?; + } + count += 1; + yield Operation::try_from_field(field).await?; } }); - Ok(Self { manifest, inserts }) + Ok(Self { operations }) } } @@ -122,24 +163,33 @@ mod tests { #[tokio::test] async fn test_valid_request_works() { - let manifest = r#"{"operations":[{"op":"insert"},{"op":"get","key":"abc123"},{"op":"insert","key":"xyz789"},{"op":"delete","key":"def456"}]}"#; let insert1_data = b"first blob data"; let insert2_data = b"second blob data"; let expiration = ExpirationPolicy::TimeToLive(Duration::from_hours(1)); let body = format!( "--boundary\r\n\ - Content-Type: application/json\r\n\ + {HEADER_BATCH_OPERATION_KEY}: test0\r\n\ + {HEADER_BATCH_OPERATION_KIND}: get\r\n\ + \r\n\ \r\n\ - {manifest}\r\n\ --boundary\r\n\ + {HEADER_BATCH_OPERATION_KEY}: test1\r\n\ + {HEADER_BATCH_OPERATION_KIND}: insert\r\n\ Content-Type: application/octet-stream\r\n\ \r\n\ {insert1}\r\n\ --boundary\r\n\ - Content-Type: text/plain\r\n\ + {HEADER_BATCH_OPERATION_KEY}: test2\r\n\ + {HEADER_BATCH_OPERATION_KIND}: insert\r\n\ {HEADER_EXPIRATION}: {expiration}\r\n\ + Content-Type: text/plain\r\n\ \r\n\ {insert2}\r\n\ + --boundary\r\n\ + {HEADER_BATCH_OPERATION_KEY}: test3\r\n\ + {HEADER_BATCH_OPERATION_KIND}: delete\r\n\ + \r\n\ + \r\n\ --boundary--\r\n", insert1 = String::from_utf8_lossy(insert1_data), insert2 = String::from_utf8_lossy(insert2_data), @@ -152,32 +202,32 @@ mod tests { let batch_request = BatchRequest::from_request(request, &()).await.unwrap(); - let expected_manifest = Manifest { - operations: vec![ - Operation::Insert { key: None }, - Operation::Get { - key: "abc123".to_string(), - }, - Operation::Insert { - key: Some("xyz789".to_string()), - }, - Operation::Delete { - key: "def456".to_string(), - }, - ], + let operations: Vec<_> = batch_request.operations.collect().await; + assert_eq!(operations.len(), 4); + + let Operation::Get(get_op) = &operations[0].as_ref().unwrap() else { + panic!("expected get operation"); }; - assert_eq!(batch_request.manifest, expected_manifest); + assert_eq!(get_op.key, "test0"); - let inserts: Vec<_> = batch_request.inserts.collect().await; - assert_eq!(inserts.len(), 2); + let Operation::Insert(insert_op1) = &operations[1].as_ref().unwrap() else { + panic!("expected insert operation"); + }; + assert_eq!(insert_op1.key.as_ref().unwrap(), "test1"); + assert_eq!(insert_op1.metadata.content_type, "application/octet-stream"); + assert_eq!(insert_op1.payload.as_ref(), insert1_data); - let (metadata1, bytes1) = inserts[0].as_ref().unwrap(); - assert_eq!(metadata1.content_type, "application/octet-stream"); - assert_eq!(bytes1.as_ref(), insert1_data); + let Operation::Insert(insert_op2) = &operations[2].as_ref().unwrap() else { + panic!("expected insert operation"); + }; + assert_eq!(insert_op2.key.as_ref().unwrap(), "test2"); + assert_eq!(insert_op2.metadata.content_type, "text/plain"); + assert_eq!(insert_op2.metadata.expiration_policy, expiration); + assert_eq!(insert_op2.payload.as_ref(), insert2_data); - let (metadata2, bytes2) = inserts[1].as_ref().unwrap(); - assert_eq!(metadata2.content_type, "text/plain"); - assert_eq!(metadata2.expiration_policy, expiration); - assert_eq!(bytes2.as_ref(), insert2_data); + let Operation::Delete(delete_op) = &operations[3].as_ref().unwrap() else { + panic!("expected delete operation"); + }; + assert_eq!(delete_op.key, "test3"); } } diff --git a/objectstore-server/src/extractors/mod.rs b/objectstore-server/src/extractors/mod.rs index 2bd79ed0..8cc4d34b 100644 --- a/objectstore-server/src/extractors/mod.rs +++ b/objectstore-server/src/extractors/mod.rs @@ -10,4 +10,4 @@ mod service; #[derive(Debug)] pub struct Xt(pub T); -pub use batch::{BatchRequest, Manifest, Operation}; +pub use batch::{BatchRequest, Operation, GetOperation, InsertOperation, DeleteOperation}; From 2ba690dead43d52da9b654035ee9cd537f662226 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 19 Dec 2025 13:57:31 +0100 Subject: [PATCH 20/33] improve --- objectstore-server/src/extractors/batch.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 9a687124..b7dafc74 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -7,7 +7,7 @@ use axum::{ response::IntoResponse, }; use bytes::Bytes; -use futures::Stream; +use futures::{StreamExt, stream::BoxStream}; use http::header::CONTENT_TYPE; use multer::Field; use multer::{Constraints, Multipart, SizeLimit}; @@ -79,7 +79,7 @@ impl Operation { } pub struct BatchRequest { - pub operations: Pin> + Send>>, + pub operations: BoxStream<'static, anyhow::Result>, } impl Debug for BatchRequest { @@ -136,7 +136,7 @@ where .whole_stream(1024 * 1024 * 1024), // 1 GB ), ); - let operations = Box::pin(async_stream::try_stream! { + let operations = async_stream::try_stream! { let mut count = 0; while let Some(field) = parts.next_field().await? { if count >= 1000 { @@ -145,7 +145,8 @@ where count += 1; yield Operation::try_from_field(field).await?; } - }); + } + .boxed(); Ok(Self { operations }) } From e01966b94ff181bb986c56db14acc29ed9cd0f7c Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 19 Dec 2025 14:06:49 +0100 Subject: [PATCH 21/33] wip --- objectstore-server/src/endpoints/batch.rs | 2 +- objectstore-server/src/extractors/batch.rs | 28 ++++++++++------------ 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index 3a9133a1..47448cd0 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -5,7 +5,7 @@ use axum::routing; use objectstore_service::id::ObjectContext; use crate::auth::AuthAwareService; -use crate::error::ApiResult; +use crate::endpoints::common::ApiResult; use crate::extractors::{BatchRequest, Xt}; use crate::state::ServiceState; diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index b7dafc74..51efcd5a 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -1,10 +1,10 @@ -use std::{fmt::Debug, pin::Pin}; +use std::fmt::Debug; use anyhow::Context; use axum::{ extract::{FromRequest, Request}, http::StatusCode, - response::IntoResponse, + response::{IntoResponse, Response}, }; use bytes::Bytes; use futures::{StreamExt, stream::BoxStream}; @@ -14,8 +14,6 @@ use multer::{Constraints, Multipart, SizeLimit}; use objectstore_service::id::ObjectKey; use objectstore_types::Metadata; -use crate::error::AnyhowResponse; - #[derive(Debug)] pub struct GetOperation { pub key: ObjectKey, @@ -95,7 +93,7 @@ impl FromRequest for BatchRequest where S: Send + Sync, { - type Rejection = AnyhowResponse; + type Rejection = Response; async fn from_request(request: Request, _: &S) -> Result { let Some(content_type) = request @@ -103,29 +101,29 @@ where .get(CONTENT_TYPE) .and_then(|ct| ct.to_str().ok()) else { - return Err((StatusCode::BAD_REQUEST, "expected valid Content-Type") - .into_response() - .into()); + return Err((StatusCode::BAD_REQUEST, "expected valid Content-Type").into_response()); }; let Ok(mime) = content_type.parse::() else { - return Err((StatusCode::BAD_REQUEST, "expected valid Content-Type") - .into_response() - .into()); + return Err((StatusCode::BAD_REQUEST, "expected valid Content-Type").into_response()); }; if !(mime.type_() == mime::MULTIPART && mime.subtype() == "mixed") { return Err(( StatusCode::BAD_REQUEST, "expected Content-Type: multipart/mixed", ) - .into_response() - .into()); + .into_response()); } // XXX: `multer::parse_boundary` requires the content-type to be `multipart/form-data` let content_type = content_type.replace("multipart/mixed", "multipart/form-data"); - let boundary = - multer::parse_boundary(content_type).context("failed to parse multipart boundary")?; + let Ok(boundary) = multer::parse_boundary(content_type) else { + return Err(( + StatusCode::BAD_REQUEST, + "failed to parse multipart boundary", + ) + .into_response()); + }; let mut parts = Multipart::with_constraints( request.into_body().into_data_stream(), boundary, From ddab6cbbc8be514022b0ba79521e594bb2ba1ce8 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 19 Dec 2025 14:08:44 +0100 Subject: [PATCH 22/33] simplify --- objectstore-server/src/auth/service.rs | 13 +------------ objectstore-service/src/lib.rs | 17 ----------------- 2 files changed, 1 insertion(+), 29 deletions(-) diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index 373a385b..c9074281 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -1,7 +1,6 @@ use objectstore_service::id::{ObjectContext, ObjectId}; use objectstore_service::{ - BatchInsertResult, DeleteResult, GetResult, InsertResult, InsertStream, PayloadStream, - StorageService, + DeleteResult, GetResult, InsertResult, InsertStream, PayloadStream, StorageService, }; use objectstore_types::{Metadata, Permission}; @@ -76,14 +75,4 @@ impl AuthAwareService { self.assert_authorized(Permission::ObjectDelete, id.context())?; self.service.delete_object(id).await } - - /// Auth-aware wrapper around [`StorageService::insert_objects`]. - pub async fn insert_objects( - &self, - context: ObjectContext, - inserts: InsertStream, - ) -> BatchInsertResult { - self.assert_authorized(Permission::ObjectWrite, &context)?; - self.service.insert_objects(context, inserts).await - } } diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 838be53b..35374560 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -10,13 +10,11 @@ mod backend; pub mod id; use std::path::Path; -use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Instant; use bytes::{Bytes, BytesMut}; -use futures_util::Stream; use futures_util::{StreamExt, TryStreamExt, stream::BoxStream}; use objectstore_types::Metadata; @@ -95,12 +93,6 @@ pub type InsertResult = anyhow::Result; /// Result type for delete operations. pub type DeleteResult = anyhow::Result<()>; -/// Type alias to represent a stream of insert operations. -pub type InsertStream = - Pin> + Send>>; -/// Result type for batch insert operations. -pub type BatchInsertResult = anyhow::Result>; - impl StorageService { /// Creates a new `StorageService` with the specified configuration. pub async fn new( @@ -289,15 +281,6 @@ impl StorageService { Ok(()) } - - /// TODO - pub async fn insert_objects( - &self, - _context: ObjectContext, - _inserts: InsertStream, - ) -> BatchInsertResult { - todo!(); - } } fn is_tombstoned(result: &Option<(Metadata, PayloadStream)>) -> bool { From c67fb7fa0cbf4c11092e8d062e6e236bd155c333 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 19 Dec 2025 14:10:48 +0100 Subject: [PATCH 23/33] improve --- objectstore-service/src/lib.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 35374560..98cbdb33 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -119,7 +119,7 @@ impl StorageService { key: Option, metadata: &Metadata, mut stream: PayloadStream, - ) -> anyhow::Result { + ) -> InsertResult { let start = Instant::now(); let mut first_chunk = BytesMut::new(); @@ -224,10 +224,7 @@ impl StorageService { } /// Streams the contents of an object stored at the given key. - pub async fn get_object( - &self, - id: &ObjectId, - ) -> anyhow::Result> { + pub async fn get_object(&self, id: &ObjectId) -> GetResult { let start = Instant::now(); let mut backend_choice = "high-volume"; @@ -264,7 +261,7 @@ impl StorageService { } /// Deletes an object stored at the given key, if it exists. - pub async fn delete_object(&self, id: &ObjectId) -> anyhow::Result<()> { + pub async fn delete_object(&self, id: &ObjectId) -> DeleteResult { let start = Instant::now(); if let Some((metadata, _stream)) = self.0.high_volume_backend.get_object(id).await? { From dbee8861f1aeab96fb48c30ce6f25ac26f947a39 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 19 Dec 2025 14:12:09 +0100 Subject: [PATCH 24/33] lint --- objectstore-server/src/auth/service.rs | 4 +--- objectstore-server/src/extractors/mod.rs | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index c9074281..f6474982 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -1,7 +1,5 @@ use objectstore_service::id::{ObjectContext, ObjectId}; -use objectstore_service::{ - DeleteResult, GetResult, InsertResult, InsertStream, PayloadStream, StorageService, -}; +use objectstore_service::{DeleteResult, GetResult, InsertResult, PayloadStream, StorageService}; use objectstore_types::{Metadata, Permission}; use crate::auth::AuthContext; diff --git a/objectstore-server/src/extractors/mod.rs b/objectstore-server/src/extractors/mod.rs index 8cc4d34b..839463f0 100644 --- a/objectstore-server/src/extractors/mod.rs +++ b/objectstore-server/src/extractors/mod.rs @@ -10,4 +10,4 @@ mod service; #[derive(Debug)] pub struct Xt(pub T); -pub use batch::{BatchRequest, Operation, GetOperation, InsertOperation, DeleteOperation}; +pub use batch::{BatchRequest, DeleteOperation, GetOperation, InsertOperation, Operation}; From ea0df2cdd791ab18dab3f048b4423a52175a2690 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 15 Jan 2026 12:42:06 +0100 Subject: [PATCH 25/33] wip --- objectstore-server/src/endpoints/common.rs | 83 ++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/objectstore-server/src/endpoints/common.rs b/objectstore-server/src/endpoints/common.rs index e69de29b..2306ca4d 100644 --- a/objectstore-server/src/endpoints/common.rs +++ b/objectstore-server/src/endpoints/common.rs @@ -0,0 +1,83 @@ +//! Common types and utilities for API endpoints. + +use std::error::Error; + +use axum::Json; +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use objectstore_service::ServiceError; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +use crate::auth::AuthError; + +/// Error type for API operations. +#[derive(Debug, Error)] +pub enum ApiError { + /// Errors indicating malformed or illegal requests. + #[error("client error: {0}")] + Client(String), + + /// Authorization/authentication errors. + #[error("auth error: {0}")] + Auth(#[from] AuthError), + + /// Service errors, indicating that something went wrong when receiving or executing a request. + #[error("service error: {0}")] + Service(#[from] ServiceError), +} + +/// Result type for API operations. +pub type ApiResult = Result; + +/// A JSON error response returned by the API. +#[derive(Serialize, Deserialize, Debug)] +pub struct ApiErrorResponse { + /// The main error message. + #[serde(default)] + detail: Option, + /// Chain of error causes. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + causes: Vec, +} + +impl ApiErrorResponse { + /// Creates an error response from an error, extracting the full cause chain. + pub fn from_error(error: &E) -> Self { + let detail = Some(error.to_string()); + + let mut causes = Vec::new(); + let mut source = error.source(); + while let Some(s) = source { + causes.push(s.to_string()); + source = s.source(); + } + + Self { detail, causes } + } +} + +impl IntoResponse for ApiError { + fn into_response(self) -> Response { + let status = match &self { + ApiError::Client(_) => StatusCode::BAD_REQUEST, + + ApiError::Auth(AuthError::BadRequest(_)) => StatusCode::BAD_REQUEST, + ApiError::Auth(AuthError::ValidationFailure(_)) + | ApiError::Auth(AuthError::VerificationFailure) => StatusCode::UNAUTHORIZED, + ApiError::Auth(AuthError::NotPermitted) => StatusCode::FORBIDDEN, + ApiError::Auth(AuthError::InternalError(_)) => { + tracing::error!(error = &self as &dyn Error, "auth system error"); + StatusCode::INTERNAL_SERVER_ERROR + } + + ApiError::Service(_) => { + tracing::error!(error = &self as &dyn Error, "error handling request"); + StatusCode::INTERNAL_SERVER_ERROR + } + }; + + let body = ApiErrorResponse::from_error(&self); + (status, Json(body)).into_response() + } +} From be27f6abe05e971b3375669d0526b373ee8b5afd Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 15 Jan 2026 12:42:53 +0100 Subject: [PATCH 26/33] wip --- objectstore-server/src/extractors/batch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 51efcd5a..36823416 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -67,7 +67,7 @@ impl Operation { payload: field.bytes().await?, }), "delete" => { - let key = key.context("missing object key for delet operation")?; + let key = key.context("missing object key for delete operation")?; Operation::Delete(DeleteOperation { key }) } _ => anyhow::bail!("invalid {HEADER_BATCH_OPERATION_KIND} header"), From 82975c19b1db92c51932284616b666a25e86dc04 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 15 Jan 2026 12:59:26 +0100 Subject: [PATCH 27/33] wip don't push --- Cargo.lock | 1 - objectstore-server/Cargo.toml | 1 - objectstore-server/src/extractors/batch.rs | 53 ++++------------------ 3 files changed, 8 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 04b8202f..cbfc6cd0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2256,7 +2256,6 @@ dependencies = [ "merni", "mimalloc", "mime", - "multer", "nix", "num_cpus", "objectstore-service", diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index f49b124a..f27b52d3 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -29,7 +29,6 @@ jsonwebtoken = { workspace = true } merni = { workspace = true } mimalloc = { workspace = true } mime = "0.3.17" -multer = "3.1.0" num_cpus = "1.17.0" objectstore-service = { workspace = true } objectstore-types = { workspace = true } diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 36823416..c26128f1 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -2,15 +2,12 @@ use std::fmt::Debug; use anyhow::Context; use axum::{ - extract::{FromRequest, Request}, + extract::{FromRequest, Multipart, Request, multipart::Field}, http::StatusCode, response::{IntoResponse, Response}, }; use bytes::Bytes; use futures::{StreamExt, stream::BoxStream}; -use http::header::CONTENT_TYPE; -use multer::Field; -use multer::{Constraints, Multipart, SizeLimit}; use objectstore_service::id::ObjectKey; use objectstore_types::Metadata; @@ -64,7 +61,7 @@ impl Operation { "insert" => Operation::Insert(InsertOperation { key, metadata: Metadata::from_headers(field.headers(), "")?, - payload: field.bytes().await?, + payload: field.bytes().await.map_err(|e| anyhow::anyhow!("{e}"))?, }), "delete" => { let key = key.context("missing object key for delete operation")?; @@ -95,48 +92,14 @@ where { type Rejection = Response; - async fn from_request(request: Request, _: &S) -> Result { - let Some(content_type) = request - .headers() - .get(CONTENT_TYPE) - .and_then(|ct| ct.to_str().ok()) - else { - return Err((StatusCode::BAD_REQUEST, "expected valid Content-Type").into_response()); - }; + async fn from_request(request: Request, state: &S) -> Result { + let mut multipart = Multipart::from_request(request, state) + .await + .map_err(|e| (StatusCode::BAD_REQUEST, e.body_text()).into_response())?; - let Ok(mime) = content_type.parse::() else { - return Err((StatusCode::BAD_REQUEST, "expected valid Content-Type").into_response()); - }; - if !(mime.type_() == mime::MULTIPART && mime.subtype() == "mixed") { - return Err(( - StatusCode::BAD_REQUEST, - "expected Content-Type: multipart/mixed", - ) - .into_response()); - } - - // XXX: `multer::parse_boundary` requires the content-type to be `multipart/form-data` - let content_type = content_type.replace("multipart/mixed", "multipart/form-data"); - let Ok(boundary) = multer::parse_boundary(content_type) else { - return Err(( - StatusCode::BAD_REQUEST, - "failed to parse multipart boundary", - ) - .into_response()); - }; - let mut parts = Multipart::with_constraints( - request.into_body().into_data_stream(), - boundary, - Constraints::new().size_limit( - // TODO(lcian): tentative limits that should be tested - SizeLimit::new() - .per_field(1024 * 1024) // 1 MB - .whole_stream(1024 * 1024 * 1024), // 1 GB - ), - ); let operations = async_stream::try_stream! { let mut count = 0; - while let Some(field) = parts.next_field().await? { + while let Some(field) = multipart.next_field().await.map_err(|e| anyhow::anyhow!("{e}"))? { if count >= 1000 { Err(anyhow::anyhow!("exceeded limit of 1000 operations per batch request"))?; } @@ -195,7 +158,7 @@ mod tests { ); let request = Request::builder() - .header(CONTENT_TYPE, "multipart/mixed; boundary=boundary") + .header(CONTENT_TYPE, "multipart/form-data; boundary=boundary") .body(Body::from(body)) .unwrap(); From c22837f52450fef8db6f4cb690a5589524ea08c7 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 15 Jan 2026 15:35:56 +0100 Subject: [PATCH 28/33] wip --- objectstore-server/src/endpoints/batch.rs | 7 ++++++- objectstore-server/src/extractors/batch.rs | 24 +++++++++++++++------- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index 47448cd0..f00b6669 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -1,4 +1,5 @@ use axum::Router; +use axum::extract::DefaultBodyLimit; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use axum::routing; @@ -9,8 +10,12 @@ use crate::endpoints::common::ApiResult; use crate::extractors::{BatchRequest, Xt}; use crate::state::ServiceState; +const MAX_BODY_SIZE: usize = 1024 * 1024 * 1024; // 1 GB + pub fn router() -> Router { - Router::new().route("/objects:batch/{usecase}/{scopes}/", routing::post(batch)) + Router::new() + .route("/objects:batch/{usecase}/{scopes}/", routing::post(batch)) + .layer(DefaultBodyLimit::max(MAX_BODY_SIZE)) } async fn batch( diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index c26128f1..2dc19763 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -58,11 +58,18 @@ impl Operation { let key = key.context("missing object key for get operation")?; Operation::Get(GetOperation { key }) } - "insert" => Operation::Insert(InsertOperation { - key, - metadata: Metadata::from_headers(field.headers(), "")?, - payload: field.bytes().await.map_err(|e| anyhow::anyhow!("{e}"))?, - }), + "insert" => { + let metadata = Metadata::from_headers(field.headers(), "")?; + let payload = field.bytes().await.map_err(|e| anyhow::anyhow!("{e}"))?; + if payload.len() > MAX_FIELD_SIZE { + anyhow::bail!("field size exceeds {MAX_FIELD_SIZE} bytes limit"); + } + Operation::Insert(InsertOperation { + key, + metadata, + payload, + }) + } "delete" => { let key = key.context("missing object key for delete operation")?; Operation::Delete(DeleteOperation { key }) @@ -86,6 +93,9 @@ impl Debug for BatchRequest { pub const HEADER_BATCH_OPERATION_KIND: &str = "x-sn-batch-operation-kind"; pub const HEADER_BATCH_OPERATION_KEY: &str = "x-sn-batch-operation-key"; +const MAX_FIELD_SIZE: usize = 1024 * 1024; // 1 MB +const MAX_OPERATIONS: usize = 1000; + impl FromRequest for BatchRequest where S: Send + Sync, @@ -100,8 +110,8 @@ where let operations = async_stream::try_stream! { let mut count = 0; while let Some(field) = multipart.next_field().await.map_err(|e| anyhow::anyhow!("{e}"))? { - if count >= 1000 { - Err(anyhow::anyhow!("exceeded limit of 1000 operations per batch request"))?; + if count >= MAX_OPERATIONS { + Err(anyhow::anyhow!("exceeded limit of {MAX_OPERATIONS} operations per batch request"))?; } count += 1; yield Operation::try_from_field(field).await?; From 026073d97c78f0e06c3d84fe117d1571a98d57eb Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 16 Jan 2026 12:51:05 +0100 Subject: [PATCH 29/33] improve --- Cargo.lock | 9 +-- objectstore-server/Cargo.toml | 2 +- objectstore-server/src/auth/service.rs | 2 +- objectstore-server/src/endpoints/batch.rs | 1 + objectstore-server/src/extractors/batch.rs | 89 ++++++++++++++++------ objectstore-server/src/extractors/mod.rs | 4 +- 6 files changed, 76 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cbfc6cd0..5a1dbd7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -370,22 +370,21 @@ dependencies = [ [[package]] name = "axum-extra" -version = "0.12.2" +version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbfe9f610fe4e99cf0cfcd03ccf8c63c28c616fe714d80475ef731f3b13dd21b" +checksum = "9963ff19f40c6102c76756ef0a46004c0d58957d87259fc9208ff8441c12ab96" dependencies = [ "axum", "axum-core", "bytes", - "fastrand", - "futures-core", "futures-util", "http 1.3.1", "http-body", "http-body-util", "mime", - "multer", "pin-project-lite", + "rustversion", + "serde_core", "tower-layer", "tower-service", "tracing", diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index f27b52d3..8bc8538b 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -15,7 +15,7 @@ anyhow = { workspace = true } argh = "0.1.13" async-stream = "0.3.6" axum = { version = "0.8.4", features = ["multipart"] } -axum-extra = { version = "0.12.2", features = ["multipart"] } +axum-extra = "0.10.1" bytes = { workspace = true } console = "0.16.1" elegant-departure = { version = "0.3.2", features = ["tokio"] } diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index d925fc57..8c9ce1cc 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -1,5 +1,5 @@ use objectstore_service::id::{ObjectContext, ObjectId}; -use objectstore_service::{DeleteResult, GetResult, InsertResult, PayloadStream, StorageService}; +use objectstore_service::{PayloadStream, StorageService}; use objectstore_types::{Metadata, Permission}; use crate::auth::AuthContext; diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index f00b6669..54e5aa5d 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -15,6 +15,7 @@ const MAX_BODY_SIZE: usize = 1024 * 1024 * 1024; // 1 GB pub fn router() -> Router { Router::new() .route("/objects:batch/{usecase}/{scopes}/", routing::post(batch)) + // Enforced by https://github.com/tokio-rs/axum/blob/4404f27cea206b0dca63637b1c76dff23772a5cc/axum/src/extract/multipart.rs#L78 .layer(DefaultBodyLimit::max(MAX_BODY_SIZE)) } diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 2dc19763..ea20b26d 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -1,15 +1,36 @@ use std::fmt::Debug; -use anyhow::Context; -use axum::{ - extract::{FromRequest, Multipart, Request, multipart::Field}, - http::StatusCode, - response::{IntoResponse, Response}, +use axum::extract::{ + FromRequest, Multipart, Request, + multipart::{Field, MultipartError, MultipartRejection}, }; use bytes::Bytes; use futures::{StreamExt, stream::BoxStream}; use objectstore_service::id::ObjectKey; use objectstore_types::Metadata; +use thiserror::Error; + +use crate::endpoints::common::ApiError; + +/// Errors that can occur when processing or executing batch operations. +#[derive(Debug, Error)] +pub enum BatchError { + /// Malformed request. + #[error("bad request: {0}")] + BadRequest(String), + + /// Errors in parsing or reading a multipart request body. + #[error("multipart error: {0}")] + Multipart(#[from] MultipartError), + + /// Errors related to de/serialization and parsing of object metadata. + #[error("metadata error: {0}")] + Metadata(#[from] objectstore_types::Error), + + /// Size or cardinality limit exceeded. + #[error("batch limit exceeded: {0}")] + LimitExceeded(String), +} #[derive(Debug)] pub struct GetOperation { @@ -36,33 +57,49 @@ pub enum Operation { } impl Operation { - async fn try_from_field(field: Field<'_>) -> anyhow::Result { + async fn try_from_field(field: Field<'_>) -> Result { let kind = field .headers() .get(HEADER_BATCH_OPERATION_KIND) - .ok_or(anyhow::anyhow!( - "missing {HEADER_BATCH_OPERATION_KIND} header" - ))?; + .ok_or_else(|| { + BatchError::BadRequest(format!("missing {HEADER_BATCH_OPERATION_KIND} header")) + })?; let kind = kind .to_str() - .context(format!("invalid {HEADER_BATCH_OPERATION_KIND} header"))? + .map_err(|_| { + BatchError::BadRequest(format!( + "unable to convert {HEADER_BATCH_OPERATION_KIND} header value to string" + )) + })? .to_lowercase(); let key = match field.headers().get(HEADER_BATCH_OPERATION_KEY) { - Some(key) => Some(key.to_str().context("invalid object key")?.to_owned()), + Some(key) => Some( + key.to_str() + .map_err(|_| { + BatchError::BadRequest(format!( + "unable to convert {HEADER_BATCH_OPERATION_KEY} header value to string" + )) + })? + .to_owned(), + ), None => None, }; let operation = match kind.as_str() { "get" => { - let key = key.context("missing object key for get operation")?; + let key = key.ok_or_else(|| { + BatchError::BadRequest("missing object key for get operation".to_string()) + })?; Operation::Get(GetOperation { key }) } "insert" => { let metadata = Metadata::from_headers(field.headers(), "")?; - let payload = field.bytes().await.map_err(|e| anyhow::anyhow!("{e}"))?; + let payload = field.bytes().await?; if payload.len() > MAX_FIELD_SIZE { - anyhow::bail!("field size exceeds {MAX_FIELD_SIZE} bytes limit"); + return Err(BatchError::LimitExceeded(format!( + "individual request in batch exceeds body size limit of {MAX_FIELD_SIZE} bytes" + ))); } Operation::Insert(InsertOperation { key, @@ -71,17 +108,23 @@ impl Operation { }) } "delete" => { - let key = key.context("missing object key for delete operation")?; + let key = key.ok_or_else(|| { + BatchError::BadRequest("missing object key for delete operation".to_string()) + })?; Operation::Delete(DeleteOperation { key }) } - _ => anyhow::bail!("invalid {HEADER_BATCH_OPERATION_KIND} header"), + _ => { + return Err(BatchError::BadRequest(format!( + "invalid operation kind: {kind}" + ))); + } }; Ok(operation) } } pub struct BatchRequest { - pub operations: BoxStream<'static, anyhow::Result>, + pub operations: BoxStream<'static, Result>, } impl Debug for BatchRequest { @@ -100,18 +143,18 @@ impl FromRequest for BatchRequest where S: Send + Sync, { - type Rejection = Response; + type Rejection = MultipartRejection; async fn from_request(request: Request, state: &S) -> Result { - let mut multipart = Multipart::from_request(request, state) - .await - .map_err(|e| (StatusCode::BAD_REQUEST, e.body_text()).into_response())?; + let mut multipart = Multipart::from_request(request, state).await?; let operations = async_stream::try_stream! { let mut count = 0; - while let Some(field) = multipart.next_field().await.map_err(|e| anyhow::anyhow!("{e}"))? { + while let Some(field) = multipart.next_field().await? { if count >= MAX_OPERATIONS { - Err(anyhow::anyhow!("exceeded limit of {MAX_OPERATIONS} operations per batch request"))?; + Err(BatchError::LimitExceeded(format!( + "exceeded {MAX_OPERATIONS} operations per batch request" + )))?; } count += 1; yield Operation::try_from_field(field).await?; diff --git a/objectstore-server/src/extractors/mod.rs b/objectstore-server/src/extractors/mod.rs index 839463f0..cdf76231 100644 --- a/objectstore-server/src/extractors/mod.rs +++ b/objectstore-server/src/extractors/mod.rs @@ -10,4 +10,6 @@ mod service; #[derive(Debug)] pub struct Xt(pub T); -pub use batch::{BatchRequest, DeleteOperation, GetOperation, InsertOperation, Operation}; +pub use batch::{ + BatchError, BatchRequest, DeleteOperation, GetOperation, InsertOperation, Operation, +}; From 23fff9fc8e712ceff7744a58007195477253a7b4 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 16 Jan 2026 12:51:16 +0100 Subject: [PATCH 30/33] clippy --- objectstore-server/src/extractors/batch.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index ea20b26d..dbb144b9 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -10,7 +10,6 @@ use objectstore_service::id::ObjectKey; use objectstore_types::Metadata; use thiserror::Error; -use crate::endpoints::common::ApiError; /// Errors that can occur when processing or executing batch operations. #[derive(Debug, Error)] From 2eebc3ea87e1213858a60158dff5efdec96a6238 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 16 Jan 2026 13:02:20 +0100 Subject: [PATCH 31/33] remove deps --- Cargo.lock | 2 -- objectstore-server/Cargo.toml | 2 -- 2 files changed, 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5a1dbd7c..0dd83c64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2248,13 +2248,11 @@ dependencies = [ "figment", "futures", "futures-util", - "http 1.3.1", "humantime", "humantime-serde", "jsonwebtoken", "merni", "mimalloc", - "mime", "nix", "num_cpus", "objectstore-service", diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index 8bc8538b..064c88d0 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -22,13 +22,11 @@ elegant-departure = { version = "0.3.2", features = ["tokio"] } figment = { version = "0.10.19", features = ["env", "test", "yaml"] } futures = { workspace = true } futures-util = { workspace = true } -http = { workspace = true } humantime = { workspace = true } humantime-serde = { workspace = true } jsonwebtoken = { workspace = true } merni = { workspace = true } mimalloc = { workspace = true } -mime = "0.3.17" num_cpus = "1.17.0" objectstore-service = { workspace = true } objectstore-types = { workspace = true } From 3560858a0f4907f2111474ef7959d9ba67a0102f Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 16 Jan 2026 13:09:14 +0100 Subject: [PATCH 32/33] tests --- objectstore-server/src/extractors/batch.rs | 55 +++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index dbb144b9..f1632d64 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -10,7 +10,6 @@ use objectstore_service::id::ObjectKey; use objectstore_types::Metadata; use thiserror::Error; - /// Errors that can occur when processing or executing batch operations. #[derive(Debug, Error)] pub enum BatchError { @@ -244,4 +243,58 @@ mod tests { }; assert_eq!(delete_op.key, "test3"); } + + #[tokio::test] + async fn test_max_operations_limit_enforced() { + let mut body = String::new(); + for i in 0..(MAX_OPERATIONS + 1) { + body.push_str(&format!( + "--boundary\r\n\ + {HEADER_BATCH_OPERATION_KEY}: test{i}\r\n\ + {HEADER_BATCH_OPERATION_KIND}: get\r\n\ + \r\n\ + \r\n" + )); + } + body.push_str("--boundary--\r\n"); + + let request = Request::builder() + .header(CONTENT_TYPE, "multipart/form-data; boundary=boundary") + .body(Body::from(body)) + .unwrap(); + + let batch_request = BatchRequest::from_request(request, &()).await.unwrap(); + let operations: Vec<_> = batch_request.operations.collect().await; + + assert_eq!(operations.len(), MAX_OPERATIONS + 1); + matches!( + &operations[MAX_OPERATIONS], + Err(BatchError::LimitExceeded(_)) + ); + } + + #[tokio::test] + async fn test_operation_body_size_limit_enforced() { + let large_payload = "x".repeat(MAX_FIELD_SIZE + 1); + let body = format!( + "--boundary\r\n\ + {HEADER_BATCH_OPERATION_KEY}: test\r\n\ + {HEADER_BATCH_OPERATION_KIND}: insert\r\n\ + Content-Type: application/octet-stream\r\n\ + \r\n\ + {large_payload}\r\n\ + --boundary--\r\n", + ); + + let request = Request::builder() + .header(CONTENT_TYPE, "multipart/form-data; boundary=boundary") + .body(Body::from(body)) + .unwrap(); + + let batch_request = BatchRequest::from_request(request, &()).await.unwrap(); + let operations: Vec<_> = batch_request.operations.collect().await; + + assert_eq!(operations.len(), 1); + assert!(matches!(&operations[0], Err(BatchError::LimitExceeded(_)))); + } } From 0cb5d9c2276324ceb7199f5500efe2c2a9dee436 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 19 Jan 2026 11:09:14 +0100 Subject: [PATCH 33/33] improve --- objectstore-service/src/lib.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 0897f101..0fd1a021 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -88,13 +88,6 @@ pub enum StorageConfig<'a> { }, } -/// Result type for get operations. -pub type GetResult = anyhow::Result>; -/// Result type for insert operations. -pub type InsertResult = anyhow::Result; -/// Result type for delete operations. -pub type DeleteResult = anyhow::Result<()>; - impl StorageService { /// Creates a new `StorageService` with the specified configuration. pub async fn new(