Skip to content

Commit

Permalink
chore: fix clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
richardshiue committed Jan 7, 2025
1 parent f8ccce8 commit 26a2df2
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 64 deletions.
33 changes: 8 additions & 25 deletions frontend/rust-lib/flowy-ai/src/ai_manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::chat::Chat;
use crate::entities::{
ChatInfoPB, ChatMessageListPB, ChatMessagePB, ChatSettingsPB, FilePB, PredefinedFormatPB,
RepeatedRelatedQuestionPB,
RepeatedRelatedQuestionPB, StreamMessageParams,
};
use crate::local_ai::local_llm_chat::LocalAIController;
use crate::middleware::chat_service_mw::AICloudServiceMiddleware;
Expand All @@ -10,9 +10,7 @@ use std::collections::HashMap;

use appflowy_plugin::manager::PluginManager;
use dashmap::DashMap;
use flowy_ai_pub::cloud::{
ChatCloudService, ChatMessageMetadata, ChatMessageType, ChatSettings, UpdateChatParams,
};
use flowy_ai_pub::cloud::{ChatCloudService, ChatSettings, UpdateChatParams};
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::kv::KVStorePreferences;
use flowy_sqlite::DBConnection;
Expand Down Expand Up @@ -213,30 +211,15 @@ impl AIManager {
Ok(chat)
}

pub async fn stream_chat_message(
&self,
chat_id: &str,
message: &str,
message_type: ChatMessageType,
answer_stream_port: i64,
question_stream_port: i64,
format: Option<PredefinedFormatPB>,
metadata: Vec<ChatMessageMetadata>,
pub async fn stream_chat_message<'a>(
&'a self,
params: &'a StreamMessageParams<'a>,
) -> Result<ChatMessagePB, FlowyError> {
let chat = self.get_or_create_chat_instance(chat_id).await?;
let question = chat
.stream_chat_message(
message,
message_type,
answer_stream_port,
question_stream_port,
format,
metadata,
)
.await?;
let chat = self.get_or_create_chat_instance(params.chat_id).await?;
let question = chat.stream_chat_message(params).await?;
let _ = self
.external_service
.notify_did_send_message(chat_id, message)
.notify_did_send_message(params.chat_id, params.message)
.await;
Ok(question)
}
Expand Down
40 changes: 17 additions & 23 deletions frontend/rust-lib/flowy-ai/src/chat.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::ai_manager::AIUserService;
use crate::entities::{
ChatMessageErrorPB, ChatMessageListPB, ChatMessagePB, PredefinedFormatPB,
RepeatedRelatedQuestionPB,
RepeatedRelatedQuestionPB, StreamMessageParams,
};
use crate::middleware::chat_service_mw::AICloudServiceMiddleware;
use crate::notification::{chat_notification_builder, ChatNotification};
Expand All @@ -12,8 +12,7 @@ use crate::persistence::{
use crate::stream_message::StreamMessage;
use allo_isolate::Isolate;
use flowy_ai_pub::cloud::{
ChatCloudService, ChatMessage, ChatMessageMetadata, ChatMessageType, MessageCursor,
QuestionStreamValue, ResponseFormat,
ChatCloudService, ChatMessage, MessageCursor, QuestionStreamValue, ResponseFormat,
};
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::DBConnection;
Expand Down Expand Up @@ -82,22 +81,17 @@ impl Chat {
}

#[instrument(level = "info", skip_all, err)]
pub async fn stream_chat_message(
&self,
message: &str,
message_type: ChatMessageType,
answer_stream_port: i64,
question_stream_port: i64,
format: Option<PredefinedFormatPB>,
metadata: Vec<ChatMessageMetadata>,
pub async fn stream_chat_message<'a>(
&'a self,
params: &'a StreamMessageParams<'a>,
) -> Result<ChatMessagePB, FlowyError> {
trace!(
"[Chat] stream chat message: chat_id={}, message={}, message_type={:?}, metadata={:?}, format={:?}",
self.chat_id,
message,
message_type,
metadata,
format,
params.message,
params.message_type,
params.metadata,
params.format,
);

// clear
Expand All @@ -106,22 +100,22 @@ impl Chat {
.store(false, std::sync::atomic::Ordering::SeqCst);
self.stream_buffer.lock().await.clear();

let mut question_sink = IsolateSink::new(Isolate::new(question_stream_port));
let mut question_sink = IsolateSink::new(Isolate::new(params.question_stream_port));
let answer_stream_buffer = self.stream_buffer.clone();
let uid = self.user_service.user_id()?;
let workspace_id = self.user_service.workspace_id()?;

let _ = question_sink
.send(StreamMessage::Text(message.to_string()).to_string())
.send(StreamMessage::Text(params.message.to_string()).to_string())
.await;
let question = self
.chat_service
.create_question(
&workspace_id,
&self.chat_id,
message,
message_type,
&metadata,
params.message,
params.message_type.clone(),
&params.metadata,
)
.await
.map_err(|err| {
Expand All @@ -134,7 +128,7 @@ impl Chat {
.await;
if let Err(err) = self
.chat_service
.index_message_metadata(&self.chat_id, &metadata, &mut question_sink)
.index_message_metadata(&self.chat_id, &params.metadata, &mut question_sink)
.await
{
error!("Failed to index file: {}", err);
Expand All @@ -144,10 +138,10 @@ impl Chat {
// Save message to disk
save_and_notify_message(uid, &self.chat_id, &self.user_service, question.clone())?;

let format = format.unwrap_or_default().into();
let format = params.format.clone().unwrap_or_default().into();

self.stream_response(
answer_stream_port,
params.answer_stream_port,
answer_stream_buffer,
uid,
workspace_id,
Expand Down
15 changes: 13 additions & 2 deletions frontend/rust-lib/flowy-ai/src/entities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::collections::HashMap;

use crate::local_ai::local_llm_resource::PendingResource;
use flowy_ai_pub::cloud::{
ChatMessage, LLMModel, OutputContent, OutputLayout, RelatedQuestion, RepeatedChatMessage,
RepeatedRelatedQuestion, ResponseFormat,
ChatMessage, ChatMessageMetadata, ChatMessageType, LLMModel, OutputContent, OutputLayout,
RelatedQuestion, RepeatedChatMessage, RepeatedRelatedQuestion, ResponseFormat,
};
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use lib_infra::validator_fn::required_not_empty_str;
Expand Down Expand Up @@ -75,6 +75,17 @@ pub struct StreamChatPayloadPB {
pub metadata: Vec<ChatMessageMetaPB>,
}

#[derive(Default, Debug)]
pub struct StreamMessageParams<'a> {
pub chat_id: &'a str,
pub message: &'a str,
pub message_type: ChatMessageType,
pub answer_stream_port: i64,
pub question_stream_port: i64,
pub format: Option<PredefinedFormatPB>,
pub metadata: Vec<ChatMessageMetadata>,
}

#[derive(Default, ProtoBuf, Validate, Clone, Debug)]
pub struct RegenerateResponsePB {
#[pb(index = 1)]
Expand Down
38 changes: 24 additions & 14 deletions frontend/rust-lib/flowy-ai/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,22 @@ pub(crate) async fn stream_chat_message_handler(
let data = data.into_inner();
data.validate()?;

let message_type = match data.message_type {
let StreamChatPayloadPB {
chat_id,
message,
message_type,
answer_stream_port,
question_stream_port,
format,
metadata,
} = data;

let message_type = match message_type {
ChatMessageTypePB::System => ChatMessageType::System,
ChatMessageTypePB::User => ChatMessageType::User,
};

let metadata = data
.metadata
let metadata = metadata
.into_iter()
.map(|metadata| {
let (content_type, content_len) = match metadata.loader_type {
Expand All @@ -63,18 +72,19 @@ pub(crate) async fn stream_chat_message_handler(
.collect::<Vec<_>>();

trace!("Stream chat message with metadata: {:?}", metadata);

let params = StreamMessageParams {
chat_id: &chat_id,
message: &message,
message_type,
answer_stream_port,
question_stream_port,
format,
metadata,
};

let ai_manager = upgrade_ai_manager(ai_manager)?;
let result = ai_manager
.stream_chat_message(
&data.chat_id,
&data.message,
message_type,
data.answer_stream_port,
data.question_stream_port,
data.format,
metadata,
)
.await?;
let result = ai_manager.stream_chat_message(&params).await?;
data_result_ok(result)
}

Expand Down

0 comments on commit 26a2df2

Please sign in to comment.