Skip to content

Commit

Permalink
Add support for multiple primary keys on replicate entities
Browse files Browse the repository at this point in the history
  • Loading branch information
mikedawson committed May 16, 2024
1 parent 938b5f3 commit 098f7a0
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 16 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ plugins {


group 'com.github.UstadMobile.door'
version '0.72'
version '0.73'

ext.localProperties = new Properties()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,11 @@ private fun CodeBlock.Builder.addReplicateEntityMetaDataCode(
add("tableId = %L, \n", repEntityAnnotation?.tableId)
add("entityTableName = %S, \n", entity.entityTableName)
add("receiveViewName = %S, \n", entity.replicationEntityReceiveViewName)
add("entityPrimaryKeyFieldName = %S, \n", entity.entityPrimaryKeyProps.first().simpleName.asString())
add("entityPrimaryKeyFieldNames = listOf(")
entity.entityPrimaryKeyProps.forEach {
add("%S,", it)
}
add("),\n")
add("entityVersionIdFieldName = %S, \n", entity.firstPropWithAnnotation(ReplicateEtag::class).simpleName.asString())
addFieldsCodeBlock(entity).add(",\n")
add("batchSize = ").add("%L", repEntityAnnotation?.batchSize ?: 1000).add(",\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,18 @@ class DoorValidatorProcessor(
entity
)
}

val primaryKeys = entity.entityPrimaryKeyProps
if(primaryKeys.size > 2) {
logger.error("@ReplicateEntity ${entity.qualifiedName?.asString()} must have 1 or 2 primary keys", entity)
}

primaryKeys.forEach {
if(it.type.resolve() != resolver.builtIns.longType) {
logger.error("@ReplicateEntity ${entity.qualifiedName?.asString()} primary key field must be Long", it)
}
}

}catch(e: IllegalArgumentException){
logger.error("ReplicateEntity ${entity.qualifiedName?.asString()} must have a tracker entity specified",
entity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@ data class PendingRepositorySession(
var rsUid: Long = 0,
var remoteNodeId: Long = 0,
var endpointUrl: String? = null
) {
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ private suspend fun RoomDatabase.selectDoorReplicateEntitiesByTableIdAndPrimaryK
) { stmt ->
primaryKeysList.mapNotNull { primaryKeys ->
stmt.setLong(1, primaryKeys.pk1)
if(primaryKeys.pk2 != 0L)
stmt.setLong(2, primaryKeys.pk2)

stmt.executeQueryAsyncKmp().useResults { result ->
result.mapNextRow(null) { mapResult ->
DoorReplicationEntity(
Expand All @@ -55,7 +58,7 @@ private suspend fun RoomDatabase.selectDoorReplicateEntitiesByTableIdAndPrimaryK

/**
* Select the DoorReplicateEntity (e.g. including the full Json of the entity data) for a list of NodeEvent(s) that
* represent replication events (e.g. something new was inserted into the OutgoingRelpication table). This is
* represent replication events (e.g. something new was inserted into the OutgoingReplication table). This is
* used as part of converting the NodeEvent into a DoorMessage that can be transmitted to another node. This is done
* lazily because the NodeEvent will only be transmitted if there is a sender which wants to transmit this event (e.g.
* probably only when the other node is connected to this node).
Expand Down Expand Up @@ -334,13 +337,20 @@ private fun createChangeMonitorTriggerSql(
): String {
val triggerName = "_d_ch_monitor_${entityMetaData.tableId}_${remoteNodeId.absoluteValue}" +
"_${operation.substring(0, 2).lowercase()}"

//A ReplicateEntity entity may have one or two primary keys (each a 64 bit long).
val orPk2 = if(entityMetaData.entityPrimaryKeyFieldNames.size > 1)
"NEW.${entityMetaData.entityPrimaryKeyFieldNames[1]}"
else
0

return """
CREATE TEMP TRIGGER IF NOT EXISTS $triggerName
AFTER $operation ON ${entityMetaData.entityTableName}
FOR EACH ROW
BEGIN
INSERT INTO OutgoingReplication(destNodeId, orTableId, orPk1, orPk2)
VALUES ($remoteNodeId, ${entityMetaData.tableId}, NEW.${entityMetaData.entityPrimaryKeyFieldName}, 0);
VALUES ($remoteNodeId, ${entityMetaData.tableId}, NEW.${entityMetaData.entityPrimaryKeyFieldNames.first()}, $orPk2);
END
"""
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,14 @@ class ReplicationEntityMetaData(
val tableId: Int,
val entityTableName: String,
val receiveViewName: String,
val entityPrimaryKeyFieldName: String,
val entityPrimaryKeyFieldNames: List<String>,
val entityVersionIdFieldName: String,
val entityFields: List<ReplicationFieldMetaData>,
val batchSize: Int = 1000,
val remoteInsertStrategy: ReplicateEntity.RemoteInsertStrategy,
val triggers: List<Trigger>,
) {

val entityPrimaryKeyFieldType: Int by lazy(LazyThreadSafetyMode.NONE) {
entityFields.first { it.fieldName == entityPrimaryKeyFieldName }.fieldType
}

val versionIdFieldType: Int by lazy(LazyThreadSafetyMode.NONE) {
entityFields.first { it.fieldName == entityVersionIdFieldName }.fieldType
}

/**
* Map of column name to column type for all fields.
*/
Expand All @@ -36,7 +28,7 @@ class ReplicationEntityMetaData(
get() = """
SELECT $entityTableName.*
FROM $entityTableName
WHERE $entityTableName.$entityPrimaryKeyFieldName = ?
WHERE ${entityPrimaryKeyFieldNames.joinToString(separator = " AND ") { "$it = ?" } }
""".trimIndent()


Expand Down
3 changes: 3 additions & 0 deletions door-testdb/src/commonMain/kotlin/db3/ExampleDb3.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import com.ustadmobile.door.room.RoomDatabase
ReplicationOperation::class,
PendingRepositorySession::class,
Badge::class,
StatementEntity::class,
)
)
expect abstract class ExampleDb3: RoomDatabase {
Expand All @@ -28,4 +29,6 @@ expect abstract class ExampleDb3: RoomDatabase {

abstract val badgeDao: BadgeDao

abstract val statementEntityDao: StatementEntityDao

}
39 changes: 39 additions & 0 deletions door-testdb/src/commonMain/kotlin/db3/StatementEntity.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package db3

import androidx.room.Entity
import com.ustadmobile.door.annotation.*
import kotlinx.serialization.Serializable

@Entity(
primaryKeys = arrayOf("uidHi", "uidLo")
)
@ReplicateEntity(
tableId = StatementEntity.TABLE_ID,
remoteInsertStrategy = ReplicateEntity.RemoteInsertStrategy.INSERT_INTO_RECEIVE_VIEW
)

@Triggers(
arrayOf(
Trigger(
name = "statement_remote_insert",
order = Trigger.Order.INSTEAD_OF,
events = arrayOf(Trigger.Event.INSERT),
conditionSql = "SELECT %NEW_LAST_MODIFIED_GREATER_THAN_EXISTING%",
on = Trigger.On.RECEIVEVIEW,
sqlStatements = arrayOf("%UPSERT%"),
)
)
)
@Serializable
data class StatementEntity(
var uidHi: Long = 0,
var uidLo: Long = 0,
@ReplicateLastModified
@ReplicateEtag
var lct: Long = 0,
var name: String? = null,
) {
companion object {
const val TABLE_ID = 12121
}
}
41 changes: 41 additions & 0 deletions door-testdb/src/commonMain/kotlin/db3/StatementEntityDao.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package db3

import androidx.room.Insert
import androidx.room.Query
import com.ustadmobile.door.annotation.DoorDao
import com.ustadmobile.door.annotation.HttpAccessible
import com.ustadmobile.door.annotation.Repository
import kotlinx.coroutines.flow.Flow

@DoorDao
@Repository

expect abstract class StatementEntityDao {

@Insert
abstract suspend fun insertAsync(entity: StatementEntity)

@Query("""
SELECT StatementEntity.*
FROM StatementEntity
""")
abstract suspend fun findAllAsync(): List<StatementEntity>

@Query("""
SELECT StatementEntity.*
FROM StatementEntity
WHERE StatementEntity.uidHi = :uidHi
AND StatementEntity.uidLo = :uidLo
""")
abstract fun findByUidAsFlow(uidHi: Long, uidLo: Long): Flow<StatementEntity?>

@HttpAccessible(clientStrategy = HttpAccessible.ClientStrategy.PULL_REPLICATE_ENTITIES)
@Query("""
SELECT StatementEntity.*
FROM StatementEntity
WHERE StatementEntity.uidHi = :uidHi
AND StatementEntity.uidLo = :uidLo
""")
abstract suspend fun findByUidAsync(uidHi: Long, uidLo: Long): StatementEntity?

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.ustadmobile.door.log.NapierDoorLogger
import com.ustadmobile.door.paging.DoorRepositoryReplicatePullPagingSource
import com.ustadmobile.door.room.InvalidationTrackerObserver
import com.ustadmobile.door.test.initNapierLog
import com.ustadmobile.door.util.systemTimeInMillis
import db3.*
import io.ktor.client.*
import io.ktor.serialization.kotlinx.json.*
Expand All @@ -33,6 +34,7 @@ import kotlinx.serialization.json.Json
import okhttp3.OkHttpClient
import org.junit.Assert
import org.junit.Test
import java.util.*
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals
import kotlin.test.assertNull
Expand Down Expand Up @@ -149,6 +151,28 @@ class PullIntegrationTest {
}
}

@Test
fun givenEntityWithTwoPrimaryKeysOnServer_whenClientMakesPullRequest_repoFunctionReturnsValuesAndEntitiesAreStoredInClientLocalDb() {
initNapierLog()
clientServerIntegrationTest {
val uuid = UUID.randomUUID()
val insertedEntity = StatementEntity(
uidHi = uuid.mostSignificantBits,
uidLo = uuid.leastSignificantBits,
lct = systemTimeInMillis(),
name = "Statement"
)
serverDb.statementEntityDao.insertAsync(insertedEntity)

val clientRepo = makeClientRepo()

val statementInLocalDb = clientRepo.statementEntityDao.findByUidAsync(insertedEntity.uidHi, insertedEntity.uidLo)

Assert.assertEquals(statementInLocalDb, insertedEntity)
clientRepo.close()
}
}

@Test
fun givenEntitiesCreatedOnServer_whenClientUsesHttpWithFallbackFunction_thenWillReturnAnswersAndNotCopyToLocalDatabase() {
clientServerIntegrationTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import com.ustadmobile.door.test.initNapierLog
import com.ustadmobile.door.util.systemTimeInMillis
import db3.ExampleDb3
import db3.ExampleEntity3
import db3.StatementEntity
import io.ktor.client.*
import io.ktor.serialization.kotlinx.json.*
import io.ktor.server.engine.*
Expand All @@ -24,6 +25,7 @@ import kotlinx.coroutines.flow.first
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.json.Json
import okhttp3.OkHttpClient
import java.util.*
import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test
Expand Down Expand Up @@ -178,6 +180,43 @@ class PushIntegrationTest {
clientRepo.close()
}

/**
* Test pushing an entity that has two primary keys
*/
@Test(timeout = 10000)
fun givenBlankClientDatabase_whenEntityWithDualPrimaryKeysCreatedOnClientAfterConnection_thenShouldReplicateToServer() {
server.start()
val clientRepo = clientDb.asClientNodeRepository()
val clientRepoClientState = (clientRepo as DoorDatabaseRepository).clientState
runBlocking {
clientRepoClientState.filter { it.initialized }.first()

//This is not ideal, but we want to be sure that the first connection has been made. That isn't something that
// any normal use case would need to know
delay(500)

val uuid = UUID.randomUUID()
val insertedEntity = StatementEntity(
uidHi = uuid.mostSignificantBits,
uidLo = uuid.leastSignificantBits,
lct = systemTimeInMillis(),
name = "Statement"
)

clientRepo.withDoorTransactionAsync {
clientRepo.statementEntityDao.insertAsync(insertedEntity)
}

serverDb.statementEntityDao.findByUidAsFlow(insertedEntity.uidHi, insertedEntity.uidLo).filter {
it != null
}.test(timeout = 5.seconds, name = "Entity created after connection is replicated from client to server as expected") {
assertNotNull(awaitItem())
cancelAndIgnoreRemainingEvents()
}
}
clientRepo.close()
}

@Test
fun givenBlankServerDatabase_whenEntityCreatedOnServerAfterConnection_thenShouldReplicateToClient() {
server.start()
Expand Down

0 comments on commit 098f7a0

Please sign in to comment.