Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to IP reassembly and TCP following #290

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions include/tins/ip_reassembler.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#define TINS_IP_REASSEMBLER_H

#include <vector>
#include <list>
#include <map>
#include <tins/pdu.h>
#include <tins/macros.h>
Expand All @@ -55,10 +56,16 @@ class IPv4Fragment {

}

uint16_t trim(uint16_t amount);

const payload_type& payload() const {
return payload_;
}

size_t size() const {
return payload_.size();
}

uint16_t offset() const {
return offset_;
}
Expand All @@ -71,10 +78,11 @@ class TINS_API IPv4Stream {
public:
IPv4Stream();

void add_fragment(IP* ip);
size_t add_fragment(IP* ip);
bool is_complete() const;
PDU* allocate_pdu() const;
const IP& first_fragment() const;
size_t size() const { return received_size_; }
private:
typedef std::vector<IPv4Fragment> fragments_type;

Expand Down Expand Up @@ -135,7 +143,8 @@ class TINS_API IPv4Reassembler {
* technique to be used.
*/
enum OverlappingTechnique {
NONE
NONE,
BSD
};

/**
Expand Down Expand Up @@ -184,15 +193,25 @@ class TINS_API IPv4Reassembler {
*/
void remove_stream(uint16_t id, IPv4Address addr1, IPv4Address addr2);
private:
static const size_t MAX_BUFFERED_BYTES;
static const size_t BUFFERED_BYTES_LOW_THRESHOLD;

typedef std::pair<IPv4Address, IPv4Address> address_pair;
typedef std::pair<uint16_t, address_pair> key_type;
typedef std::map<key_type, Internals::IPv4Stream> streams_type;
typedef std::pair<key_type, Internals::IPv4Stream*> key_stream_type;
typedef std::list<key_stream_type> ordered_streams_type;
typedef std::map<key_type, ordered_streams_type::iterator> streams_type;

Internals::IPv4Stream& get_stream(key_type key);
void remove_stream(key_type key);
void prune_streams();
key_type make_key(const IP* ip) const;
address_pair make_address_pair(IPv4Address addr1, IPv4Address addr2) const;

streams_type streams_;
ordered_streams_type ordered_streams_;
OverlappingTechnique technique_;
size_t buffered_bytes_;
};

/**
Expand Down
18 changes: 18 additions & 0 deletions include/tins/tcp_ip/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,17 @@ class TINS_API Stream {
*/
void process_packet(PDU& packet);


// calls the connection_closed callback
// to be used *outside* of stream callbacks, just before deletion.
void force_close();

// prevents further callbacks
// to be used *inside* of stream callbacks.
// the stream will be deleted when the current callbacks exits.
void ignore();


/**
* Getter for the client flow
*/
Expand All @@ -154,6 +165,9 @@ class TINS_API Stream {
*/
const Flow& server_flow() const;


bool is_established() const;

/**
* \brief Indicates whether this stream is finished.
*
Expand Down Expand Up @@ -255,6 +269,8 @@ class TINS_API Stream {
*/
const timestamp_type& last_seen() const;

void stream_est_callback(const stream_callback_type& callback);

/**
* \brief Sets the callback to be executed when the stream is closed
*
Expand Down Expand Up @@ -448,6 +464,7 @@ class TINS_API Stream {

Flow client_flow_;
Flow server_flow_;
stream_callback_type on_stream_est_;
stream_callback_type on_stream_closed_;
stream_callback_type on_client_data_callback_;
stream_callback_type on_server_data_callback_;
Expand All @@ -461,6 +478,7 @@ class TINS_API Stream {
bool auto_cleanup_server_;
bool is_partial_stream_;
unsigned directions_recovery_mode_enabled_;
bool is_ignored_;

#ifdef TINS_HAVE_TCP_STREAM_CUSTOM_DATA
boost::any user_data_;
Expand Down
24 changes: 23 additions & 1 deletion include/tins/tcp_ip/stream_follower.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ class TINS_API StreamFollower {
enum TerminationReason {
TIMEOUT, ///< The stream was terminated due to a timeout
BUFFERED_DATA, ///< The stream was terminated because it had too much buffered data
SACKED_SEGMENTS ///< The stream was terminated because it had too many SACKed segments
SACKED_SEGMENTS, ///< The stream was terminated because it had too many SACKed segments
FORCE_DISCARD ///< The stream was discarded by user request
};

/**
Expand All @@ -108,6 +109,9 @@ class TINS_API StreamFollower {
*/
StreamFollower();

// max=0 means no limitation
void max_streams(const size_t new_max);

/**
* \brief Processes a packet
*
Expand Down Expand Up @@ -164,6 +168,10 @@ class TINS_API StreamFollower {
stream_keep_alive_ = keep_alive;
}


size_t n_streams();


/**
* Finds the stream identified by the provided arguments.
*
Expand All @@ -186,6 +194,19 @@ class TINS_API StreamFollower {
Stream& find_stream(const IPv6Address& client_addr, uint16_t client_port,
const IPv6Address& server_addr, uint16_t server_port);


// finds and discards the stream identified by the provided arguments.
// throws stream_not_found if the stream is not found.
// otherwise calls the termination callback.
// to be used *outside* of stream callbacks.
void discard_stream(const IPv4Address& client_addr, uint16_t client_port,
const IPv4Address& server_addr, uint16_t server_port);
void discard_stream(const IPv6Address& client_addr, uint16_t client_port,
const IPv6Address& server_addr, uint16_t server_port);
void discard_stream(const Stream& stream);
void discard_stream(const stream_id& id);


/**
* \brief Indicates whether partial streams should be followed.
*
Expand Down Expand Up @@ -222,6 +243,7 @@ class TINS_API StreamFollower {
streams_type streams_;
stream_callback_type on_new_connection_;
stream_termination_callback_type on_stream_termination_;
size_t max_streams_; // max=0 means no limitation
size_t max_buffered_chunks_;
uint32_t max_buffered_bytes_;
timestamp_type last_cleanup_;
Expand Down
117 changes: 101 additions & 16 deletions src/ip_reassembler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,44 +27,89 @@
*
*/

#include <algorithm>
#include <tins/ip.h>
#include <tins/rawpdu.h>
#include <tins/constants.h>
#include <tins/ip_reassembler.h>
#include <tins/detail/pdu_helpers.h>

using std::make_pair;
using std::out_of_range;

namespace Tins {
namespace Internals {

uint16_t IPv4Fragment::trim(uint16_t amount) {
if (amount > payload_.size()) {
amount = payload_.size();
}
offset_ += amount;
payload_.erase(
payload_.begin(),
payload_.begin() + amount);
return amount; // report deleted bytes
}

IPv4Stream::IPv4Stream()
: received_size_(), total_size_(), received_end_(false) {

}

void IPv4Stream::add_fragment(IP* ip) {
size_t IPv4Stream::add_fragment(IP* ip) {
const size_t before_size = received_size_;
const uint16_t offset = extract_offset(ip);
uint16_t expected_offset = 0;
fragments_type::iterator it = fragments_.begin();
while (it != fragments_.end() && offset > it->offset()) {
expected_offset = static_cast<uint16_t>(it->offset() + it->size());
++it;
}

// No duplicates plx
if (it != fragments_.end() && it->offset() == offset) {
/*if (it != fragments_.end() && it->offset() == offset) {
return;
}*/

// overlap handling
/*fragments_.insert(it, IPv4Fragment(ip->inner_pdu(), offset));*/
IPv4Fragment frag(ip->inner_pdu(), offset);
if (expected_offset > offset) {
frag.trim(expected_offset - offset);
}
size_t frag_size = frag.size();
if(frag_size == 0) {
return 0;
}
fragments_.insert(it, IPv4Fragment(ip->inner_pdu(), offset));
received_size_ += ip->inner_pdu()->size();
if(static_cast<size_t>(frag.offset()) + frag_size > 65535) {
return 0;
}
expected_offset = static_cast<uint16_t>(frag.offset() + frag_size);
while (it != fragments_.end() && it->offset() < expected_offset) {
received_size_ -= it->trim(expected_offset - it->offset());
if (it->size() == 0) {
it = fragments_.erase(it);
}
else {
break;
}
}

// I wonder whether the copying of the payload is/can be optimized away
fragments_.insert(it, frag);
received_size_ += frag_size;
// If the MF flag is off
if ((ip->flags() & IP::MORE_FRAGMENTS) == 0) {
total_size_ = offset + ip->inner_pdu()->size();
total_size_ = expected_offset;
received_end_ = true;
}
if (offset == 0) {
if (frag.offset() == 0) {
// Release the inner PDU, store this first fragment and restore the inner PDU
PDU* inner_pdu = ip->release_inner_pdu();
first_fragment_ = *ip;
ip->inner_pdu(inner_pdu);
}
return received_size_ - before_size;
}

bool IPv4Stream::is_complete() const {
Expand Down Expand Up @@ -106,13 +151,16 @@ uint16_t IPv4Stream::extract_offset(const IP* ip) {

} // Internals

const size_t IPv4Reassembler::MAX_BUFFERED_BYTES = 256*1024;
const size_t IPv4Reassembler::BUFFERED_BYTES_LOW_THRESHOLD = 192*1024;

IPv4Reassembler::IPv4Reassembler()
: technique_(NONE) {
: technique_(BSD), buffered_bytes_() {

}

IPv4Reassembler::IPv4Reassembler(OverlappingTechnique technique)
: technique_(technique) {
: technique_(technique), buffered_bytes_() {

}

Expand All @@ -123,15 +171,15 @@ IPv4Reassembler::PacketStatus IPv4Reassembler::process(PDU& pdu) {
if (ip->is_fragmented()) {
key_type key = make_key(ip);
// Create it or look it up, it's the same
Internals::IPv4Stream& stream = streams_[key];
stream.add_fragment(ip);
Internals::IPv4Stream& stream = get_stream(key);
buffered_bytes_ += stream.add_fragment(ip);
if (stream.is_complete()) {
PDU* pdu = stream.allocate_pdu();
// Use all field values from the first fragment
*ip = stream.first_fragment();

// Erase this stream, since it's already assembled
streams_.erase(key);
remove_stream(key);
// The packet is corrupt
if (!pdu) {
return FRAGMENTED;
Expand All @@ -142,6 +190,9 @@ IPv4Reassembler::PacketStatus IPv4Reassembler::process(PDU& pdu) {
return REASSEMBLED;
}
else {
if (buffered_bytes_ > MAX_BUFFERED_BYTES) {
prune_streams();
}
return FRAGMENTED;
}
}
Expand All @@ -165,17 +216,51 @@ IPv4Reassembler::address_pair IPv4Reassembler::make_address_pair(IPv4Address add
}
}

Internals::IPv4Stream& IPv4Reassembler::get_stream(key_type key) {
Internals::IPv4Stream *stream_ptr;
try {
ordered_streams_type::iterator& it = streams_.at(key); // may throw out_of_range
stream_ptr = it->second;
ordered_streams_.erase(it);
}
catch (out_of_range&) {
stream_ptr = new Internals::IPv4Stream();
}
ordered_streams_type::iterator it2 = ordered_streams_.insert(ordered_streams_.end(), make_pair(key,stream_ptr));
streams_[key] = it2;
return *stream_ptr;
}

void IPv4Reassembler::prune_streams() {
while (buffered_bytes_ > BUFFERED_BYTES_LOW_THRESHOLD) {
remove_stream(ordered_streams_.begin()->first);
}
}

void IPv4Reassembler::clear_streams() {
streams_.clear();
ordered_streams_.clear();
buffered_bytes_ = 0;
}

void IPv4Reassembler::remove_stream(key_type key) {
try {
ordered_streams_type::iterator& it = streams_.at(key); // may throw out_of_range
Internals::IPv4Stream *stream_ptr = it->second;
buffered_bytes_ -= stream_ptr->size();
ordered_streams_.erase(it);
streams_.erase(key);
delete stream_ptr;
}
catch (out_of_range&) { }
}

void IPv4Reassembler::remove_stream(uint16_t id, IPv4Address addr1, IPv4Address addr2) {
streams_.erase(
make_pair(
id,
make_address_pair(addr1, addr2)
)
key_type key = make_pair(
id,
make_address_pair(addr1, addr2)
);
remove_stream(key);
}

} // Tins
2 changes: 1 addition & 1 deletion src/tcp_ip/flow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ void Flow::update_state(const TCP& tcp) {
else if ((tcp.flags() & TCP::RST) != 0) {
state_ = RST_SENT;
}
else if (state_ == SYN_SENT && (tcp.flags() & TCP::ACK) != 0) {
else if (state_ == SYN_SENT && (tcp.flags() & TCP::SYN) == 0 && (tcp.flags() & TCP::ACK) != 0) {
#ifdef TINS_HAVE_ACK_TRACKER
ack_tracker_ = AckTracker(tcp.ack_seq());
#endif // TINS_HAVE_ACK_TRACKER
Expand Down
Loading