WebSockets

warp provides first-class support for handling WebSocket upgrades. This feature allows for real-time, two-way communication between the client and server.

To enable WebSocket support, you need to add the websocket feature in your Cargo.toml.

warp = { version = "0.4", features = ["server", "websocket"] }

The ws() Filter

The core of warp's WebSocket functionality is the warp::ws() filter. This filter checks for the necessary headers to perform a WebSocket upgrade (Connection: upgrade, Upgrade: websocket, etc.). If the headers are present, it extracts a warp::ws::Ws object, which is used to complete the upgrade.

Upgrading the Connection

The Ws object has an on_upgrade method. You provide this method with a closure or function that will be executed once the WebSocket handshake is complete. This function receives the WebSocket object, which represents the live connection.

Here is a simple echo server example:

use futures_util::{StreamExt, SinkExt};
use warp::Filter;
use warp::ws::{Message, WebSocket};

// The main filter that handles the WebSocket upgrade.
let echo = warp::path("echo")
    .and(warp::ws())
    .map(|ws: warp::ws::Ws| {
        // The `on_upgrade` method hands off the connection to our handler.
        ws.on_upgrade(handle_websocket)
    });

// The handler function for a single WebSocket connection.
async fn handle_websocket(websocket: WebSocket) {
    // Split the WebSocket into a sender and a receiver.
    let (mut tx, mut rx) = websocket.split();

    // Loop over incoming messages.
    while let Some(result) = rx.next().await {
        match result {
            Ok(msg) => {
                println!("received message: {:?}", msg);
                // Echo the message back to the client.
                if let Err(e) = tx.send(msg).await {
                    eprintln!("websocket send error: {}", e);
                    break;
                }
            },
            Err(e) => {
                eprintln!("websocket error: {}", e);
                break;
            }
        }
    }
}

Handling Messages

The WebSocket object implements both futures::Sink (for sending messages) and futures::Stream (for receiving messages).

  • Receiving: You can loop over the stream using while let Some(result) = rx.next().await.
  • Sending: You can send messages using tx.send(message).await.

Messages are represented by the warp::ws::Message type, which can be text, binary, ping, pong, or close frames.

A Complete Chat Example

For a more advanced and practical example, the websockets_chat.rs file in the examples directory demonstrates a multi-user chat application. It shows how to manage the state of multiple connected clients and broadcast messages to all of them.

Key concepts from the chat example include: - Using Arc<RwLock<HashMap<...>>> to maintain a shared state of connected users. - Assigning a unique ID to each connected user. - Broadcasting messages from one user to all other connected users. - Handling user disconnections by removing them from the shared state.