Async boundaries in Sail
Sail is async from end to end. Spark Connect is a gRPC protocol, and every request flows through Tonic, the Sail session manager, and the Tokio actor system. Reading one real handler teaches you most of how production async Rust composes.
A real gRPC handler
This is the entry point for "execute this Spark plan":
#[tonic::async_trait]
impl SparkConnectService for SparkConnectServer {
type ExecutePlanStream = ExecutePlanResponseStream;
async fn execute_plan(
&self,
request: Request<ExecutePlanRequest>,
) -> Result<Response<Self::ExecutePlanStream>, Status> {
let request = request.into_inner();
debug!("{request:?}");
let session_id = request.session_id;
let user_id = request.user_context.map(|u| u.user_id).unwrap_or_default();
let metadata = ExecutorMetadata {
operation_id: request
.operation_id
.unwrap_or_else(|| Uuid::new_v4().to_string()),
tags: request.tags,
reattachable: is_reattachable(&request.request_options),
};
let ctx = self
.session_manager
.get_or_create_session_context(session_id, user_id)
.await
.map_err(SparkError::from)?;
let Plan { op_type: op } = request.plan.required("plan")?;
let op = op.required("plan op")?;
let stream = match op {
plan::OpType::Root(relation) => {
service::handle_execute_relation(&ctx, relation, metadata).await?
}
plan::OpType::Command(Command { command_type: command }) => {
let command = command.required("command")?;
handle_command(&ctx, command, metadata).await?
}
plan::OpType::CompressedOperation(_) => {
return Err(Status::unimplemented("compressed operation plan"));
}
};
Ok(Response::new(stream))
}
}Walk through it:
#[tonic::async_trait]lets the trait have async methods that work with dynamic dispatch (Tonic uses object-safe traits internally).request.into_inner()unwraps the gRPC envelope to get the actualExecutePlanRequest.- Optional fields and defaults. Protobuf 3 fields are optional in spirit; the code uses
.unwrap_or_default()and.unwrap_or_else(|| Uuid::new_v4().to_string())to provide deterministic defaults. .await.map_err(SparkError::from)?— every async call propagates its error through?. The.map_errexists where#[from]could not be derived (becauseSparkError::frommay add context).- Custom helper
.required("plan")is a Sail extension method that turnsOption<T>intoErr(SparkError::MissingField("plan")). This is how the codebase keepsunwrap_used = "deny"working at protobuf boundaries. matchon the request variant, dispatching to one of three handlers. The fallback returnsStatus::unimplemented(...)rather than panicking on unknown input. That is the contract a gRPC server makes with its callers.
What this teaches about async
Async generics: retry with a Future-returning closure
The Sail server has a generic retry primitive. It teaches you how to take "an async thing" as a parameter:
impl RetryStrategy {
pub async fn run<F, Fut, T, E>(&self, mut f: F) -> Result<T, E>
where
F: FnMut() -> Fut + Send,
Fut: Future<Output = Result<T, E>> + Send,
T: Send + 'static,
E: std::fmt::Display + Send + 'static,
{
let mut delay = self.delay();
let mut attempt = 0;
loop {
let result = f().await;
match result {
x @ Ok(_) => return x,
Err(e) => {
warn!("retryable operation failed: {e}");
if let Some(delay) = delay.next() {
tokio::time::sleep(delay).await;
} else {
return Err(e);
}
}
}
attempt += 1;
}
}
}The bound block is the lesson:
where
F: FnMut() -> Fut + Send,
Fut: Future<Output = Result<T, E>> + Send,
T: Send + 'static,
E: std::fmt::Display + Send + 'static,Translation in plain English:
Fis a closure I can call multiple times (FnMut). It returns "some future" (Fut).- That future, when polled, yields a
Result<T, E>. TandEare sendable across threads.FandFutare also sendable, so the wholerunfuture isSend-safe to spawn.
This is the canonical "I want a function that takes an async callable" signature. Memorize it. When agents propose retry, circuit breaker, or rate limiter primitives, this is the shape they should produce.
Common async failure modes (what Sail does not do)
Things to grep for in an agent's async PR that Sail's handler does not exhibit:
| Failure mode | What it looks like |
|---|---|
std::sync::Mutex held across .await | let g = mutex.lock().unwrap(); something.await; ... |
Hand-rolled Future impl | impl Future for MyThing { ... poll ... } when async fn would work |
tokio::spawn with no JoinHandle | tokio::spawn(work); — fire and forget loses error visibility |
block_on inside async code | tokio::runtime::Handle::current().block_on(...) — deadlocks the executor |
Missing Send bound on async fn that gets spawned | Compiles standalone, fails at spawn site |
A clean handler like the one above has none of these. Use it as your template when reviewing agent-written async code.