Skip to content

Commit

Permalink
Idempotent requests all around
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Nov 7, 2024
1 parent 5f10a39 commit fb737ce
Show file tree
Hide file tree
Showing 23 changed files with 208 additions and 142 deletions.
3 changes: 2 additions & 1 deletion src/main/kotlin/dev/restate/sdktesting/tests/AwaitTimeout.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class AwaitTimeout {
TestUtilsServiceClient.fromClient(ingressClient)
.createAwakeableAndAwaitIt(
CreateAwakeableAndAwaitItRequest(
UUID.randomUUID().toString(), timeout.toMillis())))
UUID.randomUUID().toString(), timeout.toMillis()),
idempotentCallOptions()))
.isEqualTo(TimeoutResponse)
}
}
5 changes: 3 additions & 2 deletions src/main/kotlin/dev/restate/sdktesting/tests/CallOrdering.kt
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ class CallOrdering {
} else {
ManyCallRequest(proxyRequest, false, true)
}
})
},
idempotentCallOptions())

assertThat(ListObjectClient.fromClient(ingressClient, listName).clear())
assertThat(ListObjectClient.fromClient(ingressClient, listName).clear(idempotentCallOptions()))
.containsExactly("0", "1", "2")
}
}
32 changes: 21 additions & 11 deletions src/main/kotlin/dev/restate/sdktesting/tests/CancelInvocation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import java.util.*
import kotlin.time.Duration.Companion.seconds
import kotlinx.coroutines.*
import kotlinx.coroutines.test.runTest
import org.assertj.core.api.Assertions.assertThat
import org.awaitility.kotlin.await
import org.awaitility.kotlin.until
import org.awaitility.kotlin.withAlias
import org.junit.jupiter.api.extension.RegisterExtension
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
Expand Down Expand Up @@ -49,24 +50,33 @@ class CancelInvocation {
val cancelTestClient = CancelTestRunnerClient.fromClient(ingressClient, key)
val blockingServiceClient = CancelTestBlockingServiceClient.fromClient(ingressClient, key)

val id = cancelTestClient.send().startTest(blockingOperation).invocationId
val id =
cancelTestClient.send().startTest(blockingOperation, idempotentCallOptions()).invocationId

val awakeableHolderClient = AwakeableHolderClient.fromClient(ingressClient, "cancel")

await until { runBlocking { awakeableHolderClient.hasAwakeable() } }

awakeableHolderClient.unlock("cancel")
await withAlias
"awakeable is registered" untilAsserted
{
assertThat(awakeableHolderClient.hasAwakeable()).isTrue()
}
awakeableHolderClient.unlock("cancel", idempotentCallOptions())

val client = InvocationApi(ApiClient().setHost(metaURL.host).setPort(metaURL.port))

// The termination signal might arrive before the blocking call to the cancel singleton was
// made, so we need to retry.
await.ignoreException(TimeoutCancellationException::class.java).until {
client.terminateInvocation(id, TerminationMode.CANCEL)
runBlocking { withTimeout(1.seconds) { cancelTestClient.verifyTest() } }
}
await.ignoreException(TimeoutCancellationException::class.java) withAlias
"verify test" untilAsserted
{
client.terminateInvocation(id, TerminationMode.CANCEL)
withTimeout(1.seconds) { cancelTestClient.verifyTest() }
}

// Check that the singleton service is unlocked
blockingServiceClient.isUnlocked()
await withAlias
"blocking service is unlocked" untilAsserted
{
blockingServiceClient.isUnlocked()
}
}
}
39 changes: 20 additions & 19 deletions src/main/kotlin/dev/restate/sdktesting/tests/Ingress.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ import dev.restate.sdktesting.infra.*
import java.net.URL
import java.util.*
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.assertj.core.api.Assertions.assertThat
import org.awaitility.kotlin.await
import org.awaitility.kotlin.until
import org.awaitility.kotlin.untilAsserted
import org.awaitility.kotlin.withAlias
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -86,17 +84,20 @@ class Ingress {

// Await until the idempotency id is cleaned up and the next idempotency call updates the
// counter again
await untilAsserted
await withAlias
"cleanup of the previous idempotent request" untilAsserted
{
runBlocking {
assertThat(counterClient.add(2, requestOptions))
.returns(2, CounterUpdateResponse::oldValue)
.returns(4, CounterUpdateResponse::newValue)
}
assertThat(counterClient.add(2, requestOptions))
.returns(2, CounterUpdateResponse::oldValue)
.returns(4, CounterUpdateResponse::newValue)
}

// State in the counter service is now equal to 4
assertThat(counterClient.get()).isEqualTo(4L)
await withAlias
"Get returns 4 now" untilAsserted
{
assertThat(counterClient.get()).isEqualTo(4L)
}
}

@Test
Expand Down Expand Up @@ -128,10 +129,10 @@ class Ingress {
requestOptions)

// Wait for get
await untilAsserted { runBlocking { assertThat(counterClient.get()).isEqualTo(2) } }
await untilAsserted { assertThat(counterClient.get()).isEqualTo(2) }

// Without request options this should be executed immediately and return 4
assertThat(counterClient.add(2))
assertThat(counterClient.add(2, idempotentCallOptions()))
.returns(2, CounterUpdateResponse::oldValue)
.returns(4, CounterUpdateResponse::newValue)
}
Expand Down Expand Up @@ -159,10 +160,10 @@ class Ingress {
.isEqualTo(secondInvocationSendStatus.invocationId)

// Wait for get
await untilAsserted { runBlocking { assertThat(counterClient.get()).isEqualTo(2) } }
await untilAsserted { assertThat(counterClient.get()).isEqualTo(2) }

// Without request options this should be executed immediately and return 4
assertThat(counterClient.add(2))
assertThat(counterClient.add(2, idempotentCallOptions()))
.returns(2, CounterUpdateResponse::oldValue)
.returns(4, CounterUpdateResponse::newValue)
}
Expand Down Expand Up @@ -201,8 +202,8 @@ class Ingress {

// Unblock
val awakeableHolderClient = AwakeableHolderClient.fromClient(ingressClient, awakeableKey)
await until { runBlocking { awakeableHolderClient.hasAwakeable() } }
awakeableHolderClient.unlock(response)
await untilAsserted { assertThat(awakeableHolderClient.hasAwakeable()).isTrue }
awakeableHolderClient.unlock(response, idempotentCallOptions())

// Attach should be completed
assertThat(blockedFut.get()).isEqualTo(AwakeableResultResponse(response))
Expand Down Expand Up @@ -248,8 +249,8 @@ class Ingress {

// Unblock
val awakeableHolderClient = AwakeableHolderClient.fromClient(ingressClient, awakeableKey)
await until { runBlocking { awakeableHolderClient.hasAwakeable() } }
awakeableHolderClient.unlock(response)
await untilAsserted { assertThat(awakeableHolderClient.hasAwakeable()).isTrue }
awakeableHolderClient.unlock(response, idempotentCallOptions())

// Attach should be completed
assertThat(blockedFut.get()).isEqualTo(AwakeableResultResponse(response))
Expand All @@ -267,7 +268,7 @@ class Ingress {

assertThat(
TestUtilsServiceClient.fromClient(ingressClient)
.echoHeaders(CallRequestOptions().withHeader(headerName, headerValue)))
.echoHeaders(idempotentCallOptions().withHeader(headerName, headerValue)))
.containsEntry(headerName, headerValue)
}
}
23 changes: 8 additions & 15 deletions src/main/kotlin/dev/restate/sdktesting/tests/KafkaIngress.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,15 @@ import dev.restate.sdktesting.infra.runtimeconfig.KafkaClusterOptions
import dev.restate.sdktesting.infra.runtimeconfig.RestateConfigSchema
import java.net.URL
import java.util.*
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.assertj.core.api.Assertions.assertThat
import org.awaitility.kotlin.await
import org.awaitility.kotlin.matches
import org.awaitility.kotlin.untilCallTo
import org.awaitility.kotlin.withAlias
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.RegisterExtension
import org.junit.jupiter.api.parallel.Execution
Expand Down Expand Up @@ -88,13 +87,10 @@ class KafkaIngress {
COUNTER_TOPIC,
listOf(counter to "1", counter to "2", counter to "3"))

// Now wait for the update to be visible
await untilCallTo
await withAlias
"Updates from Kafka are visible in the counter" untilAsserted
{
runBlocking { CounterClient.fromClient(ingressClient, counter).get() }
} matches
{ num ->
num!! == 6L
assertThat(CounterClient.fromClient(ingressClient, counter).get()).isEqualTo(6L)
}
}

Expand Down Expand Up @@ -144,13 +140,10 @@ class KafkaIngress {
Json.encodeToString(3).encodeToByteArray())),
))

// Now wait for the update to be visible
await untilCallTo
await withAlias
"Updates from Kafka are visible in the counter" untilAsserted
{
runBlocking { CounterClient.fromClient(ingressClient, counter).get() }
} matches
{ num ->
num!! == 6L
assertThat(CounterClient.fromClient(ingressClient, counter).get()).isEqualTo(6L)
}
}
}
Expand Down
21 changes: 13 additions & 8 deletions src/main/kotlin/dev/restate/sdktesting/tests/KillInvocation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import dev.restate.sdk.client.Client
import dev.restate.sdktesting.contracts.*
import dev.restate.sdktesting.infra.*
import java.net.URL
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import org.assertj.core.api.Assertions.assertThat
import org.awaitility.kotlin.await
import org.awaitility.kotlin.until
import org.awaitility.kotlin.withAlias
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.RegisterExtension

Expand All @@ -40,17 +40,22 @@ class KillInvocation {
fun kill(@InjectClient ingressClient: Client, @InjectMetaURL metaURL: URL) = runTest {
val id = KillTestRunnerClient.fromClient(ingressClient).send().startCallTree().invocationId
val awakeableHolderClient = AwakeableHolderClient.fromClient(ingressClient, "kill")

// Await until AwakeableHolder has an awakeable and then complete it.
// With this synchronization point we make sure the call tree has been built before killing it.
await until { runBlocking { awakeableHolderClient.hasAwakeable() } }
awakeableHolderClient.unlock("")
await withAlias
"awakeable is registered" untilAsserted
{
assertThat(awakeableHolderClient.hasAwakeable()).isTrue()
}
awakeableHolderClient.unlock("cancel", idempotentCallOptions())

// Kill the invocation
val client = InvocationApi(ApiClient().setHost(metaURL.host).setPort(metaURL.port))
client.terminateInvocation(id, TerminationMode.KILL)

// Check that the singleton service is unlocked after killing the call tree
KillTestSingletonClient.fromClient(ingressClient, "").isUnlocked()
await withAlias
"singleton service is unlocked after killing the call tree" untilAsserted
{
KillTestSingletonClient.fromClient(ingressClient, "").isUnlocked()
}
}
}
4 changes: 2 additions & 2 deletions src/main/kotlin/dev/restate/sdktesting/tests/KillRuntime.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class KillRuntime {
) = runTest {
var counterClient = CounterClient.fromClient(ingressClient, "my-key")

val res1 = counterClient.add(1)
val res1 = counterClient.add(1, idempotentCallOptions())
assertThat(res1.oldValue).isEqualTo(0)
assertThat(res1.newValue).isEqualTo(1)

Expand All @@ -52,7 +52,7 @@ class KillRuntime {
counterClient =
CounterClient.fromClient(
Client.connect("http://127.0.0.1:${runtimeHandle.getMappedPort(8080)!!}"), "my-key")
val res2 = counterClient.add(2)
val res2 = counterClient.add(2, idempotentCallOptions())
assertThat(res2.oldValue).isEqualTo(1)
assertThat(res2.newValue).isEqualTo(3)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import dev.restate.sdktesting.contracts.NonDeterministicDefinitions
import dev.restate.sdktesting.infra.*
import kotlinx.coroutines.test.runTest
import org.assertj.core.api.Assertions.*
import org.awaitility.kotlin.await
import org.awaitility.kotlin.withAlias
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.extension.RegisterExtension
import org.junit.jupiter.api.parallel.Execution
Expand Down Expand Up @@ -51,19 +53,19 @@ class NonDeterminismErrors {
fun method(handlerName: String, @InjectClient ingressClient: Client) = runTest {
// Increment the count first, this makes sure that the counter service is there.
val c = CounterClient.fromClient(ingressClient, handlerName)
c.add(1)
c.add(1, idempotentCallOptions())

assertThatThrownBy {
ingressClient.call(
Target.virtualObject(
NonDeterministicDefinitions.SERVICE_NAME, handlerName, handlerName),
Serde.VOID,
Serde.VOID,
null)
null,
idempotentCallOptions())
}
.isNotNull()

// Assert the counter was not incremented
assertThat(CounterClient.fromClient(ingressClient, handlerName).get()).isEqualTo(1)
await withAlias "counter was not incremented" untilAsserted { assertThat(c.get()).isEqualTo(1) }
}
}
23 changes: 17 additions & 6 deletions src/main/kotlin/dev/restate/sdktesting/tests/PrivateService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import dev.restate.sdktesting.contracts.*
import dev.restate.sdktesting.infra.*
import java.net.URL
import java.util.*
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import kotlinx.serialization.encodeToString
Expand All @@ -25,7 +26,8 @@ import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.assertj.core.api.InstanceOfAssertFactories
import org.awaitility.kotlin.await
import org.awaitility.kotlin.untilAsserted
import org.awaitility.kotlin.withAlias
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.RegisterExtension

Expand All @@ -42,6 +44,8 @@ class PrivateService {
}

@Test
@DisplayName(
"Make a handler ingress private and try to call it both directly and through a proxy service")
fun privateService(
@InjectMetaURL metaURL: URL,
@InjectClient ingressClient: Client,
Expand All @@ -50,16 +54,18 @@ class PrivateService {
val counterId = UUID.randomUUID().toString()
val counterClient = CounterClient.fromClient(ingressClient, counterId)

counterClient.add(1)
counterClient.add(1, idempotentCallOptions())

// Make the service private
adminServiceClient.modifyService(
CounterDefinitions.SERVICE_NAME, ModifyServiceRequest()._public(false))

// Wait for the service to be private
await untilAsserted
await withAlias
"the service becomes private" untilAsserted
{
assertThatThrownBy { runBlocking { counterClient.get() } }
val ctx = currentCoroutineContext()
assertThatThrownBy { runBlocking(ctx) { counterClient.get() } }
.asInstanceOf(InstanceOfAssertFactories.type(IngressException::class.java))
.returns(400, IngressException::getStatusCode)
}
Expand All @@ -71,13 +77,18 @@ class PrivateService {
CounterDefinitions.SERVICE_NAME,
counterId,
"add",
Json.encodeToString(1).encodeToByteArray()))
Json.encodeToString(1).encodeToByteArray()),
idempotentCallOptions())

// Make the service public again
adminServiceClient.modifyService(
CounterDefinitions.SERVICE_NAME, ModifyServiceRequest()._public(true))

// Wait to get the correct count
await untilAsserted { runBlocking { assertThat(counterClient.get()).isEqualTo(2L) } }
await withAlias
"the service becomes public again" untilAsserted
{
assertThat(counterClient.get()).isEqualTo(2L)
}
}
}
Loading

0 comments on commit fb737ce

Please sign in to comment.