Skip to content

Commit

Permalink
Hubspot integration sync errors & better error handling (#1503)
Browse files Browse the repository at this point in the history
  • Loading branch information
epipav authored Sep 18, 2023
1 parent 245a2ac commit 2d68ecf
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 65 deletions.
9 changes: 6 additions & 3 deletions services/apps/data_sink_worker/src/service/member.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,8 @@ export default class MemberService extends LoggerBase {
(!member.identities || member.identities.length === 0)
) {
const errorMessage = `Member can't be enriched. It is missing both emails and identities fields.`
this.log.error(errorMessage)
throw new Error(errorMessage)
this.log.warn(errorMessage)
return
}

await this.store.transactionally(async (txStore) => {
Expand All @@ -305,7 +305,10 @@ export default class MemberService extends LoggerBase {
const segmentId = dbIntegration.segmentId

// first try finding the member using the identity
const identity = singleOrDefault(member.identities, (i) => i.platform === platform)
const identity = singleOrDefault(
member.identities,
(i) => i.platform === platform && i.sourceId !== null,
)
let dbMember = await txRepo.findMember(tenantId, segmentId, platform, identity.username)

if (!dbMember && member.emails && member.emails.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,8 @@ export class OrganizationService extends LoggerBase {
const txRepo = new OrganizationRepository(txStore, this.log)
const txIntegrationRepo = new IntegrationRepository(txStore, this.log)

const txService = new OrganizationService(txStore, this.log)

const dbIntegration = await txIntegrationRepo.findById(integrationId)
const segmentId = dbIntegration.segmentId

Expand Down Expand Up @@ -381,7 +383,7 @@ export class OrganizationService extends LoggerBase {
organization.identities.unshift(...existingIdentities)
}

await this.findOrCreate(tenantId, segmentId, integrationId, organization)
await txService.findOrCreate(tenantId, segmentId, integrationId, organization)
} else {
this.log.debug(
'No organization found for enriching. This organization enrich process had no affect.',
Expand Down
5 changes: 5 additions & 0 deletions services/apps/integration_sync_worker/src/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export const integrationNotFound = (integrationId: string): string =>
`Integration ${integrationId} not found!`

export const automationNotFound = (automationId: string): string =>
`Automation ${automationId} not found!`
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
import { IDbIntegration } from '@/repo/integration.data'
import { AutomationRepository } from '@/repo/automation.repo'
import { AutomationExecutionRepository } from '@/repo/automationExecution.repo'
import { automationNotFound, integrationNotFound } from '@/errors'

export class MemberSyncService extends LoggerBase {
private readonly memberRepo: MemberRepository
Expand Down Expand Up @@ -45,14 +46,33 @@ export class MemberSyncService extends LoggerBase {
): Promise<void> {
const integration = await this.integrationRepo.findById(integrationId)

if (!integration) {
const message = integrationNotFound(integrationId)
this.log.warn(message)
return
}

const member = await this.memberRepo.findMember(memberId)

if (!member) {
this.log.warn(`Member ${memberId} is not found for syncing remote!`)
return
}

const syncRemote = await this.memberRepo.findSyncRemoteById(syncRemoteId)

const membersToCreate = []
const membersToUpdate = []

if (syncRemote.sourceId) {
member.attributes = {
...member.attributes,
sourceId: {
...(member.attributes.sourceId || {}),
[integration.platform]: syncRemote.sourceId,
},
}

membersToUpdate.push(member)
} else {
membersToCreate.push(member)
Expand Down Expand Up @@ -96,6 +116,12 @@ export class MemberSyncService extends LoggerBase {
): Promise<void> {
const integration: IDbIntegration = await this.integrationRepo.findById(integrationId)

if (!integration) {
const message = integrationNotFound(integrationId)
this.log.warn(message)
return
}

const platforms = await this.memberRepo.getExistingPlatforms(tenantId)

const attributes = await this.memberRepo.getTenantMemberAttributes(tenantId)
Expand Down Expand Up @@ -173,6 +199,7 @@ export class MemberSyncService extends LoggerBase {
translatedMembers[0].attributes = {
...translatedMembers[0].attributes,
sourceId: {
...(translatedMembers[0].attributes.sourceId || {}),
[integration.platform]: memberToSync.sourceId,
},
}
Expand Down Expand Up @@ -228,8 +255,20 @@ export class MemberSyncService extends LoggerBase {
batchSize = 50,
) {
const integration: IDbIntegration = await this.integrationRepo.findById(integrationId)

if (!integration) {
const message = integrationNotFound(integrationId)
this.log.warn(message)
return
}
const automation = await this.automationRepo.findById(automationId)

if (!automation) {
const message = automationNotFound(automationId)
this.log.warn(message)
return
}

const platforms = await this.memberRepo.getExistingPlatforms(tenantId)

const attributes = await this.memberRepo.getTenantMemberAttributes(tenantId)
Expand Down Expand Up @@ -362,6 +401,7 @@ export class MemberSyncService extends LoggerBase {
memberToSync.attributes = {
...memberToSync.attributes,
sourceId: {
...(memberToSync.attributes.sourceId || {}),
[integration.platform]: syncRemote.sourceId,
},
}
Expand Down Expand Up @@ -453,8 +493,20 @@ export class MemberSyncService extends LoggerBase {
) {
const integration: IDbIntegration = await this.integrationRepo.findById(integrationId)

if (!integration) {
const message = integrationNotFound(integrationId)
this.log.warn(message)
return
}

const automation = await this.automationRepo.findById(automationId)

if (!automation) {
const message = automationNotFound(automationId)
this.log.warn(message)
return
}

let organizationMembers
let offset

Expand Down Expand Up @@ -495,6 +547,7 @@ export class MemberSyncService extends LoggerBase {
member.attributes = {
...member.attributes,
sourceId: {
...(member.attributes.sourceId || {}),
[integration.platform]: syncRemote.sourceId,
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ export class OrganizationSyncService extends LoggerBase {
const organizationsToUpdate = []

if (syncRemote.sourceId) {
organization.attributes = {
...organization.attributes,
sourceId: {
...(organization.attributes.sourceId || {}),
[integration.platform]: syncRemote.sourceId,
},
}
organizationsToUpdate.push(organization)
} else {
oranizationsToCreate.push(organization)
Expand Down Expand Up @@ -121,6 +128,7 @@ export class OrganizationSyncService extends LoggerBase {
organization.attributes = {
...organization.attributes,
sourceId: {
...(organization.attributes.sourceId || {}),
[integration.platform]: organizationToSync.sourceId,
},
}
Expand Down Expand Up @@ -235,6 +243,7 @@ export class OrganizationSyncService extends LoggerBase {
organization.attributes = {
...organization.attributes,
sourceId: {
...(organization.attributes.sourceId || {}),
[integration.platform]: syncRemote.sourceId,
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,49 +26,51 @@ export const batchUpdateMembers = async (
const hubspotMembers = []

for (const member of members) {
const hubspotSourceId = member.attributes?.sourceId?.hubspot

if (!hubspotSourceId) {
ctx.log.warn(
`Member ${member.id} can't be updated in hubspot! Member doesn't have a hubspot sourceId in attributes.`,
)
} else {
const hsMember = {
id: hubspotSourceId,
properties: {},
} as any

const fields = memberMapper.getAllCrowdFields()

for (const crowdField of fields) {
const hubspotField = memberMapper.getHubspotFieldName(crowdField)
if (crowdField.startsWith('attributes')) {
const attributeName = crowdField.split('.')[1] || null

if (
attributeName &&
hubspotField &&
member.attributes[attributeName]?.default !== undefined
) {
hsMember.properties[hubspotField] = member.attributes[attributeName].default
if (member) {
const hubspotSourceId = member.attributes?.sourceId?.hubspot

if (!hubspotSourceId) {
ctx.log.warn(
`Member ${member.id} can't be updated in hubspot! Member doesn't have a hubspot sourceId in attributes.`,
)
} else {
const hsMember = {
id: hubspotSourceId,
properties: {},
} as any

const fields = memberMapper.getAllCrowdFields()

for (const crowdField of fields) {
const hubspotField = memberMapper.getHubspotFieldName(crowdField)
if (crowdField.startsWith('attributes')) {
const attributeName = crowdField.split('.')[1] || null

if (
attributeName &&
hubspotField &&
member.attributes[attributeName]?.default !== undefined
) {
hsMember.properties[hubspotField] = member.attributes[attributeName].default
}
} else if (crowdField.startsWith('identities')) {
const identityPlatform = crowdField.split('.')[1] || null

const identityFound = member.identities.find((i) => i.platform === identityPlatform)

if (identityPlatform && hubspotField && identityFound) {
hsMember.properties[hubspotField] = identityFound.username
}
} else if (crowdField === 'organizationName') {
// send latest org of member as value
} else if (hubspotField && member[crowdField] !== undefined) {
hsMember.properties[hubspotField] = memberMapper.getHubspotValue(member, crowdField)
}
} else if (crowdField.startsWith('identities')) {
const identityPlatform = crowdField.split('.')[1] || null

const identityFound = member.identities.find((i) => i.platform === identityPlatform)

if (identityPlatform && hubspotField && identityFound) {
hsMember.properties[hubspotField] = identityFound.username
}
} else if (crowdField === 'organizationName') {
// send latest org of member as value
} else if (hubspotField && member[crowdField] !== undefined) {
hsMember.properties[hubspotField] = memberMapper.getHubspotValue(member, crowdField)
}
}

if (Object.keys(hsMember.properties).length > 0) {
hubspotMembers.push(hsMember)
if (Object.keys(hsMember.properties).length > 0) {
hubspotMembers.push(hsMember)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,35 @@ export const batchUpdateOrganizations = async (
const hubspotCompanies = []

for (const organization of organizations) {
const hubspotSourceId = organization.attributes?.sourceId?.hubspot
if (organization) {
const hubspotSourceId = organization.attributes?.sourceId?.hubspot

if (!hubspotSourceId) {
ctx.log.warn(
`Organization ${organization.id} can't be updated in hubspot! Organization doesn't have a hubspot sourceId.`,
)
} else {
const hubspotCompany = {
id: hubspotSourceId,
properties: {},
} as any
if (!hubspotSourceId) {
ctx.log.warn(
`Organization ${organization.id} can't be updated in hubspot! Organization doesn't have a hubspot sourceId.`,
)
} else {
const hubspotCompany = {
id: hubspotSourceId,
properties: {},
} as any

const fields = organizationMapper.getAllCrowdFields()
const fields = organizationMapper.getAllCrowdFields()

for (const crowdField of fields) {
const hubspotField = organizationMapper.getHubspotFieldName(crowdField)
for (const crowdField of fields) {
const hubspotField = organizationMapper.getHubspotFieldName(crowdField)

if (hubspotField && organization[crowdField] !== undefined) {
hubspotCompany.properties[hubspotField] = organizationMapper.getHubspotValue(
organization,
crowdField,
)
if (hubspotField && organization[crowdField] !== undefined) {
hubspotCompany.properties[hubspotField] = organizationMapper.getHubspotValue(
organization,
crowdField,
)
}
}
}

if (Object.keys(hubspotCompany.properties).length > 0) {
hubspotCompanies.push(hubspotCompany)
if (Object.keys(hubspotCompany.properties).length > 0) {
hubspotCompanies.push(hubspotCompany)
}
}
}
}
Expand Down

0 comments on commit 2d68ecf

Please sign in to comment.