Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ wintun = { version = "0.5", default-features = false }
[build-dependencies]
serde_json = "1"

[features]
udp_packet = []

[profile.release]
opt-level = 's' # Optimize for size (with loop vectorization enabled).
lto = true # Enable Link Time Optimization
Expand Down
45 changes: 45 additions & 0 deletions examples/tun.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
log::info!("#{number1} TCP closed, session count {c}");
});
}
#[cfg(not(feature = "udp_packet"))]
IpStackStream::Udp(mut udp) => {
let mut s = match UdpStream::connect(server_addr).await {
Ok(s) => s,
Expand All @@ -159,6 +160,50 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
log::info!("#{number2} UDP closed, session count {c}");
});
}
#[cfg(feature = "udp_packet")]
IpStackStream::Udp(mut endpoint) => {
let c = count.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
let number2 = number;
log::info!("#{number2} UDP Packet Endpoint starting, session count {c}");

tokio::spawn(async move {
loop {
tokio::select! {
res = endpoint.recv() => {
match res {
Some((_src_addr, _dst_addr, _payload)) => {


}
None => {
log::info!("#{number2} UDP Packet Endpoint 底层通道已关闭");
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log line includes non-English text ("底层通道已关闭"), while the rest of the repository examples/logs are in English. Consider replacing it with an English message so the example output is consistent and understandable for all users.

Suggested change
log::info!("#{number2} UDP Packet Endpoint 底层通道已关闭");
log::info!("#{number2} UDP Packet Endpoint underlying channel has been closed");

Copilot uses AI. Check for mistakes.
break;
}
}
}
// res = app.readpacket() => {
// match res {
// Ok(Some((remote_player_addr, my_local_addr, payload))) => {
// log::trace!("#{number2} [down] {} -> {} ({} bytes)", remote_player_addr, my_local_addr, payload.len());

//
// if let Err(e) = endpoint.send(remote_player_addr, my_local_addr, payload) {
// log::warn!("#{number2} faild to send packet: {}", e);
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling/grammar in the (commented) log line: "faild" should be "failed" (and "shut down" is typically two words). Even though it’s commented out, it’s part of the example users will copy/paste.

Suggested change
// log::warn!("#{number2} faild to send packet: {}", e);
// log::warn!("#{number2} failed to send packet: {}", e);

Copilot uses AI. Check for mistakes.
// }
// }
// Ok(None) | Err(_) => {
//
// break;
// }
// }
// }

}
}
let c = count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed) - 1;
log::info!("#{number2} UDP Packet Endpoint closed, session count {c}");
});
}
IpStackStream::UnknownTransport(u) => {
let n = number;
if u.src_addr().is_ipv4() && u.ip_protocol() == IpNumber::ICMP {
Expand Down
149 changes: 143 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use ahash::AHashMap;
use packet::{NetworkPacket, NetworkTuple, TransportHeader};

use std::{sync::Arc, time::Duration};
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
Expand All @@ -23,6 +24,15 @@ pub use self::stream::{IpStackStream, IpStackTcpStream, IpStackUdpStream, IpStac
pub use self::stream::{TcpConfig, TcpOptions};
pub use etherparse::IpNumber;

#[cfg(feature = "udp_packet")]
pub use self::stream::IpStackUdpPacketEndpoint;
#[cfg(feature = "udp_packet")]
use std::net::SocketAddr;
#[cfg(feature = "udp_packet")]
use std::sync::atomic::{AtomicU64, Ordering};
#[cfg(feature = "udp_packet")]
use std::time::{SystemTime, UNIX_EPOCH};

#[cfg(unix)]
const TTL: u8 = 64;

Expand Down Expand Up @@ -304,50 +314,92 @@ impl Drop for IpStack {
}
}

#[cfg(feature = "udp_packet")]
type UdpEdpSender = tokio::sync::mpsc::UnboundedSender<(std::net::SocketAddr, std::net::SocketAddr, Vec<u8>)>;

#[cfg(feature = "udp_packet")]
type UdpEdpInfo = (UdpEdpSender, std::sync::Arc<std::sync::atomic::AtomicU64>);

fn run<Device: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
config: IpStackConfig,
mut device: Device,
accept_sender: UnboundedSender<IpStackStream>,
) -> JoinHandle<Result<()>> {
let mut sessions: SessionCollection = AHashMap::new();
//UDPendpoints
#[cfg(feature = "udp_packet")]
let mut packet_endpoints: AHashMap<SocketAddr, UdpEdpInfo> = AHashMap::new();
let (session_remove_tx, mut session_remove_rx) = mpsc::unbounded_channel::<NetworkTuple>();
//udp endpoints rm channel
#[cfg(feature = "udp_packet")]
let (edp_remove_tx, mut edp_remove_rx) = mpsc::unbounded_channel::<SocketAddr>();
let pi = config.packet_information;
let offset = if pi && cfg!(unix) { 4 } else { 0 };
let mut buffer = vec![0_u8; config.mtu as usize + offset];
let (up_pkt_sender, mut up_pkt_receiver) = mpsc::unbounded_channel::<NetworkPacket>();

tokio::spawn(async move {
#[cfg(feature = "udp_packet")]
return tokio::spawn(async move {
loop {
select! {
Ok(n) = device.read(&mut buffer) => {
if let Err(e) = process_device_read(&buffer[offset..n], &mut sessions, &session_remove_tx, &up_pkt_sender, &config, &accept_sender).await {
if let Err(e) = process_device_read(&buffer[offset..n], &mut sessions, &session_remove_tx, &edp_remove_tx, &up_pkt_sender, &config, &accept_sender, &mut packet_endpoints).await {
let io_err: std::io::Error = e.into();
if io_err.kind() == std::io::ErrorKind::ConnectionRefused {
log::trace!("Received junk data: {io_err}");
} else {
log::warn!("process_device_read error: {io_err}");
}
}
}
},
Some(src_addr) = edp_remove_rx.recv() => {
packet_endpoints.remove(&src_addr);
log::debug!("Packet endpoint destroyed and removed: {}", src_addr);
},
Some(network_tuple) = session_remove_rx.recv() => {
sessions.remove(&network_tuple);
log::debug!("session destroyed: {network_tuple}");
},
Some(packet) = up_pkt_receiver.recv() => {
process_upstream_recv(packet, &mut device, #[cfg(unix)]pi).await?;
}
}
}
});
#[cfg(not(feature = "udp_packet"))]
return tokio::spawn(async move {
loop {
select! {
Ok(n) = device.read(&mut buffer) => {
if let Err(e) = process_device_read(&buffer[offset..n], &mut sessions, &session_remove_tx, &up_pkt_sender, &config, &accept_sender).await {
let io_err: std::io::Error = e.into();
if io_err.kind() == std::io::ErrorKind::ConnectionRefused {
log::trace!("Received junk data: {io_err}");
} else {
log::warn!("process_device_read error: {io_err}");
}
}
},
Some(network_tuple) = session_remove_rx.recv() => {
sessions.remove(&network_tuple);
log::debug!("session destroyed: {network_tuple}");
},
Some(packet) = up_pkt_receiver.recv() => {
process_upstream_recv(packet, &mut device, #[cfg(unix)]pi).await?;
}
}
}
})
});
}

#[allow(clippy::too_many_arguments)]
async fn process_device_read(
data: &[u8],
sessions: &mut SessionCollection,
session_remove_tx: &UnboundedSender<NetworkTuple>,
#[cfg(feature = "udp_packet")] edp_remove_tx: &tokio::sync::mpsc::UnboundedSender<SocketAddr>,
up_pkt_sender: &PacketSender,
config: &IpStackConfig,
accept_sender: &UnboundedSender<IpStackStream>,
#[cfg(feature = "udp_packet")] packet_endpoints: &mut AHashMap<SocketAddr, UdpEdpInfo>,
) -> Result<()> {
let Ok(packet) = NetworkPacket::parse(data) else {
let stream = IpStackStream::UnknownNetwork(data.to_owned());
Expand All @@ -368,6 +420,79 @@ async fn process_device_read(
return Ok(());
}

//UDP packet
#[cfg(feature = "udp_packet")]
if let TransportHeader::Udp(_udp_header) = packet.transport_header() {
let src_addr = packet.src_addr();
let dst_addr = packet.dst_addr();
let payload = packet.payload.unwrap_or_default();

match packet_endpoints.entry(src_addr) {
std::collections::hash_map::Entry::Occupied(entry) => {
let (tx, last_activity) = entry.get();

if let Err(e) = tx.send((src_addr, dst_addr, payload)) {
log::warn!("Failed to send to packet endpoint for {}: {}", src_addr, e);
// Receiver was dropped; remove stale endpoint so a new one can be created.
entry.remove();
} else {
last_activity.store(now_secs(), Ordering::Relaxed);
}
}

std::collections::hash_map::Entry::Vacant(entry) => {
//announce to destroy the channel when timeout or application layer take out
let (destroy_tx, mut destroy_rx) = tokio::sync::oneshot::channel::<()>();

let last_activity = Arc::new(AtomicU64::new(now_secs()));
let last_activity_clone = last_activity.clone();

let timeout_secs = config.udp_timeout.as_secs();

let edp_remove_tx_clone = edp_remove_tx.clone();
let src_addr_clone = src_addr;

tokio::spawn(async move {
loop {
let elapsed = now_secs() - last_activity_clone.load(Ordering::Relaxed);
if elapsed >= timeout_secs {
log::info!("removing channel for {} because no data for {}s", src_addr_clone, elapsed);
break;
}

let sleep_duration = std::time::Duration::from_secs(timeout_secs - elapsed);

tokio::select! {

//sleep until timeout
_ = tokio::time::sleep(sleep_duration) => {}

// application layer take out
_ = &mut destroy_rx => {
log::debug!("application layer Endpoint:{} removed the channel", src_addr_clone);
break;
}
}
}

let _ = edp_remove_tx_clone.send(src_addr_clone);
});
//ipstack to application layer channel
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

let endpoint = IpStackUdpPacketEndpoint::new(rx, up_pkt_sender.clone(), src_addr, config.mtu, destroy_tx);

accept_sender.send(IpStackStream::Udp(endpoint)).map_err(std::io::Error::other)?;

entry.insert((tx.clone(), last_activity));
if let Err(e) = tx.send((src_addr, dst_addr, payload)) {
log::warn!("Failed to send to packet endpoint: {}", e);
}
}
}
return Ok(());
}

let network_tuple = packet.network_tuple();
match sessions.entry(network_tuple) {
std::collections::hash_map::Entry::Occupied(entry) => {
Expand Down Expand Up @@ -407,11 +532,14 @@ fn create_stream(
let stream = IpStackTcpStream::new(src_addr, dst_addr, h.clone(), up_pkt_sender, cfg.mtu, msgr, cfg.tcp_config.clone())?;
Ok(IpStackStream::Tcp(stream))
}
#[cfg(not(feature = "udp_packet"))]
TransportHeader::Udp(_) => {
let payload = packet.payload.unwrap_or_default();
let stream = IpStackUdpStream::new(src_addr, dst_addr, payload, up_pkt_sender, cfg.mtu, cfg.udp_timeout, msgr);
Ok(IpStackStream::Udp(stream))
}
#[cfg(feature = "udp_packet")]
TransportHeader::Udp(_) => Err(IpStackError::UnsupportedTransportProtocol),
TransportHeader::Unknown => Err(IpStackError::UnsupportedTransportProtocol),
}
}
Expand Down Expand Up @@ -439,3 +567,12 @@ async fn process_upstream_recv<Device: AsyncWrite + Unpin + 'static>(

Ok(())
}

//time
#[cfg(feature = "udp_packet")]
fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0))
.as_secs()
}
15 changes: 14 additions & 1 deletion src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6};

pub use self::tcp::IpStackTcpStream;
pub use self::tcp::{TcpConfig, TcpOptions};
#[cfg(feature = "udp_packet")]
pub use self::udp::IpStackUdpPacketEndpoint;

pub use self::udp::IpStackUdpStream;
pub use self::unknown::IpStackUnknownTransport;

mod seqnum;
mod tcb;
mod tcp;
Expand All @@ -26,7 +28,11 @@ pub enum IpStackStream {
/// A TCP connection stream.
Tcp(IpStackTcpStream),
/// A UDP stream.
#[cfg(not(feature = "udp_packet"))]
Udp(IpStackUdpStream),
/// UDP PACKET.
#[cfg(feature = "udp_packet")]
Udp(IpStackUdpPacketEndpoint),
/// A stream for unknown transport protocols.
UnknownTransport(IpStackUnknownTransport),
/// Raw network packets that couldn't be parsed.
Expand All @@ -51,7 +57,10 @@ impl IpStackStream {
pub fn local_addr(&self) -> SocketAddr {
match self {
IpStackStream::Tcp(tcp) => tcp.local_addr(),
#[cfg(not(feature = "udp_packet"))]
IpStackStream::Udp(udp) => udp.local_addr(),
#[cfg(feature = "udp_packet")]
IpStackStream::Udp(udp_edp) => udp_edp.local_addr(),
IpStackStream::UnknownNetwork(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)),
IpStackStream::UnknownTransport(unknown) => match unknown.src_addr() {
IpAddr::V4(addr) => SocketAddr::V4(SocketAddrV4::new(addr, 0)),
Expand All @@ -77,7 +86,10 @@ impl IpStackStream {
pub fn peer_addr(&self) -> SocketAddr {
match self {
IpStackStream::Tcp(tcp) => tcp.peer_addr(),
#[cfg(not(feature = "udp_packet"))]
IpStackStream::Udp(udp) => udp.peer_addr(),
#[cfg(feature = "udp_packet")]
IpStackStream::Udp(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)),
IpStackStream::UnknownNetwork(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)),
IpStackStream::UnknownTransport(unknown) => match unknown.dst_addr() {
IpAddr::V4(addr) => SocketAddr::V4(SocketAddrV4::new(addr, 0)),
Expand All @@ -89,6 +101,7 @@ impl IpStackStream {
pub(crate) fn stream_sender(&self) -> Result<crate::PacketSender, std::io::Error> {
match self {
IpStackStream::Tcp(tcp) => Ok(tcp.stream_sender()),
#[cfg(not(feature = "udp_packet"))]
IpStackStream::Udp(udp) => Ok(udp.stream_sender()),
_ => Err(std::io::Error::other("Unknown transport stream does not have a sender")),
}
Expand Down
Loading
Loading