Skip to content

Commit

Permalink
refactor: GetUserChats pipeline in chat_repository.go
Browse files Browse the repository at this point in the history
  • Loading branch information
tahadostifam committed Apr 21, 2024
1 parent 80c1f26 commit de9e941
Showing 1 changed file with 30 additions and 43 deletions.
73 changes: 30 additions & 43 deletions internal/repository/chat/chat_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,70 +75,57 @@ func (repo *chatRepository) FindOne(filter bson.M) (*chat.Chat, error) {
return model, nil
}

// Using mongodb aggregation pipeline to fetch user-chats.
// This process is a little special because we do not fetch all of the messages because it's really heavy query!
// Then, only last 3 messages are going to be fetched by pipeline.
func (repo *chatRepository) GetUserChats(userStaticID primitive.ObjectID) ([]chat.Chat, error) {
ctx := context.TODO()

memberMatchStage := bson.M{"chat_detail.members": bson.M{"$in": bson.A{userStaticID}}}
sidesMatchStage := bson.D{{
Key: "$match",
Value: bson.D{{
Key: "chat_detail.sides",
Value: bson.D{{
Key: "$in",
Value: bson.A{userStaticID},
}},
}},
}}

// Define the pipeline for direct chats (with aggregation for sides)
directPipeline := mongo.Pipeline{
sidesMatchStage,
{{
Key: "$lookup",
Value: bson.M{
pipeline := []bson.M{
{
"$match": bson.M{
"$or": []interface{}{
bson.M{"chat_detail.sides": bson.M{"$in": []interface{}{userStaticID}}},
bson.M{"chat_detail.members": bson.M{"$in": []interface{}{userStaticID}}},
},
},
},
{
"$lookup": bson.M{
"from": "users",
"localField": "chat_detail.sides",
"foreignField": "id",
"as": "chat_detail.fetchedUsers",
"pipeline": []interface{}{
bson.M{
"$match": bson.M{
"$expr": bson.M{"$eq": []interface{}{"$chat_type", "direct"}}},
},
},
},
}},
{{
Key: "$project",
Value: bson.M{
},
{
"$project": bson.M{
"chat_id": 1,
"chat_type": 1,
"chat_detail": 1,
"messages": bson.M{"$slice": []interface{}{"$messages", -3}},
},
}},
},
}

normalCursor, err := repo.chatsCollection.Find(context.TODO(), memberMatchStage)
cursor, err := repo.chatsCollection.Aggregate(ctx, pipeline)
if err != nil {
return []chat.Chat{}, err
}
defer normalCursor.Close(ctx)

// Aggregate for direct chats
directCursor, err := repo.chatsCollection.Aggregate(ctx, directPipeline)
if err != nil {
return nil, err
}
defer directCursor.Close(ctx)

// Decode and merge results
var allChats []chat.Chat
if err := normalCursor.All(ctx, &allChats); err != nil {
return nil, err
}
defer cursor.Close(ctx)

var directChats []chat.Chat
if err := directCursor.All(ctx, &directChats); err != nil {
var chatsList []chat.Chat
if err := cursor.All(ctx, &chatsList); err != nil {
return nil, err
}

allChats = append(allChats, directChats...)

return allChats, nil
return chatsList, nil
}

func (repo *chatRepository) FindByID(staticID primitive.ObjectID) (*chat.Chat, error) {
Expand Down

0 comments on commit de9e941

Please sign in to comment.