Skip to content

Commit

Permalink
Merge pull request #27 from zilliztech/dev
Browse files Browse the repository at this point in the history
update code to use MilvusClientV2 API
  • Loading branch information
nianliuu authored Jun 20, 2024
2 parents 258358b + ed68979 commit 461630d
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 171 deletions.
2 changes: 1 addition & 1 deletion README_OSS.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Note: Make sure the schema on both sides match each other. In the schema, there
```properties
key.converter.schemas.enable=false
value.converter.schemas.enable=false
plugin.path=libs/zilliz-kafka-connect-milvus-0.1.0
plugin.path=libs/zilliz-kafka-connect-milvus-xxx
```
4. create and configure a `milvus-sink-connector.properties` file in the `config` directory of your Kafka installation.
```properties
Expand Down
20 changes: 2 additions & 18 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<url>https://github.com/zilliztech/kafka-connect-milvus</url>

<properties>
<project-version>0.1.2</project-version>
<project-version>0.1.3</project-version>
<confluent.maven.repo>https://packages.confluent.io/maven/</confluent.maven.repo>
</properties>

Expand Down Expand Up @@ -49,27 +49,11 @@
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<!-- grpc dependence-->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.46.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-grpclb</artifactId>
<version>1.59.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.59.0</version>
</dependency>
<!-- update dependence to resolve CVE issues -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>3.6.0</version>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand Down
108 changes: 61 additions & 47 deletions src/main/java/com/milvus/io/kafka/MilvusSinkTask.java
Original file line number Diff line number Diff line change
@@ -1,36 +1,34 @@
package com.milvus.io.kafka;

import com.alibaba.fastjson.JSONObject;
import com.milvus.io.kafka.helper.MilvusClientHelper;
import com.milvus.io.kafka.utils.DataConverter;
import com.milvus.io.kafka.utils.Utils;
import com.milvus.io.kafka.utils.VersionUtil;
import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.CollectionSchema;
import io.milvus.grpc.DescribeCollectionResponse;
import io.milvus.grpc.GetLoadStateResponse;
import io.milvus.grpc.LoadState;
import io.milvus.param.R;
import io.milvus.param.collection.DescribeCollectionParam;
import io.milvus.param.collection.GetLoadStateParam;
import io.milvus.param.dml.InsertParam;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import io.milvus.v2.service.collection.request.DescribeCollectionReq;
import io.milvus.v2.service.collection.request.GetLoadStateReq;
import io.milvus.v2.service.collection.request.HasCollectionReq;
import io.milvus.v2.service.collection.response.DescribeCollectionResp;
import io.milvus.v2.service.vector.request.InsertReq;
import io.milvus.v2.service.vector.request.UpsertReq;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.*;

import static com.milvus.io.kafka.MilvusSinkConnectorConfig.TOKEN;

public class MilvusSinkTask extends SinkTask {

private static final Logger log = LoggerFactory.getLogger(MilvusSinkTask.class);
private MilvusSinkConnectorConfig config;
private MilvusServiceClient myMilvusClient;
private MilvusClientV2 myMilvusClient;
private DataConverter converter;
private CollectionSchema collectionSchema;
private DescribeCollectionResp response;

@Override
public String version() {
Expand All @@ -43,64 +41,80 @@ public void start(Map<String, String> props) {
}

// make visible for test
protected void start(Map<String, String> props, MilvusServiceClient milvusClient) {
protected void start(Map<String, String> props, MilvusClientV2 milvusClient) {
log.info("Starting MilvusSinkTask.");
props.put(TOKEN, Utils.encryptToken(props.get(TOKEN)));
this.config = new MilvusSinkConnectorConfig(props);
this.converter = new DataConverter(config);
this.myMilvusClient = milvusClient == null ? new MilvusClientHelper().createMilvusClient(config) : milvusClient;
this.collectionSchema = GetCollectionInfo(config.getCollectionName());

log.info("Started MilvusSinkTask, Connecting to Zilliz Cluster:" + config.getUrl());
preValidate();
}

private void preValidate() {
// check if the collection exists
if (!myMilvusClient.hasCollection(HasCollectionReq.builder().collectionName(config.getCollectionName()).build())) {
log.error("Collection not exist");
throw new RuntimeException("Collection not exist" + config.getCollectionName());
}
// check if the collection is loaded
if (!myMilvusClient.getLoadState(GetLoadStateReq.builder().collectionName(config.getCollectionName()).build())){
log.error("Collection not loaded");
throw new RuntimeException("Collection not loaded" + config.getCollectionName());
}
this.response = myMilvusClient.describeCollection(DescribeCollectionReq.builder().collectionName(config.getCollectionName()).build());
}

@Override
public void put(Collection<SinkRecord> records) {
log.info("Putting {} records to Milvus.", records.size());
if(records.isEmpty()) {
log.info("No records to put.");
return;
}

// not support dynamic schema for now, for dynamic schema, we need to put the data into a JSONObject
List<JSONObject> datas = new ArrayList<>();
for (SinkRecord record : records) {
log.debug("Writing {} to Milvus.", record);
WriteRecord(record, collectionSchema);
if(record.value() == null) {
log.warn("Skipping record with null value.");
continue;
}
try {
JSONObject data = converter.convertRecord(record, response.getCollectionSchema());
datas.add(data);
}catch (Exception e){
log.error("Failed to convert record to JSONObject, skip it", e);
}
}
}

protected CollectionSchema GetCollectionInfo(String collectionName) {
// check if the collection exists
R<DescribeCollectionResponse> response = myMilvusClient.describeCollection(DescribeCollectionParam.newBuilder()
.withCollectionName(collectionName).build());
if (response.getData() == null) {
log.error("Collection not exist");
throw new RuntimeException("Collection not exist" + collectionName);
if(!response.getAutoID()){
// default to use upsert
UpsertReq upsertReq = UpsertReq.builder()
.collectionName(config.getCollectionName())
.data(datas)
.build();
log.info("Upserting data to collection: {} with datas: {}", config.getCollectionName(), datas);
myMilvusClient.upsert(upsertReq);
}else {
GetLoadStateParam getLoadStateParam = GetLoadStateParam.newBuilder()
.withCollectionName(collectionName)
InsertReq insertReq = InsertReq.builder()
.collectionName(config.getCollectionName())
.data(datas)
.build();
R<GetLoadStateResponse> loadState = myMilvusClient.getLoadState(getLoadStateParam);
if (loadState.getData().getState() != LoadState.LoadStateLoaded){
log.error("Collection not loaded");
throw new RuntimeException("Collection not loaded" + collectionName);
}
log.info("Inserting data to collection: {} with fields: {}", config.getCollectionName(), datas.get(0).keySet());
myMilvusClient.insert(insertReq);
}
return response.getData().getSchema();
}

protected void WriteRecord(SinkRecord record, CollectionSchema collectionSchema) {
// not support dynamic schema for now, for dynamic schema, we need to put the data into a JSONObject
List<InsertParam.Field> fields = converter.convertRecord(record, collectionSchema);
InsertParam insertParam = InsertParam.newBuilder()
.withCollectionName(config.getCollectionName())
.withFields(fields)
.build();

log.info("Inserting data to collection: " + config.getCollectionName() + " with fields: " +
insertParam.getFields());
myMilvusClient.insert(insertParam);
}

@Override
public void stop() {
log.info("Stopping Milvus client.");
myMilvusClient.close();
try {
myMilvusClient.close(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
12 changes: 7 additions & 5 deletions src/main/java/com/milvus/io/kafka/helper/MilvusClientHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import com.milvus.io.kafka.utils.Utils;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.ConnectParam;
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;

public class MilvusClientHelper {
public MilvusServiceClient createMilvusClient(MilvusSinkConnectorConfig config) {
ConnectParam connectParam = ConnectParam.newBuilder()
.withUri(config.getUrl())
.withToken(Utils.decryptToken(config.getToken().value()))
public MilvusClientV2 createMilvusClient(MilvusSinkConnectorConfig config) {
ConnectConfig connectConfig = ConnectConfig.builder()
.uri(config.getUrl())
.token(Utils.decryptToken(config.getToken().value()))
.build();
return new MilvusServiceClient(connectParam);
return new MilvusClientV2(connectConfig);
}
}
59 changes: 19 additions & 40 deletions src/main/java/com/milvus/io/kafka/utils/DataConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.milvus.io.kafka.MilvusSinkConnectorConfig;
import io.milvus.grpc.CollectionSchema;
import io.milvus.grpc.DataType;
import io.milvus.grpc.FieldSchema;
import io.milvus.param.dml.InsertParam;
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
Expand All @@ -27,9 +26,9 @@ public DataConverter(MilvusSinkConnectorConfig config) {
this.config = config;
}
/*
* Convert SinkRecord to List<InsertParam.Field>
* Convert SinkRecord to JSONObject
*/
public List<InsertParam.Field> convertRecord(SinkRecord sr, CollectionSchema collectionSchema) {
public JSONObject convertRecord(SinkRecord sr, CreateCollectionReq.CollectionSchema collectionSchema) {
// parse sinkRecord to get filed name and value
if(sr.value() instanceof Struct) {
return parseValue((Struct)sr.value(), collectionSchema);
Expand All @@ -40,35 +39,29 @@ public List<InsertParam.Field> convertRecord(SinkRecord sr, CollectionSchema col
}
}

private List<InsertParam.Field> parseValue(HashMap<?, ?> mapValue, CollectionSchema collectionSchema) {
List<InsertParam.Field> fields = new ArrayList<>();
// convert collectionSchema.getFieldsList: Filed's Name and DataType to a Map
Map<String, DataType> fieldType = collectionSchema.getFieldsList().stream().collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getDataType));
mapValue.forEach((key1, value) -> {
// for each field, create a InsertParam.Field
if(fieldType.containsKey(key1.toString())){
private JSONObject parseValue(HashMap<?, ?> mapValue, CreateCollectionReq.CollectionSchema collectionSchema) {
JSONObject fields = new JSONObject();
mapValue.forEach((field, value) -> {
if(collectionSchema.getField(field.toString())!=null){
// if the key exists in the collection, store the value by collectionSchema DataType
fields.add(new InsertParam.Field(key1.toString(), Collections.singletonList(castValueToType(value, fieldType.get(key1.toString())))));
}else if(collectionSchema.getEnableDynamicField()){
// if the key not exists in the collection and the collection is dynamic, store the value directly
fields.add(new InsertParam.Field(key1.toString(), Collections.singletonList(value)));
fields.put(field.toString(), castValueToType(value, collectionSchema.getField(field.toString()).getDataType()));
}else {
log.warn("Field {} not exists in collection", field);
}

});
return fields;
}

private List<InsertParam.Field> parseValue(Struct structValue, CollectionSchema collectionSchema) {
List<InsertParam.Field> fields = new ArrayList<>();
// convert collectionSchema.getFieldsList: Filed's Name and DataType to a Map
Map<String, DataType> fieldType = collectionSchema.getFieldsList().stream().collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getDataType));
private JSONObject parseValue(Struct structValue, CreateCollectionReq.CollectionSchema collectionSchema) {
JSONObject fields = new JSONObject();

structValue.schema().fields().forEach(field -> {
// for each field, create a InsertParam.Field
if(fieldType.containsKey(field.name())){
if(collectionSchema.getField(field.name()) != null){
// if the key exists in the collection, store the value by collectionSchema DataType
fields.add(new InsertParam.Field(field.name(), Collections.singletonList(castValueToType(structValue.get(field.name()), fieldType.get(field.name())))));
}else if(collectionSchema.getEnableDynamicField()){
// if the key not exists in the collection and the collection is dynamic, store the value directly
fields.add(new InsertParam.Field(field.name(), Collections.singletonList(structValue.get(field.name()))));
fields.put(field.toString(), castValueToType(structValue.get(field.name()), collectionSchema.getField(field.name()).getDataType()));
}else {
log.warn("Field {} not exists in collection", field);
}
});

Expand Down Expand Up @@ -141,18 +134,4 @@ protected ByteBuffer parseBinaryVectorField(String vectors){
throw new RuntimeException("parse binary vector field error: " + e.getMessage() + vectors);
}
}

public List<JSONObject> convertRecordWithDynamicSchema(SinkRecord sr, CollectionSchema collectionSchema) {
List<InsertParam.Field> fields = convertRecord(sr, collectionSchema);
List<JSONObject> jsonObjects = new ArrayList<>();
int rows = fields.get(0).getValues().size();
for (int i = 0; i < rows; i++) {
JSONObject jsonObject = new JSONObject();
for (InsertParam.Field field : fields) {
jsonObject.put(field.getName(), field.getValues().get(i));
}
jsonObjects.add(jsonObject);
}
return jsonObjects;
}
}
Loading

0 comments on commit 461630d

Please sign in to comment.