Async boundaries in Sail

Sail is async from end to end. Spark Connect is a gRPC protocol, and every request flows through Tonic, the Sail session manager, and the Tokio actor system. Reading one real handler teaches you most of how production async Rust composes.

A real gRPC handler

This is the entry point for "execute this Spark plan":

crates/sail-spark-connect/src/server.rs
#[tonic::async_trait]
impl SparkConnectService for SparkConnectServer {
    type ExecutePlanStream = ExecutePlanResponseStream;
 
    async fn execute_plan(
        &self,
        request: Request<ExecutePlanRequest>,
    ) -> Result<Response<Self::ExecutePlanStream>, Status> {
        let request = request.into_inner();
        debug!("{request:?}");
        let session_id = request.session_id;
        let user_id = request.user_context.map(|u| u.user_id).unwrap_or_default();
        let metadata = ExecutorMetadata {
            operation_id: request
                .operation_id
                .unwrap_or_else(|| Uuid::new_v4().to_string()),
            tags: request.tags,
            reattachable: is_reattachable(&request.request_options),
        };
        let ctx = self
            .session_manager
            .get_or_create_session_context(session_id, user_id)
            .await
            .map_err(SparkError::from)?;
        let Plan { op_type: op } = request.plan.required("plan")?;
        let op = op.required("plan op")?;
        let stream = match op {
            plan::OpType::Root(relation) => {
                service::handle_execute_relation(&ctx, relation, metadata).await?
            }
            plan::OpType::Command(Command { command_type: command }) => {
                let command = command.required("command")?;
                handle_command(&ctx, command, metadata).await?
            }
            plan::OpType::CompressedOperation(_) => {
                return Err(Status::unimplemented("compressed operation plan"));
            }
        };
        Ok(Response::new(stream))
    }
}

Walk through it:

  1. #[tonic::async_trait] lets the trait have async methods that work with dynamic dispatch (Tonic uses object-safe traits internally).
  2. request.into_inner() unwraps the gRPC envelope to get the actual ExecutePlanRequest.
  3. Optional fields and defaults. Protobuf 3 fields are optional in spirit; the code uses .unwrap_or_default() and .unwrap_or_else(|| Uuid::new_v4().to_string()) to provide deterministic defaults.
  4. .await.map_err(SparkError::from)? — every async call propagates its error through ?. The .map_err exists where #[from] could not be derived (because SparkError::from may add context).
  5. Custom helper .required("plan") is a Sail extension method that turns Option<T> into Err(SparkError::MissingField("plan")). This is how the codebase keeps unwrap_used = "deny" working at protobuf boundaries.
  6. match on the request variant, dispatching to one of three handlers. The fallback returns Status::unimplemented(...) rather than panicking on unknown input. That is the contract a gRPC server makes with its callers.

What this teaches about async

Async generics: retry with a Future-returning closure

The Sail server has a generic retry primitive. It teaches you how to take "an async thing" as a parameter:

crates/sail-server/src/retry.rs
impl RetryStrategy {
    pub async fn run<F, Fut, T, E>(&self, mut f: F) -> Result<T, E>
    where
        F: FnMut() -> Fut + Send,
        Fut: Future<Output = Result<T, E>> + Send,
        T: Send + 'static,
        E: std::fmt::Display + Send + 'static,
    {
        let mut delay = self.delay();
        let mut attempt = 0;
        loop {
            let result = f().await;
            match result {
                x @ Ok(_) => return x,
                Err(e) => {
                    warn!("retryable operation failed: {e}");
                    if let Some(delay) = delay.next() {
                        tokio::time::sleep(delay).await;
                    } else {
                        return Err(e);
                    }
                }
            }
            attempt += 1;
        }
    }
}

The bound block is the lesson:

where
    F: FnMut() -> Fut + Send,
    Fut: Future<Output = Result<T, E>> + Send,
    T: Send + 'static,
    E: std::fmt::Display + Send + 'static,

Translation in plain English:

  • F is a closure I can call multiple times (FnMut). It returns "some future" (Fut).
  • That future, when polled, yields a Result<T, E>.
  • T and E are sendable across threads.
  • F and Fut are also sendable, so the whole run future is Send-safe to spawn.

This is the canonical "I want a function that takes an async callable" signature. Memorize it. When agents propose retry, circuit breaker, or rate limiter primitives, this is the shape they should produce.

Common async failure modes (what Sail does not do)

Things to grep for in an agent's async PR that Sail's handler does not exhibit:

Failure modeWhat it looks like
std::sync::Mutex held across .awaitlet g = mutex.lock().unwrap(); something.await; ...
Hand-rolled Future implimpl Future for MyThing { ... poll ... } when async fn would work
tokio::spawn with no JoinHandletokio::spawn(work); — fire and forget loses error visibility
block_on inside async codetokio::runtime::Handle::current().block_on(...) — deadlocks the executor
Missing Send bound on async fn that gets spawnedCompiles standalone, fails at spawn site

A clean handler like the one above has none of these. Use it as your template when reviewing agent-written async code.