messages working
This commit is contained in:
parent
1893eb0599
commit
01e9b47bc7
6 changed files with 165 additions and 111 deletions
66
src/main.rs
66
src/main.rs
|
|
@ -6,36 +6,44 @@ mod websoket_connection;
|
|||
use std::time::Duration;
|
||||
use std::{path::Path, str::FromStr};
|
||||
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::{self, AsyncWriteExt};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::time;
|
||||
|
||||
use crate::websoket_connection::{FrameType, WebsocketConnection};
|
||||
use crate::websoket_connection::{FrameType, WebsocketRead, WebsocketWrite};
|
||||
use crate::{
|
||||
request::{Connection, ServerPath},
|
||||
response::{Response, ResponseCode, ResponseHeader},
|
||||
};
|
||||
|
||||
use tokio::sync;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> tokio::io::Result<()> {
|
||||
let listener = TcpListener::bind("127.0.0.1:8080").await?;
|
||||
let (sender, _) = sync::broadcast::channel(16);
|
||||
loop {
|
||||
let (stream, _) = listener.accept().await?;
|
||||
|
||||
tokio::spawn(handle_connection(stream));
|
||||
let receiver = sender.subscribe();
|
||||
let sender = sender.clone();
|
||||
tokio::spawn(handle_connection(stream, receiver, sender));
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_connection(stream: TcpStream) -> tokio::io::Result<()> {
|
||||
async fn handle_connection(
|
||||
stream: TcpStream,
|
||||
receiver: sync::broadcast::Receiver<String>,
|
||||
sender: sync::broadcast::Sender<String>,
|
||||
) -> tokio::io::Result<()> {
|
||||
if let Some(ws) = handle_http_connection(stream).await? {
|
||||
handle_websocket(ws).await?
|
||||
handle_websocket(ws, receiver, sender).await?
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
async fn handle_http_connection(
|
||||
mut stream: TcpStream,
|
||||
) -> tokio::io::Result<Option<websoket_connection::WebsocketConnection>> {
|
||||
) -> tokio::io::Result<Option<(WebsocketRead, WebsocketWrite)>> {
|
||||
let mut timeout = 500;
|
||||
loop {
|
||||
let req = match time::timeout(
|
||||
|
|
@ -63,7 +71,7 @@ async fn handle_http_connection(
|
|||
Err(_) => Response::new().with_code(ResponseCode::NotFound),
|
||||
}
|
||||
}
|
||||
["websocket"] => match WebsocketConnection::initialize_connection(req, stream).await {
|
||||
["websocket"] => match websoket_connection::initialize_connection(req, stream).await {
|
||||
Ok(ws) => {
|
||||
return Ok(Some(ws));
|
||||
}
|
||||
|
|
@ -91,15 +99,45 @@ async fn handle_http_connection(
|
|||
Ok(None)
|
||||
}
|
||||
|
||||
async fn handle_websocket(mut web_socket: WebsocketConnection) -> tokio::io::Result<()> {
|
||||
async fn handle_websocket(
|
||||
mut web_socket: (WebsocketRead, WebsocketWrite),
|
||||
receiver: sync::broadcast::Receiver<String>,
|
||||
sender: sync::broadcast::Sender<String>,
|
||||
) -> tokio::io::Result<()> {
|
||||
tokio::spawn(broadcast_message(web_socket.1, receiver));
|
||||
|
||||
loop {
|
||||
let message = web_socket.read_next_message().await?;
|
||||
let message = web_socket.0.read_next_message().await?;
|
||||
|
||||
if message.frame_type == FrameType::TextFrame {
|
||||
println!("{}", String::from_utf8_lossy(&message.data));
|
||||
web_socket
|
||||
.send_message(FrameType::TextFrame, "message_received".as_bytes())
|
||||
.await?;
|
||||
let s = String::from_utf8_lossy(&message.data).to_string();
|
||||
println!("{}", s);
|
||||
let _ = sender.send(s);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum BroadcastError {
|
||||
IoError(io::Error),
|
||||
BroadcastError(sync::broadcast::error::RecvError),
|
||||
}
|
||||
|
||||
async fn broadcast_message(
|
||||
mut write: WebsocketWrite,
|
||||
mut receiver: sync::broadcast::Receiver<String>,
|
||||
) -> Result<(), BroadcastError> {
|
||||
loop {
|
||||
let new_message = match receiver.recv().await {
|
||||
Ok(s) => s,
|
||||
Err(e) => return Err(BroadcastError::BroadcastError(e)),
|
||||
};
|
||||
|
||||
match write
|
||||
.send_message(FrameType::TextFrame, new_message.as_bytes())
|
||||
.await
|
||||
{
|
||||
Ok(()) => {}
|
||||
Err(e) => return Err(BroadcastError::IoError(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue