diff --git a/Cargo.toml b/Cargo.toml index 00afd28..9ef2ebb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,12 +21,11 @@ arrayref = "~0.3" nom = "~4.1" failure = "~0.1" failure_derive = "~0.1" -futures = "~0.1" +futures-preview = { version ="0.3.0-alpha.11", features = ["compat"] } log = "~0.4" [dev-dependencies] env_logger = "0.6.0" hex-slice = "~0.1" -tokio = "~0.1" regex = "~1" hex = "~0.3.2" diff --git a/src/flow.rs b/src/extraction/flow.rs similarity index 98% rename from src/flow.rs rename to src/extraction/flow.rs index 9f5dada..04f79ab 100644 --- a/src/flow.rs +++ b/src/extraction/flow.rs @@ -4,7 +4,7 @@ use crate::{ common::{MacAddress, Vlan, MAC_LENGTH}, errors::Error, layer2::{ethernet::Ethernet, Layer2FlowInfo}, - record::PcapRecord, + parse::record::PcapRecord, }; use std::{self, convert::TryFrom}; @@ -134,7 +134,7 @@ impl std::fmt::Display for Flow { #[cfg(test)] mod tests { - use super::super::{layer2, layer3, layer4}; + use crate::{layer2, layer3, layer4}; use super::*; #[test] diff --git a/src/extraction/mod.rs b/src/extraction/mod.rs new file mode 100644 index 0000000..d4e75d0 --- /dev/null +++ b/src/extraction/mod.rs @@ -0,0 +1,4 @@ +pub mod flow; +pub mod stream; + +pub use self::stream::{ExtractionStream as ExtractionStream, WithExtraction}; diff --git a/src/stream/flow.rs b/src/extraction/stream.rs similarity index 59% rename from src/stream/flow.rs rename to src/extraction/stream.rs index 6f44da3..3baa8a0 100644 --- a/src/stream/flow.rs +++ b/src/extraction/stream.rs @@ -1,13 +1,14 @@ use crate::{ errors::Error, - flow::{Flow, FlowExtraction}, + extraction::flow::{Flow, FlowExtraction}, }; -use futures::{self, try_ready, Async, Poll, Stream}; +use futures::{self, Poll, stream::Stream}; use log::*; use std::{ self, convert::{From, TryInto}, + pin::Pin }; pub struct FlowRecord @@ -18,7 +19,7 @@ where flow: Flow, } -pub struct FlowStream +pub struct ExtractionStream where S: Stream, S::Item: FlowExtraction, @@ -26,48 +27,50 @@ where inner: S, } -impl FlowStream +impl ExtractionStream where S: Stream, S::Item: FlowExtraction, { - pub fn new(inner: S) -> FlowStream { - FlowStream { inner: inner } + pub fn new(inner: S) -> ExtractionStream { + ExtractionStream { inner: inner } } } -impl Stream for FlowStream +impl Stream for ExtractionStream where - S: Stream, + S: Stream + Unpin, S::Item: FlowExtraction, { type Item = FlowRecord; - type Error = S::Error; - fn poll(&mut self) -> Poll, Self::Error> { + fn poll_next(mut self: std::pin::Pin<&mut Self>, lw: &std::task::LocalWaker) -> futures::Poll> { + let this = &mut *self; loop { - if let Some(mut v) = try_ready!(self.inner.poll()) { - match v.extract_flow() { - Err(e) => debug!("Failed to convert value: {:?}", e), - Ok(f) => { - let res = FlowRecord { record: v, flow: f }; - return Ok(Async::Ready(Some(res))); + match Pin::new(&mut this.inner).poll_next(lw) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some(mut v)) => { + match v.extract_flow() { + Err(e) => debug!("Failed to convert value: {:?}", e), + Ok(f) => { + let res = FlowRecord { record: v, flow: f }; + return Poll::Ready(Some(res)); + } } } - } else { - return Ok(Async::Ready(None)); } } } } pub trait WithExtraction: Stream { - fn extract<'a>(self) -> FlowStream + fn extract(self) -> ExtractionStream where Self: Stream + Sized, Self::Item: FlowExtraction, { - FlowStream::new(self) + ExtractionStream::new(self) } } @@ -79,10 +82,10 @@ mod tests { use super::*; - use crate::{record::PcapRecord, CaptureParser}; + use crate::{parse::record::PcapRecord, CaptureParser}; use self::test::Bencher; - use futures::{stream as futures_stream, Future}; + use futures::{stream as futures_stream, Future, StreamExt}; use nom::Endianness; use std::{io::Read, path::PathBuf}; @@ -108,11 +111,14 @@ mod tests { assert_eq!(header.endianness(), Endianness::Little); assert_eq!(records.len(), 246137); - let fut_flows = futures_stream::iter_ok::, Error>(records) - .extract() - .collect(); + let fut_flows = async { + let flows: Vec<_> = await!(futures_stream::iter(records) + .extract() + .collect()); + flows + }; - let flows = fut_flows.wait().expect("Failed to run"); + let flows = futures::executor::block_on(fut_flows); assert_eq!(flows.len(), 236527); } @@ -140,11 +146,14 @@ mod tests { assert_eq!(header.endianness(), Endianness::Little); assert_eq!(records.len(), 246137); - let fut_flows = futures_stream::iter_ok::, Error>(records) - .extract() - .collect(); + let fut_flows = async { + let flows: Vec<_> = await!(futures_stream::iter(records) + .extract() + .collect()); + flows + }; - let flows = fut_flows.wait().expect("Failed to run"); + let flows = futures::executor::block_on(fut_flows); assert_eq!(flows.len(), 236527); }); diff --git a/src/layer4/tcp.rs b/src/layer4/tcp.rs index dc42382..34c12bf 100644 --- a/src/layer4/tcp.rs +++ b/src/layer4/tcp.rs @@ -1,5 +1,5 @@ use crate::{ - flow, + extraction::flow, layer4::Layer4FlowInfo, }; diff --git a/src/layer4/vxlan.rs b/src/layer4/vxlan.rs index dd7039f..580f380 100644 --- a/src/layer4/vxlan.rs +++ b/src/layer4/vxlan.rs @@ -1,5 +1,5 @@ use crate::{ - record::{ + parse::record::{ PcapRecord }, layer4::udp::Udp, diff --git a/src/lib.rs b/src/lib.rs index c9b3936..37b5ff1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ #![allow(unused)] -#![feature(test, try_from)] +#![feature(test, try_from, futures_api, async_await, await_macro)] ///! net-parser-rs ///! ///! Network packet parser, also capable of parsing packet capture files (e.g. libpcap) and the @@ -8,25 +8,28 @@ pub mod common; pub mod errors; -pub mod flow; -pub mod global_header; +pub mod extraction; pub mod layer2; pub mod layer3; pub mod layer4; pub mod nom_error; -pub mod record; -pub mod stream; +pub mod parse; use log::*; use nom::*; +use crate::parse::{ + global_header::GlobalHeader, + record::PcapRecord +}; + /// /// Primary utility for parsing packet captures, either from file, bytes, or interfaces. /// /// ```text /// #![feature(try_from)] /// -/// use net_parser_rs::NetworkParser; +/// use net_parser_rs::CaptureParser; /// use std::*; /// /// //Parse a file with global header and packet records @@ -42,7 +45,7 @@ use nom::*; /// //Convert a packet into stream information /// use net_parser_rs::convert::*; /// -/// let stream = Flow::try_from(packet).expect("Could not convert packet"); +/// let stream = netparser_rs::extraction::Flow::try_from(packet).expect("Could not convert packet"); ///``` /// pub struct CaptureParser; @@ -51,16 +54,16 @@ impl CaptureParser { /// /// Parse a slice of bytes that start with libpcap file format header (https://wiki.wireshark.org/Development/LibpcapFileFormat) /// - pub fn parse_file<'a>( - input: &'a [u8], + pub fn parse_file( + input: &[u8], ) -> IResult< - &'a [u8], + &[u8], ( - global_header::GlobalHeader, - std::vec::Vec>, + GlobalHeader, + std::vec::Vec, ), > { - let header_res = global_header::GlobalHeader::parse(input); + let header_res = GlobalHeader::parse(input); header_res.and_then(|r| { let (rem, header) = r; @@ -87,17 +90,17 @@ impl CaptureParser { /// header (https://wiki.wireshark.org/Development/LibpcapFileFormat). Endianness of the byte /// slice must be known. /// - pub fn parse_records<'a>( - input: &'a [u8], + pub fn parse_records( + input: &[u8], endianness: Endianness, - ) -> IResult<&'a [u8], std::vec::Vec>> { - let mut records: std::vec::Vec = vec![]; + ) -> IResult<&[u8], std::vec::Vec> { + let mut records: std::vec::Vec<_> = vec![]; let mut current = input; trace!("{} bytes left for record parsing", current.len()); loop { - match record::PcapRecord::parse(current, endianness) { + match PcapRecord::parse(current, endianness) { Ok((rem, r)) => { current = rem; trace!("{} bytes left for record parsing", current.len()); @@ -124,11 +127,11 @@ impl CaptureParser { /// /// Parse a slice of bytes as a single record. Endianness must be known. /// - pub fn parse_record<'a>( - input: &'a [u8], + pub fn parse_record( + input: &[u8], endianness: Endianness, - ) -> IResult<&'a [u8], record::PcapRecord<'a>> { - record::PcapRecord::parse(input, endianness) + ) -> IResult<&[u8], PcapRecord> { + PcapRecord::parse(input, endianness) } } @@ -137,7 +140,7 @@ pub mod tests { extern crate test; use self::test::Bencher; - use crate::{flow::FlowExtraction, record::PcapRecord, CaptureParser}; + use crate::{extraction::flow::FlowExtraction, parse::record::PcapRecord, CaptureParser}; use nom::Endianness; use std::io::prelude::*; use std::path::PathBuf; diff --git a/src/global_header.rs b/src/parse/global_header.rs similarity index 100% rename from src/global_header.rs rename to src/parse/global_header.rs diff --git a/src/parse/mod.rs b/src/parse/mod.rs new file mode 100644 index 0000000..cc6f685 --- /dev/null +++ b/src/parse/mod.rs @@ -0,0 +1,5 @@ +pub mod global_header; +pub mod record; +pub mod stream; + +pub use self::stream::{RecordStream as RecordStream}; \ No newline at end of file diff --git a/src/record.rs b/src/parse/record.rs similarity index 88% rename from src/record.rs rename to src/parse/record.rs index 8b34b03..0530ab7 100644 --- a/src/record.rs +++ b/src/parse/record.rs @@ -1,5 +1,5 @@ use crate::{ - flow::{Device, Flow, FlowExtraction}, + extraction::flow::{Device, Flow, FlowExtraction}, layer2::{ethernet::Ethernet, Layer2, Layer2FlowInfo}, }; @@ -11,27 +11,15 @@ use std::{self, convert::TryFrom}; /// /// Pcap record associated with a libpcap capture /// -pub struct PcapRecord<'a> { +pub struct PcapRecord { timestamp: std::time::SystemTime, actual_length: u32, original_length: u32, - payload: &'a [u8], + payload: Vec, flow: Option, } -impl<'a> Default for PcapRecord<'a> { - fn default() -> Self { - PcapRecord { - timestamp: std::time::SystemTime::UNIX_EPOCH, - actual_length: 0, - original_length: 0, - payload: &[0u8; 0], - flow: None, - } - } -} - -impl<'a> PcapRecord<'a> { +impl PcapRecord { pub fn timestamp(&self) -> &std::time::SystemTime { &self.timestamp } @@ -42,7 +30,7 @@ impl<'a> PcapRecord<'a> { self.original_length } pub fn payload(&self) -> &[u8] { - &self.payload + self.payload.as_ref() } /// @@ -57,9 +45,9 @@ impl<'a> PcapRecord<'a> { /// /// Utility function to convert a vector of records to flows, unless an error is encountered in stream conversion /// - pub fn convert_records<'b>( - records: std::vec::Vec>, - ) -> std::vec::Vec> { + pub fn convert_records( + records: std::vec::Vec, + ) -> std::vec::Vec { let mut records = records; let mut results = vec![]; @@ -82,25 +70,25 @@ impl<'a> PcapRecord<'a> { results } - pub fn new( + pub fn new>>( timestamp: std::time::SystemTime, actual_length: u32, original_length: u32, - payload: &'a [u8], - ) -> PcapRecord<'a> { + payload: T, + ) -> PcapRecord { PcapRecord { timestamp: timestamp, actual_length: actual_length, original_length: original_length, - payload: payload, + payload: payload.into(), flow: None, } } - pub fn parse<'b>( - input: &'b [u8], + pub fn parse( + input: &[u8], endianness: nom::Endianness, - ) -> nom::IResult<&'b [u8], PcapRecord<'b>> { + ) -> nom::IResult<&[u8], PcapRecord> { do_parse!( input, ts_seconds: u32!(endianness) @@ -112,20 +100,20 @@ impl<'a> PcapRecord<'a> { timestamp: PcapRecord::convert_packet_time(ts_seconds, ts_microseconds), actual_length: actual_length, original_length: original_length, - payload: payload, + payload: payload.into(), flow: None }) ) } } -impl<'a> FlowExtraction for PcapRecord<'a> { +impl FlowExtraction for PcapRecord { fn payload(&self) -> &[u8] { - self.payload + self.payload.as_ref() } } -impl<'a> std::fmt::Display for PcapRecord<'a> { +impl std::fmt::Display for PcapRecord { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { self.timestamp .duration_since(std::time::UNIX_EPOCH) diff --git a/src/parse/stream.rs b/src/parse/stream.rs new file mode 100644 index 0000000..422e8d6 --- /dev/null +++ b/src/parse/stream.rs @@ -0,0 +1,184 @@ +use crate::{ + errors::Error, + parse::{ + global_header::GlobalHeader, + record::PcapRecord + } +}; + +use futures::{self, io::AsyncRead, io::AsyncReadExt, Poll, stream::Stream}; +use log::*; +use std::{ + self, + convert::{From, TryInto}, + pin::Pin +}; + +pub struct RecordStream { + inner: S, + endianness: Option, + buffer: Vec, + outstanding: Vec +} + +impl RecordStream { + pub fn new( + inner: S, + endianness: Option, + capacity: usize + ) -> RecordStream { + RecordStream { + inner: inner, + endianness: endianness, + buffer: Vec::with_capacity(capacity), + outstanding: vec![] + } + } +} + +impl From for RecordStream where S: AsyncRead { + fn from(v: S) -> Self { + RecordStream::new(v, None, 10_000_000) + } +} + +impl Stream for RecordStream + where S: AsyncRead + Unpin { + + type Item=Vec; + + fn poll_next(mut self: std::pin::Pin<&mut Self>, lw: &std::task::LocalWaker) -> futures::Poll> { + let this = &mut *self; + + this.buffer.resize(this.buffer.capacity(), 0u8); + + match Pin::new(&mut this.inner).poll_read(lw, &mut this.buffer) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => { + error!("Failed to read: {:?}", e); + Poll::Ready(None) + }, + Poll::Ready(Ok(bytes_read)) => { + debug!("Read {} bytes", bytes_read); + if bytes_read == 0 { + return Poll::Ready(None); + } + + this.outstanding.extend(this.buffer.drain(0..bytes_read)); + + let (rem, records) = match this.endianness { + None => { + //parse global header + match crate::CaptureParser::parse_file(&this.outstanding) { + Err(e) => { + error!("Failed to parse: {:?}", e); + return Poll::Ready(None); + } + Ok((rem, (gh, records))) => { + this.endianness = Some(gh.endianness()); + (rem, records) + } + } + } + Some(ref e) => { + //parse records + match crate::CaptureParser::parse_records(&this.outstanding, e.clone()) { + Err(e) => { + error!("Failed to parse: {:?}", e); + return Poll::Ready(None); + } + Ok(t) => { + t + } + } + } + }; + + let unread_position = this.outstanding.len() - rem.len(); + this.outstanding = this.outstanding.drain(unread_position..).collect(); + + Poll::Ready(Some(records)) + } + } + } +} + +#[cfg(test)] +mod tests { + extern crate test; + + use super::*; + + use crate::{parse::record::PcapRecord, CaptureParser}; + + use self::test::Bencher; + use futures::{ + StreamExt + }; + use nom::Endianness; + use std::{io::Read, path::PathBuf}; + + struct FileWrapper { + inner: std::fs::File + } + + impl AsyncRead for FileWrapper { + fn poll_read(&mut self, lw: &std::task::LocalWaker, buf: &mut [u8]) + -> Poll> { + Poll::Ready(self.inner.read(buf)) + } + } + + #[test] + fn create_records_from_file() { + let _ = env_logger::try_init(); + + let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("resources") + .join("4SICS-GeekLounge-151020.pcap"); + + let bytes_stream = std::fs::File::open(pcap_path.clone()) + .expect(&format!("Failed to open pcap path {:?}", pcap_path)); + + let fut_records = async { + let reader: RecordStream<_> = FileWrapper { inner: bytes_stream }.into(); + let r: Vec<_> = await!( + reader.collect() + ); + r + }; + + let vec_of_records = futures::executor::block_on(fut_records); + let records: Vec<_> = vec_of_records.iter().flatten().collect(); + + assert_eq!(records.len(), 246137); + } + + #[bench] + fn bench_create_stream_from_file(b: &mut Bencher) { + let _ = env_logger::try_init(); + + let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("resources") + .join("4SICS-GeekLounge-151020.pcap"); + + b.iter(|| { + let bytes_stream = std::fs::File::open(pcap_path.clone()) + .expect(&format!("Failed to open pcap path {:?}", pcap_path)); + + let fut_records = async { + let reader: RecordStream<_> = FileWrapper { inner: bytes_stream }.into(); + let r: Vec<_> = await!(reader + .collect() + ); + r + }; + + let vec_of_records = futures::executor::block_on(fut_records); + let records: Vec<_> = vec_of_records.iter().flatten().collect(); + + assert_eq!(records.len(), 246137); + }); + } +} + diff --git a/src/stream/mod.rs b/src/stream/mod.rs deleted file mode 100644 index 7101dfd..0000000 --- a/src/stream/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod flow; - -pub use self::flow::{FlowStream, WithExtraction};