Exploring websockets rooms with tide
Hi there! welcome back, this time we will be exploring how to implement some kind of websockets rooms
using tide and tide-websockets.
The general goal is to allow clients to select or subscribe
to a room
and receive updates produced by another tasks
, but filtered to ensure that each room receive their updates.
For our use case we will using a task
tracking tweets
related to some topics
, from the twitter api. And we will split this topics
in different rooms
so our clients can subscribe
to a room using a websocket connection and receive only the tweets related to the room's topics.
So, let start creating our app and adding some deps
Base setup
$ cargo init tide-ws-rooms && cd tide-ws-room
$ cargo add tide tide-websockets async-std
Updating 'https://github.com/rust-lang/crates.io-index' index
Adding tide v0.16.0 to dependencies
Adding tide-websockets v0.3.0 to dependencies
Adding async-std v1.9.0 to dependencies
Great! let's now add the basic structure of the app (and the attributes
feature from async-std
).
// Cargo.toml
[dependencies]
tide = "0.16.0"
tide-websockets = "0.3.0"
async-std = { version = "1.9.0", features = ["attributes"] }
// main.rs
use tide_websockets::{Message as WSMessage, WebSocket};
#[async_std::main]
async fn main() -> Result<(), std::io::Error> {
tide::log::start();
let mut app = tide::new();
// serve public dir for assets
app.at("/public").serve_dir("./public/")?;
// index route
app.at("/").serve_file("public/index.html")?;
// ws route
app.at("/ws")
.get(WebSocket::new(|_req, wsc| async move {
println!("new ws connection");
Ok(())
}));
let port = std::env::var("PORT").unwrap_or_else(|_| "8080".to_string());
let addr = format!("0.0.0.0:{}", port);
app.listen(addr).await?;
Ok(())
}
As you can see we are serving and index
and a public
directory with css
assets. The front-end
of this app is very similar to the one we use in exploring-sse-with-rust-and-tide-by-streaming-tweets and you can also check the code in the repo. We are also defining a ws
route to handle websocket connections, for the moment only print a new line when a connection arrive.
Let's run to see if we can establish the ws
connection.
$ cargo run
Awesome, we create a new ws
connection. Also, you can see the message in the server
tide::log::middleware <-- Request received
method GET
path /ws
tide::log::middleware --> Response sent
method GET
path /ws
status 101 - Switching Protocols
duration 181.388µs
new ws connection
Setup tracking
Let's now add the logic to track
the topics from the twitter api. We will be using the same logic that we use to explore sse, so let's add those lines to our main file.
// main.rs
use tide_websockets::{Message as WSMessage, WebSocket};
use async_std::task;
use broadcaster::BroadcastChannel;
use serde::de;
use serde::{Deserialize, Serialize};
use twitter_stream::Token;
use futures::prelude::*;
#[derive(Debug, serde::Deserialize)]
pub struct RequestBody {
topics: Vec<String>,
}
#[derive(Clone, Debug)]
struct State {
broadcaster: BroadcastChannel<Tweet>
}
#[derive(Deserialize)]
#[serde(untagged)]
enum StreamMessage {
Tweet(Tweet),
Other(de::IgnoredAny),
}
#[derive(Deserialize, Serialize, Clone, Debug)]
struct Tweet {
id: u64,
id_str: String,
text: String,
user: User,
timestamp_ms: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
struct User {
id: u64,
screen_name: String,
profile_image_url_https: String,
}
async fn spawn_tracker(broadcaster: BroadcastChannel<Tweet>, topics: String) {
println!("topics : {}", topics);
let token = Token::from_parts(
std::env::var("TW_CONSUMER_KEY").expect("missing env var TW_CONSUMER_KEY"),
std::env::var("TW_CONSUMER_SECRET").expect("missing env var TW_CONSUMER_SECRET"),
std::env::var("TW_TOKEN").expect("missing env var TW_TOKEN"),
std::env::var("TW_SECRET").expect("missing env var TW_SECRET"),
);
task::spawn(async move {
let mut tracker = twitter_stream::Builder::new(token.as_ref());
let mut stream = tracker.track(&topics).listen().try_flatten_stream();
while let Some(json) = stream.next().await {
if let Ok(StreamMessage::Tweet(tw)) = serde_json::from_str(&json.unwrap()) {
//println!("receive a tweet! ... , {}", tw.text);
match broadcaster.send(&tw).await {
Ok(_) => {}
Err(_) => {
println!("Error sending to broadcaster")
}
};
}
}
});
}
#[async_std::main]
async fn main() -> Result<(), std::io::Error> {
dotenv::dotenv().ok();
tide::log::start();
let broadcaster = BroadcastChannel::new();
spawn_tracker(broadcaster.clone(), "hello-world".to_string()).await;
let mut app = tide::with_state(State { broadcaster });
// serve public dir for assets
app.at("/public").serve_dir("./public/")?;
// index route
app.at("/").serve_file("public/index.html")?;
// ws route
app.at("/ws")
.get(WebSocket::new(|_req, wsc| async move {
println!("new ws connection");
Ok(())
}));
let port = std::env::var("PORT").unwrap_or_else(|_| "8080".to_string());
let addr = format!("0.0.0.0:{}", port);
app.listen(addr).await?;
Ok(())
}
We add a couple of things:
deps
we need.Structs
to deserialize tweets.- app
State
struct to hold the broadcaster.
Let's run again to check that works as expected (remember to set the .env
file with the api credentials).
tide::log Logger started
level Info
topics : hello-world
tide::server Server listening on http://0.0.0.0:8080
thread 'async-std/runtime' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /Users/pepo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.3.0/src/runtime/blocking/pool.rs:85:33
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Nice, the spawned
task just panic
. I forget to mention that the crate
we use to tracks topics from the twitter api use tokio
, so we need to enable the feature in async-std
async-std = { version = "1.9.0", features = ["attributes", "tokio1"] }
And try again....
tide::log Logger started
level Info
topics : hello-world
tide::server Server listening on http://0.0.0.0:8080
Nice! now it's working :)
Connecting the dots
Now we have the two parts we want to connect, from one side ws
connections and in the other a task
that produce tweets.
But first let me make a mention of the Sink
trait, because if you look the tide-websockets repo implementing this trait to allow user to split
the ws connection
and move the tx
part out of the handler
and use in other parts of the app (for example in the producer to send the tweets
directly to the clients) is requested but not implementing Sink has been a deliberate choice in the http-rs ecosystem as you can see in the pr comment. A different approach, of sending messages to the handler
with some kind of channel
or broadcaster
is also mention in the comments (check this comment) and that is the path we will try to connect the dots :)
Now that we have clear the approach we will use, let's think how to implement it. If you check the code you will notice that the broadcaster
is already there but not one is listening the messages. We need now a mechanism to represent the rooms
and allow users to subscribe
to one or another, also this room
s will have the logic to decide
if the receive message (a tweet
in our case) should be forwarder to the connection or not.
Let's add the Room struct now...
#[derive(Clone, Debug)]
struct Room {
id: u8,
topics: Vec<String>,
regex: Regex,
}
impl Room {
pub fn should_send(&self, text: &str) -> bool {
self.regex.is_match(text)
}
}
Every room
will have a vector of topics and a regex
used to decide if we need to forward the tweet or not. To use the Regex
we also need to add it to our deps
.
Now, to make us easy to add more rooms we need to add a couple of helper
functions that will get us the topics vector
and the regex
from an &str
, so we can have the room's topics in a resource file.
fn get_topics(input_str: &str) -> Vec<String> {
let temp_vec: Vec<&str> = input_str.split('\n').collect();
let topics: Vec<String> = temp_vec.iter().map(|s| s.to_string()).collect();
topics
}
fn get_regex(input_str: &str) -> String {
let temp_vec: Vec<&str> = input_str.split('\n').collect();
let topics: Vec<String> = temp_vec.iter().map(|s| format!(r"(\b{}\b)", s)).collect();
topics.join("|")
}
And now in our main.rs code we can include the topics files and used to construct the room
let nba_input = include_str!("../public/nba.txt");
let rust_input = include_str!("../public/rust.txt");
let premier_input = include_str!("../public/premier.txt");
let nba_room = Room {
topics: get_topics(nba_input),
regex: Regex::new(&get_regex(nba_input)).unwrap(),
};
let nba_topics_str = nba_room.topics.join(",");
let rust_room = Room {
topics: get_topics(rust_input),
regex: Regex::new(&get_regex(rust_input)).unwrap(),
};
let rust_topics_str = rust_room.topics.join(",");
let premier_room = Room {
topics: get_topics(premier_input),
regex: Regex::new(&get_regex(premier_input)).unwrap(),
};
let premier_topics_str = premier_room.topics.join(",");
let mut rooms: HashMap<String, Room> = HashMap::new();
rooms.insert("nba".to_string(), nba_room);
rooms.insert("rust".to_string(), rust_room);
rooms.insert("premier".to_string(), premier_room);
// spawn tracker
let topics_str = format!(
"{},{},{}",
nba_topics_str, rust_topics_str, premier_topics_str
);
spawn_tracker(broadcaster.clone(), topics_str).await;
let mut app = tide::with_state(State { broadcaster, rooms });
Also, we add the rooms
HashMap to the State
and use the topics
from the files in the tracker task
.
Now we need to finish the logic of the ws
handler to forward the appropriated tweets.
// ws route
app.at("/ws")
.get(WebSocket::new(|req: tide::Request<State>, wsc| async move {
let state = req.state().clone();
let rooms = state.rooms;
let broadcaster = state.broadcaster.clone();
let mut combined_stream = futures_util::stream::select(
wsc.clone().map(Either::Left),
broadcaster.clone().map(Either::Right),
);
// by default we put new connections in the nba room
let mut current_room = rooms.get("nba");
while let Some(item) = combined_stream.next().await {
match item {
Either::Left(Ok(WSMessage::Text(message))) => {
println!("message : {}", message);
current_room = rooms.get(&message);
}
Either::Right(tweet) => {
if let Some(room) = current_room {
if room.should_send(&tweet.text) {
wsc.send_json(&tweet).await?;
}
}
}
_o => {
return Err(tide::http::format_err!("no idea"));
}
}
}
Ok(())
}));
Notice that we add the futures-util
dependency to use in the handler.
Let's try one more time, now we should receive tweets
in the web.
Nice! It's working and we can select the room
and see the tweets
related to those topics.
But if you look the code you will notice that we are using the regex to matching every tweet
in every connection
and that isn't good, we can improve the code to have a better performances.
Improving performance
We want to change our code to run the regex only one time per tweet, so we will run N
times where N
is the number of rooms
.
To archive this, let's start by moving the filtering to the tracker and produce a roomMessage
instead of a tweet that will hold the tweet
and the identifier
of the room.
Add the id
to the Room and the RoomMessage
to our code
#[derive(Clone, Debug)]
struct Room {
id: String,
topics: Vec<String>,
regex: Regex,
}
struct RoomMessage {
room_id: String,
tweet: Tweet
}
And now we need to make some changes:
- Update the State broadcaster to use
RoomMessage
. - Filtering in the tracker.
- Remove the regex from the handler and use an
if
to decide if we need to send.
Let's change the tracker...
task::spawn(async move {
let mut tracker = twitter_stream::Builder::new(token.as_ref());
let mut stream = tracker.track(&topics).listen().try_flatten_stream();
while let Some(json) = stream.next().await {
if let Ok(StreamMessage::Tweet(tw)) = serde_json::from_str(&json.unwrap()) {
//println!("receive a tweet! ... , {}", tw.text);
for (key, room) in &rooms {
if room.should_send(&tw.text) {
let msg = RoomMessage {
room_id: key.to_string(),
tweet: tw.clone()
};
match broadcaster.send(&msg).await {
Ok(_) => {}
Err(_) => {
println!("Error sending to broadcaster")
}
};
}
}
}
}
});
And the handler
app.at("/ws")
.get(WebSocket::new(|req: tide::Request<State>, wsc| async move {
let state = req.state().clone();
let rooms = state.rooms;
let broadcaster = state.broadcaster.clone();
let mut combined_stream = futures_util::stream::select(
wsc.clone().map(Either::Left),
broadcaster.clone().map(Either::Right),
);
// by default we put new connections in the nba room
let mut current_room = rooms.get("nba");
while let Some(item) = combined_stream.next().await {
match item {
Either::Left(Ok(WSMessage::Text(message))) => {
println!("message : {}", message);
current_room = rooms.get(&message);
}
Either::Right(room_message) => {
if let Some(room) = current_room {
if room.id == room_message.room_id {
wsc.send_json(&room_message.tweet).await?;
}
}
}
_o => {
return Err(tide::http::format_err!("no idea"));
}
}
}
Ok(())
}))
Nice! we improved the performance and we are now using one regex per tweet per room
.
We can run again and see it in action...
That's all for today, we explored how to use rooms
with websockets in Tide and improved the performance of our first implementation. This version has a fixed and hardcoded number of rooms/topics, in the next note we will make this dynamic to allow users to create it own rooms.
You can check the complete code in the repo and see it in action in the labs link.
As always, I write this as a learning journal and there could be another more elegant and correct way to do it and any feedback is welcome.
Thanks!