From f0829316f3ea8af478e34cd04ed6c550bdd93559 Mon Sep 17 00:00:00 2001 From: "k." Date: Tue, 14 Jan 2025 15:02:34 +0330 Subject: [PATCH] feat: supporting nip-09, accepting 1 filter per req message, using aggregation pipeline on req. (#110) --- config/parameters.go | 1 - delivery/websocket/client_state.go | 2 +- delivery/websocket/config.go | 1 - delivery/websocket/event_handler.go | 60 +++++- delivery/websocket/req_handler.go | 13 +- delivery/websocket/server.go | 2 +- documents/NIPs.md | 2 +- infrastructure/grpc_client/gen/config.pb.go | 110 +++++------ infrastructure/grpc_client/proto/config.proto | 3 +- makefile | 1 - repository/delete.go | 95 ++++++++- repository/event.go | 3 +- repository/handler.go | 75 ++++++- repository/req.go | 185 ++++++------------ types/event/event.go | 6 +- types/message/message.go | 16 +- types/message/message_test.go | 6 +- 17 files changed, 338 insertions(+), 243 deletions(-) diff --git a/config/parameters.go b/config/parameters.go index e59b76e..4609ce6 100644 --- a/config/parameters.go +++ b/config/parameters.go @@ -23,7 +23,6 @@ func (c *Config) LoadParameters(params *mpb.GetParametersResponse) error { c.WebsocketServer.Limitation = &websocket.Limitation{ MaxMessageLength: params.Limitations.MaxMessageLength, MaxSubscriptions: params.Limitations.MaxSubscriptions, - MaxFilters: params.Limitations.MaxFilters, MaxSubidLength: params.Limitations.MaxSubidLength, MinPowDifficulty: params.Limitations.MinPowDifficulty, AuthRequired: params.Limitations.AuthRequired, diff --git a/delivery/websocket/client_state.go b/delivery/websocket/client_state.go index e9a12a1..7c8e6ba 100644 --- a/delivery/websocket/client_state.go +++ b/delivery/websocket/client_state.go @@ -10,6 +10,6 @@ type clientState struct { challenge string pubkey *string isKnown *bool - subs map[string]filter.Filters + subs map[string]filter.Filter *sync.RWMutex } diff --git a/delivery/websocket/config.go b/delivery/websocket/config.go index 650e0cc..8a5ce50 100644 --- a/delivery/websocket/config.go +++ b/delivery/websocket/config.go @@ -5,7 +5,6 @@ import "net/url" type Limitation struct { MaxMessageLength int32 // todo?. MaxSubscriptions int32 - MaxFilters int32 MaxSubidLength int32 MinPowDifficulty int32 AuthRequired bool diff --git a/delivery/websocket/event_handler.go b/delivery/websocket/event_handler.go index 471d529..255b292 100644 --- a/delivery/websocket/event_handler.go +++ b/delivery/websocket/event_handler.go @@ -9,13 +9,15 @@ import ( "github.com/dezh-tech/immortal/infrastructure/redis" "github.com/dezh-tech/immortal/pkg/logger" "github.com/dezh-tech/immortal/pkg/utils" + "github.com/dezh-tech/immortal/types" + "github.com/dezh-tech/immortal/types/filter" "github.com/dezh-tech/immortal/types/message" "github.com/gorilla/websocket" gredis "github.com/redis/go-redis/v9" ) // handleEvent handles new incoming EVENT messages from client. -func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { +func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { //nolint s.mu.Lock() defer s.mu.Unlock() defer measureLatency(s.metrics.EventLatency)() @@ -172,9 +174,59 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { return } - if !msg.Event.Kind.IsEphemeral() { - err := s.handler.HandleEvent(msg.Event) - if err != nil { + if !msg.Event.Kind.IsEphemeral() { //nolint + if msg.Event.Kind == types.KindEventDeletionRequest { + deleteFilterString := msg.Event.Tags.GetValue("filter") + + deleteFilter, err := filter.Decode([]byte(deleteFilterString)) + if err != nil { + okm := message.MakeOK(false, + msg.Event.ID, + fmt.Sprintf("error: parse deletion event: %s", deleteFilterString), + ) + + _ = conn.WriteMessage(1, okm) + + status = invalidFail + + return + } + + // you can only delete events you own. + if len(deleteFilter.Authors) == 1 { + if deleteFilter.Authors[0] != msg.Event.PublicKey { + okm := message.MakeOK(false, + msg.Event.ID, + fmt.Sprintf( + "error: you can request to delete your events only: %s", + deleteFilter.Authors), + ) + + _ = conn.WriteMessage(1, okm) + + status = invalidFail + + return + } + } else { + okm := message.MakeOK(false, + msg.Event.ID, + fmt.Sprintf( + "error: you can request to delete your events only: %s", + deleteFilter.Authors), + ) + + _ = conn.WriteMessage(1, okm) + + status = invalidFail + + return + } + + go s.handler.DeleteByFilter(deleteFilter) //nolint + } + + if err := s.handler.HandleEvent(msg.Event); err != nil { okm := message.MakeOK(false, msg.Event.ID, "error: can't write event to database.", diff --git a/delivery/websocket/req_handler.go b/delivery/websocket/req_handler.go index a9f1c71..e63fb32 100644 --- a/delivery/websocket/req_handler.go +++ b/delivery/websocket/req_handler.go @@ -55,15 +55,6 @@ func (s *Server) handleReq(conn *websocket.Conn, m message.Message) { return } - if len(msg.Filters) >= int(s.config.Limitation.MaxFilters) { - _ = conn.WriteMessage(1, message.MakeNotice(fmt.Sprintf("error: max limit of filters is: %d", - s.config.Limitation.MaxFilters))) - - status = limitsFail - - return - } - if len(msg.SubscriptionID) >= int(s.config.Limitation.MaxSubidLength) { _ = conn.WriteMessage(1, message.MakeNotice(fmt.Sprintf("error: max limit of sub id is: %d", s.config.Limitation.MaxSubidLength))) @@ -82,7 +73,7 @@ func (s *Server) handleReq(conn *websocket.Conn, m message.Message) { return } - res, err := s.handler.HandleReq(msg.Filters) + res, err := s.handler.HandleReq(&msg.Filter) if err != nil { _ = conn.WriteMessage(1, message.MakeNotice(fmt.Sprintf("error: can't process REQ message: %s", err.Error()))) status = databaseFail @@ -99,6 +90,6 @@ func (s *Server) handleReq(conn *websocket.Conn, m message.Message) { client.Lock() s.metrics.Subscriptions.Inc() - client.subs[msg.SubscriptionID] = msg.Filters + client.subs[msg.SubscriptionID] = msg.Filter client.Unlock() } diff --git a/delivery/websocket/server.go b/delivery/websocket/server.go index 388f782..5548ac9 100644 --- a/delivery/websocket/server.go +++ b/delivery/websocket/server.go @@ -82,7 +82,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.conns[conn] = clientState{ pubkey: &pubkey, isKnown: &known, - subs: make(map[string]filter.Filters), + subs: make(map[string]filter.Filter), RWMutex: &sync.RWMutex{}, } diff --git a/documents/NIPs.md b/documents/NIPs.md index ab40ae6..fdf9404 100644 --- a/documents/NIPs.md +++ b/documents/NIPs.md @@ -1,7 +1,7 @@ # Immortal supported NIPs - [X] **NIP-01**: Basic Protocol Flow Description -- [ ] **NIP-09**: Event Deletion Request +- [X] **NIP-09**: Event Deletion Request - [X] **NIP-11**: Relay Information Document - [X] **NIP-13**: Proof of Work - [X] **NIP-40**: Expiration Timestamp diff --git a/infrastructure/grpc_client/gen/config.pb.go b/infrastructure/grpc_client/gen/config.pb.go index 3f2c5a2..9835e63 100644 --- a/infrastructure/grpc_client/gen/config.pb.go +++ b/infrastructure/grpc_client/gen/config.pb.go @@ -27,7 +27,6 @@ type Limitations struct { MaxMessageLength int32 `protobuf:"varint,1,opt,name=max_message_length,json=maxMessageLength,proto3" json:"max_message_length,omitempty"` MaxSubscriptions int32 `protobuf:"varint,2,opt,name=max_subscriptions,json=maxSubscriptions,proto3" json:"max_subscriptions,omitempty"` - MaxFilters int32 `protobuf:"varint,3,opt,name=max_filters,json=maxFilters,proto3" json:"max_filters,omitempty"` MaxSubidLength int32 `protobuf:"varint,4,opt,name=max_subid_length,json=maxSubidLength,proto3" json:"max_subid_length,omitempty"` MinPowDifficulty int32 `protobuf:"varint,5,opt,name=min_pow_difficulty,json=minPowDifficulty,proto3" json:"min_pow_difficulty,omitempty"` AuthRequired bool `protobuf:"varint,6,opt,name=auth_required,json=authRequired,proto3" json:"auth_required,omitempty"` @@ -87,13 +86,6 @@ func (x *Limitations) GetMaxSubscriptions() int32 { return 0 } -func (x *Limitations) GetMaxFilters() int32 { - if x != nil { - return x.MaxFilters - } - return 0 -} - func (x *Limitations) GetMaxSubidLength() int32 { if x != nil { return x.MaxSubidLength @@ -268,65 +260,63 @@ var File_config_proto protoreflect.FileDescriptor var file_config_proto_rawDesc = []byte{ 0x0a, 0x0c, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, - 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x22, 0xf4, 0x04, 0x0a, 0x0b, 0x4c, + 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x22, 0xd3, 0x04, 0x0a, 0x0b, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2c, 0x0a, 0x12, 0x6d, 0x61, 0x78, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x10, 0x6d, 0x61, 0x78, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x4c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x12, 0x2b, 0x0a, 0x11, 0x6d, 0x61, 0x78, 0x5f, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x10, 0x6d, 0x61, 0x78, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x6d, 0x61, 0x78, 0x5f, 0x66, 0x69, 0x6c, - 0x74, 0x65, 0x72, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x6d, 0x61, 0x78, 0x46, - 0x69, 0x6c, 0x74, 0x65, 0x72, 0x73, 0x12, 0x28, 0x0a, 0x10, 0x6d, 0x61, 0x78, 0x5f, 0x73, 0x75, - 0x62, 0x69, 0x64, 0x5f, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, - 0x52, 0x0e, 0x6d, 0x61, 0x78, 0x53, 0x75, 0x62, 0x69, 0x64, 0x4c, 0x65, 0x6e, 0x67, 0x74, 0x68, - 0x12, 0x2c, 0x0a, 0x12, 0x6d, 0x69, 0x6e, 0x5f, 0x70, 0x6f, 0x77, 0x5f, 0x64, 0x69, 0x66, 0x66, - 0x69, 0x63, 0x75, 0x6c, 0x74, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x10, 0x6d, 0x69, - 0x6e, 0x50, 0x6f, 0x77, 0x44, 0x69, 0x66, 0x66, 0x69, 0x63, 0x75, 0x6c, 0x74, 0x79, 0x12, 0x23, - 0x0a, 0x0d, 0x61, 0x75, 0x74, 0x68, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64, 0x18, - 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x61, 0x75, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x69, - 0x72, 0x65, 0x64, 0x12, 0x29, 0x0a, 0x10, 0x70, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x72, - 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0f, 0x70, - 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64, 0x12, 0x2b, - 0x0a, 0x11, 0x72, 0x65, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x65, 0x64, 0x5f, 0x77, 0x72, 0x69, - 0x74, 0x65, 0x73, 0x18, 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, 0x10, 0x72, 0x65, 0x73, 0x74, 0x72, - 0x69, 0x63, 0x74, 0x65, 0x64, 0x57, 0x72, 0x69, 0x74, 0x65, 0x73, 0x12, 0x24, 0x0a, 0x0e, 0x6d, - 0x61, 0x78, 0x5f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x61, 0x67, 0x73, 0x18, 0x09, 0x20, - 0x01, 0x28, 0x05, 0x52, 0x0c, 0x6d, 0x61, 0x78, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x61, 0x67, - 0x73, 0x12, 0x2c, 0x0a, 0x12, 0x6d, 0x61, 0x78, 0x5f, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, - 0x5f, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x05, 0x52, 0x10, 0x6d, - 0x61, 0x78, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x4c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x12, - 0x33, 0x0a, 0x16, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x5f, 0x6c, 0x6f, - 0x77, 0x65, 0x72, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x03, 0x52, - 0x13, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x4c, 0x6f, 0x77, 0x65, 0x72, 0x4c, - 0x69, 0x6d, 0x69, 0x74, 0x12, 0x33, 0x0a, 0x16, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, - 0x61, 0x74, 0x5f, 0x75, 0x70, 0x70, 0x65, 0x72, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x0c, - 0x20, 0x01, 0x28, 0x03, 0x52, 0x13, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x55, - 0x70, 0x70, 0x65, 0x72, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x2e, 0x0a, 0x13, 0x64, 0x65, 0x66, - 0x61, 0x75, 0x6c, 0x74, 0x5f, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, - 0x18, 0x0d, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x11, 0x64, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x51, - 0x75, 0x65, 0x72, 0x79, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x26, 0x0a, 0x0f, 0x6d, 0x61, 0x78, - 0x5f, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x0e, 0x20, 0x01, - 0x28, 0x0d, 0x52, 0x0d, 0x6d, 0x61, 0x78, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4c, 0x69, 0x6d, 0x69, - 0x74, 0x22, 0x16, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, - 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x64, 0x0a, 0x15, 0x47, 0x65, 0x74, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x28, 0x0a, 0x10, 0x6d, 0x61, 0x78, 0x5f, 0x73, 0x75, 0x62, + 0x69, 0x64, 0x5f, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, + 0x0e, 0x6d, 0x61, 0x78, 0x53, 0x75, 0x62, 0x69, 0x64, 0x4c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x12, + 0x2c, 0x0a, 0x12, 0x6d, 0x69, 0x6e, 0x5f, 0x70, 0x6f, 0x77, 0x5f, 0x64, 0x69, 0x66, 0x66, 0x69, + 0x63, 0x75, 0x6c, 0x74, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x10, 0x6d, 0x69, 0x6e, + 0x50, 0x6f, 0x77, 0x44, 0x69, 0x66, 0x66, 0x69, 0x63, 0x75, 0x6c, 0x74, 0x79, 0x12, 0x23, 0x0a, + 0x0d, 0x61, 0x75, 0x74, 0x68, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64, 0x18, 0x06, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x61, 0x75, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x69, 0x72, + 0x65, 0x64, 0x12, 0x29, 0x0a, 0x10, 0x70, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x72, 0x65, + 0x71, 0x75, 0x69, 0x72, 0x65, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0f, 0x70, 0x61, + 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64, 0x12, 0x2b, 0x0a, + 0x11, 0x72, 0x65, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x65, 0x64, 0x5f, 0x77, 0x72, 0x69, 0x74, + 0x65, 0x73, 0x18, 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, 0x10, 0x72, 0x65, 0x73, 0x74, 0x72, 0x69, + 0x63, 0x74, 0x65, 0x64, 0x57, 0x72, 0x69, 0x74, 0x65, 0x73, 0x12, 0x24, 0x0a, 0x0e, 0x6d, 0x61, + 0x78, 0x5f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x61, 0x67, 0x73, 0x18, 0x09, 0x20, 0x01, + 0x28, 0x05, 0x52, 0x0c, 0x6d, 0x61, 0x78, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x61, 0x67, 0x73, + 0x12, 0x2c, 0x0a, 0x12, 0x6d, 0x61, 0x78, 0x5f, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x5f, + 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x05, 0x52, 0x10, 0x6d, 0x61, + 0x78, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x4c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x12, 0x33, + 0x0a, 0x16, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x5f, 0x6c, 0x6f, 0x77, + 0x65, 0x72, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x03, 0x52, 0x13, + 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x4c, 0x6f, 0x77, 0x65, 0x72, 0x4c, 0x69, + 0x6d, 0x69, 0x74, 0x12, 0x33, 0x0a, 0x16, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, + 0x74, 0x5f, 0x75, 0x70, 0x70, 0x65, 0x72, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x0c, 0x20, + 0x01, 0x28, 0x03, 0x52, 0x13, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x55, 0x70, + 0x70, 0x65, 0x72, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x2e, 0x0a, 0x13, 0x64, 0x65, 0x66, 0x61, + 0x75, 0x6c, 0x74, 0x5f, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, + 0x0d, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x11, 0x64, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x51, 0x75, + 0x65, 0x72, 0x79, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x26, 0x0a, 0x0f, 0x6d, 0x61, 0x78, 0x5f, + 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x0e, 0x20, 0x01, 0x28, + 0x0d, 0x52, 0x0d, 0x6d, 0x61, 0x78, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4c, 0x69, 0x6d, 0x69, 0x74, + 0x22, 0x16, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x64, 0x0a, 0x15, 0x47, 0x65, 0x74, 0x50, + 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x39, 0x0a, 0x0b, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, + 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, + 0x0b, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x10, 0x0a, 0x03, + 0x75, 0x72, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x32, 0x62, + 0x0a, 0x0a, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x73, 0x12, 0x54, 0x0a, 0x0d, + 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x73, 0x12, 0x20, 0x2e, + 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61, + 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x21, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x39, 0x0a, 0x0b, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, - 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x52, 0x0b, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x10, 0x0a, - 0x03, 0x75, 0x72, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x32, - 0x62, 0x0a, 0x0a, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x73, 0x12, 0x54, 0x0a, - 0x0d, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x73, 0x12, 0x20, - 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x50, - 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x21, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, - 0x74, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x42, 0x3a, 0x5a, 0x38, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x64, 0x65, 0x7a, 0x68, 0x2d, 0x74, 0x65, 0x63, 0x68, 0x2f, 0x69, 0x6d, 0x6d, 0x6f, - 0x72, 0x74, 0x61, 0x6c, 0x2f, 0x69, 0x6e, 0x66, 0x72, 0x61, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, - 0x75, 0x72, 0x65, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x65, 0x42, 0x3a, 0x5a, 0x38, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x64, 0x65, 0x7a, 0x68, 0x2d, 0x74, 0x65, 0x63, 0x68, 0x2f, 0x69, 0x6d, 0x6d, 0x6f, 0x72, + 0x74, 0x61, 0x6c, 0x2f, 0x69, 0x6e, 0x66, 0x72, 0x61, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, + 0x72, 0x65, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/infrastructure/grpc_client/proto/config.proto b/infrastructure/grpc_client/proto/config.proto index b361bc0..94122fb 100644 --- a/infrastructure/grpc_client/proto/config.proto +++ b/infrastructure/grpc_client/proto/config.proto @@ -10,8 +10,7 @@ service Parameters { message Limitations { int32 max_message_length = 1; - int32 max_subscriptions = 2; - int32 max_filters = 3; + int32 max_subscriptions = 2; int32 max_subid_length = 4; int32 min_pow_difficulty = 5; bool auth_required = 6; diff --git a/makefile b/makefile index 1c69bf7..abe3406 100644 --- a/makefile +++ b/makefile @@ -13,7 +13,6 @@ devtools: go install mvdan.cc/gofumpt@latest go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.35 go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.5 - go install github.com/pactus-project/protoc-gen-doc/cmd/protoc-gen-doc@v0.0.0-20240815105130-84e89d0170e4 go install github.com/bufbuild/buf/cmd/buf@v1.47 ### Testing diff --git a/repository/delete.go b/repository/delete.go index f0036bc..05bc449 100644 --- a/repository/delete.go +++ b/repository/delete.go @@ -5,23 +5,16 @@ import ( "github.com/dezh-tech/immortal/pkg/logger" "github.com/dezh-tech/immortal/types" + "github.com/dezh-tech/immortal/types/filter" "go.mongodb.org/mongo-driver/bson" ) func (h *Handler) DeleteByID(id string, kind types.Kind) error { - coll := h.db.Client.Database(h.db.DBName).Collection(getCollectionName(kind)) - - ctx, cancel := context.WithTimeout(context.Background(), h.db.QueryTimeout) - defer cancel() - - filter := bson.D{ + deleteFilter := bson.D{ {Key: "id", Value: id}, } update := bson.D{ - {Key: "$set", Value: bson.D{ - {Key: "id", Value: id}, - }}, {Key: "$unset", Value: bson.D{ {Key: "pubkey"}, {Key: "created_at"}, @@ -32,7 +25,13 @@ func (h *Handler) DeleteByID(id string, kind types.Kind) error { }}, } - _, err := coll.UpdateOne(ctx, filter, update) + collName, _ := getCollectionName(kind) + coll := h.db.Client.Database(h.db.DBName).Collection(collName) + + ctx, cancel := context.WithTimeout(context.Background(), h.db.QueryTimeout) + defer cancel() + + _, err := coll.UpdateOne(ctx, deleteFilter, update) if err != nil { _, err := h.grpc.AddLog(context.Background(), "database error while removing event", err.Error()) @@ -45,3 +44,79 @@ func (h *Handler) DeleteByID(id string, kind types.Kind) error { return nil } + +func (h *Handler) DeleteByFilter(f *filter.Filter) error { + // question/todo::: is it possible to run the deletion on all collections with one database call? + // we have an open issue on deletion execution. + // we do the read operation using aggregation pipeline and $unionWith stage which + // helps us ti prevent multiple database calls and it would help us to do the operation faster. + // to do the same thing for deletion we need to filter the documents with $match, then update the + // fields of deleted event to null (expect the `id` since its unique index to prevent overwrites) with $unset + // then we apply them to collection using $merge. + // although we can't use multiple $merge's on one pipeline and we must have + // only one merge at the end of pipeline commands. also, $unionWith is restricted to be used with $merge. + + // notes::: these details may help you to think for solutions better: + // 1. we create a collection for each kind or each group of kinds. + // using this model forces us to make query to all collections corresponding to provided kinds when + // we are dealing with filters since filters contain a list of kinds + // (which can be empty and we are then forced to query all collections) + + // 2. when we delete an event we $unset all fields expect `id`. + // when we make a query to read from database, we ignore fields which + // their fields are null. and when we write new events we prevent overwriting + // events with duplicated `id`. so we can handle the deletion properly. + + // resources::: these links may help you: + // 1. https://www.mongodb.com/docs/manual/reference/operator/aggregation/merge/#restrictions + // 2. https://www.mongodb.com/docs/manual/reference/operator/aggregation/unionWith/#mongodb-pipeline-pipe.-unionWith + // 3. https://www.mongodb.com/docs/manual/reference/operator/aggregation + + queryKinds := make(map[types.Kind]*filter.Filter) + + if len(f.Kinds) != 0 { + uniqueKinds := removeDuplicateKinds(f.Kinds) + for _, k := range uniqueKinds { + queryKinds[k] = f + } + } else { + for k := range types.KindToName { + queryKinds[k] = f + } + } + + update := bson.D{ + {Key: "$unset", Value: bson.D{ + {Key: "pubkey"}, + {Key: "created_at"}, + {Key: "kind"}, + {Key: "tags"}, + {Key: "content"}, + {Key: "sig"}, + }}, + } + + for kind, deleteFilter := range queryKinds { + collectionName, isMultiKindColl := getCollectionName(kind) + + query := filterToMongoQuery(deleteFilter, isMultiKindColl, kind) + + ctx, cancel := context.WithTimeout(context.Background(), h.db.QueryTimeout) + + _, err := h.db.Client.Database(h.db.DBName).Collection(collectionName).UpdateMany(ctx, query, update) + if err != nil { + _, err := h.grpc.AddLog(ctx, + "database error while deleting events", err.Error()) + if err != nil { + logger.Error("can't send log to manager", "err", err) + } + + cancel() + + return err + } + cancel() + } + + return nil +} diff --git a/repository/event.go b/repository/event.go index 6107ee2..2f8eba8 100644 --- a/repository/event.go +++ b/repository/event.go @@ -11,7 +11,8 @@ import ( ) func (h *Handler) HandleEvent(e *event.Event) error { - coll := h.db.Client.Database(h.db.DBName).Collection(getCollectionName(e.Kind)) + collName, _ := getCollectionName(e.Kind) + coll := h.db.Client.Database(h.db.DBName).Collection(collName) ctx, cancel := context.WithTimeout(context.Background(), h.db.QueryTimeout) defer cancel() diff --git a/repository/handler.go b/repository/handler.go index 879618b..cb84ebd 100644 --- a/repository/handler.go +++ b/repository/handler.go @@ -4,6 +4,8 @@ import ( "github.com/dezh-tech/immortal/infrastructure/database" grpcclient "github.com/dezh-tech/immortal/infrastructure/grpc_client" "github.com/dezh-tech/immortal/types" + "github.com/dezh-tech/immortal/types/filter" + "go.mongodb.org/mongo-driver/bson" ) type Handler struct { @@ -20,27 +22,86 @@ func New(cfg Config, db *database.Database, grpc grpcclient.IClient) *Handler { } } -func getCollectionName(k types.Kind) string { +func filterToMongoQuery(f *filter.Filter, isMultiKindColl bool, k types.Kind) bson.D { + query := make(bson.D, 0) + + if isMultiKindColl { + query = append(query, bson.E{Key: "kind", Value: k}) + } + + query = append(query, bson.E{Key: "pubkey", Value: bson.M{ + "$exists": true, + }}) + + if len(f.IDs) > 0 { + query = append(query, bson.E{Key: "id", Value: bson.M{"$in": f.IDs}}) + } + + if len(f.Authors) > 0 { + query = append(query, bson.E{Key: "pubkey", Value: bson.M{"$in": f.Authors}}) + } + + if len(f.Tags) > 0 { + tagQueries := bson.A{} + for tagKey, tagValues := range f.Tags { + qtf := bson.M{ + "tags": bson.M{ + "$elemMatch": bson.M{ + "0": tagKey, + "1": bson.M{"$in": tagValues}, + }, + }, + } + tagQueries = append(tagQueries, qtf) + } + query = append(query, bson.E{Key: "$and", Value: tagQueries}) + } + + if f.Since > 0 { + query = append(query, bson.E{Key: "created_at", Value: bson.M{"$gte": f.Since}}) + } + + if f.Until > 0 { + query = append(query, bson.E{Key: "created_at", Value: bson.M{"$lte": f.Until}}) + } + + return query +} + +func getCollectionName(k types.Kind) (string, bool) { collName, ok := types.KindToName[k] if ok { - return collName + return collName, false } if k >= 9000 && k <= 9030 { - return "groups" + return "groups", true } if k >= 1630 && k <= 1633 { - return "status" + return "status", true } if k >= 39000 && k <= 39009 { - return "groups_metadata" + return "groups_metadata", true } if k >= 5000 && k <= 5999 || k >= 6000 && k <= 6999 || k == 7000 { - return "dvm" + return "dvm", true + } + + return "unknown", true +} + +func removeDuplicateKinds(intSlice []types.Kind) []types.Kind { + allKeys := make(map[types.Kind]bool, len(intSlice)) + list := []types.Kind{} + for _, item := range intSlice { + if _, value := allKeys[item]; !value { + allKeys[item] = true + list = append(list, item) + } } - return "unknown" + return list } diff --git a/repository/req.go b/repository/req.go index 2cf25a6..bc8b88d 100644 --- a/repository/req.go +++ b/repository/req.go @@ -2,7 +2,6 @@ package repository import ( "context" - "errors" "github.com/dezh-tech/immortal/pkg/logger" "github.com/dezh-tech/immortal/types" @@ -10,156 +9,90 @@ import ( "github.com/dezh-tech/immortal/types/filter" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" ) -var possibleKinds = []types.Kind{ - types.KindUserMetadata, - types.KindShortTextNote, - types.KindZap, - types.KindRelayListMetadata, -} - -type filterQuery struct { - Tags map[string][]string +func (h *Handler) HandleReq(f *filter.Filter) ([]event.Event, error) { + queryKinds := make(map[types.Kind]*filter.Filter) - Authors []string - IDs []string + if len(f.Kinds) != 0 { + uniqueKinds := removeDuplicateKinds(f.Kinds) + for _, k := range uniqueKinds { + queryKinds[k] = f + } + } else { + for k := range types.KindToName { + queryKinds[k] = f + } + } - Since int64 - Until int64 - Limit uint32 -} + var pipeline mongo.Pipeline -func (h *Handler) HandleReq(fs filter.Filters) ([]event.Event, error) { - ctx, cancel := context.WithTimeout(context.Background(), h.db.QueryTimeout) - defer cancel() + for kind, filter := range queryKinds { + collectionName, isMultiKindColl := getCollectionName(kind) - queryKinds := make(map[types.Kind][]filterQuery) + query := filterToMongoQuery(filter, isMultiKindColl, kind) - for _, f := range fs { - qf := filterQuery{ - Tags: f.Tags, - Authors: f.Authors, - IDs: f.IDs, - Since: f.Since, - Until: f.Until, - Limit: f.Limit, + matchStage := bson.D{ + {Key: "$match", Value: query}, } - if len(f.Kinds) != 0 { - uniqueKinds := removeDuplicateKind(f.Kinds) - for _, k := range uniqueKinds { - queryKinds[k] = append(queryKinds[k], qf) - } - } else { - // ! it makes query to the most requested kinds if there is no kind provided. - // ? fix::: any better way? - for _, k := range possibleKinds { - queryKinds[k] = append(queryKinds[k], qf) - } + unionStage := bson.D{ + {Key: "$unionWith", Value: bson.D{ + {Key: "coll", Value: collectionName}, + {Key: "pipeline", Value: mongo.Pipeline{ + matchStage, + }}, + }}, } - } - - var finalResult []event.Event - for kind, filters := range queryKinds { - // todo::: querying database in goroutines. - collection := h.db.Client.Database(h.db.DBName).Collection(getCollectionName(kind)) - for _, f := range filters { - query, opts, err := h.FilterToQuery(&f) - if err != nil { - continue - } - - cursor, err := collection.Find(ctx, query, opts) - if err != nil { - if !errors.Is(err, mongo.ErrNoDocuments) { - _, err := h.grpc.AddLog(context.Background(), - "database error while making query", err.Error()) - if err != nil { - logger.Error("can't send log to manager", "err", err) - } - } - - return nil, err - } - - var result []event.Event - if err := cursor.All(ctx, &result); err != nil { - return nil, err - } - - finalResult = append(finalResult, result...) - } + pipeline = append(pipeline, unionStage) } - return finalResult, nil -} - -func removeDuplicateKind(intSlice []types.Kind) []types.Kind { - allKeys := make(map[types.Kind]bool) - list := []types.Kind{} - for _, item := range intSlice { - if _, value := allKeys[item]; !value { - allKeys[item] = true - list = append(list, item) - } + sortStage := bson.D{ + {Key: "$sort", Value: bson.D{ + {Key: "created_at", Value: -1}, + {Key: "id", Value: 1}, + }}, } - return list -} - -func (h *Handler) FilterToQuery(fq *filterQuery) (bson.D, *options.FindOptions, error) { - query := make(bson.D, 0) - opts := options.Find() - - query = append(query, bson.E{Key: "pubkey", Value: bson.M{ - "$exists": true, - }}) + pipeline = append(pipeline, sortStage) - if len(fq.IDs) > 0 { - query = append(query, bson.E{Key: "id", Value: bson.M{"$in": fq.IDs}}) + finalLimit := h.config.DefaultQueryLimit + if f.Limit > 0 && f.Limit < h.config.MaxQueryLimit { + finalLimit = f.Limit } - if len(fq.Authors) > 0 { - query = append(query, bson.E{Key: "pubkey", Value: bson.M{"$in": fq.Authors}}) + limitStage := bson.D{ + {Key: "$limit", Value: finalLimit}, } - if len(fq.Tags) > 0 { - tagQueries := bson.A{} - for tagKey, tagValues := range fq.Tags { - qtf := bson.M{ - "tags": bson.M{ - "$elemMatch": bson.M{ - "0": tagKey, - "1": bson.M{"$in": tagValues}, - }, - }, - } - tagQueries = append(tagQueries, qtf) + pipeline = append(pipeline, limitStage) + + ctx, cancel := context.WithTimeout(context.Background(), h.db.QueryTimeout) + defer cancel() + + cursor, err := h.db.Client.Database(h.db.DBName).Collection("empty").Aggregate(ctx, pipeline) + if err != nil { + _, err := h.grpc.AddLog(context.Background(), + "database error while adding new event", err.Error()) + if err != nil { + logger.Error("can't send log to manager", "err", err) } - query = append(query, bson.E{Key: "$and", Value: tagQueries}) - } - if fq.Since > 0 { - query = append(query, bson.E{Key: "created_at", Value: bson.M{"$gte": fq.Since}}) + return nil, err } + defer cursor.Close(ctx) - if fq.Until > 0 { - query = append(query, bson.E{Key: "created_at", Value: bson.M{"$lte": fq.Since}}) - } + var finalResult []event.Event + if err := cursor.All(ctx, &finalResult); err != nil { + _, err := h.grpc.AddLog(context.Background(), + "database error while adding new event", err.Error()) + if err != nil { + logger.Error("can't send log to manager", "err", err) + } - if fq.Limit > 0 && fq.Limit < h.config.MaxQueryLimit { - opts.SetLimit(int64(fq.Limit)) - } else { - opts.SetLimit(int64(h.config.DefaultQueryLimit)) + return nil, err } - opts.SetSort(bson.D{ - {Key: "created_at", Value: -1}, - {Key: "id", Value: 1}, - }) - - return query, opts, nil + return finalResult, nil } diff --git a/types/event/event.go b/types/event/event.go index a8ef76b..9efed9e 100644 --- a/types/event/event.go +++ b/types/event/event.go @@ -99,14 +99,14 @@ func (e *Event) IsValid(id [32]byte) bool { return false } - // TODO::: replace with libsecp256k1 (C++ version). + // todo::: replace with libsecp256k1 (C++ version). return sig.Verify(id[:], pubkey) } -// IsProtected checks is ["-"] is present, look nip-70 for more. +// IsProtected checks if ["-"] tag is present, check nip-70 for more. func (e *Event) IsProtected() bool { for _, t := range e.Tags { - if len(t) < 1 { + if len(t) != 1 { continue } diff --git a/types/message/message.go b/types/message/message.go index e78a7c5..f18ddfc 100644 --- a/types/message/message.go +++ b/types/message/message.go @@ -126,7 +126,7 @@ func (em Event) EncodeToJSON() ([]byte, error) { // Req represents a NIP-01 REQ message. type Req struct { SubscriptionID string - filter.Filters + filter.Filter } func (Req) Type() string { return "REQ" } @@ -140,21 +140,17 @@ func (rm *Req) DecodeFromJSON(data []byte) error { } } rm.SubscriptionID = arr[1].Str - rm.Filters = make(filter.Filters, len(arr)-2) - f := 0 - for i := 2; i < len(arr); i++ { - if err := easyjson.Unmarshal([]byte(arr[i].Raw), &rm.Filters[f]); err != nil { - return types.DecodeError{ - Reason: fmt.Sprintf("REQ message: %s", err.Error()), - } + rm.Filter = filter.Filter{} + if err := easyjson.Unmarshal([]byte(arr[2].Raw), &rm.Filter); err != nil { + return types.DecodeError{ + Reason: fmt.Sprintf("REQ message: %s", err.Error()), } - f++ } return nil } -func (rm Req) EncodeToJSON() ([]byte, error) { +func (rm *Req) EncodeToJSON() ([]byte, error) { return nil, nil } diff --git a/types/message/message_test.go b/types/message/message_test.go index e05d43a..eee371b 100644 --- a/types/message/message_test.go +++ b/types/message/message_test.go @@ -34,13 +34,13 @@ var testCases = []testCase{ }, { Name: "REQ message", - Message: []byte(`["REQ","million", {"kinds": [1]}, {"kinds": [30023 ], "#d": ["buteko", "batuke"]}]`), + Message: []byte(`["REQ","million", {"kinds": [30023 ], "#d": ["buteko", "batuke"]}]`), ExpectedEnvelope: &message.Req{ SubscriptionID: "million", - Filters: filter.Filters{{Kinds: []types.Kind{1}}, { + Filter: filter.Filter{ Kinds: []types.Kind{30023}, Tags: map[string][]string{"d": {"buteko", "batuke"}}, - }}, + }, }, }, {