Server-Sent Events (SSE)
Server-Sent Events (SSE) is a standard that allows a web server to push events to a client over a single, long-lived HTTP connection. It's a simple and efficient way to send real-time updates from the server to the client. warp
provides built-in support for SSE.
Creating Events
Events are created using the warp::sse::Event
builder. You can set various fields on an event:
data(..)
: The main payload of the event. Can be a simple string or multiple lines.json_data(..)
: A helper to serialize a typeT: Serialize
into JSON and set it as the data.event(..)
: A name for the event, which allows clients to listen for specific event types.id(..)
: A unique ID for the event. If the client disconnects, it will send this ID in aLast-Event-ID
header upon reconnecting, allowing the server to resume the stream.retry(..)
: ADuration
suggesting how long the client should wait before attempting to reconnect if the connection is lost.comment(..)
: A comment, which is ignored by clients but can be used for keep-alive messages.
use std::time::Duration;
use warp::sse::Event;
// An event with simple text data
let event1 = Event::default().data("some data");
// A named event with multi-line data, an ID, and a retry timeout
let event2 = Event::default()
.event("message")
.data("first line\nsecond line")
.id("123")
.retry(Duration::from_secs(5));
Replying with an Event Stream
To send SSE to a client, you need a Stream
of Result<Event, E>
. The warp::sse::reply()
function takes such a stream and converts it into a valid Reply
.
warp
sets the required HTTP headers:
- Content-Type: text/event-stream
- Cache-Control: no-cache
use std::convert::Infallible;
use std::time::Duration;
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
use futures_util::StreamExt;
use warp::{sse::Event, Filter};
let routes = warp::path("ticks").and(warp::get()).map(|| {
let mut counter: u64 = 0;
// Create a stream that produces an event every second
let interval = interval(Duration::from_secs(1));
let stream = IntervalStream::new(interval);
let event_stream = stream.map(move |_| {
counter += 1;
Ok(Event::default().data(counter.to_string()))
});
// Convert the stream into an SSE reply
warp::sse::reply(event_stream)
});
Keep-Alive
Proxies or firewalls might close an SSE connection if it's inactive for too long. To prevent this, you can send periodic comments. warp
provides a convenient keep_alive
utility for this.
use warp::sse;
let stream = /* your event stream */;
// Wrap the stream to send a comment every 15 seconds of inactivity.
let keep_alive_stream = sse::keep_alive().stream(stream);
let reply = sse::reply(keep_alive_stream);
You can customize the interval and the comment text using the KeepAlive
builder:
use std::time::Duration;
use warp::sse;
let keep_alive = sse::keep_alive()
.interval(Duration::from_secs(10))
.text("ping");
let stream = keep_alive.stream(/* your stream */);