Skip to content

Commit

Permalink
Introduce DeserializedMessage for carrying schema information into th…
Browse files Browse the repository at this point in the history
…e writers
  • Loading branch information
rtyler committed Jan 7, 2024
1 parent c511f50 commit d0f1ea3
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 62 deletions.
13 changes: 9 additions & 4 deletions src/coercions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use serde_json::Value;
use std::collections::HashMap;
use std::str::FromStr;

use crate::serialization::DeserializedMessage;

#[derive(Debug, Clone, PartialEq)]
#[allow(unused)]
enum CoercionNode {
Expand Down Expand Up @@ -72,7 +74,7 @@ fn build_coercion_node(data_type: &DataType) -> Option<CoercionNode> {

/// Applies all data coercions specified by the [`CoercionTree`] to the [`Value`].
/// Though it does not currently, this function should approximate or improve on the coercions applied by [Spark's `from_json`](https://spark.apache.org/docs/latest/api/sql/index.html#from_json)
pub(crate) fn coerce(value: &mut Value, coercion_tree: &CoercionTree) {
pub(crate) fn coerce(value: &mut DeserializedMessage, coercion_tree: &CoercionTree) {
if let Some(context) = value.as_object_mut() {
for (field_name, coercion) in coercion_tree.root.iter() {
if let Some(value) = context.get_mut(field_name) {
Expand Down Expand Up @@ -322,7 +324,7 @@ mod tests {

let coercion_tree = create_coercion_tree(&delta_schema);

let mut messages = vec![
let mut messages: Vec<DeserializedMessage> = vec![
json!({
"level1_string": "a",
"level1_integer": 0,
Expand Down Expand Up @@ -380,7 +382,10 @@ mod tests {
// This is valid epoch micros, but typed as a string on the way in. We WON'T coerce it.
"level1_timestamp": "1636668718000000",
}),
];
]
.into_iter()
.map(|f| f.into())
.collect();

for message in messages.iter_mut() {
coerce(message, &coercion_tree);
Expand Down Expand Up @@ -447,7 +452,7 @@ mod tests {
];

for i in 0..messages.len() {
assert_eq!(messages[i], expected[i]);
assert_eq!(messages[i].clone().message(), expected[i]);
}
}
}
15 changes: 10 additions & 5 deletions src/dead_letters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;

use crate::serialization::DeserializedMessage;
use crate::{transforms::TransformError, writer::*};

#[cfg(feature = "s3")]
Expand Down Expand Up @@ -55,11 +56,11 @@ impl DeadLetter {

/// Creates a dead letter from a failed transform.
/// `base64_bytes` will always be `None`.
pub(crate) fn from_failed_transform(value: &Value, err: TransformError) -> Self {
pub(crate) fn from_failed_transform(value: &DeserializedMessage, err: TransformError) -> Self {
let timestamp = Utc::now();
Self {
base64_bytes: None,
json_string: Some(value.to_string()),
json_string: Some(value.clone().message().to_string()),
error: Some(err.to_string()),
timestamp: timestamp
.timestamp_nanos_opt()
Expand Down Expand Up @@ -286,9 +287,10 @@ impl DeadLetterQueue for DeltaSinkDeadLetterQueue {
.map(|dl| {
serde_json::to_value(dl)
.map_err(|e| DeadLetterQueueError::SerdeJson { source: e })
.and_then(|mut v| {
.and_then(|v| {
self.transformer
.transform(&mut v, None as Option<&BorrowedMessage>)?;
// TODO: this can't be right, shouldn't this function takje DeserializedMessage
.transform(&mut v.clone().into(), None as Option<&BorrowedMessage>)?;
Ok(v)
})
})
Expand All @@ -297,7 +299,10 @@ impl DeadLetterQueue for DeltaSinkDeadLetterQueue {

let version = self
.delta_writer
.insert_all(&mut self.table, values)
.insert_all(
&mut self.table,
values.into_iter().map(|v| v.into()).collect(),
)
.await?;

if self.write_checkpoints {
Expand Down
11 changes: 7 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ mod dead_letters;
mod delta_helpers;
mod metrics;
mod offsets;
mod serialization;
#[allow(missing_docs)]
pub mod serialization;
mod transforms;
mod value_buffers;
/// Doc
Expand All @@ -56,6 +57,7 @@ use crate::value_buffers::{ConsumedBuffers, ValueBuffers};
use crate::{
dead_letters::*,
metrics::*,
serialization::*,
transforms::*,
writer::{DataWriter, DataWriterError},
};
Expand Down Expand Up @@ -207,8 +209,9 @@ pub enum IngestError {
}

/// Formats for message parsing
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub enum MessageFormat {
#[default]
/// Parses messages as json and uses the inferred schema
DefaultJson,

Expand Down Expand Up @@ -733,7 +736,7 @@ struct IngestProcessor {
coercion_tree: CoercionTree,
table: DeltaTable,
delta_writer: DataWriter,
value_buffers: ValueBuffers,
value_buffers: ValueBuffers<DeserializedMessage>,
delta_partition_offsets: HashMap<DataTypePartition, Option<DataTypeOffset>>,
latency_timer: Instant,
dlq: Box<dyn DeadLetterQueue>,
Expand Down Expand Up @@ -864,7 +867,7 @@ impl IngestProcessor {
async fn deserialize_message<M>(
&mut self,
msg: &M,
) -> Result<Value, MessageDeserializationError>
) -> Result<DeserializedMessage, MessageDeserializationError>
where
M: Message + Send + Sync,
{
Expand Down
97 changes: 88 additions & 9 deletions src/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,53 @@ use serde_json::Value;

use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat};

use deltalake_core::arrow::datatypes::Schema as ArrowSchema;

/// Structure which contains the [serde_json::Value] and the inferred schema of the message
///
/// The [ArrowSchema] helps with schema evolution
#[derive(Clone, Debug, Default, PartialEq)]
pub struct DeserializedMessage {
message: Value,
schema: Option<ArrowSchema>,
}

impl DeserializedMessage {
pub fn schema(&self) -> &Option<ArrowSchema> {
&self.schema
}
pub fn message(self) -> Value {
self.message
}
pub fn get(&self, key: &str) -> Option<&Value> {
self.message.get(key)
}
pub fn as_object_mut(&mut self) -> Option<&mut serde_json::Map<String, Value>> {
self.message.as_object_mut()
}
}

/// Allow for `.into()` on [Value] for ease of use
impl From<Value> for DeserializedMessage {
fn from(message: Value) -> Self {
// XXX: This seems wasteful, this function should go away, and the deserializers should
// infer straight from the buffer stream
let iter = vec![message.clone()].into_iter().map(|v| Ok(v));
let schema =
match deltalake_core::arrow::json::reader::infer_json_schema_from_iterator(iter) {
Ok(schema) => Some(schema),
_ => None,
};
Self { message, schema }
}
}

#[async_trait]
pub(crate) trait MessageDeserializer {
async fn deserialize(
&mut self,
message_bytes: &[u8],
) -> Result<Value, MessageDeserializationError>;
) -> Result<DeserializedMessage, MessageDeserializationError>;
}

pub(crate) struct MessageDeserializerFactory {}
Expand Down Expand Up @@ -80,11 +121,15 @@ impl MessageDeserializerFactory {
}
}

#[derive(Clone, Debug, Default)]
struct DefaultDeserializer {}

#[async_trait]
impl MessageDeserializer for DefaultDeserializer {
async fn deserialize(&mut self, payload: &[u8]) -> Result<Value, MessageDeserializationError> {
async fn deserialize(
&mut self,
payload: &[u8],
) -> Result<DeserializedMessage, MessageDeserializationError> {
let value: Value = match serde_json::from_slice(payload) {
Ok(v) => v,
Err(e) => {
Expand All @@ -94,7 +139,41 @@ impl MessageDeserializer for DefaultDeserializer {
}
};

Ok(value)
Ok(value.into())
}
}

#[cfg(test)]
mod default_tests {
use super::*;

#[tokio::test]
async fn deserialize_with_schema() {
let mut deser = DefaultDeserializer::default();
let message = deser
.deserialize(r#"{"hello" : "world"}"#.as_bytes())
.await
.expect("Failed to deserialize trivial JSON");
assert!(
message.schema().is_some(),
"The DeserializedMessage doesn't have a schema!"
);
}

#[tokio::test]
async fn deserialize_simple_json() {
#[derive(serde::Deserialize)]
struct HW {
hello: String,
}

let mut deser = DefaultDeserializer::default();
let message = deser
.deserialize(r#"{"hello" : "world"}"#.as_bytes())
.await
.expect("Failed to deserialize trivial JSON");
let value: HW = serde_json::from_value(message.message).expect("Failed to coerce");
assert_eq!("world", value.hello);
}
}

Expand All @@ -116,11 +195,11 @@ impl MessageDeserializer for AvroDeserializer {
async fn deserialize(
&mut self,
message_bytes: &[u8],
) -> Result<Value, MessageDeserializationError> {
) -> Result<DeserializedMessage, MessageDeserializationError> {
match self.decoder.decode_with_schema(Some(message_bytes)).await {
Ok(drs) => match drs {
Some(v) => match Value::try_from(v.value) {
Ok(v) => Ok(v),
Ok(v) => Ok(v.into()),
Err(e) => Err(MessageDeserializationError::AvroDeserialization {
dead_letter: DeadLetter::from_failed_deserialization(
message_bytes,
Expand All @@ -147,7 +226,7 @@ impl MessageDeserializer for AvroSchemaDeserializer {
async fn deserialize(
&mut self,
message_bytes: &[u8],
) -> Result<Value, MessageDeserializationError> {
) -> Result<DeserializedMessage, MessageDeserializationError> {
let reader_result = match &self.schema {
None => apache_avro::Reader::new(Cursor::new(message_bytes)),
Some(schema) => apache_avro::Reader::with_schema(schema, Cursor::new(message_bytes)),
Expand All @@ -162,7 +241,7 @@ impl MessageDeserializer for AvroSchemaDeserializer {
};

return match v {
Ok(value) => Ok(value),
Ok(value) => Ok(value.into()),
Err(e) => Err(MessageDeserializationError::AvroDeserialization {
dead_letter: DeadLetter::from_failed_deserialization(
message_bytes,
Expand Down Expand Up @@ -221,11 +300,11 @@ impl MessageDeserializer for JsonDeserializer {
async fn deserialize(
&mut self,
message_bytes: &[u8],
) -> Result<Value, MessageDeserializationError> {
) -> Result<DeserializedMessage, MessageDeserializationError> {
let decoder = self.decoder.borrow_mut();
match decoder.decode(Some(message_bytes)).await {
Ok(drs) => match drs {
Some(v) => Ok(v.value),
Some(v) => Ok(v.value.into()),
None => return Err(MessageDeserializationError::EmptyPayload),
},
Err(e) => {
Expand Down
17 changes: 11 additions & 6 deletions src/transforms.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::serialization::DeserializedMessage;
use chrono::prelude::*;
use jmespatch::{
functions::{ArgumentType, CustomFunction, Signature},
Expand Down Expand Up @@ -348,13 +349,13 @@ impl Transformer {
/// The optional `kafka_message` must be provided to include well known Kafka properties in the value.
pub(crate) fn transform<M>(
&self,
value: &mut Value,
value: &mut DeserializedMessage,
kafka_message: Option<&M>,
) -> Result<(), TransformError>
where
M: Message,
{
let data = Variable::try_from(value.clone())?;
let data = Variable::try_from(value.clone().message())?;

match value.as_object_mut() {
Some(map) => {
Expand All @@ -378,7 +379,7 @@ impl Transformer {
Ok(())
}
_ => Err(TransformError::ValueNotAnObject {
value: value.to_owned(),
value: value.clone().message(),
}),
}
}
Expand Down Expand Up @@ -510,7 +511,7 @@ mod tests {

#[test]
fn transforms_with_substr() {
let mut test_value = json!({
let test_value = json!({
"name": "A",
"modified": "2021-03-16T14:38:58Z",
});
Expand All @@ -524,6 +525,7 @@ mod tests {
0,
None,
);
let mut test_value: DeserializedMessage = test_value.into();

let mut transforms = HashMap::new();

Expand All @@ -540,6 +542,7 @@ mod tests {

let name = test_value.get("name").unwrap().as_str().unwrap();
let modified = test_value.get("modified").unwrap().as_str().unwrap();
println!("TEST: {test_value:?}");
let modified_date = test_value.get("modified_date").unwrap().as_str().unwrap();

assert_eq!("A", name);
Expand Down Expand Up @@ -567,7 +570,7 @@ mod tests {
fn test_transforms_with_epoch_seconds_to_iso8601() {
let expected_iso = "2021-07-20T23:18:18Z";

let mut test_value = json!({
let test_value = json!({
"name": "A",
"epoch_seconds_float": 1626823098.51995,
"epoch_seconds_int": 1626823098,
Expand All @@ -584,6 +587,7 @@ mod tests {
0,
None,
);
let mut test_value: DeserializedMessage = test_value.into();

let mut transforms = HashMap::new();
transforms.insert(
Expand Down Expand Up @@ -640,7 +644,7 @@ mod tests {

#[test]
fn test_transforms_with_kafka_meta() {
let mut test_value = json!({
let test_value = json!({
"name": "A",
"modified": "2021-03-16T14:38:58Z",
});
Expand All @@ -655,6 +659,7 @@ mod tests {
None,
);

let mut test_value: DeserializedMessage = test_value.into();
let mut transforms = HashMap::new();

transforms.insert("_kafka_offset".to_string(), "kafka.offset".to_string());
Expand Down
Loading

0 comments on commit d0f1ea3

Please sign in to comment.