Server-Sent Events (SSE)

Server-Sent Events (SSE) is a standard that allows a web server to push events to a client over a single, long-lived HTTP connection. It's a simple and efficient way to send real-time updates from the server to the client. warp provides built-in support for SSE.

Creating Events

Events are created using the warp::sse::Event builder. You can set various fields on an event:

  • data(..): The main payload of the event. Can be a simple string or multiple lines.
  • json_data(..): A helper to serialize a type T: Serialize into JSON and set it as the data.
  • event(..): A name for the event, which allows clients to listen for specific event types.
  • id(..): A unique ID for the event. If the client disconnects, it will send this ID in a Last-Event-ID header upon reconnecting, allowing the server to resume the stream.
  • retry(..): A Duration suggesting how long the client should wait before attempting to reconnect if the connection is lost.
  • comment(..): A comment, which is ignored by clients but can be used for keep-alive messages.
use std::time::Duration;
use warp::sse::Event;

// An event with simple text data
let event1 = Event::default().data("some data");

// A named event with multi-line data, an ID, and a retry timeout
let event2 = Event::default()
    .event("message")
    .data("first line\nsecond line")
    .id("123")
    .retry(Duration::from_secs(5));

Replying with an Event Stream

To send SSE to a client, you need a Stream of Result<Event, E>. The warp::sse::reply() function takes such a stream and converts it into a valid Reply.

warp sets the required HTTP headers: - Content-Type: text/event-stream - Cache-Control: no-cache

use std::convert::Infallible;
use std::time::Duration;
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
use futures_util::StreamExt;
use warp::{sse::Event, Filter};

let routes = warp::path("ticks").and(warp::get()).map(|| {
    let mut counter: u64 = 0;
    // Create a stream that produces an event every second
    let interval = interval(Duration::from_secs(1));
    let stream = IntervalStream::new(interval);
    let event_stream = stream.map(move |_| {
        counter += 1;
        Ok(Event::default().data(counter.to_string()))
    });

    // Convert the stream into an SSE reply
    warp::sse::reply(event_stream)
});

Keep-Alive

Proxies or firewalls might close an SSE connection if it's inactive for too long. To prevent this, you can send periodic comments. warp provides a convenient keep_alive utility for this.

use warp::sse;

let stream = /* your event stream */;

// Wrap the stream to send a comment every 15 seconds of inactivity.
let keep_alive_stream = sse::keep_alive().stream(stream);

let reply = sse::reply(keep_alive_stream);

You can customize the interval and the comment text using the KeepAlive builder:

use std::time::Duration;
use warp::sse;

let keep_alive = sse::keep_alive()
    .interval(Duration::from_secs(10))
    .text("ping");

let stream = keep_alive.stream(/* your stream */);