Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle null and zero vector insertion to pinecone #20

Open
wants to merge 1 commit into
base: pgvr/main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/remote/clients/pinecone/pinecone.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ char* pinecone_create_host_from_spec(int dimensions, VectorMetric metric, char*
char* pinecone_index_name = palloc(20);
sprintf(pinecone_index_name, "pgvr-%u", index->rd_id);
// TODO: remote index name
create_response = remote_create_index(pinecone_api_key, pinecone_index_name, dimensions, remote_metric_name, spec_json);
create_response = pinecone_create_index(pinecone_api_key, pinecone_index_name, dimensions, remote_metric_name, spec_json);
host = cJSON_GetStringValue(cJSON_GetObjectItemCaseSensitive(create_response, "host"));
// now we wait until the remote index is done initializing
// todo: timeout and error handling
// we don't want to ping pinecone.io with describe_index because pinecone.io may be aware of the index before
// the host is actually live. We poll the host for liveness with get_index_stats instead.
index_stats = remote_get_index_stats(pinecone_api_key, host);
while (index_stats == NULL) {
while (index_stats == NULL || cJSON_GetObjectItemCaseSensitive(index_stats, "totalVectorCount")==NULL) {
elog(DEBUG1, "Waiting for remote index to initialize...");
pg_usleep(1000000); // 1 second
index_stats = remote_get_index_stats(pinecone_api_key, host);
Expand Down Expand Up @@ -232,11 +232,13 @@ ItemPointerData* pinecone_query_with_fetch(char* host, int top_k, PreparedQuery
int n_results;
ItemPointerData* fetched_ctids = pinecone_extract_ctids_from_fetch_response(fetch_response, &n_results);
// 2. return the best (first) checkpoint which is among the fetched
for (int i = 0; i < n_checkpoints; i++) {
bool br=false;
for (int i = 0; i < n_checkpoints && !br; i++) {
RemoteCheckpoint checkpoint = checkpoints[i];
for (int j = 0; j < n_results; j++) {
if (ItemPointerEquals(&checkpoint.tid, &fetched_ctids[j])) {
*best_checkpoint_return = checkpoint;
br=true;
break;
}
}
Expand Down Expand Up @@ -268,6 +270,7 @@ void pinecone_append_prepare_bulk_insert(PreparedBulkInsert prepared_vectors, Tu
cJSON* json_vectors = (cJSON*) prepared_vectors;
char* vector_id = pinecone_id_from_heap_tid(*ctid);
cJSON* json_vector = tuple_get_remote_vector(tupdesc, values, nulls, vector_id);
if(json_vector==NULL) return;
cJSON_AddItemToArray(json_vectors, json_vector);
}
void pinecone_end_prepare_bulk_insert(PreparedBulkInsert prepared_vectors) {
Expand Down
4 changes: 2 additions & 2 deletions src/remote/clients/pinecone/pinecone_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ cJSON* describe_index(const char *api_key, const char *index_name) {
cJSON* remote_get_index_stats(const char *api_key, const char *index_host) {
cJSON* resp;
char url[100] = "https://"; strcat(url, index_host); strcat(url, "/describe_index_stats");
resp = generic_remote_request(api_key, url, "GET", NULL, true);
resp = generic_remote_request(api_key, url, "GET", NULL, false);
return resp;
}

Expand Down Expand Up @@ -170,7 +170,7 @@ cJSON* remote_list_vectors(const char *api_key, const char *index_host, int limi
* pod: environment, replicas, pod_type, pods, shards, metadata_config
* Refer to https://docs.pinecone.io/reference/create_index
*/
cJSON* remote_create_index(const char *api_key, const char *index_name, const int dimension, const char *metric, cJSON *spec) {
cJSON* pinecone_create_index(const char *api_key, const char *index_name, const int dimension, const char *metric, cJSON *spec) {
cJSON *request = cJSON_CreateObject();
cJSON_AddItemToObject(request, "name", cJSON_CreateString(index_name));
cJSON_AddItemToObject(request, "dimension", cJSON_CreateNumber(dimension));
Expand Down
2 changes: 1 addition & 1 deletion src/remote/clients/pinecone/pinecone_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ cJSON* remote_delete_vectors(const char *api_key, const char *index_host, cJSON
cJSON* remote_delete_index(const char *api_key, const char *index_name);
cJSON* remote_delete_all(const char *api_key, const char *index_host);
cJSON* remote_list_vectors(const char *api_key, const char *index_host, int limit, char* pagination_token);
cJSON* remote_create_index(const char *api_key, const char *index_name, const int dimension, const char *metric, cJSON *spec);
cJSON* pinecone_create_index(const char *api_key, const char *index_name, const int dimension, const char *metric, cJSON *spec);
cJSON** remote_query_with_fetch(const char *api_key, const char *index_host, cJSON* request_body, bool with_fetch, cJSON* fetch_ids);
cJSON* remote_bulk_upsert(const char *api_key, const char *index_host, cJSON *vectors, int batch_size);
CURL* get_remote_query_handle(const char *api_key, const char *index_host, cJSON* request_body, ResponseData* response_data);
Expand Down
8 changes: 8 additions & 0 deletions src/remote/clients/pinecone/pinecone_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ cJSON* tuple_get_remote_vector(TupleDesc tup_desc, Datum *values, bool *isnull,
cJSON *metadata = cJSON_CreateObject();
Vector *vector;
cJSON *json_values;
if(isnull[0]){
elog(DEBUG1, "Cannot insert NULL vectors to pinecone.");
return NULL;
}
vector = DatumGetVector(values[0]);
if(vector_eq_zero_internal(vector)) {
elog(DEBUG1, "Cannot insert zero vectors to pinecone.");
return NULL;
}
json_values = cJSON_CreateFloatArray(vector->x, vector->dim);
// prepare metadata
for (int i = 1; i < tup_desc->natts; i++) // skip the first column which is the vector
Expand Down
2 changes: 1 addition & 1 deletion src/remote/remote_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ bool remote_gettuple(IndexScanDesc scan, ScanDirection dir)
if (ItemPointerCompare(tid, new_tid) == 0) {
// if the next tuple is the same as the current tuple, then we need to skip
// N.B. we don't need a loop because the same tid appears at most twice in the sortstate
elog(DEBUG1, "Skipping duplicate tuple, found in both local buffer and remote index");
elog(DEBUG1, "Skipping duplicate tuple %d:%d, found in both local buffer and remote index", ItemPointerGetBlockNumber(tid),ItemPointerGetOffsetNumber(tid));
so->more_tuples = tuplesort_gettupleslot(so->tid_sortstate, true, false, so->tid_sortslot, NULL);
}
}
Expand Down