Exploring SSE with Rust and Tide by streaming tweets
Hi there! welcome back, last time we explore how to use WebSocket in tide and this time we will exploring how to use Server Sent Events.
SSE are a lightweight alternative to web-sockets if you only needs to send updates from the server.
Tide not only have built in support for sse but also have an example
we can run, so let's start by running that example.
The simplest way to run an example is to clone the repo and run
cargo run --example <example name>
cargo run --example sse
(...)
Running `target/debug/examples/sse`
tide::log Logger started
level Info
tide::server Server listening on http://[::1]:8080
And in another terminal
curl -v localhost:8080/sse
* Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> GET /sse HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.54.0
> Accept: */*
>
< HTTP/1.1 200 OK
< cache-control: no-cache
< content-type: text/event-stream
< date: Tue, 02 Feb 2021 13:26:54 GMT
< transfer-encoding: chunked
<
event:fruit
data:banana
event:fruit
data:apple
Nice! that works as expected, you can see the content-type text/event-stream
. Now, we can check the code of the tide example
...
use tide::sse;
#[async_std::main]
async fn main() -> Result<(), std::io::Error> {
tide::log::start();
let mut app = tide::new();
app.at("/sse").get(sse::endpoint(|_req, sender| async move {
sender.send("fruit", "banana", None).await?;
sender.send("fruit", "apple", None).await?;
Ok(())
}));
app.listen("localhost:8080").await?;
Ok(())
}
As you can see, in this example is defined and endpoint /sse
and the closure
receive two arguments, the request
and a sender
that is used after to send
information.
This is a great example! and give us the basics to explore how to use sse
in other scenarios.
As the title
says, our goal is stream tweets using sse
and in this first exploration we will try to mimic the behavior of the socket.io home page but with sse
and tracking rust
and http-rs
words.
Create the application structure
Let's start by create the application and set the deps and the basic struct
❯ cargo init tide-sse-tweets
Created binary (application) package
❯ cd tide-sse-tweets/
❯ cargo add tide async-std
Updating 'https://github.com/rust-lang/crates.io-index' index
Adding tide v0.16.0 to dependencies
Adding async-std v1.9.0 to dependencies
❯ mkdir -p public/{css,js}
Now, enable the attributes
for async-std
in your Cargo.toml
since we will need those.
async-std = { version = "1.9.0", features = [ "attributes" ] }
Like in the other
tide
example apps we will be using skeleton to have a nice styling, so you can go ahead and put the required files in/public/css
.
Connect the dots...
Let's start wit some rust
code :), in our main.rs
file let create the basic structure of our application.
#[async_std::main]
async fn main() -> Result<(), std::io::Error> {
dotenv::dotenv().ok();
tide::log::start();
let mut app = tide::new();
// serve public dir for assets
app.at("/public").serve_dir("./public/")?;
// index route
app.at("/").serve_file("public/index.html")?;
// sse route
app.at("/sse")
.get(sse::endpoint(|_req, sender| async move {
// TODO
Ok(())
}));
let port = std::env::var("PORT").unwrap_or_else(|_| "8080".to_string());
let addr = format!("0.0.0.0:{}", port);
app.listen(addr).await?;
Ok(())
}
Here we are creating the tide
server with an index
route, note that serve_file
method were added in tide v0.16.0 release, and we are using dotenv
to load the env vars
from the .env
file.
Nice! we have the basic structure in place, and we leave the sse endpoint
blank to implement the logic to stream tweets
.
We will using the crate
twitter-stream
to connect us to the twitter api and track out topics.
As we saw in the first example, the sse
endpoint give us a sender
to send data to the client, so here we need to some how connect this sender with the tweets we receive from the twitter api. For this purpose we will use a broadcast channel
that allow us to broadcast the received tweet to all clients.
First we need to create the broadcast channel
and store in our State
for make it available in every request.
#[derive(Clone, Debug)]
struct State {
broadcaster: BroadcastChannel<Tweet>,
}
(...)
#[async_std::main]
async fn main() -> Result<(), std::io::Error> {
dotenv::dotenv().ok();
tide::log::start();
let broadcaster = BroadcastChannel::new();
let mut app = tide::with_state(State { broadcaster });
(...)
}
And now we can listen in our sse
endpoint
// sse route
app.at("/sse")
.get(sse::endpoint(|req: Request<State>, sender| async move {
let state = req.state().clone();
while let Some(tweet) = state.broadcaster.clone().next().await {
sender.send("tweet", json!(tweet).to_string(), None).await?;
}
Ok(())
}));
Here we are listen tweets from the broadcaster
and send them through the sse connection
to the clients, we are also using a Tweet
struct ( and a User ) to serialize the fields we want to use and stream.
#[derive(Deserialize, Serialize, Clone, Debug)]
struct Tweet {
id: u64,
text: String,
user: User,
timestamp_ms: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
struct User {
id: u64,
screen_name: String,
profile_image_url_https: String,
}
Great! we already have one side of the channel, now we need to code the other side. We will create a fn
that spawn
a new task
for tracking the topic from twitter and send through the broadcast channel.
async fn spawn_tracker(broadcaster: BroadcastChannel<Tweet>) {
let token = Token::from_parts(
std::env::var("TW_CONSUMER_KEY").expect("missing env var TW_CONSUMER_KEY"),
std::env::var("TW_CONSUMER_SECRET").expect("missing env var TW_CONSUMER_SECRET"),
std::env::var("TW_TOKEN").expect("missing env var TW_TOKEN"),
std::env::var("TW_SECRET").expect("missing env var TW_SECRET")
);
task::spawn(async move {
let mut tracker = twitter_stream::Builder::new(token.as_ref());
let mut stream = tracker
.track("@_httprs,@rustlang,rust")
.listen()
.try_flatten_stream();
while let Some(json) = stream.next().await {
if let Ok(StreamMessage::Tweet(tw)) = serde_json::from_str(&json.unwrap()) {
println!("receive a tweet! ... , {}", tw.text);
match broadcaster.send(&tw).await {
Ok(_) => {}
Err(_) => {
println!("Error sending to broadcaster")
}
};
}
}
});
}
So, we are receiving the broadcaster as argument and the we use inside of the spawned
task to notify that we receive a new tweet
.
Awesome!! we just connected the dots... time to run
(cargo run) and see it in action....
That's all for today, we copy the base funtionality of the socket.io home page but using sse
with Tide :). In the next note
we will extend this application to allow users to create their own trend tracks
. You can check the repo for the complete version and check the rust trends
As always, I write this as a learning journal and there could be another more elegant and correct way to do it and any feedback is welcome.
Thanks!