From 1893eb059902935e54e0b589d4d61f3a96ec5c91 Mon Sep 17 00:00:00 2001 From: maxstrb Date: Tue, 4 Nov 2025 22:41:01 +0100 Subject: [PATCH] websocket and data: --- main.rs | 0 src/main.rs | 56 ++++++++++++++------------------------ src/request.rs | 2 -- src/response.rs | 12 ++++++-- src/websoket_connection.rs | 45 +++++++++++++++++++++++------- 5 files changed, 65 insertions(+), 50 deletions(-) create mode 100644 main.rs diff --git a/main.rs b/main.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/main.rs b/src/main.rs index 0eca6ad..003a02b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,7 +10,7 @@ use tokio::io::AsyncWriteExt; use tokio::net::{TcpListener, TcpStream}; use tokio::time; -use crate::websoket_connection::WebsocketConnection; +use crate::websoket_connection::{FrameType, WebsocketConnection}; use crate::{ request::{Connection, ServerPath}, response::{Response, ResponseCode, ResponseHeader}, @@ -50,52 +50,34 @@ async fn handle_http_connection( break; } Err(_) => { - println!("Timed out"); break; } }; - println!("{req:?}"); - // DEBUG: Print the path matching let matchable = req.path.to_matchable(); - println!("Path matchable: {:?}", matchable); - println!("Path as slice: {:?}", matchable.as_slice()); let response = match matchable.as_slice() { ["public", file] => { - println!("Matched public file: {}", file); match Response::from_file(Path::new(format!("./public/{file}").as_str())) { Ok(resp) => resp, Err(_) => Response::new().with_code(ResponseCode::NotFound), } } - ["websocket"] => { - println!("WebSocket path matched!"); - println!("Initializing WebSocket connection..."); - match WebsocketConnection::initialize_connection(req, stream).await { - Ok(ws) => { - println!("WebSocket connection established successfully!"); - return Ok(Some(ws)); - } - Err(e) => { - println!("WebSocket initialization failed: {}", e); - return Err(e); - } + ["websocket"] => match WebsocketConnection::initialize_connection(req, stream).await { + Ok(ws) => { + return Ok(Some(ws)); } - } - [] => { - println!("Matched root path, redirecting"); - Response::new() - .with_code(ResponseCode::PermanentRedirect) - .with_header(ResponseHeader::Connection(Connection::KeepAlive)) - .with_header(ResponseHeader::Location( - ServerPath::from_str("/public/index.html").unwrap(), - )) - } - other => { - println!("No match, path was: {:?}", other); - Response::new().with_code(ResponseCode::NotFound) - } + Err(e) => { + return Err(e); + } + }, + [] => Response::new() + .with_code(ResponseCode::PermanentRedirect) + .with_header(ResponseHeader::Connection(Connection::KeepAlive)) + .with_header(ResponseHeader::Location( + ServerPath::from_str("/public/index.html").unwrap(), + )), + _ => Response::new().with_code(ResponseCode::NotFound), }; response.respond(&mut stream).await?; stream.flush().await?; @@ -103,7 +85,6 @@ async fn handle_http_connection( if req.headers.contains(&request::RequestHeader::Connection( request::Connection::Close, )) { - println!("Connection closed"); break; } } @@ -114,6 +95,11 @@ async fn handle_websocket(mut web_socket: WebsocketConnection) -> tokio::io::Res loop { let message = web_socket.read_next_message().await?; - println!("{:?}", message.data); + if message.frame_type == FrameType::TextFrame { + println!("{}", String::from_utf8_lossy(&message.data)); + web_socket + .send_message(FrameType::TextFrame, "message_received".as_bytes()) + .await?; + } } } diff --git a/src/request.rs b/src/request.rs index eaa5bc6..cdc7884 100644 --- a/src/request.rs +++ b/src/request.rs @@ -66,8 +66,6 @@ impl Upgrade { impl FromStr for Upgrade { type Err = tokio::io::Error; fn from_str(s: &str) -> Result { - println!("{s}"); - match s.split('/').collect::>().as_slice() { [protocol, version] => Ok(Self { protocol: Protocol::from_str(protocol)?, diff --git a/src/response.rs b/src/response.rs index e3d9780..7cb2fc6 100644 --- a/src/response.rs +++ b/src/response.rs @@ -8,6 +8,7 @@ use crate::{ use tokio::io::{self, AsyncWriteExt}; use tokio::net::TcpStream; +#[derive(Debug)] pub struct Response { http_version: Box, code: ResponseCode, @@ -19,22 +20,24 @@ impl Response { pub async fn respond(self, stream: &mut TcpStream) -> Result<(), io::Error> { let binding = self.to_str(); let mut output = binding.as_bytes().to_vec(); - output.extend_from_slice(b"\r\n"); if !self.data.is_empty() { output.extend_from_slice(format!("Content-Length: {}", self.data.len()).as_bytes()); output.extend_from_slice(b"\r\n\r\n"); output.extend_from_slice(&self.data); + } else { + output.extend_from_slice(b"\r\n"); } stream.write_all(output.as_slice()).await?; + stream.flush().await?; Ok(()) } - fn to_str(&self) -> Box { + pub fn to_str(&self) -> Box { format!( - "{} {}\r\n{}", + "{} {}\r\n{}\r\n", self.http_version, self.code.to_code(), self.headers @@ -111,6 +114,7 @@ impl Response { } } +#[derive(Debug)] pub enum ResponseCode { Continue, SwitchingProtocols, @@ -244,6 +248,7 @@ impl ResponseCode { } } +#[derive(Debug)] pub enum ResponseHeader { ContentType(Content), CacheControl(CacheControl), @@ -273,6 +278,7 @@ impl ResponseHeader { } } +#[derive(Debug)] pub enum CacheControl { NoStore, } diff --git a/src/websoket_connection.rs b/src/websoket_connection.rs index b98da90..de2be4e 100644 --- a/src/websoket_connection.rs +++ b/src/websoket_connection.rs @@ -42,15 +42,43 @@ pub enum FrameType { } impl WebsocketConnection { - pub async fn send_message(&self) -> io::Result<()> { - todo!() + pub async fn send_message(&mut self, frame_type: FrameType, data: &[u8]) -> io::Result<()> { + let mut header = Vec::with_capacity(14); // Max header size for 127-length payload + + // First byte: FIN (1) + RSV1-3 (000) + opcode + let opcode = match frame_type { + FrameType::TextFrame => 0x1, + FrameType::BinaryFrame => 0x2, + FrameType::Ping => 0x9, + FrameType::Pong => 0xA, + FrameType::ConnectionClose => 0x8, + _ => panic!("No other type should by passed to this function"), + }; + header.push(0b1000_0000 | opcode); // FIN = 1 + + // Second byte: MASK bit = 0 (server -> client frames are NOT masked) + let payload_len = data.len(); + if payload_len < 126 { + header.push(payload_len as u8); + } else if payload_len <= u16::MAX as usize { + header.push(126); + header.extend_from_slice(&(payload_len as u16).to_be_bytes()); + } else { + header.push(127); + header.extend_from_slice(&(payload_len as u64).to_be_bytes()); + } + + // Send header + payload + self.stream.write_all(&header).await?; + self.stream.write_all(data).await?; + self.stream.flush().await?; + + Ok(()) } pub async fn read_next_message(&mut self) -> io::Result { let first_line = self.parse_single_block().await?; - println!("Block read"); - let mut data = first_line.data; let frame_type = first_line.message_type; @@ -184,7 +212,7 @@ impl WebsocketConnection { let result = hasher.finalize(); let result = BASE64_STANDARD.encode(result); - Response::new() + let rep = Response::new() .with_code(crate::response::ResponseCode::SwitchingProtocols) .with_header(crate::response::ResponseHeader::Upgrade(Upgrade { protocol: Protocol::Websocket, @@ -196,11 +224,8 @@ impl WebsocketConnection { .with_header(crate::response::ResponseHeader::Other { header_name: "Sec-WebSocket-Accept".into(), header_value: result.into(), - }) - .respond(&mut stream) - .await?; - - stream.flush().await?; + }); + rep.respond(&mut stream).await?; Ok(Self { stream }) } else {