Javier

Exploring SSE with Rust and Tide by streaming tweets

Hi there! welcome back, last time we explore how to use WebSocket in tide and this time we will exploring how to use Server Sent Events.

SSE are a lightweight alternative to web-sockets if you only needs to send updates from the server.

Tide not only have built in support for sse but also have an example we can run, so let's start by running that example.

The simplest way to run an example is to clone the repo and run cargo run --example <example name>

cargo run --example sse
(...)
     Running `target/debug/examples/sse`
tide::log Logger started
    level Info
tide::server Server listening on http://[::1]:8080

And in another terminal

curl -v localhost:8080/sse
*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> GET /sse HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.54.0
> Accept: */*
>
< HTTP/1.1 200 OK
< cache-control: no-cache
< content-type: text/event-stream
< date: Tue, 02 Feb 2021 13:26:54 GMT
< transfer-encoding: chunked
<
event:fruit
data:banana

event:fruit
data:apple

Nice! that works as expected, you can see the content-type text/event-stream. Now, we can check the code of the tide example...

use tide::sse;

#[async_std::main]
async fn main() -> Result<(), std::io::Error> {
    tide::log::start();
    let mut app = tide::new();
    app.at("/sse").get(sse::endpoint(|_req, sender| async move {
        sender.send("fruit", "banana", None).await?;
        sender.send("fruit", "apple", None).await?;
        Ok(())
    }));
    app.listen("localhost:8080").await?;
    Ok(())
}

As you can see, in this example is defined and endpoint /sse and the closure receive two arguments, the request and a sender that is used after to send information.

This is a great example! and give us the basics to explore how to use sse in other scenarios.


As the title says, our goal is stream tweets using sse and in this first exploration we will try to mimic the behavior of the socket.io home page but with sse and tracking rust and http-rs words.

image

Create the application structure

Let's start by create the application and set the deps and the basic struct

❯ cargo init tide-sse-tweets
     Created binary (application) package
❯ cd tide-sse-tweets/
❯ cargo add tide async-std
    Updating 'https://github.com/rust-lang/crates.io-index' index
      Adding tide v0.16.0 to dependencies
      Adding async-std v1.9.0 to dependencies

❯ mkdir -p public/{css,js}

Now, enable the attributes for async-std in your Cargo.toml since we will need those.

async-std = { version = "1.9.0", features = [ "attributes" ] }

Like in the other tide example apps we will be using skeleton to have a nice styling, so you can go ahead and put the required files in /public/css.

Connect the dots...

Let's start wit some rust code :), in our main.rs file let create the basic structure of our application.

#[async_std::main]
async fn main() -> Result<(), std::io::Error> {
    dotenv::dotenv().ok();

    tide::log::start();

    let mut app = tide::new();

    // serve public dir for assets
    app.at("/public").serve_dir("./public/")?;

    // index route
    app.at("/").serve_file("public/index.html")?;


    // sse route
    app.at("/sse")
        .get(sse::endpoint(|_req, sender| async move {
            // TODO

            Ok(())
        }));

    let port = std::env::var("PORT").unwrap_or_else(|_| "8080".to_string());
    let addr = format!("0.0.0.0:{}", port);
    app.listen(addr).await?;

    Ok(())
}

Here we are creating the tide server with an index route, note that serve_file method were added in tide v0.16.0 release, and we are using dotenv to load the env vars from the .env file.

Nice! we have the basic structure in place, and we leave the sse endpoint blank to implement the logic to stream tweets.

We will using the crate twitter-stream to connect us to the twitter api and track out topics.

As we saw in the first example, the sse endpoint give us a sender to send data to the client, so here we need to some how connect this sender with the tweets we receive from the twitter api. For this purpose we will use a broadcast channel that allow us to broadcast the received tweet to all clients.

First we need to create the broadcast channel and store in our State for make it available in every request.

#[derive(Clone, Debug)]
struct State {
    broadcaster: BroadcastChannel<Tweet>,
}

(...)

#[async_std::main]
async fn main() -> Result<(), std::io::Error> {
    dotenv::dotenv().ok();

    tide::log::start();

    let broadcaster = BroadcastChannel::new();
    let mut app = tide::with_state(State { broadcaster });

(...)
}

And now we can listen in our sse endpoint

   // sse route
    app.at("/sse")
        .get(sse::endpoint(|req: Request<State>, sender| async move {
            let state = req.state().clone();
            while let Some(tweet) = state.broadcaster.clone().next().await {
                sender.send("tweet", json!(tweet).to_string(), None).await?;
            }

            Ok(())
        }));

Here we are listen tweets from the broadcaster and send them through the sse connection to the clients, we are also using a Tweet struct ( and a User ) to serialize the fields we want to use and stream.

#[derive(Deserialize, Serialize, Clone, Debug)]
struct Tweet {
    id: u64,
    text: String,
    user: User,
    timestamp_ms: String,
}

#[derive(Deserialize, Serialize, Clone, Debug)]
struct User {
    id: u64,
    screen_name: String,
    profile_image_url_https: String,
}

Great! we already have one side of the channel, now we need to code the other side. We will create a fn that spawn a new task for tracking the topic from twitter and send through the broadcast channel.

async fn spawn_tracker(broadcaster: BroadcastChannel<Tweet>) {
    let token = Token::from_parts(
        std::env::var("TW_CONSUMER_KEY").expect("missing env var TW_CONSUMER_KEY"),
        std::env::var("TW_CONSUMER_SECRET").expect("missing env var TW_CONSUMER_SECRET"),
        std::env::var("TW_TOKEN").expect("missing env var TW_TOKEN"),
        std::env::var("TW_SECRET").expect("missing env var TW_SECRET")
    );

    task::spawn(async move {
        let mut tracker = twitter_stream::Builder::new(token.as_ref());
        let mut stream = tracker
            .track("@_httprs,@rustlang,rust")
            .listen()
            .try_flatten_stream();

        while let Some(json) = stream.next().await {
            if let Ok(StreamMessage::Tweet(tw)) = serde_json::from_str(&json.unwrap()) {
                println!("receive a  tweet! ... , {}", tw.text);
                match broadcaster.send(&tw).await {
                    Ok(_) => {}
                    Err(_) => {
                        println!("Error sending to broadcaster")
                    }
                };
            }
        }
    });
}

So, we are receiving the broadcaster as argument and the we use inside of the spawned task to notify that we receive a new tweet.

Awesome!! we just connected the dots... time to run (cargo run) and see it in action....

tide-sse-tweets


That's all for today, we copy the base funtionality of the socket.io home page but using sse with Tide :). In the next note we will extend this application to allow users to create their own trend tracks. You can check the repo for the complete version and check the rust trends

As always, I write this as a learning journal and there could be another more elegant and correct way to do it and any feedback is welcome.

Thanks!