diff --git a/include/tins/ip_reassembler.h b/include/tins/ip_reassembler.h index 3fd0524f..236d43fb 100644 --- a/include/tins/ip_reassembler.h +++ b/include/tins/ip_reassembler.h @@ -31,6 +31,7 @@ #define TINS_IP_REASSEMBLER_H #include +#include #include #include #include @@ -55,10 +56,16 @@ class IPv4Fragment { } + uint16_t trim(uint16_t amount); + const payload_type& payload() const { return payload_; } + size_t size() const { + return payload_.size(); + } + uint16_t offset() const { return offset_; } @@ -71,10 +78,11 @@ class TINS_API IPv4Stream { public: IPv4Stream(); - void add_fragment(IP* ip); + size_t add_fragment(IP* ip); bool is_complete() const; PDU* allocate_pdu() const; const IP& first_fragment() const; + size_t size() const { return received_size_; } private: typedef std::vector fragments_type; @@ -135,7 +143,8 @@ class TINS_API IPv4Reassembler { * technique to be used. */ enum OverlappingTechnique { - NONE + NONE, + BSD }; /** @@ -184,15 +193,25 @@ class TINS_API IPv4Reassembler { */ void remove_stream(uint16_t id, IPv4Address addr1, IPv4Address addr2); private: + static const size_t MAX_BUFFERED_BYTES; + static const size_t BUFFERED_BYTES_LOW_THRESHOLD; + typedef std::pair address_pair; typedef std::pair key_type; - typedef std::map streams_type; + typedef std::pair key_stream_type; + typedef std::list ordered_streams_type; + typedef std::map streams_type; + Internals::IPv4Stream& get_stream(key_type key); + void remove_stream(key_type key); + void prune_streams(); key_type make_key(const IP* ip) const; address_pair make_address_pair(IPv4Address addr1, IPv4Address addr2) const; streams_type streams_; + ordered_streams_type ordered_streams_; OverlappingTechnique technique_; + size_t buffered_bytes_; }; /** diff --git a/include/tins/tcp_ip/stream.h b/include/tins/tcp_ip/stream.h index 3cd10d83..30f1f860 100644 --- a/include/tins/tcp_ip/stream.h +++ b/include/tins/tcp_ip/stream.h @@ -134,6 +134,17 @@ class TINS_API Stream { */ void process_packet(PDU& packet); + + // calls the connection_closed callback + // to be used *outside* of stream callbacks, just before deletion. + void force_close(); + + // prevents further callbacks + // to be used *inside* of stream callbacks. + // the stream will be deleted when the current callbacks exits. + void ignore(); + + /** * Getter for the client flow */ @@ -154,6 +165,9 @@ class TINS_API Stream { */ const Flow& server_flow() const; + + bool is_established() const; + /** * \brief Indicates whether this stream is finished. * @@ -255,6 +269,8 @@ class TINS_API Stream { */ const timestamp_type& last_seen() const; + void stream_est_callback(const stream_callback_type& callback); + /** * \brief Sets the callback to be executed when the stream is closed * @@ -448,6 +464,7 @@ class TINS_API Stream { Flow client_flow_; Flow server_flow_; + stream_callback_type on_stream_est_; stream_callback_type on_stream_closed_; stream_callback_type on_client_data_callback_; stream_callback_type on_server_data_callback_; @@ -461,6 +478,7 @@ class TINS_API Stream { bool auto_cleanup_server_; bool is_partial_stream_; unsigned directions_recovery_mode_enabled_; + bool is_ignored_; #ifdef TINS_HAVE_TCP_STREAM_CUSTOM_DATA boost::any user_data_; diff --git a/include/tins/tcp_ip/stream_follower.h b/include/tins/tcp_ip/stream_follower.h index e6b7a1da..e4bbfd6b 100644 --- a/include/tins/tcp_ip/stream_follower.h +++ b/include/tins/tcp_ip/stream_follower.h @@ -93,7 +93,8 @@ class TINS_API StreamFollower { enum TerminationReason { TIMEOUT, ///< The stream was terminated due to a timeout BUFFERED_DATA, ///< The stream was terminated because it had too much buffered data - SACKED_SEGMENTS ///< The stream was terminated because it had too many SACKed segments + SACKED_SEGMENTS, ///< The stream was terminated because it had too many SACKed segments + FORCE_DISCARD ///< The stream was discarded by user request }; /** @@ -108,6 +109,9 @@ class TINS_API StreamFollower { */ StreamFollower(); + // max=0 means no limitation + void max_streams(const size_t new_max); + /** * \brief Processes a packet * @@ -164,6 +168,10 @@ class TINS_API StreamFollower { stream_keep_alive_ = keep_alive; } + + size_t n_streams(); + + /** * Finds the stream identified by the provided arguments. * @@ -186,6 +194,19 @@ class TINS_API StreamFollower { Stream& find_stream(const IPv6Address& client_addr, uint16_t client_port, const IPv6Address& server_addr, uint16_t server_port); + + // finds and discards the stream identified by the provided arguments. + // throws stream_not_found if the stream is not found. + // otherwise calls the termination callback. + // to be used *outside* of stream callbacks. + void discard_stream(const IPv4Address& client_addr, uint16_t client_port, + const IPv4Address& server_addr, uint16_t server_port); + void discard_stream(const IPv6Address& client_addr, uint16_t client_port, + const IPv6Address& server_addr, uint16_t server_port); + void discard_stream(const Stream& stream); + void discard_stream(const stream_id& id); + + /** * \brief Indicates whether partial streams should be followed. * @@ -222,6 +243,7 @@ class TINS_API StreamFollower { streams_type streams_; stream_callback_type on_new_connection_; stream_termination_callback_type on_stream_termination_; + size_t max_streams_; // max=0 means no limitation size_t max_buffered_chunks_; uint32_t max_buffered_bytes_; timestamp_type last_cleanup_; diff --git a/src/ip_reassembler.cpp b/src/ip_reassembler.cpp index 744b4c30..bc80d047 100644 --- a/src/ip_reassembler.cpp +++ b/src/ip_reassembler.cpp @@ -27,44 +27,89 @@ * */ +#include #include +#include #include #include #include using std::make_pair; +using std::out_of_range; namespace Tins { namespace Internals { +uint16_t IPv4Fragment::trim(uint16_t amount) { + if (amount > payload_.size()) { + amount = payload_.size(); + } + offset_ += amount; + payload_.erase( + payload_.begin(), + payload_.begin() + amount); + return amount; // report deleted bytes +} + IPv4Stream::IPv4Stream() : received_size_(), total_size_(), received_end_(false) { } -void IPv4Stream::add_fragment(IP* ip) { +size_t IPv4Stream::add_fragment(IP* ip) { + const size_t before_size = received_size_; const uint16_t offset = extract_offset(ip); + uint16_t expected_offset = 0; fragments_type::iterator it = fragments_.begin(); while (it != fragments_.end() && offset > it->offset()) { + expected_offset = static_cast(it->offset() + it->size()); ++it; } + // No duplicates plx - if (it != fragments_.end() && it->offset() == offset) { + /*if (it != fragments_.end() && it->offset() == offset) { return; + }*/ + + // overlap handling + /*fragments_.insert(it, IPv4Fragment(ip->inner_pdu(), offset));*/ + IPv4Fragment frag(ip->inner_pdu(), offset); + if (expected_offset > offset) { + frag.trim(expected_offset - offset); + } + size_t frag_size = frag.size(); + if(frag_size == 0) { + return 0; } - fragments_.insert(it, IPv4Fragment(ip->inner_pdu(), offset)); - received_size_ += ip->inner_pdu()->size(); + if(static_cast(frag.offset()) + frag_size > 65535) { + return 0; + } + expected_offset = static_cast(frag.offset() + frag_size); + while (it != fragments_.end() && it->offset() < expected_offset) { + received_size_ -= it->trim(expected_offset - it->offset()); + if (it->size() == 0) { + it = fragments_.erase(it); + } + else { + break; + } + } + + // I wonder whether the copying of the payload is/can be optimized away + fragments_.insert(it, frag); + received_size_ += frag_size; // If the MF flag is off if ((ip->flags() & IP::MORE_FRAGMENTS) == 0) { - total_size_ = offset + ip->inner_pdu()->size(); + total_size_ = expected_offset; received_end_ = true; } - if (offset == 0) { + if (frag.offset() == 0) { // Release the inner PDU, store this first fragment and restore the inner PDU PDU* inner_pdu = ip->release_inner_pdu(); first_fragment_ = *ip; ip->inner_pdu(inner_pdu); } + return received_size_ - before_size; } bool IPv4Stream::is_complete() const { @@ -106,13 +151,16 @@ uint16_t IPv4Stream::extract_offset(const IP* ip) { } // Internals +const size_t IPv4Reassembler::MAX_BUFFERED_BYTES = 256*1024; +const size_t IPv4Reassembler::BUFFERED_BYTES_LOW_THRESHOLD = 192*1024; + IPv4Reassembler::IPv4Reassembler() -: technique_(NONE) { +: technique_(BSD), buffered_bytes_() { } IPv4Reassembler::IPv4Reassembler(OverlappingTechnique technique) -: technique_(technique) { +: technique_(technique), buffered_bytes_() { } @@ -123,15 +171,15 @@ IPv4Reassembler::PacketStatus IPv4Reassembler::process(PDU& pdu) { if (ip->is_fragmented()) { key_type key = make_key(ip); // Create it or look it up, it's the same - Internals::IPv4Stream& stream = streams_[key]; - stream.add_fragment(ip); + Internals::IPv4Stream& stream = get_stream(key); + buffered_bytes_ += stream.add_fragment(ip); if (stream.is_complete()) { PDU* pdu = stream.allocate_pdu(); // Use all field values from the first fragment *ip = stream.first_fragment(); // Erase this stream, since it's already assembled - streams_.erase(key); + remove_stream(key); // The packet is corrupt if (!pdu) { return FRAGMENTED; @@ -142,6 +190,9 @@ IPv4Reassembler::PacketStatus IPv4Reassembler::process(PDU& pdu) { return REASSEMBLED; } else { + if (buffered_bytes_ > MAX_BUFFERED_BYTES) { + prune_streams(); + } return FRAGMENTED; } } @@ -165,17 +216,51 @@ IPv4Reassembler::address_pair IPv4Reassembler::make_address_pair(IPv4Address add } } +Internals::IPv4Stream& IPv4Reassembler::get_stream(key_type key) { + Internals::IPv4Stream *stream_ptr; + try { + ordered_streams_type::iterator& it = streams_.at(key); // may throw out_of_range + stream_ptr = it->second; + ordered_streams_.erase(it); + } + catch (out_of_range&) { + stream_ptr = new Internals::IPv4Stream(); + } + ordered_streams_type::iterator it2 = ordered_streams_.insert(ordered_streams_.end(), make_pair(key,stream_ptr)); + streams_[key] = it2; + return *stream_ptr; +} + +void IPv4Reassembler::prune_streams() { + while (buffered_bytes_ > BUFFERED_BYTES_LOW_THRESHOLD) { + remove_stream(ordered_streams_.begin()->first); + } +} + void IPv4Reassembler::clear_streams() { streams_.clear(); + ordered_streams_.clear(); + buffered_bytes_ = 0; +} + +void IPv4Reassembler::remove_stream(key_type key) { + try { + ordered_streams_type::iterator& it = streams_.at(key); // may throw out_of_range + Internals::IPv4Stream *stream_ptr = it->second; + buffered_bytes_ -= stream_ptr->size(); + ordered_streams_.erase(it); + streams_.erase(key); + delete stream_ptr; + } + catch (out_of_range&) { } } void IPv4Reassembler::remove_stream(uint16_t id, IPv4Address addr1, IPv4Address addr2) { - streams_.erase( - make_pair( - id, - make_address_pair(addr1, addr2) - ) + key_type key = make_pair( + id, + make_address_pair(addr1, addr2) ); + remove_stream(key); } } // Tins diff --git a/src/tcp_ip/flow.cpp b/src/tcp_ip/flow.cpp index de4fdba2..45624d2c 100644 --- a/src/tcp_ip/flow.cpp +++ b/src/tcp_ip/flow.cpp @@ -133,7 +133,7 @@ void Flow::update_state(const TCP& tcp) { else if ((tcp.flags() & TCP::RST) != 0) { state_ = RST_SENT; } - else if (state_ == SYN_SENT && (tcp.flags() & TCP::ACK) != 0) { + else if (state_ == SYN_SENT && (tcp.flags() & TCP::SYN) == 0 && (tcp.flags() & TCP::ACK) != 0) { #ifdef TINS_HAVE_ACK_TRACKER ack_tracker_ = AckTracker(tcp.ack_seq()); #endif // TINS_HAVE_ACK_TRACKER diff --git a/src/tcp_ip/stream.cpp b/src/tcp_ip/stream.cpp index 48e7cfcc..bbd1050c 100644 --- a/src/tcp_ip/stream.cpp +++ b/src/tcp_ip/stream.cpp @@ -53,7 +53,7 @@ Stream::Stream(PDU& packet, const timestamp_type& ts) : client_flow_(extract_client_flow(packet)), server_flow_(extract_server_flow(packet)), create_time_(ts), last_seen_(ts), auto_cleanup_client_(true), auto_cleanup_server_(true), - is_partial_stream_(false), directions_recovery_mode_enabled_(0) { + is_partial_stream_(false), directions_recovery_mode_enabled_(0), is_ignored_(false) { const EthernetII* eth = packet.find_pdu(); if (eth) { client_hw_addr_ = eth->src_addr(); @@ -72,8 +72,18 @@ void Stream::process_packet(PDU& packet, const timestamp_type& ts) { else if (server_flow_.packet_belongs(packet)) { server_flow_.process_packet(packet); } - if (is_finished() && on_stream_closed_) { - on_stream_closed_(*this); + + if (is_established()) { + if (on_stream_est_) { + on_stream_est_(*this); + on_stream_est_ = NULL; + } + } + if (is_finished()) { + if (on_stream_closed_) { + on_stream_closed_(*this); + on_stream_closed_ = NULL; + } } } @@ -81,6 +91,30 @@ void Stream::process_packet(PDU& packet) { return process_packet(packet, timestamp_type(0)); } + +// calls the connection_closed callback +// to be used *outside* of stream callbacks, just before deletion. +void Stream::force_close() { + if (on_stream_closed_) + on_stream_closed_(*this); + ignore(); +} + +// prevents further callbacks +// to be used *inside* of stream callbacks. +// the stream will be deleted when the current callbacks exits. +void Stream::ignore() { + is_ignored_ = true; + ignore_client_data(); + ignore_server_data(); + stream_est_callback(NULL); + client_data_callback(NULL); + server_data_callback(NULL); + client_out_of_order_callback(NULL); + server_out_of_order_callback(NULL); + stream_closed_callback(NULL); +} + Flow& Stream::client_flow() { return client_flow_; } @@ -97,6 +131,10 @@ const Flow& Stream::server_flow() const { return server_flow_; } +void Stream::stream_est_callback(const stream_callback_type& callback) { + on_stream_est_ = callback; +} + void Stream::stream_closed_callback(const stream_callback_type& callback) { on_stream_closed_ = callback; } @@ -125,7 +163,18 @@ void Stream::ignore_server_data() { server_flow().ignore_data_packets(); } +bool Stream::is_established() const { + const Flow::State client_state = client_flow_.state(); + const Flow::State server_state = server_flow_.state(); + if (client_state == Flow::ESTABLISHED && server_state == Flow::SYN_SENT) { + return true; + } + return false; +} + bool Stream::is_finished() const { + if (is_ignored_) + return true; const Flow::State client_state = client_flow_.state(); const Flow::State server_state = server_flow_.state(); // If either peer sent a RST then the stream is done diff --git a/src/tcp_ip/stream_follower.cpp b/src/tcp_ip/stream_follower.cpp index b683219e..86b3373c 100644 --- a/src/tcp_ip/stream_follower.cpp +++ b/src/tcp_ip/stream_follower.cpp @@ -58,12 +58,16 @@ const uint32_t StreamFollower::DEFAULT_MAX_BUFFERED_BYTES = 3 * 1024 * 1024; // const StreamFollower::timestamp_type StreamFollower::DEFAULT_KEEP_ALIVE = minutes(5); StreamFollower::StreamFollower() -: max_buffered_chunks_(DEFAULT_MAX_BUFFERED_CHUNKS), +: max_streams_(0), max_buffered_chunks_(DEFAULT_MAX_BUFFERED_CHUNKS), max_buffered_bytes_(DEFAULT_MAX_BUFFERED_BYTES), last_cleanup_(0), stream_keep_alive_(DEFAULT_KEEP_ALIVE), attach_to_flows_(false) { } +void StreamFollower::max_streams(const size_t new_max) { + max_streams_ = new_max; +} + void StreamFollower::process_packet(PDU& packet) { // Use current time const system_clock::duration ts = system_clock::now().time_since_epoch(); @@ -84,7 +88,8 @@ void StreamFollower::process_packet(PDU& packet, const timestamp_type& ts) { if (iter == streams_.end()) { // Start tracking if they're either SYNs or they contain data (attach // to an already running flow). - if (tcp->flags() == TCP::SYN || (attach_to_flows_ && tcp->find_pdu() != 0)) { + if ((tcp->flags() == TCP::SYN || (attach_to_flows_ && tcp->find_pdu() != 0)) && + (max_streams_ == 0 || (streams_.size() < max_streams_))) { iter = streams_.insert(make_pair(identifier, Stream(packet, ts))).first; iter->second.setup_flows_callbacks(); if (on_new_connection_) { @@ -149,6 +154,10 @@ void StreamFollower::stream_termination_callback(const stream_termination_callba on_stream_termination_ = callback; } +size_t StreamFollower::n_streams() { + return streams_.size(); +} + Stream& StreamFollower::find_stream(const IPv4Address& client_addr, uint16_t client_port, const IPv4Address& server_addr, uint16_t server_port) { stream_id identifier(stream_id::serialize(client_addr), client_port, @@ -173,6 +182,39 @@ Stream& StreamFollower::find_stream(const stream_id& id) { } } +void StreamFollower::discard_stream(const IPv4Address& client_addr, uint16_t client_port, + const IPv4Address& server_addr, uint16_t server_port) { + stream_id identifier(stream_id::serialize(client_addr), client_port, + stream_id::serialize(server_addr), server_port); + discard_stream(identifier); +} + +void StreamFollower::discard_stream(const IPv6Address& client_addr, uint16_t client_port, + const IPv6Address& server_addr, uint16_t server_port) { + stream_id identifier(stream_id::serialize(client_addr), client_port, + stream_id::serialize(server_addr), server_port); + discard_stream(identifier); +} + +void StreamFollower::discard_stream(const Stream& stream) { + stream_id identifier = stream_id::make_identifier(stream); + discard_stream(identifier); +} + +void StreamFollower::discard_stream(const stream_id& id) { + streams_type::iterator iter = streams_.find(id); + if (iter == streams_.end()) { + throw stream_not_found(); + } + else { + // If we have a termination callback, execute it + if (on_stream_termination_) { + on_stream_termination_(iter->second, FORCE_DISCARD); + } + streams_.erase(iter); + } +} + void StreamFollower::follow_partial_streams(bool value) { attach_to_flows_ = value; }