Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse Stream #15

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ arrayref = "~0.3"
nom = "~4.1"
failure = "~0.1"
failure_derive = "~0.1"
futures = "~0.1"
futures-preview = { version ="0.3.0-alpha.11", features = ["compat"] }
log = "~0.4"

[dev-dependencies]
env_logger = "0.6.0"
hex-slice = "~0.1"
tokio = "~0.1"
regex = "~1"
hex = "~0.3.2"
4 changes: 2 additions & 2 deletions src/flow.rs → src/extraction/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
common::{MacAddress, Vlan, MAC_LENGTH},
errors::Error,
layer2::{ethernet::Ethernet, Layer2FlowInfo},
record::PcapRecord,
parse::record::PcapRecord,
};

use std::{self, convert::TryFrom};
Expand Down Expand Up @@ -134,7 +134,7 @@ impl std::fmt::Display for Flow {

#[cfg(test)]
mod tests {
use super::super::{layer2, layer3, layer4};
use crate::{layer2, layer3, layer4};
use super::*;

#[test]
Expand Down
4 changes: 4 additions & 0 deletions src/extraction/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod flow;
pub mod stream;

pub use self::stream::{ExtractionStream as ExtractionStream, WithExtraction};
69 changes: 39 additions & 30 deletions src/stream/flow.rs → src/extraction/stream.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::{
errors::Error,
flow::{Flow, FlowExtraction},
extraction::flow::{Flow, FlowExtraction},
};

use futures::{self, try_ready, Async, Poll, Stream};
use futures::{self, Poll, stream::Stream};
use log::*;
use std::{
self,
convert::{From, TryInto},
pin::Pin
};

pub struct FlowRecord<R>
Expand All @@ -18,56 +19,58 @@ where
flow: Flow,
}

pub struct FlowStream<S>
pub struct ExtractionStream<S>
where
S: Stream,
S::Item: FlowExtraction,
{
inner: S,
}

impl<S> FlowStream<S>
impl<S> ExtractionStream<S>
where
S: Stream,
S::Item: FlowExtraction,
{
pub fn new(inner: S) -> FlowStream<S> {
FlowStream { inner: inner }
pub fn new(inner: S) -> ExtractionStream<S> {
ExtractionStream { inner: inner }
}
}

impl<S> Stream for FlowStream<S>
impl<S> Stream for ExtractionStream<S>
where
S: Stream,
S: Stream + Unpin,
S::Item: FlowExtraction,
{
type Item = FlowRecord<S::Item>;
type Error = S::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
fn poll_next(mut self: std::pin::Pin<&mut Self>, lw: &std::task::LocalWaker) -> futures::Poll<Option<Self::Item>> {
let this = &mut *self;
loop {
if let Some(mut v) = try_ready!(self.inner.poll()) {
match v.extract_flow() {
Err(e) => debug!("Failed to convert value: {:?}", e),
Ok(f) => {
let res = FlowRecord { record: v, flow: f };
return Ok(Async::Ready(Some(res)));
match Pin::new(&mut this.inner).poll_next(lw) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(mut v)) => {
match v.extract_flow() {
Err(e) => debug!("Failed to convert value: {:?}", e),
Ok(f) => {
let res = FlowRecord { record: v, flow: f };
return Poll::Ready(Some(res));
}
}
}
} else {
return Ok(Async::Ready(None));
}
}
}
}

pub trait WithExtraction: Stream {
fn extract<'a>(self) -> FlowStream<Self>
fn extract(self) -> ExtractionStream<Self>
where
Self: Stream + Sized,
Self::Item: FlowExtraction,
{
FlowStream::new(self)
ExtractionStream::new(self)
}
}

Expand All @@ -79,10 +82,10 @@ mod tests {

use super::*;

use crate::{record::PcapRecord, CaptureParser};
use crate::{parse::record::PcapRecord, CaptureParser};

use self::test::Bencher;
use futures::{stream as futures_stream, Future};
use futures::{stream as futures_stream, Future, StreamExt};
use nom::Endianness;
use std::{io::Read, path::PathBuf};

Expand All @@ -108,11 +111,14 @@ mod tests {
assert_eq!(header.endianness(), Endianness::Little);
assert_eq!(records.len(), 246137);

let fut_flows = futures_stream::iter_ok::<Vec<PcapRecord>, Error>(records)
.extract()
.collect();
let fut_flows = async {
let flows: Vec<_> = await!(futures_stream::iter(records)
.extract()
.collect());
flows
};

let flows = fut_flows.wait().expect("Failed to run");
let flows = futures::executor::block_on(fut_flows);

assert_eq!(flows.len(), 236527);
}
Expand Down Expand Up @@ -140,11 +146,14 @@ mod tests {
assert_eq!(header.endianness(), Endianness::Little);
assert_eq!(records.len(), 246137);

let fut_flows = futures_stream::iter_ok::<Vec<PcapRecord>, Error>(records)
.extract()
.collect();
let fut_flows = async {
let flows: Vec<_> = await!(futures_stream::iter(records)
.extract()
.collect());
flows
};

let flows = fut_flows.wait().expect("Failed to run");
let flows = futures::executor::block_on(fut_flows);

assert_eq!(flows.len(), 236527);
});
Expand Down
2 changes: 1 addition & 1 deletion src/layer4/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
flow,
extraction::flow,
layer4::Layer4FlowInfo,
};

Expand Down
2 changes: 1 addition & 1 deletion src/layer4/vxlan.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
record::{
parse::record::{
PcapRecord
},
layer4::udp::Udp,
Expand Down
49 changes: 26 additions & 23 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![allow(unused)]
#![feature(test, try_from)]
#![feature(test, try_from, futures_api, async_await, await_macro)]
///! net-parser-rs
///!
///! Network packet parser, also capable of parsing packet capture files (e.g. libpcap) and the
Expand All @@ -8,25 +8,28 @@

pub mod common;
pub mod errors;
pub mod flow;
pub mod global_header;
pub mod extraction;
pub mod layer2;
pub mod layer3;
pub mod layer4;
pub mod nom_error;
pub mod record;
pub mod stream;
pub mod parse;

use log::*;
use nom::*;

use crate::parse::{
global_header::GlobalHeader,
record::PcapRecord
};

///
/// Primary utility for parsing packet captures, either from file, bytes, or interfaces.
///
/// ```text
/// #![feature(try_from)]
///
/// use net_parser_rs::NetworkParser;
/// use net_parser_rs::CaptureParser;
/// use std::*;
///
/// //Parse a file with global header and packet records
Expand All @@ -42,7 +45,7 @@ use nom::*;
/// //Convert a packet into stream information
/// use net_parser_rs::convert::*;
///
/// let stream = Flow::try_from(packet).expect("Could not convert packet");
/// let stream = netparser_rs::extraction::Flow::try_from(packet).expect("Could not convert packet");
///```
///
pub struct CaptureParser;
Expand All @@ -51,16 +54,16 @@ impl CaptureParser {
///
/// Parse a slice of bytes that start with libpcap file format header (https://wiki.wireshark.org/Development/LibpcapFileFormat)
///
pub fn parse_file<'a>(
input: &'a [u8],
pub fn parse_file(
input: &[u8],
) -> IResult<
&'a [u8],
&[u8],
(
global_header::GlobalHeader,
std::vec::Vec<record::PcapRecord<'a>>,
GlobalHeader,
std::vec::Vec<PcapRecord>,
),
> {
let header_res = global_header::GlobalHeader::parse(input);
let header_res = GlobalHeader::parse(input);

header_res.and_then(|r| {
let (rem, header) = r;
Expand All @@ -87,17 +90,17 @@ impl CaptureParser {
/// header (https://wiki.wireshark.org/Development/LibpcapFileFormat). Endianness of the byte
/// slice must be known.
///
pub fn parse_records<'a>(
input: &'a [u8],
pub fn parse_records(
input: &[u8],
endianness: Endianness,
) -> IResult<&'a [u8], std::vec::Vec<record::PcapRecord<'a>>> {
let mut records: std::vec::Vec<record::PcapRecord> = vec![];
) -> IResult<&[u8], std::vec::Vec<PcapRecord>> {
let mut records: std::vec::Vec<_> = vec![];
let mut current = input;

trace!("{} bytes left for record parsing", current.len());

loop {
match record::PcapRecord::parse(current, endianness) {
match PcapRecord::parse(current, endianness) {
Ok((rem, r)) => {
current = rem;
trace!("{} bytes left for record parsing", current.len());
Expand All @@ -124,11 +127,11 @@ impl CaptureParser {
///
/// Parse a slice of bytes as a single record. Endianness must be known.
///
pub fn parse_record<'a>(
input: &'a [u8],
pub fn parse_record(
input: &[u8],
endianness: Endianness,
) -> IResult<&'a [u8], record::PcapRecord<'a>> {
record::PcapRecord::parse(input, endianness)
) -> IResult<&[u8], PcapRecord> {
PcapRecord::parse(input, endianness)
}
}

Expand All @@ -137,7 +140,7 @@ pub mod tests {
extern crate test;

use self::test::Bencher;
use crate::{flow::FlowExtraction, record::PcapRecord, CaptureParser};
use crate::{extraction::flow::FlowExtraction, parse::record::PcapRecord, CaptureParser};
use nom::Endianness;
use std::io::prelude::*;
use std::path::PathBuf;
Expand Down
File renamed without changes.
5 changes: 5 additions & 0 deletions src/parse/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod global_header;
pub mod record;
pub mod stream;

pub use self::stream::{RecordStream as RecordStream};
Loading