From 92eacb471fdcc9cdce636b67c56ac603f7992796 Mon Sep 17 00:00:00 2001 From: Taha Dostifam Date: Mon, 23 Sep 2024 18:43:11 +0330 Subject: [PATCH] refactor: direct chat creation business logic improved --- database/repo_mongo/chat_repository.go | 26 +++--- delivery/grpc/handlers/auth_handler.go | 2 +- delivery/grpc/handlers/chat_handler.go | 15 ++-- delivery/grpc/handlers/message_handler.go | 2 +- delivery/grpc/handlers/search_handler.go | 2 +- .../proto_model_transformer/chat_model.go | 0 .../model}/proto_model_transformer/errors.go | 0 .../proto_model_transformer/message_model.go | 0 .../proto_model_transformer/user_model.go | 0 internal/repository/chat_repository.go | 4 +- internal/service/chat/chat_service.go | 79 +++++++++++++------ internal/service/common_errors.go | 7 ++ internal/service/message/message_service.go | 2 +- 13 files changed, 89 insertions(+), 50 deletions(-) rename {delivery/grpc => internal/model}/proto_model_transformer/chat_model.go (100%) rename {delivery/grpc => internal/model}/proto_model_transformer/errors.go (100%) rename {delivery/grpc => internal/model}/proto_model_transformer/message_model.go (100%) rename {delivery/grpc => internal/model}/proto_model_transformer/user_model.go (100%) create mode 100644 internal/service/common_errors.go diff --git a/database/repo_mongo/chat_repository.go b/database/repo_mongo/chat_repository.go index ebc412d..78d8f5c 100644 --- a/database/repo_mongo/chat_repository.go +++ b/database/repo_mongo/chat_repository.go @@ -21,7 +21,7 @@ func NewChatMongoRepository(db *mongo.Database) repository.ChatRepository { return &chatRepository{db.Collection(database.UsersCollection), db.Collection(database.ChatsCollection)} } -func (repo *chatRepository) JoinChat(ctx context.Context, chatType string, userID string, chatID primitive.ObjectID) error { +func (repo *chatRepository) AddToUsersChatsList(ctx context.Context, userID string, chatID primitive.ObjectID) error { userFilter := bson.M{"user_id": userID} userUpdate := bson.M{ "$addToSet": bson.M{ @@ -33,18 +33,20 @@ func (repo *chatRepository) JoinChat(ctx context.Context, chatType string, userI return err } - if chatType != "direct" { - chatFilter := bson.M{"_id": chatID} - chatUpdate := bson.M{ - "$addToSet": bson.M{ - "chat_detail.members": userID, - }, - } + return nil +} - _, err = repo.chatsCollection.UpdateOne(ctx, chatFilter, chatUpdate) - if err != nil { - return err - } +func (repo *chatRepository) JoinChat(ctx context.Context, chatType string, userID string, chatID primitive.ObjectID) error { + chatFilter := bson.M{"_id": chatID} + chatUpdate := bson.M{ + "$addToSet": bson.M{ + "chat_detail.members": userID, + }, + } + + _, err := repo.chatsCollection.UpdateOne(ctx, chatFilter, chatUpdate) + if err != nil { + return err } return nil diff --git a/delivery/grpc/handlers/auth_handler.go b/delivery/grpc/handlers/auth_handler.go index cf6bf21..1efe385 100644 --- a/delivery/grpc/handlers/auth_handler.go +++ b/delivery/grpc/handlers/auth_handler.go @@ -5,8 +5,8 @@ import ( "connectrpc.com/connect" grpc_helpers "github.com/kavkaco/Kavka-Core/delivery/grpc/helpers" - "github.com/kavkaco/Kavka-Core/delivery/grpc/proto_model_transformer" + "github.com/kavkaco/Kavka-Core/internal/model/proto_model_transformer" "github.com/kavkaco/Kavka-Core/internal/service/auth" authv1 "github.com/kavkaco/Kavka-ProtoBuf/gen/go/protobuf/auth/v1" diff --git a/delivery/grpc/handlers/chat_handler.go b/delivery/grpc/handlers/chat_handler.go index 84727bb..8ff5f54 100644 --- a/delivery/grpc/handlers/chat_handler.go +++ b/delivery/grpc/handlers/chat_handler.go @@ -8,8 +8,9 @@ import ( grpc_helpers "github.com/kavkaco/Kavka-Core/delivery/grpc/helpers" "github.com/kavkaco/Kavka-Core/delivery/grpc/interceptor" - "github.com/kavkaco/Kavka-Core/delivery/grpc/proto_model_transformer" "github.com/kavkaco/Kavka-Core/internal/model" + "github.com/kavkaco/Kavka-Core/internal/model/proto_model_transformer" + "github.com/kavkaco/Kavka-Core/internal/service" "github.com/kavkaco/Kavka-Core/internal/service/chat" "github.com/kavkaco/Kavka-Core/log" chatv1 "github.com/kavkaco/Kavka-ProtoBuf/gen/go/protobuf/chat/v1" @@ -38,7 +39,7 @@ func (h chatHandler) GetDirectChat(ctx context.Context, req *connect.Request[cha chatProto, err := proto_model_transformer.ChatToProto(*chatDto) if err != nil { - return nil, connect.NewError(connect.CodeInternal, err) + return nil, connect.NewError(connect.CodeInternal, service.ErrProtoMarshaling) } res := connect.NewResponse(&chatv1.GetDirectChatResponse{ @@ -61,7 +62,7 @@ func (h chatHandler) CreateChannel(ctx context.Context, req *connect.Request[cha chatProto, err := proto_model_transformer.ChatToProto(*chat) if err != nil { - return nil, connect.NewError(connect.CodeInternal, err) + return nil, connect.NewError(connect.CodeInternal, service.ErrProtoMarshaling) } res := connect.NewResponse(&chatv1.CreateChannelResponse{ @@ -88,7 +89,7 @@ func (h chatHandler) CreateDirect(ctx context.Context, req *connect.Request[chat chatProto, err := proto_model_transformer.ChatToProto(*chat) if err != nil { - return nil, connect.NewError(connect.CodeInternal, err) + return nil, connect.NewError(connect.CodeInternal, service.ErrProtoMarshaling) } res := connect.NewResponse(&chatv1.CreateDirectResponse{ @@ -111,7 +112,7 @@ func (h chatHandler) CreateGroup(ctx context.Context, req *connect.Request[chatv chatProto, err := proto_model_transformer.ChatToProto(*chat) if err != nil { - return nil, connect.NewError(connect.CodeInternal, err) + return nil, connect.NewError(connect.CodeInternal, service.ErrProtoMarshaling) } res := connect.NewResponse(&chatv1.CreateGroupResponse{ @@ -140,7 +141,7 @@ func (h chatHandler) GetChat(ctx context.Context, req *connect.Request[chatv1.Ge chatGetter := model.NewChatDTO(chat) chatProto, err := proto_model_transformer.ChatToProto(*chatGetter) if err != nil { - return nil, connect.NewError(connect.CodeInternal, err) + return nil, connect.NewError(connect.CodeInternal, service.ErrProtoMarshaling) } res := &connect.Response[chatv1.GetChatResponse]{ @@ -199,7 +200,7 @@ func (h chatHandler) JoinChat(ctx context.Context, req *connect.Request[chatv1.J protoChat, err := proto_model_transformer.ChatToProto(*joinResult.UpdatedChat) if err != nil { - return nil, grpc_helpers.GrpcVarror(varror, connect.CodeInternal) + return nil, connect.NewError(connect.CodeInternal, service.ErrProtoMarshaling) } res := &connect.Response[chatv1.JoinChatResponse]{Msg: &chatv1.JoinChatResponse{ diff --git a/delivery/grpc/handlers/message_handler.go b/delivery/grpc/handlers/message_handler.go index 58328ab..94a5861 100644 --- a/delivery/grpc/handlers/message_handler.go +++ b/delivery/grpc/handlers/message_handler.go @@ -7,10 +7,10 @@ import ( grpc_helpers "github.com/kavkaco/Kavka-Core/delivery/grpc/helpers" "github.com/kavkaco/Kavka-Core/delivery/grpc/interceptor" "github.com/kavkaco/Kavka-Core/internal/model" + "github.com/kavkaco/Kavka-Core/internal/model/proto_model_transformer" "github.com/kavkaco/Kavka-Core/internal/service/message" "github.com/kavkaco/Kavka-Core/log" - "github.com/kavkaco/Kavka-Core/delivery/grpc/proto_model_transformer" "github.com/kavkaco/Kavka-Core/utils/vali" messagev1 "github.com/kavkaco/Kavka-ProtoBuf/gen/go/protobuf/message/v1" "github.com/kavkaco/Kavka-ProtoBuf/gen/go/protobuf/message/v1/messagev1connect" diff --git a/delivery/grpc/handlers/search_handler.go b/delivery/grpc/handlers/search_handler.go index c918522..a5ab8ba 100644 --- a/delivery/grpc/handlers/search_handler.go +++ b/delivery/grpc/handlers/search_handler.go @@ -5,7 +5,7 @@ import ( "connectrpc.com/connect" grpc_helpers "github.com/kavkaco/Kavka-Core/delivery/grpc/helpers" - "github.com/kavkaco/Kavka-Core/delivery/grpc/proto_model_transformer" + "github.com/kavkaco/Kavka-Core/internal/model/proto_model_transformer" "github.com/kavkaco/Kavka-Core/internal/service/search" "github.com/kavkaco/Kavka-Core/log" searchv1 "github.com/kavkaco/Kavka-ProtoBuf/gen/go/protobuf/search/v1" diff --git a/delivery/grpc/proto_model_transformer/chat_model.go b/internal/model/proto_model_transformer/chat_model.go similarity index 100% rename from delivery/grpc/proto_model_transformer/chat_model.go rename to internal/model/proto_model_transformer/chat_model.go diff --git a/delivery/grpc/proto_model_transformer/errors.go b/internal/model/proto_model_transformer/errors.go similarity index 100% rename from delivery/grpc/proto_model_transformer/errors.go rename to internal/model/proto_model_transformer/errors.go diff --git a/delivery/grpc/proto_model_transformer/message_model.go b/internal/model/proto_model_transformer/message_model.go similarity index 100% rename from delivery/grpc/proto_model_transformer/message_model.go rename to internal/model/proto_model_transformer/message_model.go diff --git a/delivery/grpc/proto_model_transformer/user_model.go b/internal/model/proto_model_transformer/user_model.go similarity index 100% rename from delivery/grpc/proto_model_transformer/user_model.go rename to internal/model/proto_model_transformer/user_model.go diff --git a/internal/repository/chat_repository.go b/internal/repository/chat_repository.go index 60eb1e8..5f1e1aa 100644 --- a/internal/repository/chat_repository.go +++ b/internal/repository/chat_repository.go @@ -4,7 +4,6 @@ import ( "context" "github.com/kavkaco/Kavka-Core/internal/model" - "go.mongodb.org/mongo-driver/bson/primitive" ) type ChatRepository interface { @@ -14,5 +13,6 @@ type ChatRepository interface { GetUserChats(ctx context.Context, userID model.UserID, chatIDs []model.ChatID) ([]model.ChatDTO, error) GetDirectChat(ctx context.Context, userID model.UserID, recipientUserID model.UserID) (*model.Chat, error) GetChatMembers(chatID model.ChatID) []model.Member - JoinChat(ctx context.Context, chatType string, userID string, chatID primitive.ObjectID) error + JoinChat(ctx context.Context, chatType string, userID string, chatID model.ChatID) error + AddToUsersChatsList(ctx context.Context, userID string, chatID model.ChatID) error } diff --git a/internal/service/chat/chat_service.go b/internal/service/chat/chat_service.go index fec82e2..2e1ba1d 100644 --- a/internal/service/chat/chat_service.go +++ b/internal/service/chat/chat_service.go @@ -6,10 +6,14 @@ import ( "github.com/kavkaco/Kavka-Core/infra/stream" "github.com/kavkaco/Kavka-Core/internal/model" + "github.com/kavkaco/Kavka-Core/internal/model/proto_model_transformer" "github.com/kavkaco/Kavka-Core/internal/repository" + "github.com/kavkaco/Kavka-Core/internal/service" "github.com/kavkaco/Kavka-Core/log" "github.com/kavkaco/Kavka-Core/utils" "github.com/kavkaco/Kavka-Core/utils/vali" + eventsv1 "github.com/kavkaco/Kavka-ProtoBuf/gen/go/protobuf/events/v1" + "google.golang.org/protobuf/proto" ) const SubjChats = "chats" @@ -133,7 +137,12 @@ func (s *ChatService) CreateDirect(ctx context.Context, userID model.UserID, rec return nil, &vali.Varror{Error: ErrMessageStoreCreation} } - err = s.chatRepo.JoinChat(ctx, createdChat.ChatType, userID, createdChat.ChatID) + err = s.chatRepo.AddToUsersChatsList(ctx, userID, createdChat.ChatID) + if err != nil { + return nil, &vali.Varror{Error: ErrUnableToAddChatToUsersList} + } + + err = s.chatRepo.AddToUsersChatsList(ctx, recipientUserID, createdChat.ChatID) if err != nil { return nil, &vali.Varror{Error: ErrUnableToAddChatToUsersList} } @@ -156,6 +165,34 @@ func (s *ChatService) CreateDirect(ctx context.Context, userID model.UserID, rec Recipient: recipient, } + chatProto, err := proto_model_transformer.ChatToProto(*chatDTO) + if err != nil { + return nil, &vali.Varror{Error: service.ErrProtoMarshaling} + } + + // Let's tell the recipient that this user created a direct chat with you + payloadProtoBuf, marshalErr := proto.Marshal(&eventsv1.SubscribeEventsStreamResponse{ + Name: "add-chat", + Type: eventsv1.SubscribeEventsStreamResponse_TYPE_ADD_CHAT, + Payload: &eventsv1.SubscribeEventsStreamResponse_AddChat{ + AddChat: &eventsv1.AddChat{ + Chat: chatProto, + }, + }, + }, + ) + if marshalErr != nil { + return nil, &vali.Varror{Error: service.ErrProtoMarshaling} + } + + s.eventPublisher.Publish(&eventsv1.StreamEvent{ + SenderUserId: userID, + ReceiversUserId: []model.UserID{ + finalRecipientUserID, + }, + Payload: payloadProtoBuf, + }) + return chatDTO, nil } @@ -183,19 +220,15 @@ func (s *ChatService) CreateGroup(ctx context.Context, userID model.UserID, titl Text: "Group created", }, userID) - go func() { - createErr := s.messageRepo.Create(context.TODO(), savedChat.ChatID) - if createErr != nil { - s.logger.Error("message store creation failed: " + createErr.Error()) - return - } + err = s.messageRepo.Create(context.TODO(), savedChat.ChatID) + if err != nil { + return nil, &vali.Varror{Error: ErrJoinDirectChat} + } - _, createErr = s.messageRepo.Insert(context.TODO(), savedChat.ChatID, messageModel) - if createErr != nil { - s.logger.Error("failed to insert message in group creation: " + createErr.Error()) - return - } - }() + _, err = s.messageRepo.Insert(context.TODO(), savedChat.ChatID, messageModel) + if err != nil { + return nil, &vali.Varror{Error: ErrJoinDirectChat} + } err = s.chatRepo.JoinChat(ctx, savedChat.ChatType, userID, savedChat.ChatID) if err != nil { @@ -232,19 +265,15 @@ func (s *ChatService) CreateChannel(ctx context.Context, userID model.UserID, ti Text: "Channel created", }, userID) - go func() { - createError := s.messageRepo.Create(context.TODO(), savedChat.ChatID) - if createError != nil { - s.logger.Error("message store creation failed: " + createError.Error()) - return - } + err = s.messageRepo.Create(context.TODO(), savedChat.ChatID) + if err != nil { + return nil, &vali.Varror{Error: ErrMessageStoreCreation} + } - _, createError = s.messageRepo.Insert(context.TODO(), savedChat.ChatID, messageModel) - if createError != nil { - s.logger.Error("failed to insert message in channel creation: " + createError.Error()) - return - } - }() + _, err = s.messageRepo.Insert(context.TODO(), savedChat.ChatID, messageModel) + if err != nil { + return nil, &vali.Varror{Error: ErrMessageStoreCreation} + } err = s.chatRepo.JoinChat(ctx, savedChat.ChatType, userID, savedChat.ChatID) if err != nil { diff --git a/internal/service/common_errors.go b/internal/service/common_errors.go new file mode 100644 index 0000000..95bcd05 --- /dev/null +++ b/internal/service/common_errors.go @@ -0,0 +1,7 @@ +package service + +import "errors" + +var ( + ErrProtoMarshaling = errors.New("proto marshaling failed") +) diff --git a/internal/service/message/message_service.go b/internal/service/message/message_service.go index 81e1c7d..c3cda49 100644 --- a/internal/service/message/message_service.go +++ b/internal/service/message/message_service.go @@ -3,9 +3,9 @@ package message import ( "context" - "github.com/kavkaco/Kavka-Core/delivery/grpc/proto_model_transformer" "github.com/kavkaco/Kavka-Core/infra/stream" "github.com/kavkaco/Kavka-Core/internal/model" + "github.com/kavkaco/Kavka-Core/internal/model/proto_model_transformer" "github.com/kavkaco/Kavka-Core/internal/repository" "github.com/kavkaco/Kavka-Core/log" "github.com/kavkaco/Kavka-Core/utils/vali"