websocket not working

This commit is contained in:
maxstrb 2025-11-04 18:02:59 +01:00
parent 46ad5b3bf7
commit 964c90d2f2
5 changed files with 136 additions and 54 deletions

View file

@ -4,6 +4,17 @@
<link rel="stylesheet" href="/public/index.css"> <link rel="stylesheet" href="/public/index.css">
<meta charset="UTF-8"/> <meta charset="UTF-8"/>
<title>Hello World!</title> <title>Hello World!</title>
<script>
const socket = new WebSocket("ws://localhost:8080/websocket");
socket.addEventListener("open", (event) => {
socket.send("Hello Server!");
});
socket.addEventListener("message", (event) => {
console.log("Message from server ", event.data);
});
</script>
</head> </head>
<body> <body>

View file

@ -33,7 +33,6 @@ async fn handle_connection(stream: TcpStream) -> tokio::io::Result<()> {
Ok(()) Ok(())
} }
async fn handle_http_connection( async fn handle_http_connection(
mut stream: TcpStream, mut stream: TcpStream,
) -> tokio::io::Result<Option<websoket_connection::WebsocketConnection>> { ) -> tokio::io::Result<Option<websoket_connection::WebsocketConnection>> {
@ -46,8 +45,8 @@ async fn handle_http_connection(
.await .await
{ {
Ok(Ok(r)) => r, Ok(Ok(r)) => r,
Ok(Err(_)) => { Ok(Err(e)) => {
println!("Wrong request"); println!("Wrong request: {e}");
break; break;
} }
Err(_) => { Err(_) => {
@ -55,37 +54,52 @@ async fn handle_http_connection(
break; break;
} }
}; };
println!("{req:?}"); println!("{req:?}");
let response = match req.path.to_matchable().as_slice() { // DEBUG: Print the path matching
let matchable = req.path.to_matchable();
println!("Path matchable: {:?}", matchable);
println!("Path as slice: {:?}", matchable.as_slice());
let response = match matchable.as_slice() {
["public", file] => { ["public", file] => {
println!("Matched public file: {}", file);
match Response::from_file(Path::new(format!("./public/{file}").as_str())) { match Response::from_file(Path::new(format!("./public/{file}").as_str())) {
Ok(resp) => resp, Ok(resp) => resp,
Err(_) => Response::new().with_code(ResponseCode::NotFound), Err(_) => Response::new().with_code(ResponseCode::NotFound),
} }
} }
["websocket"] => { ["websocket"] => {
return Ok(Some( println!("WebSocket path matched!");
WebsocketConnection::initialize_connection(req, stream).await?, println!("Initializing WebSocket connection...");
)); match WebsocketConnection::initialize_connection(req, stream).await {
Ok(ws) => {
println!("WebSocket connection established successfully!");
return Ok(Some(ws));
} }
[] => Response::new() Err(e) => {
println!("WebSocket initialization failed: {}", e);
return Err(e);
}
}
}
[] => {
println!("Matched root path, redirecting");
Response::new()
.with_code(ResponseCode::PermanentRedirect) .with_code(ResponseCode::PermanentRedirect)
.with_header(ResponseHeader::Connection(Connection::KeepAlive)) .with_header(ResponseHeader::Connection(Connection::KeepAlive))
.with_header(ResponseHeader::Location( .with_header(ResponseHeader::Location(
ServerPath::from_str("/public/index.html").unwrap(), ServerPath::from_str("/public/index.html").unwrap(),
)), ))
}
_ => Response::new().with_code(ResponseCode::NotFound), other => {
println!("No match, path was: {:?}", other);
Response::new().with_code(ResponseCode::NotFound)
}
}; };
response.respond(&mut stream).await?; response.respond(&mut stream).await?;
stream.flush().await?; stream.flush().await?;
timeout = 5000; timeout = 5000;
if req.headers.contains(&request::RequestHeader::Connection( if req.headers.contains(&request::RequestHeader::Connection(
request::Connection::Close, request::Connection::Close,
)) { )) {
@ -93,10 +107,13 @@ async fn handle_http_connection(
break; break;
} }
} }
Ok(None) Ok(None)
} }
async fn handle_websocket(mut web_socket: WebsocketConnection) -> tokio::io::Result<()> { async fn handle_websocket(mut web_socket: WebsocketConnection) -> tokio::io::Result<()> {
todo!() loop {
let message = web_socket.read_next_message().await?;
println!("{:?}", message.data);
}
} }

View file

@ -1,6 +1,6 @@
use std::str::FromStr; use std::str::FromStr;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader, Take}; use tokio::io::{self, AsyncBufReadExt, AsyncReadExt, BufReader, Take};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use crate::shared_enums::Content; use crate::shared_enums::Content;
@ -51,35 +51,64 @@ impl Connection {
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub struct Upgrade { pub struct Upgrade {
pub protocol: Protocol, pub protocol: Protocol,
pub version: Box<str>, pub version: Option<Box<str>>,
} }
impl Upgrade { impl Upgrade {
pub fn to_str(&self) -> Box<str> { pub fn to_str(&self) -> Box<str> {
match self.version.as_ref() { match self.version.as_ref() {
"" => self.protocol.to_str().into(), None => self.protocol.to_str().into(),
_ => format!("{}/{}", self.protocol.to_str(), self.version).into(), Some(version) => format!("{}/{}", self.protocol.to_str(), version).into(),
} }
} }
} }
impl FromStr for Upgrade { impl FromStr for Upgrade {
type Err = tokio::io::Error; type Err = tokio::io::Error;
fn from_str(_s: &str) -> Result<Self, Self::Err> { fn from_str(s: &str) -> Result<Self, Self::Err> {
todo!() println!("{s}");
match s.split('/').collect::<Vec<&str>>().as_slice() {
[protocol, version] => Ok(Self {
protocol: Protocol::from_str(protocol)?,
version: Some((*version).into()),
}),
[protocol] => Ok(Self {
protocol: Protocol::from_str(protocol)?,
version: None,
}),
_ => Err(tokio::io::Error::new(
io::ErrorKind::InvalidData,
"invalid upgrade error",
)),
}
} }
} }
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum Protocol { pub enum Protocol {
HTTP, Http,
Websocket, Websocket,
} }
impl FromStr for Protocol {
type Err = tokio::io::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"http" => Ok(Self::Http),
"websocket" => Ok(Self::Websocket),
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid protocol",
)),
}
}
}
impl Protocol { impl Protocol {
pub fn to_str(&self) -> &'static str { pub fn to_str(&self) -> &'static str {
match self { match self {
Self::HTTP => "HTTP", Self::Http => "HTTP",
Self::Websocket => "websocket", Self::Websocket => "websocket",
} }
} }

View file

@ -224,6 +224,7 @@ impl FromStr for ContentType {
["application", "json"] => Ok(ContentType::Aplication(ApplicationType::Json)), ["application", "json"] => Ok(ContentType::Aplication(ApplicationType::Json)),
["application", "xhtml+xml"] => Ok(ContentType::Aplication(ApplicationType::XhtmlXml)), ["application", "xhtml+xml"] => Ok(ContentType::Aplication(ApplicationType::XhtmlXml)),
["application", "xml"] => Ok(ContentType::Aplication(ApplicationType::Xml)), ["application", "xml"] => Ok(ContentType::Aplication(ApplicationType::Xml)),
["application", "signed-exchange"] => Ok(ContentType::Aplication(ApplicationType::Any)),
["application", "*"] => Ok(ContentType::Aplication(ApplicationType::Any)), ["application", "*"] => Ok(ContentType::Aplication(ApplicationType::Any)),
["image", "png"] => Ok(ContentType::Image(Image::Png)), ["image", "png"] => Ok(ContentType::Image(Image::Png)),

View file

@ -21,21 +21,16 @@ struct DataBlock {
message_type: FrameType, message_type: FrameType,
is_masked: bool,
length: u64,
mask_key: Option<u32>,
data: Vec<u8>, data: Vec<u8>,
} }
struct DataFrame { pub struct DataFrame {
frame_type: FrameType, pub frame_type: FrameType,
data: Box<[u8]>, pub data: Box<[u8]>,
} }
enum FrameType { #[derive(PartialEq, Eq)]
pub enum FrameType {
Continuation, Continuation,
TextFrame, TextFrame,
BinaryFrame, BinaryFrame,
@ -52,12 +47,28 @@ impl WebsocketConnection {
} }
pub async fn read_next_message(&mut self) -> io::Result<DataFrame> { pub async fn read_next_message(&mut self) -> io::Result<DataFrame> {
let mut data;
let frame_type;
{
let first_line = self.parse_single_block().await?; let first_line = self.parse_single_block().await?;
data = first_line.data;
frame_type = first_line.message_type; println!("Block read");
let mut data = first_line.data;
let frame_type = first_line.message_type;
if !first_line.is_final {
let mut current_line = self.parse_single_block().await?;
while !current_line.is_final {
if current_line.message_type != FrameType::Continuation {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"That is not how websocket works!!!",
));
}
data.extend_from_slice(&current_line.data);
current_line = self.parse_single_block().await?;
}
data.extend_from_slice(&current_line.data);
} }
Ok(DataFrame { Ok(DataFrame {
@ -66,6 +77,13 @@ impl WebsocketConnection {
}) })
} }
fn unmask_block(data: &mut [u8], mask: u32) {
let mask_bytes = mask.to_be_bytes();
for (i, e) in data.iter_mut().enumerate() {
*e ^= mask_bytes[i % 4];
}
}
async fn parse_single_block(&mut self) -> io::Result<DataBlock> { async fn parse_single_block(&mut self) -> io::Result<DataBlock> {
let mut first_line: [u8; 2] = [0; 2]; let mut first_line: [u8; 2] = [0; 2];
self.stream.read_exact(&mut first_line).await?; self.stream.read_exact(&mut first_line).await?;
@ -100,24 +118,24 @@ impl WebsocketConnection {
}; };
let masking_key = if mask { let masking_key = if mask {
Some(self.stream.read_u32().await?) self.stream.read_u32().await?
} else { } else {
None 0
}; };
let mut message_data = Vec::<u8>::with_capacity(length as usize); let mut message_data = vec![0u8; length as usize];
self.stream.read_exact(&mut message_data).await?; self.stream.read_exact(&mut message_data).await?;
if mask {
Self::unmask_block(&mut message_data, masking_key);
}
Ok(DataBlock { Ok(DataBlock {
is_final, is_final,
e1: extension_bit_1, e1: extension_bit_1,
e2: extension_bit_2, e2: extension_bit_2,
e3: extension_bit_3, e3: extension_bit_3,
message_type, message_type,
is_masked: mask,
length,
mask_key: masking_key,
data: message_data, data: message_data,
}) })
} }
@ -141,6 +159,10 @@ impl WebsocketConnection {
RequestHeader::Connection(con) => { RequestHeader::Connection(con) => {
if con == Connection::Upgrade { if con == Connection::Upgrade {
connection = true; connection = true;
} else if let Connection::Other(c) = con
&& c.contains("Upgrade")
{
connection = true;
} }
} }
RequestHeader::Other { name, value } => { RequestHeader::Other { name, value } => {
@ -166,7 +188,7 @@ impl WebsocketConnection {
.with_code(crate::response::ResponseCode::SwitchingProtocols) .with_code(crate::response::ResponseCode::SwitchingProtocols)
.with_header(crate::response::ResponseHeader::Upgrade(Upgrade { .with_header(crate::response::ResponseHeader::Upgrade(Upgrade {
protocol: Protocol::Websocket, protocol: Protocol::Websocket,
version: "".into(), version: None,
})) }))
.with_header(crate::response::ResponseHeader::Connection( .with_header(crate::response::ResponseHeader::Connection(
Connection::Upgrade, Connection::Upgrade,
@ -178,6 +200,8 @@ impl WebsocketConnection {
.respond(&mut stream) .respond(&mut stream)
.await?; .await?;
stream.flush().await?;
Ok(Self { stream }) Ok(Self { stream })
} else { } else {
Response::new() Response::new()