Compare commits
No commits in common. "27513863cf89102d0e7b03ba136f650484dc58b8" and "1893eb059902935e54e0b589d4d61f3a96ec5c91" have entirely different histories.
27513863cf
...
1893eb0599
7 changed files with 109 additions and 174 deletions
0
main.rs
Normal file
0
main.rs
Normal file
|
|
@ -10,10 +10,6 @@ html {
|
||||||
height: 100%;
|
height: 100%;
|
||||||
}
|
}
|
||||||
|
|
||||||
li {
|
|
||||||
color: #fff;
|
|
||||||
}
|
|
||||||
|
|
||||||
body {
|
body {
|
||||||
display: flex;
|
display: flex;
|
||||||
flex-direction: row;
|
flex-direction: row;
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,17 @@
|
||||||
<link rel="stylesheet" href="/public/index.css">
|
<link rel="stylesheet" href="/public/index.css">
|
||||||
<meta charset="UTF-8"/>
|
<meta charset="UTF-8"/>
|
||||||
<title>Hello World!</title>
|
<title>Hello World!</title>
|
||||||
<script src="/public/index.js"></script>
|
<script>
|
||||||
|
const socket = new WebSocket("ws://localhost:8080/websocket");
|
||||||
|
|
||||||
|
socket.addEventListener("open", (event) => {
|
||||||
|
socket.send("Hello Server!");
|
||||||
|
});
|
||||||
|
|
||||||
|
socket.addEventListener("message", (event) => {
|
||||||
|
console.log("Message from server ", event.data);
|
||||||
|
});
|
||||||
|
</script>
|
||||||
</head>
|
</head>
|
||||||
|
|
||||||
<body>
|
<body>
|
||||||
|
|
@ -17,14 +27,6 @@
|
||||||
|
|
||||||
<main>
|
<main>
|
||||||
|
|
||||||
<form onsubmit="myFunction(); return false;">
|
|
||||||
<label for="fname">Message:</label>
|
|
||||||
<input type="text" id="inp" name="fname"><br>
|
|
||||||
<input type="submit" value="Submit">
|
|
||||||
</form>
|
|
||||||
|
|
||||||
<ul id="messages"></ul>
|
|
||||||
|
|
||||||
</main>
|
</main>
|
||||||
</body>
|
</body>
|
||||||
</html>
|
</html>
|
||||||
|
|
|
||||||
|
|
@ -1,14 +0,0 @@
|
||||||
const socket = new WebSocket("ws://localhost:8080/websocket");
|
|
||||||
|
|
||||||
socket.addEventListener("message", (event) => {
|
|
||||||
let messages = document.getElementById("messages");
|
|
||||||
let item = document.createElement("li");
|
|
||||||
item.innerHTML = event.data;
|
|
||||||
messages.appendChild(item);
|
|
||||||
});
|
|
||||||
|
|
||||||
function myFunction() {
|
|
||||||
socket.send(document.getElementById("inp").value);
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
68
src/main.rs
68
src/main.rs
|
|
@ -6,44 +6,36 @@ mod websoket_connection;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::{path::Path, str::FromStr};
|
use std::{path::Path, str::FromStr};
|
||||||
|
|
||||||
use tokio::io::{self, AsyncWriteExt};
|
use tokio::io::AsyncWriteExt;
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
|
|
||||||
use crate::websoket_connection::{FrameType, WebsocketRead, WebsocketWrite};
|
use crate::websoket_connection::{FrameType, WebsocketConnection};
|
||||||
use crate::{
|
use crate::{
|
||||||
request::{Connection, ServerPath},
|
request::{Connection, ServerPath},
|
||||||
response::{Response, ResponseCode, ResponseHeader},
|
response::{Response, ResponseCode, ResponseHeader},
|
||||||
};
|
};
|
||||||
|
|
||||||
use tokio::sync;
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> tokio::io::Result<()> {
|
async fn main() -> tokio::io::Result<()> {
|
||||||
let listener = TcpListener::bind("127.0.0.1:8080").await?;
|
let listener = TcpListener::bind("127.0.0.1:8080").await?;
|
||||||
let (sender, _) = sync::broadcast::channel(16);
|
|
||||||
loop {
|
loop {
|
||||||
let (stream, _) = listener.accept().await?;
|
let (stream, _) = listener.accept().await?;
|
||||||
let receiver = sender.subscribe();
|
|
||||||
let sender = sender.clone();
|
tokio::spawn(handle_connection(stream));
|
||||||
tokio::spawn(handle_connection(stream, receiver, sender));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_connection(
|
async fn handle_connection(stream: TcpStream) -> tokio::io::Result<()> {
|
||||||
stream: TcpStream,
|
|
||||||
receiver: sync::broadcast::Receiver<String>,
|
|
||||||
sender: sync::broadcast::Sender<String>,
|
|
||||||
) -> tokio::io::Result<()> {
|
|
||||||
if let Some(ws) = handle_http_connection(stream).await? {
|
if let Some(ws) = handle_http_connection(stream).await? {
|
||||||
handle_websocket(ws, receiver, sender).await?
|
handle_websocket(ws).await?
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
async fn handle_http_connection(
|
async fn handle_http_connection(
|
||||||
mut stream: TcpStream,
|
mut stream: TcpStream,
|
||||||
) -> tokio::io::Result<Option<(WebsocketRead, WebsocketWrite)>> {
|
) -> tokio::io::Result<Option<websoket_connection::WebsocketConnection>> {
|
||||||
let mut timeout = 500;
|
let mut timeout = 500;
|
||||||
loop {
|
loop {
|
||||||
let req = match time::timeout(
|
let req = match time::timeout(
|
||||||
|
|
@ -62,8 +54,6 @@ async fn handle_http_connection(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
println!("{req:?}");
|
|
||||||
|
|
||||||
let matchable = req.path.to_matchable();
|
let matchable = req.path.to_matchable();
|
||||||
|
|
||||||
let response = match matchable.as_slice() {
|
let response = match matchable.as_slice() {
|
||||||
|
|
@ -73,7 +63,7 @@ async fn handle_http_connection(
|
||||||
Err(_) => Response::new().with_code(ResponseCode::NotFound),
|
Err(_) => Response::new().with_code(ResponseCode::NotFound),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
["websocket"] => match websoket_connection::initialize_connection(req, stream).await {
|
["websocket"] => match WebsocketConnection::initialize_connection(req, stream).await {
|
||||||
Ok(ws) => {
|
Ok(ws) => {
|
||||||
return Ok(Some(ws));
|
return Ok(Some(ws));
|
||||||
}
|
}
|
||||||
|
|
@ -101,45 +91,15 @@ async fn handle_http_connection(
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_websocket(
|
async fn handle_websocket(mut web_socket: WebsocketConnection) -> tokio::io::Result<()> {
|
||||||
mut web_socket: (WebsocketRead, WebsocketWrite),
|
|
||||||
receiver: sync::broadcast::Receiver<String>,
|
|
||||||
sender: sync::broadcast::Sender<String>,
|
|
||||||
) -> tokio::io::Result<()> {
|
|
||||||
tokio::spawn(broadcast_message(web_socket.1, receiver));
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let message = web_socket.0.read_next_message().await?;
|
let message = web_socket.read_next_message().await?;
|
||||||
|
|
||||||
if message.frame_type == FrameType::TextFrame {
|
if message.frame_type == FrameType::TextFrame {
|
||||||
let s = String::from_utf8_lossy(&message.data).to_string();
|
println!("{}", String::from_utf8_lossy(&message.data));
|
||||||
println!("{}", s);
|
web_socket
|
||||||
let _ = sender.send(s);
|
.send_message(FrameType::TextFrame, "message_received".as_bytes())
|
||||||
}
|
.await?;
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
enum BroadcastError {
|
|
||||||
IoError(io::Error),
|
|
||||||
BroadcastError(sync::broadcast::error::RecvError),
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn broadcast_message(
|
|
||||||
mut write: WebsocketWrite,
|
|
||||||
mut receiver: sync::broadcast::Receiver<String>,
|
|
||||||
) -> Result<(), BroadcastError> {
|
|
||||||
loop {
|
|
||||||
let new_message = match receiver.recv().await {
|
|
||||||
Ok(s) => s,
|
|
||||||
Err(e) => return Err(BroadcastError::BroadcastError(e)),
|
|
||||||
};
|
|
||||||
|
|
||||||
match write
|
|
||||||
.send_message(FrameType::TextFrame, new_message.as_bytes())
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(e) => return Err(BroadcastError::IoError(e)),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ impl Response {
|
||||||
output.extend_from_slice(format!("Content-Length: {}", self.data.len()).as_bytes());
|
output.extend_from_slice(format!("Content-Length: {}", self.data.len()).as_bytes());
|
||||||
output.extend_from_slice(b"\r\n\r\n");
|
output.extend_from_slice(b"\r\n\r\n");
|
||||||
output.extend_from_slice(&self.data);
|
output.extend_from_slice(&self.data);
|
||||||
} else if !self.headers.is_empty() {
|
} else {
|
||||||
output.extend_from_slice(b"\r\n");
|
output.extend_from_slice(b"\r\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -97,9 +97,6 @@ impl Response {
|
||||||
Some(a) if a == OsStr::new("css") => {
|
Some(a) if a == OsStr::new("css") => {
|
||||||
ContentType::Text(crate::shared_enums::TextType::Css)
|
ContentType::Text(crate::shared_enums::TextType::Css)
|
||||||
}
|
}
|
||||||
Some(a) if a == OsStr::new("js") => {
|
|
||||||
ContentType::Text(crate::shared_enums::TextType::Javascript)
|
|
||||||
}
|
|
||||||
Some(_) | None => {
|
Some(_) | None => {
|
||||||
return Err(io::Error::new(
|
return Err(io::Error::new(
|
||||||
io::ErrorKind::InvalidInput,
|
io::ErrorKind::InvalidInput,
|
||||||
|
|
|
||||||
|
|
@ -3,21 +3,14 @@ use crate::{
|
||||||
response::Response,
|
response::Response,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use tokio::{
|
|
||||||
io::{self, AsyncReadExt, AsyncWriteExt},
|
|
||||||
net::tcp::{OwnedReadHalf, OwnedWriteHalf},
|
|
||||||
};
|
|
||||||
|
|
||||||
use base64::prelude::*;
|
use base64::prelude::*;
|
||||||
use sha1::{Digest, Sha1};
|
use sha1::{Digest, Sha1};
|
||||||
|
|
||||||
pub struct WebsocketRead {
|
pub struct WebsocketConnection {
|
||||||
read: OwnedReadHalf,
|
stream: TcpStream,
|
||||||
}
|
|
||||||
|
|
||||||
pub struct WebsocketWrite {
|
|
||||||
write: OwnedWriteHalf,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct DataBlock {
|
struct DataBlock {
|
||||||
|
|
@ -48,10 +41,11 @@ pub enum FrameType {
|
||||||
OtherNonControl(u8),
|
OtherNonControl(u8),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WebsocketWrite {
|
impl WebsocketConnection {
|
||||||
pub async fn send_message(&mut self, frame_type: FrameType, data: &[u8]) -> io::Result<()> {
|
pub async fn send_message(&mut self, frame_type: FrameType, data: &[u8]) -> io::Result<()> {
|
||||||
let mut header = Vec::with_capacity(14);
|
let mut header = Vec::with_capacity(14); // Max header size for 127-length payload
|
||||||
|
|
||||||
|
// First byte: FIN (1) + RSV1-3 (000) + opcode
|
||||||
let opcode = match frame_type {
|
let opcode = match frame_type {
|
||||||
FrameType::TextFrame => 0x1,
|
FrameType::TextFrame => 0x1,
|
||||||
FrameType::BinaryFrame => 0x2,
|
FrameType::BinaryFrame => 0x2,
|
||||||
|
|
@ -62,6 +56,7 @@ impl WebsocketWrite {
|
||||||
};
|
};
|
||||||
header.push(0b1000_0000 | opcode); // FIN = 1
|
header.push(0b1000_0000 | opcode); // FIN = 1
|
||||||
|
|
||||||
|
// Second byte: MASK bit = 0 (server -> client frames are NOT masked)
|
||||||
let payload_len = data.len();
|
let payload_len = data.len();
|
||||||
if payload_len < 126 {
|
if payload_len < 126 {
|
||||||
header.push(payload_len as u8);
|
header.push(payload_len as u8);
|
||||||
|
|
@ -73,15 +68,14 @@ impl WebsocketWrite {
|
||||||
header.extend_from_slice(&(payload_len as u64).to_be_bytes());
|
header.extend_from_slice(&(payload_len as u64).to_be_bytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
self.write.write_all(&header).await?;
|
// Send header + payload
|
||||||
self.write.write_all(data).await?;
|
self.stream.write_all(&header).await?;
|
||||||
self.write.flush().await?;
|
self.stream.write_all(data).await?;
|
||||||
|
self.stream.flush().await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl WebsocketRead {
|
|
||||||
pub async fn read_next_message(&mut self) -> io::Result<DataFrame> {
|
pub async fn read_next_message(&mut self) -> io::Result<DataFrame> {
|
||||||
let first_line = self.parse_single_block().await?;
|
let first_line = self.parse_single_block().await?;
|
||||||
|
|
||||||
|
|
@ -120,7 +114,7 @@ impl WebsocketRead {
|
||||||
|
|
||||||
async fn parse_single_block(&mut self) -> io::Result<DataBlock> {
|
async fn parse_single_block(&mut self) -> io::Result<DataBlock> {
|
||||||
let mut first_line: [u8; 2] = [0; 2];
|
let mut first_line: [u8; 2] = [0; 2];
|
||||||
self.read.read_exact(&mut first_line).await?;
|
self.stream.read_exact(&mut first_line).await?;
|
||||||
|
|
||||||
let get_bool = |index: u8, byte: u8| -> bool { byte & (1 << index) != 0 };
|
let get_bool = |index: u8, byte: u8| -> bool { byte & (1 << index) != 0 };
|
||||||
|
|
||||||
|
|
@ -146,15 +140,19 @@ impl WebsocketRead {
|
||||||
let mask = get_bool(7, first_line[1]);
|
let mask = get_bool(7, first_line[1]);
|
||||||
|
|
||||||
let length = match first_line[1] & 0b01111111 {
|
let length = match first_line[1] & 0b01111111 {
|
||||||
126 => self.read.read_u16().await? as u64,
|
126 => self.stream.read_u16().await? as u64,
|
||||||
127 => self.read.read_u64().await?,
|
127 => self.stream.read_u64().await?,
|
||||||
other => other as u64,
|
other => other as u64,
|
||||||
};
|
};
|
||||||
|
|
||||||
let masking_key = if mask { self.read.read_u32().await? } else { 0 };
|
let masking_key = if mask {
|
||||||
|
self.stream.read_u32().await?
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
};
|
||||||
|
|
||||||
let mut message_data = vec![0u8; length as usize];
|
let mut message_data = vec![0u8; length as usize];
|
||||||
self.read.read_exact(&mut message_data).await?;
|
self.stream.read_exact(&mut message_data).await?;
|
||||||
|
|
||||||
if mask {
|
if mask {
|
||||||
Self::unmask_block(&mut message_data, masking_key);
|
Self::unmask_block(&mut message_data, masking_key);
|
||||||
|
|
@ -169,11 +167,11 @@ impl WebsocketRead {
|
||||||
data: message_data,
|
data: message_data,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
|
||||||
pub async fn initialize_connection(
|
pub async fn initialize_connection(
|
||||||
req: Request,
|
req: Request,
|
||||||
mut stream: TcpStream,
|
mut stream: TcpStream,
|
||||||
) -> tokio::io::Result<(WebsocketRead, WebsocketWrite)> {
|
) -> tokio::io::Result<Self> {
|
||||||
let (mut upgrade, mut connection, mut key_exists) = (false, false, false);
|
let (mut upgrade, mut connection, mut key_exists) = (false, false, false);
|
||||||
let mut key_val: Box<str> = "".into();
|
let mut key_val: Box<str> = "".into();
|
||||||
|
|
||||||
|
|
@ -229,12 +227,7 @@ pub async fn initialize_connection(
|
||||||
});
|
});
|
||||||
rep.respond(&mut stream).await?;
|
rep.respond(&mut stream).await?;
|
||||||
|
|
||||||
let (read_halve, write_halve) = stream.into_split();
|
Ok(Self { stream })
|
||||||
|
|
||||||
Ok((
|
|
||||||
WebsocketRead { read: read_halve },
|
|
||||||
WebsocketWrite { write: write_halve },
|
|
||||||
))
|
|
||||||
} else {
|
} else {
|
||||||
Response::new()
|
Response::new()
|
||||||
.with_code(crate::response::ResponseCode::BadRequest)
|
.with_code(crate::response::ResponseCode::BadRequest)
|
||||||
|
|
@ -244,3 +237,4 @@ pub async fn initialize_connection(
|
||||||
Err(io::Error::new(io::ErrorKind::InvalidData, "Wrong request"))
|
Err(io::Error::new(io::ErrorKind::InvalidData, "Wrong request"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue