From 1644b6de3919603aca3da24a6b2d74c43f1453cb Mon Sep 17 00:00:00 2001 From: unx0 <0x@unx0.cc> Date: Sat, 28 Feb 2026 17:49:40 +0800 Subject: [PATCH 01/11] implement UDP packet mode endpoint --- examples/tun.rs | 44 ++++++++++++++ examples/tun_wintun.rs | 44 ++++++++++++++ src/lib.rs | 119 ++++++++++++++++++++++++++++++++++++- src/stream/mod.rs | 6 +- src/stream/udp.rs | 131 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 340 insertions(+), 4 deletions(-) diff --git a/examples/tun.rs b/examples/tun.rs index 37364da..ed276e2 100644 --- a/examples/tun.rs +++ b/examples/tun.rs @@ -159,6 +159,50 @@ async fn main() -> Result<(), Box> { log::info!("#{number2} UDP closed, session count {c}"); }); } + + IpStackStream::UdpEdp(mut endpoint) => { + let c = count.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; + let number2 = number; + log::info!("#{number2} UDP Packet Endpoint starting, session count {c}"); + + tokio::spawn(async move { + loop { + tokio::select! { + res = endpoint.recv() => { + match res { + Some((_src_addr, _dst_addr, _payload)) => { + + + } + None => { + log::info!("#{number2} UDP Packet Endpoint 底层通道已关闭"); + break; + } + } + } + // res = app.readpacket() => { + // match res { + // Ok(Some((remote_player_addr, my_local_addr, payload))) => { + // log::trace!("#{number2} [down] {} -> {} ({} bytes)", remote_player_addr, my_local_addr, payload.len()); + + // + // if let Err(e) = endpoint.send(remote_player_addr, my_local_addr, payload) { + // log::warn!("#{number2} faild to send packet: {}", e); + // } + // } + // Ok(None) | Err(_) => { + // + // break; + // } + // } + // } + + } + } + let c = count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed) - 1; + log::info!("#{number2} UDP Packet Endpoint closed, session count {c}"); + }); + } IpStackStream::UnknownTransport(u) => { let n = number; if u.src_addr().is_ipv4() && u.ip_protocol() == IpNumber::ICMP { diff --git a/examples/tun_wintun.rs b/examples/tun_wintun.rs index 4d6649a..eba8ff0 100644 --- a/examples/tun_wintun.rs +++ b/examples/tun_wintun.rs @@ -85,6 +85,50 @@ async fn main() -> Result<(), Box> { println!("==== end UDP connection ===="); }); } + IpStackStream::UdpEdp(mut endpoint) => { + + tokio::spawn(async move { + loop { + tokio::select! { + res = endpoint.recv() => { + match res { + Some((_src_addr, _dst_addr, _payload)) => { + + + } + None => { + log::info!(" UDP Packet Endpoint the channel have been shutdown"); + break; + } + } + } + // res = app.readpacket() => { + // match res { + // Ok(Some((remote_player_addr, my_local_addr, payload))) => { + // log::trace!("#{number2} [down] {} -> {} ({} bytes)", remote_player_addr, my_local_addr, payload.len()); + + // + // if let Err(e) = endpoint.send(remote_player_addr, my_local_addr, payload) { + // log::warn!("#{number2} faild to send packet: {}", e); + // } + // } + // Ok(None) | Err(_) => { + // + // break; + // } + // } + // } + + } + } + + + }); + } + + + + IpStackStream::UnknownTransport(u) => { if u.src_addr().is_ipv4() && u.ip_protocol() == IpNumber::ICMP { let (icmp_header, req_payload) = Icmpv4Header::from_slice(u.payload())?; diff --git a/src/lib.rs b/src/lib.rs index e67072a..53a5e2a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,10 @@ use tokio::{ sync::mpsc::{self, UnboundedReceiver, UnboundedSender}, task::JoinHandle, }; +use std::net::SocketAddr; + +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{SystemTime, UNIX_EPOCH}; pub(crate) type PacketSender = UnboundedSender; pub(crate) type PacketReceiver = UnboundedReceiver; @@ -19,9 +23,10 @@ mod packet; mod stream; pub use self::error::{IpStackError, Result}; -pub use self::stream::{IpStackStream, IpStackTcpStream, IpStackUdpStream, IpStackUnknownTransport}; +pub use self::stream::{IpStackStream, IpStackTcpStream, IpStackUdpStream, IpStackUnknownTransport, IpStackUdpPacketEndpoint}; pub use self::stream::{TcpConfig, TcpOptions}; -pub use etherparse::IpNumber; +pub use etherparse::IpNumber; + #[cfg(unix)] const TTL: u8 = 64; @@ -75,6 +80,8 @@ pub struct IpStackConfig { /// Timeout for UDP connections. /// Default is 30 seconds. pub udp_timeout: Duration, + + pub udp_packet_mode: bool, } impl Default for IpStackConfig { @@ -84,6 +91,7 @@ impl Default for IpStackConfig { packet_information: false, tcp_config: Arc::new(TcpConfig::default()), udp_timeout: Duration::from_secs(30), + udp_packet_mode: false, } } } @@ -177,6 +185,12 @@ impl IpStackConfig { self.packet_information = packet_information; self } + + /// Enable or disable UDP packet mode. + pub fn udp_packet_mode(&mut self, udp_packet_mode: bool) -> &mut Self { + self.udp_packet_mode = udp_packet_mode; + self + } } /// The main IP stack instance. @@ -310,7 +324,14 @@ fn run( accept_sender: UnboundedSender, ) -> JoinHandle> { let mut sessions: SessionCollection = AHashMap::new(); + //UDPendpoints + let mut packet_endpoints: AHashMap< + SocketAddr, + (mpsc::UnboundedSender<(SocketAddr, SocketAddr, Vec)>, Arc) + > = AHashMap::new(); let (session_remove_tx, mut session_remove_rx) = mpsc::unbounded_channel::(); + //udp endpoints rm channel + let (edp_remove_tx, mut edp_remove_rx) = mpsc::unbounded_channel::(); let pi = config.packet_information; let offset = if pi && cfg!(unix) { 4 } else { 0 }; let mut buffer = vec![0_u8; config.mtu as usize + offset]; @@ -320,7 +341,7 @@ fn run( loop { select! { Ok(n) = device.read(&mut buffer) => { - if let Err(e) = process_device_read(&buffer[offset..n], &mut sessions, &session_remove_tx, &up_pkt_sender, &config, &accept_sender).await { + if let Err(e) = process_device_read(&buffer[offset..n], &mut sessions, &session_remove_tx,&edp_remove_tx, &up_pkt_sender, &config, &accept_sender, &mut packet_endpoints).await { let io_err: std::io::Error = e.into(); if io_err.kind() == std::io::ErrorKind::ConnectionRefused { log::trace!("Received junk data: {io_err}"); @@ -329,6 +350,11 @@ fn run( } } } + //udp endpoint remove + Some(src_addr) = edp_remove_rx.recv() => { + packet_endpoints.remove(&src_addr); + log::debug!("Packet endpoint destroyed and removed: {}", src_addr); + } Some(network_tuple) = session_remove_rx.recv() => { sessions.remove(&network_tuple); log::debug!("session destroyed: {network_tuple}"); @@ -345,9 +371,11 @@ async fn process_device_read( data: &[u8], sessions: &mut SessionCollection, session_remove_tx: &UnboundedSender, + edp_remove_tx: &tokio::sync::mpsc::UnboundedSender, up_pkt_sender: &PacketSender, config: &IpStackConfig, accept_sender: &UnboundedSender, + packet_endpoints: &mut AHashMap)>, Arc)>, ) -> Result<()> { let Ok(packet) = NetworkPacket::parse(data) else { let stream = IpStackStream::UnknownNetwork(data.to_owned()); @@ -368,6 +396,85 @@ async fn process_device_read( return Ok(()); } + + //UDP packet + if let TransportHeader::Udp(_udp_header) = packet.transport_header() { + if config.udp_packet_mode { + let src_addr = packet.src_addr(); + let dst_addr = packet.dst_addr(); + let payload = packet.payload.unwrap_or_default(); + + match packet_endpoints.entry(src_addr) { + + std::collections::hash_map::Entry::Occupied(entry) => { + let (tx, last_activity) = entry.get(); + last_activity.store(now_secs(), Ordering::Relaxed); + + if let Err(e) = tx.send((src_addr, dst_addr, payload)) { + log::warn!("Failed to send to packet endpoint: {}", e); + } + } + + + std::collections::hash_map::Entry::Vacant(entry) => { + + //announce to destroy the channel when timeout or application layer take out + let (destroy_tx, mut destroy_rx) = tokio::sync::oneshot::channel::<()>(); + + + let last_activity = Arc::new(AtomicU64::new(now_secs())); + let last_activity_clone = last_activity.clone(); + + let timeout_secs = config.udp_timeout.as_secs(); + + let edp_remove_tx_clone = edp_remove_tx.clone(); + let src_addr_clone = src_addr; + + + tokio::spawn(async move { + loop { + let elapsed = now_secs() - last_activity_clone.load(Ordering::Relaxed); + if elapsed >= timeout_secs { + log::info!("remove the channel of {} because not data in {} ", src_addr_clone, elapsed); + break; + } + + + let sleep_duration = std::time::Duration::from_secs(timeout_secs - elapsed); + + tokio::select! { + + //sleep until timeout + _ = tokio::time::sleep(sleep_duration) => {} + + // application layer take out + _ = &mut destroy_rx => { + log::debug!("application layer Endpoint:{} removed the channel", src_addr_clone); + break; + } + } + } + + + let _ = edp_remove_tx_clone.send(src_addr_clone); + }); + //ipstack to application layer channel + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + let endpoint = IpStackUdpPacketEndpoint::new( + rx, up_pkt_sender.clone(), src_addr, config.mtu, destroy_tx + ); + + accept_sender.send(IpStackStream::UdpEdp(endpoint)).unwrap(); + + entry.insert((tx.clone(), last_activity)); + tx.send((src_addr, dst_addr, payload)).unwrap(); + } + } + return Ok(()); + } + } + let network_tuple = packet.network_tuple(); match sessions.entry(network_tuple) { std::collections::hash_map::Entry::Occupied(entry) => { @@ -439,3 +546,9 @@ async fn process_upstream_recv( Ok(()) } + + +//time +fn now_secs() -> u64 { + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 777053f..f979925 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -4,7 +4,7 @@ pub use self::tcp::IpStackTcpStream; pub use self::tcp::{TcpConfig, TcpOptions}; pub use self::udp::IpStackUdpStream; pub use self::unknown::IpStackUnknownTransport; - +pub use self::udp::IpStackUdpPacketEndpoint; mod seqnum; mod tcb; mod tcp; @@ -27,6 +27,8 @@ pub enum IpStackStream { Tcp(IpStackTcpStream), /// A UDP stream. Udp(IpStackUdpStream), + /// UDP PACKET. + UdpEdp(IpStackUdpPacketEndpoint), /// A stream for unknown transport protocols. UnknownTransport(IpStackUnknownTransport), /// Raw network packets that couldn't be parsed. @@ -52,6 +54,7 @@ impl IpStackStream { match self { IpStackStream::Tcp(tcp) => tcp.local_addr(), IpStackStream::Udp(udp) => udp.local_addr(), + IpStackStream::UdpEdp(udp_edp) => udp_edp.local_addr(), IpStackStream::UnknownNetwork(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)), IpStackStream::UnknownTransport(unknown) => match unknown.src_addr() { IpAddr::V4(addr) => SocketAddr::V4(SocketAddrV4::new(addr, 0)), @@ -78,6 +81,7 @@ impl IpStackStream { match self { IpStackStream::Tcp(tcp) => tcp.peer_addr(), IpStackStream::Udp(udp) => udp.peer_addr(), + IpStackStream::UdpEdp(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)), IpStackStream::UnknownNetwork(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)), IpStackStream::UnknownTransport(unknown) => match unknown.dst_addr() { IpAddr::V4(addr) => SocketAddr::V4(SocketAddrV4::new(addr, 0)), diff --git a/src/stream/udp.rs b/src/stream/udp.rs index 87925b1..cb4e911 100644 --- a/src/stream/udp.rs +++ b/src/stream/udp.rs @@ -213,3 +213,134 @@ impl Drop for IpStackUdpStream { } } } + + + + + + + + + + + +pub struct IpStackUdpPacketEndpoint { + //receive from TUN: (src, dst, payload) + receiver: mpsc::UnboundedReceiver<(SocketAddr, SocketAddr, Vec)>, + + //send to TUN: raw packet + up_pkt_sender: crate::PacketSender, + + local_addr: SocketAddr, + + mtu: u16, + + _destroy_messenger: tokio::sync::oneshot::Sender<()>, +} + +impl IpStackUdpPacketEndpoint { + pub fn new( + receiver: mpsc::UnboundedReceiver<(SocketAddr, SocketAddr, Vec)>, + up_pkt_sender: crate::PacketSender, + local_addr: SocketAddr, + mtu: u16, + _destroy_messenger: tokio::sync::oneshot::Sender<()>, + ) -> Self { + Self { receiver, up_pkt_sender, local_addr, mtu, _destroy_messenger } + } + + /// recv from TUN: (src, dst, payload) + pub async fn recv(&mut self) -> Option<(SocketAddr, SocketAddr, Vec)> { + self.receiver.recv().await + } + + /// send to TUN: raw packet + pub fn send(&self, src: SocketAddr, dst: SocketAddr, payload: Vec) -> std::io::Result<()> { + let raw_packet = build_raw_udp_packet(src, dst, payload, self.mtu)?; + + self.up_pkt_sender.send(raw_packet).map_err(|_| { + std::io::Error::new(std::io::ErrorKind::BrokenPipe, "ipstack up_pkt_sender closed") + }) + } + + pub fn local_addr(&self) -> SocketAddr { + self.local_addr + } + +} + + + + + + + + + + + + + + + + + +pub fn build_raw_udp_packet( + src_addr: SocketAddr, + dst_addr: SocketAddr, + mut payload: Vec, + mtu: u16, +) -> std::io::Result { + const UHS: usize = 8; + let ttl = 64; + + match (src_addr.ip(), dst_addr.ip()) { + (std::net::IpAddr::V4(src), std::net::IpAddr::V4(dst)) => { + + let mut ip_h = Ipv4Header::new(0, ttl, IpNumber::UDP, src.octets(), dst.octets()) + .map_err(IpStackError::from)?; + + + let line_buffer = mtu.saturating_sub((ip_h.header_len() + UHS) as u16); + payload.truncate(line_buffer as usize); + + + ip_h.set_payload_len(payload.len() + UHS).map_err(IpStackError::from)?; + + + let udp_header = UdpHeader::with_ipv4_checksum(src_addr.port(), dst_addr.port(), &ip_h, &payload) + .map_err(IpStackError::from)?; + + Ok(NetworkPacket { + ip: IpHeader::Ipv4(ip_h), + transport: TransportHeader::Udp(udp_header), + payload: Some(payload), + }) + } + (std::net::IpAddr::V6(src), std::net::IpAddr::V6(dst)) => { + + let mut ip_h = Ipv6Header { + traffic_class: 0, + flow_label: Ipv6FlowLabel::ZERO, + payload_length: 0, + next_header: IpNumber::UDP, + hop_limit: ttl, + source: src.octets(), + destination: dst.octets(), + }; + let line_buffer = mtu.saturating_sub((ip_h.header_len() + UHS) as u16); + payload.truncate(line_buffer as usize); + ip_h.payload_length = (payload.len() + UHS) as u16; + let udp_header = UdpHeader::with_ipv6_checksum(src_addr.port(), dst_addr.port(), &ip_h, &payload) + .map_err(IpStackError::from)?; + + Ok(NetworkPacket { + ip: IpHeader::Ipv6(ip_h), + transport: TransportHeader::Udp(udp_header), + payload: Some(payload), + }) + } + _ => Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "IP version mismatch")), + } +} + From 6a9a2a063ac1a1198ab8fb3203b9dca081981517 Mon Sep 17 00:00:00 2001 From: unx0 <0x@unx0.cc> Date: Sun, 1 Mar 2026 15:52:29 +0800 Subject: [PATCH 02/11] format code with cargo fmt --- examples/tun.rs | 12 +++--- examples/tun_wintun.rs | 19 +++------ src/lib.rs | 54 ++++++++++---------------- src/stream/mod.rs | 4 +- src/stream/udp.rs | 87 +++++++++++++----------------------------- 5 files changed, 61 insertions(+), 115 deletions(-) diff --git a/examples/tun.rs b/examples/tun.rs index ed276e2..d0f58b3 100644 --- a/examples/tun.rs +++ b/examples/tun.rs @@ -171,12 +171,12 @@ async fn main() -> Result<(), Box> { res = endpoint.recv() => { match res { Some((_src_addr, _dst_addr, _payload)) => { - + } None => { log::info!("#{number2} UDP Packet Endpoint 底层通道已关闭"); - break; + break; } } } @@ -184,15 +184,15 @@ async fn main() -> Result<(), Box> { // match res { // Ok(Some((remote_player_addr, my_local_addr, payload))) => { // log::trace!("#{number2} [down] {} -> {} ({} bytes)", remote_player_addr, my_local_addr, payload.len()); - - // + + // // if let Err(e) = endpoint.send(remote_player_addr, my_local_addr, payload) { // log::warn!("#{number2} faild to send packet: {}", e); // } // } // Ok(None) | Err(_) => { - // - // break; + // + // break; // } // } // } diff --git a/examples/tun_wintun.rs b/examples/tun_wintun.rs index eba8ff0..55aa101 100644 --- a/examples/tun_wintun.rs +++ b/examples/tun_wintun.rs @@ -86,19 +86,17 @@ async fn main() -> Result<(), Box> { }); } IpStackStream::UdpEdp(mut endpoint) => { - tokio::spawn(async move { loop { tokio::select! { res = endpoint.recv() => { match res { Some((_src_addr, _dst_addr, _payload)) => { - - + //your logic to process the packet } None => { log::info!(" UDP Packet Endpoint the channel have been shutdown"); - break; + break; } } } @@ -106,29 +104,24 @@ async fn main() -> Result<(), Box> { // match res { // Ok(Some((remote_player_addr, my_local_addr, payload))) => { // log::trace!("#{number2} [down] {} -> {} ({} bytes)", remote_player_addr, my_local_addr, payload.len()); - - // + + // // if let Err(e) = endpoint.send(remote_player_addr, my_local_addr, payload) { // log::warn!("#{number2} faild to send packet: {}", e); // } // } // Ok(None) | Err(_) => { - // - // break; + // + // break; // } // } // } } } - - }); } - - - IpStackStream::UnknownTransport(u) => { if u.src_addr().is_ipv4() && u.ip_protocol() == IpNumber::ICMP { let (icmp_header, req_payload) = Icmpv4Header::from_slice(u.payload())?; diff --git a/src/lib.rs b/src/lib.rs index 53a5e2a..2785976 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ use ahash::AHashMap; use packet::{NetworkPacket, NetworkTuple, TransportHeader}; +use std::net::SocketAddr; use std::{sync::Arc, time::Duration}; use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, @@ -9,7 +10,6 @@ use tokio::{ sync::mpsc::{self, UnboundedReceiver, UnboundedSender}, task::JoinHandle, }; -use std::net::SocketAddr; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; @@ -23,10 +23,9 @@ mod packet; mod stream; pub use self::error::{IpStackError, Result}; -pub use self::stream::{IpStackStream, IpStackTcpStream, IpStackUdpStream, IpStackUnknownTransport, IpStackUdpPacketEndpoint}; +pub use self::stream::{IpStackStream, IpStackTcpStream, IpStackUdpPacketEndpoint, IpStackUdpStream, IpStackUnknownTransport}; pub use self::stream::{TcpConfig, TcpOptions}; -pub use etherparse::IpNumber; - +pub use etherparse::IpNumber; #[cfg(unix)] const TTL: u8 = 64; @@ -325,10 +324,8 @@ fn run( ) -> JoinHandle> { let mut sessions: SessionCollection = AHashMap::new(); //UDPendpoints - let mut packet_endpoints: AHashMap< - SocketAddr, - (mpsc::UnboundedSender<(SocketAddr, SocketAddr, Vec)>, Arc) - > = AHashMap::new(); + let mut packet_endpoints: AHashMap)>, Arc)> = + AHashMap::new(); let (session_remove_tx, mut session_remove_rx) = mpsc::unbounded_channel::(); //udp endpoints rm channel let (edp_remove_tx, mut edp_remove_rx) = mpsc::unbounded_channel::(); @@ -396,16 +393,14 @@ async fn process_device_read( return Ok(()); } - - //UDP packet + //UDP packet if let TransportHeader::Udp(_udp_header) = packet.transport_header() { if config.udp_packet_mode { - let src_addr = packet.src_addr(); - let dst_addr = packet.dst_addr(); + let src_addr = packet.src_addr(); + let dst_addr = packet.dst_addr(); let payload = packet.payload.unwrap_or_default(); match packet_endpoints.entry(src_addr) { - std::collections::hash_map::Entry::Occupied(entry) => { let (tx, last_activity) = entry.get(); last_activity.store(now_secs(), Ordering::Relaxed); @@ -414,31 +409,26 @@ async fn process_device_read( log::warn!("Failed to send to packet endpoint: {}", e); } } - std::collections::hash_map::Entry::Vacant(entry) => { - //announce to destroy the channel when timeout or application layer take out let (destroy_tx, mut destroy_rx) = tokio::sync::oneshot::channel::<()>(); - - + let last_activity = Arc::new(AtomicU64::new(now_secs())); let last_activity_clone = last_activity.clone(); - - let timeout_secs = config.udp_timeout.as_secs(); - + + let timeout_secs = config.udp_timeout.as_secs(); + let edp_remove_tx_clone = edp_remove_tx.clone(); let src_addr_clone = src_addr; - tokio::spawn(async move { loop { let elapsed = now_secs() - last_activity_clone.load(Ordering::Relaxed); if elapsed >= timeout_secs { log::info!("remove the channel of {} because not data in {} ", src_addr_clone, elapsed); - break; + break; } - let sleep_duration = std::time::Duration::from_secs(timeout_secs - elapsed); @@ -446,7 +436,7 @@ async fn process_device_read( //sleep until timeout _ = tokio::time::sleep(sleep_duration) => {} - + // application layer take out _ = &mut destroy_rx => { log::debug!("application layer Endpoint:{} removed the channel", src_addr_clone); @@ -454,24 +444,21 @@ async fn process_device_read( } } } - - + let _ = edp_remove_tx_clone.send(src_addr_clone); }); //ipstack to application layer channel let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - - let endpoint = IpStackUdpPacketEndpoint::new( - rx, up_pkt_sender.clone(), src_addr, config.mtu, destroy_tx - ); - + + let endpoint = IpStackUdpPacketEndpoint::new(rx, up_pkt_sender.clone(), src_addr, config.mtu, destroy_tx); + accept_sender.send(IpStackStream::UdpEdp(endpoint)).unwrap(); - + entry.insert((tx.clone(), last_activity)); tx.send((src_addr, dst_addr, payload)).unwrap(); } } - return Ok(()); + return Ok(()); } } @@ -547,7 +534,6 @@ async fn process_upstream_recv( Ok(()) } - //time fn now_secs() -> u64 { SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() diff --git a/src/stream/mod.rs b/src/stream/mod.rs index f979925..891dfab 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -2,9 +2,9 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; pub use self::tcp::IpStackTcpStream; pub use self::tcp::{TcpConfig, TcpOptions}; +pub use self::udp::IpStackUdpPacketEndpoint; pub use self::udp::IpStackUdpStream; pub use self::unknown::IpStackUnknownTransport; -pub use self::udp::IpStackUdpPacketEndpoint; mod seqnum; mod tcb; mod tcp; @@ -27,7 +27,7 @@ pub enum IpStackStream { Tcp(IpStackTcpStream), /// A UDP stream. Udp(IpStackUdpStream), - /// UDP PACKET. + /// UDP PACKET. UdpEdp(IpStackUdpPacketEndpoint), /// A stream for unknown transport protocols. UnknownTransport(IpStackUnknownTransport), diff --git a/src/stream/udp.rs b/src/stream/udp.rs index cb4e911..216c0a9 100644 --- a/src/stream/udp.rs +++ b/src/stream/udp.rs @@ -214,25 +214,15 @@ impl Drop for IpStackUdpStream { } } - - - - - - - - - - pub struct IpStackUdpPacketEndpoint { //receive from TUN: (src, dst, payload) receiver: mpsc::UnboundedReceiver<(SocketAddr, SocketAddr, Vec)>, - + //send to TUN: raw packet up_pkt_sender: crate::PacketSender, - local_addr: SocketAddr, - + local_addr: SocketAddr, + mtu: u16, _destroy_messenger: tokio::sync::oneshot::Sender<()>, @@ -246,7 +236,13 @@ impl IpStackUdpPacketEndpoint { mtu: u16, _destroy_messenger: tokio::sync::oneshot::Sender<()>, ) -> Self { - Self { receiver, up_pkt_sender, local_addr, mtu, _destroy_messenger } + Self { + receiver, + up_pkt_sender, + local_addr, + mtu, + _destroy_messenger, + } } /// recv from TUN: (src, dst, payload) @@ -257,60 +253,33 @@ impl IpStackUdpPacketEndpoint { /// send to TUN: raw packet pub fn send(&self, src: SocketAddr, dst: SocketAddr, payload: Vec) -> std::io::Result<()> { let raw_packet = build_raw_udp_packet(src, dst, payload, self.mtu)?; - - self.up_pkt_sender.send(raw_packet).map_err(|_| { - std::io::Error::new(std::io::ErrorKind::BrokenPipe, "ipstack up_pkt_sender closed") - }) + + self.up_pkt_sender + .send(raw_packet) + .map_err(|_| std::io::Error::new(std::io::ErrorKind::BrokenPipe, "ipstack up_pkt_sender closed")) } pub fn local_addr(&self) -> SocketAddr { self.local_addr } - } - - - - - - - - - - - - - - - - -pub fn build_raw_udp_packet( - src_addr: SocketAddr, - dst_addr: SocketAddr, - mut payload: Vec, - mtu: u16, -) -> std::io::Result { - const UHS: usize = 8; - let ttl = 64; +pub fn build_raw_udp_packet(src_addr: SocketAddr, dst_addr: SocketAddr, mut payload: Vec, mtu: u16) -> std::io::Result { + const UHS: usize = 8; + let ttl = 64; match (src_addr.ip(), dst_addr.ip()) { (std::net::IpAddr::V4(src), std::net::IpAddr::V4(dst)) => { - - let mut ip_h = Ipv4Header::new(0, ttl, IpNumber::UDP, src.octets(), dst.octets()) - .map_err(IpStackError::from)?; - - + let mut ip_h = Ipv4Header::new(0, ttl, IpNumber::UDP, src.octets(), dst.octets()).map_err(IpStackError::from)?; + let line_buffer = mtu.saturating_sub((ip_h.header_len() + UHS) as u16); payload.truncate(line_buffer as usize); - - + ip_h.set_payload_len(payload.len() + UHS).map_err(IpStackError::from)?; - - - let udp_header = UdpHeader::with_ipv4_checksum(src_addr.port(), dst_addr.port(), &ip_h, &payload) - .map_err(IpStackError::from)?; - + + let udp_header = + UdpHeader::with_ipv4_checksum(src_addr.port(), dst_addr.port(), &ip_h, &payload).map_err(IpStackError::from)?; + Ok(NetworkPacket { ip: IpHeader::Ipv4(ip_h), transport: TransportHeader::Udp(udp_header), @@ -318,7 +287,6 @@ pub fn build_raw_udp_packet( }) } (std::net::IpAddr::V6(src), std::net::IpAddr::V6(dst)) => { - let mut ip_h = Ipv6Header { traffic_class: 0, flow_label: Ipv6FlowLabel::ZERO, @@ -331,9 +299,9 @@ pub fn build_raw_udp_packet( let line_buffer = mtu.saturating_sub((ip_h.header_len() + UHS) as u16); payload.truncate(line_buffer as usize); ip_h.payload_length = (payload.len() + UHS) as u16; - let udp_header = UdpHeader::with_ipv6_checksum(src_addr.port(), dst_addr.port(), &ip_h, &payload) - .map_err(IpStackError::from)?; - + let udp_header = + UdpHeader::with_ipv6_checksum(src_addr.port(), dst_addr.port(), &ip_h, &payload).map_err(IpStackError::from)?; + Ok(NetworkPacket { ip: IpHeader::Ipv6(ip_h), transport: TransportHeader::Udp(udp_header), @@ -343,4 +311,3 @@ pub fn build_raw_udp_packet( _ => Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "IP version mismatch")), } } - From c13c127e340cb3dac988851dcf57abf85b374352 Mon Sep 17 00:00:00 2001 From: Uxx0 <154650582+Uxx0@users.noreply.github.com> Date: Mon, 2 Mar 2026 13:29:41 +0800 Subject: [PATCH 03/11] Update src/lib.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/lib.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 2785976..e634aeb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -452,10 +452,14 @@ async fn process_device_read( let endpoint = IpStackUdpPacketEndpoint::new(rx, up_pkt_sender.clone(), src_addr, config.mtu, destroy_tx); - accept_sender.send(IpStackStream::UdpEdp(endpoint)).unwrap(); + accept_sender + .send(IpStackStream::UdpEdp(endpoint)) + .map_err(std::io::Error::other)?; entry.insert((tx.clone(), last_activity)); - tx.send((src_addr, dst_addr, payload)).unwrap(); + if let Err(e) = tx.send((src_addr, dst_addr, payload)) { + log::warn!("Failed to send to packet endpoint: {}", e); + } } } return Ok(()); From 51328fa3b2cb6b9dc536e9d898f3afe4336cfdf5 Mon Sep 17 00:00:00 2001 From: Uxx0 <154650582+Uxx0@users.noreply.github.com> Date: Mon, 2 Mar 2026 13:30:56 +0800 Subject: [PATCH 04/11] Update src/lib.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/lib.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e634aeb..92b1540 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -401,12 +401,15 @@ async fn process_device_read( let payload = packet.payload.unwrap_or_default(); match packet_endpoints.entry(src_addr) { - std::collections::hash_map::Entry::Occupied(entry) => { + std::collections::hash_map::Entry::Occupied(mut entry) => { let (tx, last_activity) = entry.get(); - last_activity.store(now_secs(), Ordering::Relaxed); if let Err(e) = tx.send((src_addr, dst_addr, payload)) { - log::warn!("Failed to send to packet endpoint: {}", e); + log::warn!("Failed to send to packet endpoint for {}: {}", src_addr, e); + // Receiver was dropped; remove stale endpoint so a new one can be created. + entry.remove(); + } else { + last_activity.store(now_secs(), Ordering::Relaxed); } } From 3d05550f2ddd9257c1c4e84737d27a6f773561df Mon Sep 17 00:00:00 2001 From: Uxx0 <154650582+Uxx0@users.noreply.github.com> Date: Mon, 2 Mar 2026 13:33:16 +0800 Subject: [PATCH 05/11] Update src/lib.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/lib.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 92b1540..c2c79b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -543,5 +543,8 @@ async fn process_upstream_recv( //time fn now_secs() -> u64 { - SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_else(|_| Duration::from_secs(0)) + .as_secs() } From 77f0261a85af5753041e87d8e1fa66dca256a529 Mon Sep 17 00:00:00 2001 From: Uxx0 <154650582+Uxx0@users.noreply.github.com> Date: Mon, 2 Mar 2026 13:35:56 +0800 Subject: [PATCH 06/11] Update src/lib.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index c2c79b7..0dc6b43 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -429,7 +429,7 @@ async fn process_device_read( loop { let elapsed = now_secs() - last_activity_clone.load(Ordering::Relaxed); if elapsed >= timeout_secs { - log::info!("remove the channel of {} because not data in {} ", src_addr_clone, elapsed); + log::info!("removing channel for {} because no data for {}s", src_addr_clone, elapsed); break; } From 6f59da927273cb190bd9432ed6913bf520bd4f0e Mon Sep 17 00:00:00 2001 From: Uxx0 <154650582+Uxx0@users.noreply.github.com> Date: Mon, 2 Mar 2026 13:37:18 +0800 Subject: [PATCH 07/11] Update examples/tun_wintun.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- examples/tun_wintun.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/tun_wintun.rs b/examples/tun_wintun.rs index 55aa101..c90dfdf 100644 --- a/examples/tun_wintun.rs +++ b/examples/tun_wintun.rs @@ -95,7 +95,7 @@ async fn main() -> Result<(), Box> { //your logic to process the packet } None => { - log::info!(" UDP Packet Endpoint the channel have been shutdown"); + log::info!("UDP packet endpoint: the channel has been shut down"); break; } } From e2b5208d731f3a534774c1219c93c82c79f429a2 Mon Sep 17 00:00:00 2001 From: Uxx0 <154650582+Uxx0@users.noreply.github.com> Date: Mon, 2 Mar 2026 13:39:41 +0800 Subject: [PATCH 08/11] Update src/lib.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 0dc6b43..66002eb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -80,6 +80,9 @@ pub struct IpStackConfig { /// Default is 30 seconds. pub udp_timeout: Duration, + /// When enabled, UDP datagrams are accepted as packet endpoints + /// (`IpStackUdpPacketEndpoint`) instead of `IpStackUdpStream` connections. + /// Default is `false`. pub udp_packet_mode: bool, } From ea826cf52b3d1c600fcf5732e3a99d1b12d9af6f Mon Sep 17 00:00:00 2001 From: Uxx0 <154650582+Uxx0@users.noreply.github.com> Date: Mon, 2 Mar 2026 13:40:12 +0800 Subject: [PATCH 09/11] Update examples/tun_wintun.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- examples/tun_wintun.rs | 37 ------------------------------------- 1 file changed, 37 deletions(-) diff --git a/examples/tun_wintun.rs b/examples/tun_wintun.rs index c90dfdf..4d6649a 100644 --- a/examples/tun_wintun.rs +++ b/examples/tun_wintun.rs @@ -85,43 +85,6 @@ async fn main() -> Result<(), Box> { println!("==== end UDP connection ===="); }); } - IpStackStream::UdpEdp(mut endpoint) => { - tokio::spawn(async move { - loop { - tokio::select! { - res = endpoint.recv() => { - match res { - Some((_src_addr, _dst_addr, _payload)) => { - //your logic to process the packet - } - None => { - log::info!("UDP packet endpoint: the channel has been shut down"); - break; - } - } - } - // res = app.readpacket() => { - // match res { - // Ok(Some((remote_player_addr, my_local_addr, payload))) => { - // log::trace!("#{number2} [down] {} -> {} ({} bytes)", remote_player_addr, my_local_addr, payload.len()); - - // - // if let Err(e) = endpoint.send(remote_player_addr, my_local_addr, payload) { - // log::warn!("#{number2} faild to send packet: {}", e); - // } - // } - // Ok(None) | Err(_) => { - // - // break; - // } - // } - // } - - } - } - }); - } - IpStackStream::UnknownTransport(u) => { if u.src_addr().is_ipv4() && u.ip_protocol() == IpNumber::ICMP { let (icmp_header, req_payload) = Icmpv4Header::from_slice(u.payload())?; From 9cc591b4735bcec9ec7ad3f8b4eecc85d71dcabc Mon Sep 17 00:00:00 2001 From: unx0 <0x@unx0.cc> Date: Mon, 2 Mar 2026 16:50:50 +0800 Subject: [PATCH 10/11] refactor: use feature flag for udp packet mode --- Cargo.toml | 3 + examples/tun.rs | 5 +- src/lib.rs | 181 ++++++++++++++++++++++++++-------------------- src/stream/mod.rs | 15 +++- src/stream/udp.rs | 7 +- 5 files changed, 123 insertions(+), 88 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fc3384c..cca7f95 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,9 @@ wintun = { version = "0.5", default-features = false } [build-dependencies] serde_json = "1" +[features] +udp_packet = [] + [profile.release] opt-level = 's' # Optimize for size (with loop vectorization enabled). lto = true # Enable Link Time Optimization diff --git a/examples/tun.rs b/examples/tun.rs index d0f58b3..5126259 100644 --- a/examples/tun.rs +++ b/examples/tun.rs @@ -136,6 +136,7 @@ async fn main() -> Result<(), Box> { log::info!("#{number1} TCP closed, session count {c}"); }); } + #[cfg(not(feature = "udp_packet"))] IpStackStream::Udp(mut udp) => { let mut s = match UdpStream::connect(server_addr).await { Ok(s) => s, @@ -159,8 +160,8 @@ async fn main() -> Result<(), Box> { log::info!("#{number2} UDP closed, session count {c}"); }); } - - IpStackStream::UdpEdp(mut endpoint) => { + #[cfg(feature = "udp_packet")] + IpStackStream::Udp(mut endpoint) => { let c = count.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; let number2 = number; log::info!("#{number2} UDP Packet Endpoint starting, session count {c}"); diff --git a/src/lib.rs b/src/lib.rs index 66002eb..42da852 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,7 @@ use ahash::AHashMap; use packet::{NetworkPacket, NetworkTuple, TransportHeader}; -use std::net::SocketAddr; + use std::{sync::Arc, time::Duration}; use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, @@ -11,9 +11,6 @@ use tokio::{ task::JoinHandle, }; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::{SystemTime, UNIX_EPOCH}; - pub(crate) type PacketSender = UnboundedSender; pub(crate) type PacketReceiver = UnboundedReceiver; pub(crate) type SessionCollection = AHashMap; @@ -23,10 +20,19 @@ mod packet; mod stream; pub use self::error::{IpStackError, Result}; -pub use self::stream::{IpStackStream, IpStackTcpStream, IpStackUdpPacketEndpoint, IpStackUdpStream, IpStackUnknownTransport}; +pub use self::stream::{IpStackStream, IpStackTcpStream, IpStackUdpStream, IpStackUnknownTransport}; pub use self::stream::{TcpConfig, TcpOptions}; pub use etherparse::IpNumber; +#[cfg(feature = "udp_packet")] +pub use self::stream::IpStackUdpPacketEndpoint; +#[cfg(feature = "udp_packet")] +use std::net::SocketAddr; +#[cfg(feature = "udp_packet")] +use std::sync::atomic::{AtomicU64, Ordering}; +#[cfg(feature = "udp_packet")] +use std::time::{SystemTime, UNIX_EPOCH}; + #[cfg(unix)] const TTL: u8 = 64; @@ -79,11 +85,6 @@ pub struct IpStackConfig { /// Timeout for UDP connections. /// Default is 30 seconds. pub udp_timeout: Duration, - - /// When enabled, UDP datagrams are accepted as packet endpoints - /// (`IpStackUdpPacketEndpoint`) instead of `IpStackUdpStream` connections. - /// Default is `false`. - pub udp_packet_mode: bool, } impl Default for IpStackConfig { @@ -93,7 +94,6 @@ impl Default for IpStackConfig { packet_information: false, tcp_config: Arc::new(TcpConfig::default()), udp_timeout: Duration::from_secs(30), - udp_packet_mode: false, } } } @@ -187,12 +187,6 @@ impl IpStackConfig { self.packet_information = packet_information; self } - - /// Enable or disable UDP packet mode. - pub fn udp_packet_mode(&mut self, udp_packet_mode: bool) -> &mut Self { - self.udp_packet_mode = udp_packet_mode; - self - } } /// The main IP stack instance. @@ -327,21 +321,23 @@ fn run( ) -> JoinHandle> { let mut sessions: SessionCollection = AHashMap::new(); //UDPendpoints + #[cfg(feature = "udp_packet")] let mut packet_endpoints: AHashMap)>, Arc)> = AHashMap::new(); let (session_remove_tx, mut session_remove_rx) = mpsc::unbounded_channel::(); //udp endpoints rm channel + #[cfg(feature = "udp_packet")] let (edp_remove_tx, mut edp_remove_rx) = mpsc::unbounded_channel::(); let pi = config.packet_information; let offset = if pi && cfg!(unix) { 4 } else { 0 }; let mut buffer = vec![0_u8; config.mtu as usize + offset]; let (up_pkt_sender, mut up_pkt_receiver) = mpsc::unbounded_channel::(); - - tokio::spawn(async move { + #[cfg(feature = "udp_packet")] + return tokio::spawn(async move { loop { select! { Ok(n) = device.read(&mut buffer) => { - if let Err(e) = process_device_read(&buffer[offset..n], &mut sessions, &session_remove_tx,&edp_remove_tx, &up_pkt_sender, &config, &accept_sender, &mut packet_endpoints).await { + if let Err(e) = process_device_read(&buffer[offset..n], &mut sessions, &session_remove_tx, &edp_remove_tx, &up_pkt_sender, &config, &accept_sender, &mut packet_endpoints).await { let io_err: std::io::Error = e.into(); if io_err.kind() == std::io::ErrorKind::ConnectionRefused { log::trace!("Received junk data: {io_err}"); @@ -349,33 +345,59 @@ fn run( log::warn!("process_device_read error: {io_err}"); } } - } - //udp endpoint remove + }, Some(src_addr) = edp_remove_rx.recv() => { packet_endpoints.remove(&src_addr); log::debug!("Packet endpoint destroyed and removed: {}", src_addr); - } + }, Some(network_tuple) = session_remove_rx.recv() => { sessions.remove(&network_tuple); log::debug!("session destroyed: {network_tuple}"); + }, + Some(packet) = up_pkt_receiver.recv() => { + process_upstream_recv(packet, &mut device, #[cfg(unix)]pi).await?; } + } + } + }); + #[cfg(not(feature = "udp_packet"))] + return tokio::spawn(async move { + loop { + select! { + Ok(n) = device.read(&mut buffer) => { + if let Err(e) = process_device_read(&buffer[offset..n], &mut sessions, &session_remove_tx, &up_pkt_sender, &config, &accept_sender).await { + let io_err: std::io::Error = e.into(); + if io_err.kind() == std::io::ErrorKind::ConnectionRefused { + log::trace!("Received junk data: {io_err}"); + } else { + log::warn!("process_device_read error: {io_err}"); + } + } + }, + Some(network_tuple) = session_remove_rx.recv() => { + sessions.remove(&network_tuple); + log::debug!("session destroyed: {network_tuple}"); + }, Some(packet) = up_pkt_receiver.recv() => { process_upstream_recv(packet, &mut device, #[cfg(unix)]pi).await?; } } } - }) + }); } async fn process_device_read( data: &[u8], sessions: &mut SessionCollection, session_remove_tx: &UnboundedSender, - edp_remove_tx: &tokio::sync::mpsc::UnboundedSender, + #[cfg(feature = "udp_packet")] edp_remove_tx: &tokio::sync::mpsc::UnboundedSender, up_pkt_sender: &PacketSender, config: &IpStackConfig, accept_sender: &UnboundedSender, - packet_endpoints: &mut AHashMap)>, Arc)>, + #[cfg(feature = "udp_packet")] packet_endpoints: &mut AHashMap< + SocketAddr, + (mpsc::UnboundedSender<(SocketAddr, SocketAddr, Vec)>, Arc), + >, ) -> Result<()> { let Ok(packet) = NetworkPacket::parse(data) else { let stream = IpStackStream::UnknownNetwork(data.to_owned()); @@ -397,79 +419,76 @@ async fn process_device_read( } //UDP packet + #[cfg(feature = "udp_packet")] if let TransportHeader::Udp(_udp_header) = packet.transport_header() { - if config.udp_packet_mode { - let src_addr = packet.src_addr(); - let dst_addr = packet.dst_addr(); - let payload = packet.payload.unwrap_or_default(); - - match packet_endpoints.entry(src_addr) { - std::collections::hash_map::Entry::Occupied(mut entry) => { - let (tx, last_activity) = entry.get(); - - if let Err(e) = tx.send((src_addr, dst_addr, payload)) { - log::warn!("Failed to send to packet endpoint for {}: {}", src_addr, e); - // Receiver was dropped; remove stale endpoint so a new one can be created. - entry.remove(); - } else { - last_activity.store(now_secs(), Ordering::Relaxed); - } + let src_addr = packet.src_addr(); + let dst_addr = packet.dst_addr(); + let payload = packet.payload.unwrap_or_default(); + + match packet_endpoints.entry(src_addr) { + std::collections::hash_map::Entry::Occupied(entry) => { + let (tx, last_activity) = entry.get(); + + if let Err(e) = tx.send((src_addr, dst_addr, payload)) { + log::warn!("Failed to send to packet endpoint for {}: {}", src_addr, e); + // Receiver was dropped; remove stale endpoint so a new one can be created. + entry.remove(); + } else { + last_activity.store(now_secs(), Ordering::Relaxed); } + } - std::collections::hash_map::Entry::Vacant(entry) => { - //announce to destroy the channel when timeout or application layer take out - let (destroy_tx, mut destroy_rx) = tokio::sync::oneshot::channel::<()>(); + std::collections::hash_map::Entry::Vacant(entry) => { + //announce to destroy the channel when timeout or application layer take out + let (destroy_tx, mut destroy_rx) = tokio::sync::oneshot::channel::<()>(); - let last_activity = Arc::new(AtomicU64::new(now_secs())); - let last_activity_clone = last_activity.clone(); + let last_activity = Arc::new(AtomicU64::new(now_secs())); + let last_activity_clone = last_activity.clone(); - let timeout_secs = config.udp_timeout.as_secs(); + let timeout_secs = config.udp_timeout.as_secs(); - let edp_remove_tx_clone = edp_remove_tx.clone(); - let src_addr_clone = src_addr; + let edp_remove_tx_clone = edp_remove_tx.clone(); + let src_addr_clone = src_addr; - tokio::spawn(async move { - loop { - let elapsed = now_secs() - last_activity_clone.load(Ordering::Relaxed); - if elapsed >= timeout_secs { - log::info!("removing channel for {} because no data for {}s", src_addr_clone, elapsed); - break; - } + tokio::spawn(async move { + loop { + let elapsed = now_secs() - last_activity_clone.load(Ordering::Relaxed); + if elapsed >= timeout_secs { + log::info!("removing channel for {} because no data for {}s", src_addr_clone, elapsed); + break; + } - let sleep_duration = std::time::Duration::from_secs(timeout_secs - elapsed); + let sleep_duration = std::time::Duration::from_secs(timeout_secs - elapsed); - tokio::select! { + tokio::select! { - //sleep until timeout - _ = tokio::time::sleep(sleep_duration) => {} + //sleep until timeout + _ = tokio::time::sleep(sleep_duration) => {} - // application layer take out - _ = &mut destroy_rx => { - log::debug!("application layer Endpoint:{} removed the channel", src_addr_clone); - break; - } + // application layer take out + _ = &mut destroy_rx => { + log::debug!("application layer Endpoint:{} removed the channel", src_addr_clone); + break; } } + } - let _ = edp_remove_tx_clone.send(src_addr_clone); - }); - //ipstack to application layer channel - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let _ = edp_remove_tx_clone.send(src_addr_clone); + }); + //ipstack to application layer channel + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let endpoint = IpStackUdpPacketEndpoint::new(rx, up_pkt_sender.clone(), src_addr, config.mtu, destroy_tx); + let endpoint = IpStackUdpPacketEndpoint::new(rx, up_pkt_sender.clone(), src_addr, config.mtu, destroy_tx); - accept_sender - .send(IpStackStream::UdpEdp(endpoint)) - .map_err(std::io::Error::other)?; + accept_sender.send(IpStackStream::Udp(endpoint)).map_err(std::io::Error::other)?; - entry.insert((tx.clone(), last_activity)); - if let Err(e) = tx.send((src_addr, dst_addr, payload)) { - log::warn!("Failed to send to packet endpoint: {}", e); - } + entry.insert((tx.clone(), last_activity)); + if let Err(e) = tx.send((src_addr, dst_addr, payload)) { + log::warn!("Failed to send to packet endpoint: {}", e); } } - return Ok(()); } + return Ok(()); } let network_tuple = packet.network_tuple(); @@ -511,11 +530,14 @@ fn create_stream( let stream = IpStackTcpStream::new(src_addr, dst_addr, h.clone(), up_pkt_sender, cfg.mtu, msgr, cfg.tcp_config.clone())?; Ok(IpStackStream::Tcp(stream)) } + #[cfg(not(feature = "udp_packet"))] TransportHeader::Udp(_) => { let payload = packet.payload.unwrap_or_default(); let stream = IpStackUdpStream::new(src_addr, dst_addr, payload, up_pkt_sender, cfg.mtu, cfg.udp_timeout, msgr); Ok(IpStackStream::Udp(stream)) } + #[cfg(feature = "udp_packet")] + TransportHeader::Udp(_) => Err(IpStackError::UnsupportedTransportProtocol), TransportHeader::Unknown => Err(IpStackError::UnsupportedTransportProtocol), } } @@ -545,6 +567,7 @@ async fn process_upstream_recv( } //time +#[cfg(feature = "udp_packet")] fn now_secs() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 891dfab..d54b5fd 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -2,7 +2,9 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; pub use self::tcp::IpStackTcpStream; pub use self::tcp::{TcpConfig, TcpOptions}; +#[cfg(feature = "udp_packet")] pub use self::udp::IpStackUdpPacketEndpoint; + pub use self::udp::IpStackUdpStream; pub use self::unknown::IpStackUnknownTransport; mod seqnum; @@ -26,9 +28,11 @@ pub enum IpStackStream { /// A TCP connection stream. Tcp(IpStackTcpStream), /// A UDP stream. + #[cfg(not(feature = "udp_packet"))] Udp(IpStackUdpStream), /// UDP PACKET. - UdpEdp(IpStackUdpPacketEndpoint), + #[cfg(feature = "udp_packet")] + Udp(IpStackUdpPacketEndpoint), /// A stream for unknown transport protocols. UnknownTransport(IpStackUnknownTransport), /// Raw network packets that couldn't be parsed. @@ -53,8 +57,10 @@ impl IpStackStream { pub fn local_addr(&self) -> SocketAddr { match self { IpStackStream::Tcp(tcp) => tcp.local_addr(), + #[cfg(not(feature = "udp_packet"))] IpStackStream::Udp(udp) => udp.local_addr(), - IpStackStream::UdpEdp(udp_edp) => udp_edp.local_addr(), + #[cfg(feature = "udp_packet")] + IpStackStream::Udp(udp_edp) => udp_edp.local_addr(), IpStackStream::UnknownNetwork(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)), IpStackStream::UnknownTransport(unknown) => match unknown.src_addr() { IpAddr::V4(addr) => SocketAddr::V4(SocketAddrV4::new(addr, 0)), @@ -80,8 +86,10 @@ impl IpStackStream { pub fn peer_addr(&self) -> SocketAddr { match self { IpStackStream::Tcp(tcp) => tcp.peer_addr(), + #[cfg(not(feature = "udp_packet"))] IpStackStream::Udp(udp) => udp.peer_addr(), - IpStackStream::UdpEdp(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)), + #[cfg(feature = "udp_packet")] + IpStackStream::Udp(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)), IpStackStream::UnknownNetwork(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)), IpStackStream::UnknownTransport(unknown) => match unknown.dst_addr() { IpAddr::V4(addr) => SocketAddr::V4(SocketAddrV4::new(addr, 0)), @@ -93,6 +101,7 @@ impl IpStackStream { pub(crate) fn stream_sender(&self) -> Result { match self { IpStackStream::Tcp(tcp) => Ok(tcp.stream_sender()), + #[cfg(not(feature = "udp_packet"))] IpStackStream::Udp(udp) => Ok(udp.stream_sender()), _ => Err(std::io::Error::other("Unknown transport stream does not have a sender")), } diff --git a/src/stream/udp.rs b/src/stream/udp.rs index 216c0a9..5605337 100644 --- a/src/stream/udp.rs +++ b/src/stream/udp.rs @@ -79,7 +79,6 @@ impl IpStackUdpStream { pub(crate) fn stream_sender(&self) -> PacketSender { self.stream_sender.clone() } - fn create_rev_packet(&self, ttl: u8, mut payload: Vec) -> std::io::Result { const UHS: usize = 8; // udp header size is 8 match (self.dst_addr.ip(), self.src_addr.ip()) { @@ -213,7 +212,7 @@ impl Drop for IpStackUdpStream { } } } - +#[cfg(feature = "udp_packet")] pub struct IpStackUdpPacketEndpoint { //receive from TUN: (src, dst, payload) receiver: mpsc::UnboundedReceiver<(SocketAddr, SocketAddr, Vec)>, @@ -227,7 +226,7 @@ pub struct IpStackUdpPacketEndpoint { _destroy_messenger: tokio::sync::oneshot::Sender<()>, } - +#[cfg(feature = "udp_packet")] impl IpStackUdpPacketEndpoint { pub fn new( receiver: mpsc::UnboundedReceiver<(SocketAddr, SocketAddr, Vec)>, @@ -263,7 +262,7 @@ impl IpStackUdpPacketEndpoint { self.local_addr } } - +#[cfg(feature = "udp_packet")] pub fn build_raw_udp_packet(src_addr: SocketAddr, dst_addr: SocketAddr, mut payload: Vec, mtu: u16) -> std::io::Result { const UHS: usize = 8; let ttl = 64; From f58e0825719c81510f9f7b6c37b71d0f3215a134 Mon Sep 17 00:00:00 2001 From: unx0 <0x@unx0.cc> Date: Mon, 2 Mar 2026 17:15:53 +0800 Subject: [PATCH 11/11] style: resolve clippy warnings for dead code, complex types, and argument count --- src/lib.rs | 16 +++++++++------- src/stream/udp.rs | 3 ++- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 42da852..f8ab252 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -314,6 +314,12 @@ impl Drop for IpStack { } } +#[cfg(feature = "udp_packet")] +type UdpEdpSender = tokio::sync::mpsc::UnboundedSender<(std::net::SocketAddr, std::net::SocketAddr, Vec)>; + +#[cfg(feature = "udp_packet")] +type UdpEdpInfo = (UdpEdpSender, std::sync::Arc); + fn run( config: IpStackConfig, mut device: Device, @@ -322,8 +328,7 @@ fn run( let mut sessions: SessionCollection = AHashMap::new(); //UDPendpoints #[cfg(feature = "udp_packet")] - let mut packet_endpoints: AHashMap)>, Arc)> = - AHashMap::new(); + let mut packet_endpoints: AHashMap = AHashMap::new(); let (session_remove_tx, mut session_remove_rx) = mpsc::unbounded_channel::(); //udp endpoints rm channel #[cfg(feature = "udp_packet")] @@ -385,7 +390,7 @@ fn run( } }); } - +#[allow(clippy::too_many_arguments)] async fn process_device_read( data: &[u8], sessions: &mut SessionCollection, @@ -394,10 +399,7 @@ async fn process_device_read( up_pkt_sender: &PacketSender, config: &IpStackConfig, accept_sender: &UnboundedSender, - #[cfg(feature = "udp_packet")] packet_endpoints: &mut AHashMap< - SocketAddr, - (mpsc::UnboundedSender<(SocketAddr, SocketAddr, Vec)>, Arc), - >, + #[cfg(feature = "udp_packet")] packet_endpoints: &mut AHashMap, ) -> Result<()> { let Ok(packet) = NetworkPacket::parse(data) else { let stream = IpStackStream::UnknownNetwork(data.to_owned()); diff --git a/src/stream/udp.rs b/src/stream/udp.rs index 5605337..a3a0c44 100644 --- a/src/stream/udp.rs +++ b/src/stream/udp.rs @@ -40,6 +40,7 @@ use tokio::{ pub struct IpStackUdpStream { src_addr: SocketAddr, dst_addr: SocketAddr, + #[allow(dead_code)] stream_sender: PacketSender, stream_receiver: PacketReceiver, up_pkt_sender: PacketSender, @@ -75,7 +76,7 @@ impl IpStackUdpStream { destroy_messenger, } } - + #[allow(dead_code)] pub(crate) fn stream_sender(&self) -> PacketSender { self.stream_sender.clone() }