Skip to content

Commit

Permalink
Merge pull request #3532 from Thushani-Jayasekera/choreo-ws
Browse files Browse the repository at this point in the history
[Choreo - WebsSocket Feature] Update wasm module to fix memory leaks
  • Loading branch information
renuka-fernando authored Jun 3, 2024
2 parents 9621c3d + fc6d6e6 commit 7c4a8f9
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@ public static void addObserver(String streamId, WebSocketResponseObserver observ
}

public static void removeObserver(String streamId) {
responseObservers.remove(streamId);
// As per the class java doc, the map of observers is maintained to keep track of open grpc streams.
// The map is updated when the first message from the router arrives at the enforcer. If a stream
// was created, yet no WebSocketFrameRequests arrives, and the stream gets deleted, the class variable
// streamId will not be set (streamId will be null) and an observer will not be added to the map.
// Ex: when a backend does not exist.
if (streamId != null) {
responseObservers.remove(streamId);
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ wasm_cc_binary(
name = "mgw-websocket.wasm",
srcs = ["filter.cc", "filter.h", "handler_impl.h", "handler_impl.cc", "handler.h"],
deps = [
"@proxy_wasm_cpp_sdk//:proxy_wasm_intrinsics",
"@mgw_wasm_websocket_api//:api_cc_proto",
"@proxy_wasm_cpp_sdk//:proxy_wasm_intrinsics_lite"
"@proxy_wasm_cpp_sdk//:proxy_wasm_intrinsics_full"
],
)
151 changes: 90 additions & 61 deletions envoy-filters/mgw-source/filters/http/mgw-wasm-websocket/filter.cc

Large diffs are not rendered by default.

14 changes: 11 additions & 3 deletions envoy-filters/mgw-source/filters/http/mgw-wasm-websocket/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
#include "handler_impl.h"
#include "handler.h"

#define X_REQUEST_ID "x-request-id"

#define STATUS_HEADER ":status"
#define STATUS_101 "101"
#define ENFORCER_NOT_REACHABLE_ERROR_CODE 102500


using envoy::extensions::filters::http::mgw_wasm_websocket::v3::Metadata;

enum class ThrottleState {UnderLimit, OverLimit, FailureModeAllowed, FailureModeBlocked};
Expand Down Expand Up @@ -39,7 +46,7 @@ class MgwWebSocketContext : public Context ,
void updateFilterState(ResponseStatus status) override;
void updateHandlerState(HandlerState state) override;
void updateThrottlePeriod(const int throttle_period) override;

~MgwWebSocketContext() override;

private:
MgwGrpcStreamHandler* stream_handler_{};
Expand All @@ -49,9 +56,10 @@ class MgwWebSocketContext : public Context ,
bool failure_mode_deny_;
std::unique_ptr<Metadata> metadata_{new Metadata};
int throttle_period_;
int apim_error_code_;
std::string x_request_id_;

bool isDataFrame(const std::string_view data);
void establishNewStream();

void sendEnforcerRequest(MgwWebSocketContext* websocContext, WebSocketFrameRequest request);
};

Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ class HandlerCallbacks{
public:
virtual ~HandlerCallbacks() = default;

virtual void updateFilterState(ResponseStatus status);
virtual void updateFilterState(ResponseStatus status){};

virtual void updateHandlerState(HandlerState state);
virtual void updateHandlerState(HandlerState state){};

virtual void updateThrottlePeriod(const int throttle_period);
virtual void updateThrottlePeriod(const int throttle_period){};
};

class StreamHanlderClient{
public:
virtual ~StreamHanlderClient() = default;

virtual bool sendMessage(WebSocketFrameRequest request);
virtual void sendMessage(WebSocketFrameRequest request){};
};

using StreamHanlderClientPtr = std::unique_ptr<StreamHanlderClient>;
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ MgwGrpcStreamHandler::MgwGrpcStreamHandler(HandlerCallbacks *callbacks){
}

MgwGrpcStreamHandler::~MgwGrpcStreamHandler(){
LOG_TRACE("Handler destructed");
LOG_TRACE("Handler destructed.");
}

void MgwGrpcStreamHandler::onReceive(size_t body_size){
Expand All @@ -36,6 +36,7 @@ void MgwGrpcStreamHandler::onReceive(size_t body_size){
const WebSocketFrameResponse& frame_response = message->proto<WebSocketFrameResponse>();
LOG_TRACE(WebSocketFrameResponse_Code_Name(frame_response.throttle_state()));
if(frame_response.throttle_state() == WebSocketFrameResponse_Code_OK){
LOG_TRACE("gRPC streaming onReceive");
this->callbacks_->updateFilterState(ResponseStatus::OK);
} else if (frame_response.throttle_state() == WebSocketFrameResponse_Code_OVER_LIMIT){
this->callbacks_->updateThrottlePeriod(frame_response.throttle_period());
Expand All @@ -50,11 +51,11 @@ void MgwGrpcStreamHandler::onRemoteClose(GrpcStatus status){
this->callbacks_->updateHandlerState(HandlerState::Error);
};

bool MgwGrpcStreamHandler::sendMessage(WebSocketFrameRequest request){
void MgwGrpcStreamHandler::sendMessage(WebSocketFrameRequest request){
auto res = send(request, true);
if(res != WasmResult::Ok){
return false;
LOG_WARN(std::string("Error sending message"));
}else{
return true;
LOG_TRACE(std::string("Message sent successfully"));
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class MgwGrpcStreamHandler : public GrpcStreamHandler<WebSocketFrameRequest, Web

void onRemoteClose(GrpcStatus status) override;

bool sendMessage(WebSocketFrameRequest request) override;
void sendMessage(WebSocketFrameRequest request) override;

private:
HandlerCallbacks *callbacks_;
Expand Down

0 comments on commit 7c4a8f9

Please sign in to comment.