Best Kotest code snippet using io.kotest.matchers.future.matchers.completed
RateLimitedDispatcherTest.kt
Source:RateLimitedDispatcherTest.kt  
...29import ru.fix.dynamic.property.api.DynamicProperty30import java.lang.Thread.sleep31import java.time.Duration32import java.util.concurrent.*33import java.util.concurrent.CompletableFuture.completedFuture34import java.util.concurrent.atomic.AtomicBoolean35import java.util.concurrent.atomic.AtomicInteger36@TestInstance(TestInstance.Lifecycle.PER_METHOD)37@Execution(ExecutionMode.CONCURRENT)38class RateLimitedDispatcherTest {39    private companion object : KLogging() {40        const val DISPATCHER_NAME = "dispatcher-name"41        const val DISPATCHER_METRICS_PREFIX = "RateLimiterDispatcher.$DISPATCHER_NAME"42    }43    @Test44    fun `dispatch async operation with user defined async result type, operation invoked and it's result returned`() {45        class UserAsyncResult {46            fun whenComplete(callback: () -> Unit) {47                callback()48            }49        }50        val dispatcher = createDispatcher()51        val asyncResultInstance = UserAsyncResult()52        fun userAsyncOperation(): UserAsyncResult {53            return asyncResultInstance54        }55        val delayedSubmission = dispatcher.compose(56                { userAsyncOperation() },57                { asyncResult, callback -> asyncResult.whenComplete { callback.onAsyncResultCompleted() } }58        )59        (asyncResultInstance === delayedSubmission.get()).shouldBeTrue()60        dispatcher.close()61    }62    @Test63    fun `dispatch async operation with successfull CompletableFuture, operation invoked and it's result returned`() {64        val dispatcher = createDispatcher()65        val operationResult = Object()66        fun userAsyncOperation(): CompletableFuture<Any> {67            return completedFuture(operationResult)68        }69        val delayedSubmissionFuture = dispatcher.compose { userAsyncOperation() }70        (operationResult === delayedSubmissionFuture.get()).shouldBeTrue()71        dispatcher.close()72    }73    @Test74    fun `dispatch async operation with exceptional CompletableFuture, operation invoked and it's result returned`() {75        val dispatcher = createDispatcher()76        val asyncOperationException = Exception("some error")77        fun userAsyncOperation(): CompletableFuture<Any> {78            return CompletableFuture<Any>().apply {79                completeExceptionally(asyncOperationException)80            }81        }82        val delayedSubmissionFuture = dispatcher.compose { userAsyncOperation() }83        await().atMost(Duration.ofSeconds(10)).until {84            delayedSubmissionFuture.isCompletedExceptionally85        }86        val actualException = shouldThrow<Exception> { delayedSubmissionFuture.get() }87        actualException.cause.shouldNotBeNull()88        actualException.cause.shouldBe(asyncOperationException)89        dispatcher.close()90    }91    @Test92    fun `if windows size is 0, then restricted only by limiter `() {93        `async operations are restricted by limiter limit `(0)94    }95    @Test96    fun `if window size is not empty and quite big, restricted by limiter`() {97        `async operations are restricted by limiter limit `(100_000)98    }99    private fun `async operations are restricted by limiter limit `(windowSize: Int) {100        val RATE_PER_SECOND = 500101        val ITERATIONS = 5 * RATE_PER_SECOND102        val report = `submit series of operations`(103                ratePerSecond = RATE_PER_SECOND,104                interations = ITERATIONS,105                windowSize = DynamicProperty.of(windowSize))106        val operationReport = report.profilerCallReports.single { it.identity.name == "operation" }107        logger.info("Throughput " + operationReport.stopThroughputAvg)108        operationReport.stopThroughputAvg.shouldBeBetween(109                RATE_PER_SECOND.toDouble(),110                RATE_PER_SECOND.toDouble(),111                RATE_PER_SECOND.toDouble() * 0.25)112    }113    private fun `submit series of operations`(114            ratePerSecond: Int,115            interations: Int,116            windowSize: DynamicProperty<Int>): ProfilerReport {117        val profiler = AggregatingProfiler()118        val dispatcher = createDispatcher(119                rateLimitRequestPerSecond = ratePerSecond,120                window = windowSize,121                profiler = profiler122        )123        val counter = AtomicInteger(0)124        val profilerReporter = profiler.createReporter()125        val profiledCall = profiler.profiledCall("operation")126        val features = List(interations) {127            dispatcher.compose {128                profiledCall.profile<CompletableFuture<Int>> {129                    completedFuture(counter.incrementAndGet())130                }131            }132        }133        logger.info("Submit $interations operations.")134        features.forEach { it.join() }135        counter.get().shouldBe(interations)136        features.map { it.join() }.toSet().containsAll((1..interations).toList())137        val report = profilerReporter.buildReportAndReset()138        dispatcher.close()139        return report;140    }141    @Test142    fun `when window of uncompleted operations is full no new operation is dispatched`() {143        val dispatcher = TrackableDispatcher()144        dispatcher.windowProperty.set(10)145        dispatcher.submitTasks(1..11)146        sleep(4000)147        dispatcher.isSubmittedTaskInvoked(1..10).shouldBeTrue()148        dispatcher.isSubmittedTaskInvoked(11).shouldBeFalse()149        dispatcher.completeTask(4)150        await().atMost(Duration.ofSeconds(10)).until {151            dispatcher.isSubmittedTaskInvoked(11)152        }153        dispatcher.completeAllAndClose()154    }155    @Test156    fun `'queue_wait', 'acquire_limit', 'acquire_window', 'supplied_operation', 'queue_size', 'active_async_operations' metrics gathered during execution`() {157        val RATE_PER_SECOND = 500158        val ITERATIONS = 5 * RATE_PER_SECOND159        val report = `submit series of operations`(160                ratePerSecond = RATE_PER_SECOND,161                interations = ITERATIONS,162                windowSize = DynamicProperty.of(100))163        report.profilerCallReports.single { it.identity.name == "$DISPATCHER_METRICS_PREFIX.queue_wait" }164                .stopSum.shouldBe(ITERATIONS)165        report.profilerCallReports.single { it.identity.name == "$DISPATCHER_METRICS_PREFIX.acquire_window" }166                .stopSum.shouldBe(ITERATIONS)167        report.profilerCallReports.single { it.identity.name == "$DISPATCHER_METRICS_PREFIX.acquire_limit" }168                .stopSum.shouldBe(ITERATIONS)169        report.profilerCallReports.single { it.identity.name == "$DISPATCHER_METRICS_PREFIX.supply_operation" }170                .stopSum.shouldBe(ITERATIONS)171        report.indicators.map { it.key.name }.shouldContain("$DISPATCHER_METRICS_PREFIX.queue_size")172        report.indicators.map { it.key.name }.shouldContain("$DISPATCHER_METRICS_PREFIX.active_async_operations")173        logger.info(report.toString())174    }175    @Test176    fun `indicators 'queue_size' and 'active_async_operations' adjusted according to number of queued and active operations`() {177        val profiler = AggregatingProfiler()178        val reporter = profiler.createReporter()179        val trackableDispatcher = TrackableDispatcher(profiler)180        trackableDispatcher.windowProperty.set(10)181        trackableDispatcher.submitTasks(1..12)182        await().atMost(1, TimeUnit.SECONDS).until {183            trackableDispatcher.isSubmittedTaskInvoked(1..10)184        }185        reporter.buildReportAndReset().assertSoftly {186            indicators.mapKeys { it.key.name }.assertSoftly {187                it["$DISPATCHER_METRICS_PREFIX.queue_size"] shouldBe 1188                it["$DISPATCHER_METRICS_PREFIX.active_async_operations"] shouldBe 10189            }190            profilerCallReports.single { it.identity.name == "$DISPATCHER_METRICS_PREFIX.acquire_window" }191                    .activeCallsCountMax shouldBe 1192        }193        trackableDispatcher.completeTasks(1..10)194        await().atMost(1, TimeUnit.SECONDS).until {195            trackableDispatcher.isSubmittedTaskInvoked(11..12)196        }197        reporter.buildReportAndReset().indicators.mapKeys { it.key.name }.assertSoftly {198            it["$DISPATCHER_METRICS_PREFIX.queue_size"] shouldBe 0199            it["$DISPATCHER_METRICS_PREFIX.active_async_operations"] shouldBe 2200        }201        trackableDispatcher.completeTasks(11..12)202        reporter.buildReportAndReset().indicators.mapKeys { it.key.name }.assertSoftly {203            it["$DISPATCHER_METRICS_PREFIX.queue_size"] shouldBe 0204            it["$DISPATCHER_METRICS_PREFIX.active_async_operations"] shouldBe 0205        }206        trackableDispatcher.completeAllAndClose()207    }208    @Test209    fun `WHEN many fast CompletableFuture tasks completed THEN 'active_async_operations' is 0`() {210        val report = `submit series of operations`(500, 4000, DynamicProperty.of(1000))211        logger.info { report }212        report.indicators.mapKeys { it.key.name }.assertSoftly {213            it["$DISPATCHER_METRICS_PREFIX.active_async_operations"] shouldBe 0214        }215    }216    @Test217    fun `WHEN completed futures arrived THEN indicators are correctly adjusted`() {218        val profiler = AggregatingProfiler()219        val reporter = profiler.createReporter()220        val trackableDispatcher = TrackableDispatcher(profiler)221        trackableDispatcher.windowProperty.set(5)222        trackableDispatcher.submitCompletedTasks(1..4)223        trackableDispatcher.submitTasks(5..6)224        await().atMost(1, TimeUnit.SECONDS).until {225            trackableDispatcher.isSubmittedTaskInvoked(1..6)226        }227        reporter.buildReportAndReset().indicators.mapKeys { it.key.name }.assertSoftly {228            it["$DISPATCHER_METRICS_PREFIX.queue_size"] shouldBe 0229            it["$DISPATCHER_METRICS_PREFIX.active_async_operations"] shouldBe 2230        }231        trackableDispatcher.submitCompletedTasks(7..10)232        await().atMost(1, TimeUnit.SECONDS).until {233            trackableDispatcher.isSubmittedTaskInvoked(7..10)234        }235        reporter.buildReportAndReset().indicators.mapKeys { it.key.name }.assertSoftly {236            it["$DISPATCHER_METRICS_PREFIX.queue_size"] shouldBe 0237            it["$DISPATCHER_METRICS_PREFIX.active_async_operations"] shouldBe 2238        }239        trackableDispatcher.completeTasks(5..6)240        reporter.buildReportAndReset().indicators.mapKeys { it.key.name }.assertSoftly {241            it["$DISPATCHER_METRICS_PREFIX.queue_size"] shouldBe 0242            it["$DISPATCHER_METRICS_PREFIX.active_async_operations"] shouldBe 0243        }244        trackableDispatcher.completeAllAndClose()245    }246    @Test247    fun `increasing window size allows to submit new operations up to the new limit`() {248        val trackableDispatcher = TrackableDispatcher()249        trackableDispatcher.windowProperty.set(10)250        trackableDispatcher.submitTasks(1..11)251        sleep(4000)252        trackableDispatcher.isSubmittedTaskInvoked(1..10).shouldBeTrue()253        trackableDispatcher.isSubmittedTaskInvoked(11).shouldBeFalse()254        trackableDispatcher.windowProperty.set(11)255        trackableDispatcher.submitTask(12)256        trackableDispatcher.completeTask(1)257        await().atMost(Duration.ofSeconds(10)).until {258            trackableDispatcher.isSubmittedTaskInvoked(1..12)259        }260        trackableDispatcher.completeAllAndClose();261    }262    @Test263    fun `decreasing window size reduces limit`() {264        val trackableDispatcher = TrackableDispatcher()265        trackableDispatcher.windowProperty.set(10)266        trackableDispatcher.submitTasks(1..10)267        sleep(4000)268        trackableDispatcher.isSubmittedTaskInvoked(1..10).shouldBeTrue()269        trackableDispatcher.completeTasks(1..10)270        trackableDispatcher.windowProperty.set(4)271        trackableDispatcher.submitTasks(11..15)272        sleep(4000)273        trackableDispatcher.isSubmittedTaskInvoked(11..14).shouldBeTrue()274        trackableDispatcher.isSubmittedTaskInvoked(15).shouldBeFalse()275        trackableDispatcher.completeTask(11)276        await().atMost(Duration.ofSeconds(10)).until {277            trackableDispatcher.isSubmittedTaskInvoked(15)278        }279        trackableDispatcher.completeAllAndClose();280    }281    /**282     * When dispatcher closingTimeout is enough for pending tasks to complete283     * such tasks will complete normally284     */285    @Test286    fun `on shutdown fast tasks complete normally`() {287        val dispatch = createDispatcher(closingTimeout = 5_000)288        assertTimeoutPreemptively(Duration.ofSeconds(10)) {289            val blockingTaskIsStarted = CountDownLatch(1)290            dispatch.compose {291                blockingTaskIsStarted.countDown()292                //Due to blocking nature of dispatch.close we hae to use sleep293                Thread.sleep(1000)294                completedFuture(true)295            }296            val futures = List(3) {297                dispatch.compose {298                    completedFuture(true)299                }300            }301            blockingTaskIsStarted.await()302            dispatch.close()303            CompletableFuture.allOf(*futures.toTypedArray()).exceptionally { null }.join()304            futures.forEach { future: CompletableFuture<*> ->305                future.isDone.shouldBeTrue()306                future.isCompletedExceptionally.shouldBeFalse()307            }308        }309    }310    @Test311    fun `on shutdown slow tasks complete exceptionally`() {312        val dispatch = createDispatcher(closingTimeout = 0)313        assertTimeoutPreemptively(Duration.ofSeconds(5)) {314            val blockingTaskIsStarted = CountDownLatch(1)315            dispatch.compose {316                blockingTaskIsStarted.countDown()317                //Due to blocking nature of dispatch.close we hae to use sleep318                Thread.sleep(1000)319                completedFuture(true)320            }321            val futures = ArrayList<CompletableFuture<*>>()322            for (i in 1..3) {323                futures.add(dispatch.compose {324                    completedFuture(true)325                })326            }327            blockingTaskIsStarted.await()328            dispatch.close()329            CompletableFuture.allOf(*futures.toTypedArray()).exceptionally { null }.join()330            futures.forEach { future: CompletableFuture<*> ->331                future.isDone.shouldBeTrue()332                future.isCompletedExceptionally.shouldBeTrue()333                shouldThrow<ExecutionException> { future.get() }334                        .cause.shouldBeInstanceOf<RejectedExecutionException>()335            }336        }337    }338    @Test339    fun `task, submitted in closed dispatcher, is rejected with exception`() {340        val dispatcher = createDispatcher()341        dispatcher.close()342        val result = dispatcher.compose {343            completedFuture(true)344        }345        await().atMost(Duration.ofSeconds(2)).until {346            result.isCompletedExceptionally347        }348        shouldThrow<ExecutionException> { result.get() }349                .cause.shouldBeInstanceOf<RejectedExecutionException>()350    }351    fun createDispatcher(352            rateLimitRequestPerSecond: Int = 500,353            window: DynamicProperty<Int> = DynamicProperty.of(0),354            closingTimeout: Int = 5000,355            profiler: Profiler = NoopProfiler()) =356            RateLimitedDispatcher(357                    DISPATCHER_NAME,358                    ConfigurableRateLimiter("rate-limiter-name", rateLimitRequestPerSecond),359                    profiler,360                    window,361                    DynamicProperty.of(closingTimeout.toLong())362            )363    inner class TrackableDispatcher(364            profiler: Profiler = NoopProfiler()365    ) {366        val windowProperty = AtomicProperty(0)367        val dispatcher = createDispatcher(profiler = profiler, window = windowProperty)368        val submittedTasksResults = HashMap<Int, CompletableFuture<Any?>>()369        val isSubmittedTaskInvoked = HashMap<Int, AtomicBoolean>()370        fun submitCompletedTasks(tasks: IntRange) {371            for (task in tasks) {372                submitCompletedTask(task)373            }374        }375        fun submitTasks(tasks: IntRange) {376            for (task in tasks) {377                submitTask(task)378            }379        }380        fun submitCompletedTask(taskIndex: Int) = submitTask(taskIndex, completedFuture(taskIndex))381        fun submitTask(taskIndex: Int) = submitTask(taskIndex, CompletableFuture())382        private fun submitTask(taskIndex: Int, future: CompletableFuture<Any?>) {383            submittedTasksResults[taskIndex] = future384            isSubmittedTaskInvoked[taskIndex] = AtomicBoolean(false)385            dispatcher.compose {386                isSubmittedTaskInvoked[taskIndex]!!.set(true)387                future388            }389        }390        fun completeTask(taskIndex: Int) {391            submittedTasksResults[taskIndex]!!.complete(taskIndex)392        }393        fun completeTasks(range: IntRange) {394            for (task in range) {...GrpcEventSubscriberTest.kt
Source:GrpcEventSubscriberTest.kt  
...23        "Test that we correctly generate event subscriptions" {24            forAll(Arb.long(-1L..1000L), Arb.long(0L..1000L)) { hwm, eventnumber ->25                val category = UUID.randomUUID().toString()26                val eventStoreMock = mockk<EventStoreDBClient>(relaxed = true) {27                    every { readStream(any(), any(), any())} returns CompletableFuture.completedFuture(ReadResult(listOf()))28                    every { subscribeToStream(any(), any(), any()) } returns CompletableFuture.completedFuture(mockk())29                }30                GrpcEventSubscriberFactory(31                        eventStoreDBClient = eventStoreMock,32                        category = category,33                        serdes = mockk()34                ).createSubscriber(subscriber = "aSubscriber", onEvent = { run {} }, fromEvent = hwm)35                verify(exactly = 1) { eventStoreMock.subscribeToStream("\$ce-$category", any(), any()) }36                true37            }38        }39        "On close propagates reason" {40            val category = UUID.randomUUID().toString()41            val subscriptionListener = slot<SubscriptionListener>()42            val subscription: CompletableFuture<Subscription> = CompletableFuture.completedFuture(mockk<Subscription> {43                every { subscriptionId } returns UUID.randomUUID().toString()44            })45            val eventStoreMock = mockk<EventStoreDBClient> {46                every { readStream(any(), any(), any())} returns CompletableFuture.completedFuture(ReadResult(listOf()))47                every { subscribeToStream("\$ce-$category", capture(subscriptionListener), any()) } returns subscription48            }49            var catchedException: Exception? = null50            GrpcEventSubscriberFactory(51                    eventStoreDBClient = eventStoreMock,52                    category = category,53                    serdes = mockk()54            ).createSubscriber(subscriber = "aSubscriber", onEvent = { run {} }, fromEvent = 1, onClose = {55                catchedException = it56            })57            val reason = "connection closed"58            subscriptionListener.captured.onError(subscription.get(), ConnectionShutdownException())59            (catchedException!! as GrpcSubscriptionDroppedException).run {60                reason shouldBe reason61                message shouldBe "Subscription was dropped. Reason: $ConnectionShutDown"62                cause should beInstanceOf<ConnectionShutdownException>()63            }64            verify(exactly = 1) { eventStoreMock.subscribeToStream("\$ce-$category", any(), any()) }65            verify(exactly = 1) { subscription.get().subscriptionId }66            confirmVerified(subscription.get())67        }68        "Create event subsription starting on MIN_VALUE" {69            val hwm = Long.MIN_VALUE70            val category = UUID.randomUUID().toString()71            val eventStoreMock = mockk<EventStoreDBClient>(relaxed = true)72            shouldThrowExactly<IllegalStateException> {73                GrpcEventSubscriberFactory(74                        eventStoreDBClient = eventStoreMock,75                        category = category,76                        serdes = mockk()77                ).createSubscriber(subscriber = "aSubscriber", onEvent = { run {} }, fromEvent = hwm)78            }.message shouldBe "the from-event $hwm is invalid, must be a number equal to or larger than -1"79        }80        "Create event subscriptions using different borderline highwater marks" {81            io.kotest.data.forAll(82                row(-1L, StreamRevision.START),83                row(0L, StreamRevision(0L)),84                row(1L, StreamRevision(1L)),85                row(37999L, StreamRevision(37999L)),86                row(Long.MAX_VALUE, StreamRevision(Long.MAX_VALUE)))87            { hwm, revision ->88                val category = UUID.randomUUID().toString()89                val streamName = "\$ce-$category"90                val eventStoreMock = mockk<EventStoreDBClient> {91                    every { readStream(any(), any(), any())} returns CompletableFuture.completedFuture(ReadResult(listOf()))92                    every { subscribeToStream(streamName, any(), any()) } returns CompletableFuture.completedFuture(mockk())93                }94                GrpcEventSubscriberFactory(95                        eventStoreDBClient = eventStoreMock,96                        category = category,97                        serdes = mockk()98                ).createSubscriber(subscriber = "aSubscriber", onEvent = { run {} }, fromEvent = hwm)99                verify(exactly = 1) { eventStoreMock.subscribeToStream("\$ce-$category", any(), withArg { it.startingRevision shouldBe revision }) }100            }101        }102        "onLive should be called before events received if stream is empty" {103            val category = UUID.randomUUID().toString()104            val streamId = "\$ce-$category"105            val eventStoreMock = mockk<EventStoreDBClient>(relaxed = true) {106                every { readStream(any(), any(), any())} returns CompletableFuture.completedFuture(ReadResult(listOf()))107                every {108                    hint(CompletableFuture::class)109                    subscribeToStream(streamId, any(), any())110                } returns CompletableFuture.completedFuture(mockk<Subscription>(relaxed = true))111            }112            val subscriberFactory = GrpcEventSubscriberFactory(113                eventStoreDBClient = eventStoreMock,114                category = category,115                serdes = mockk()116            )117            var onLiveCalled = 0118            subscriberFactory.createSubscriber("subscriber",119                fromEvent = 0,120                onEvent = { },121                onLive = { onLiveCalled += 1 })122            onLiveCalled shouldBe 1123        }124        "onLive should be called before events received if we start at the last event" {125            val category = UUID.randomUUID().toString()126            val streamId = "\$ce-$category"127            val eventStoreMock = mockk<EventStoreDBClient>(relaxed = true) {128                every { readStream(any(), any(), any())} returns CompletableFuture.completedFuture(ReadResult(listOf()))129                every {130                    hint(CompletableFuture::class)131                    readStream(streamId, 1, ReadStreamOptions.get().backwards().fromEnd().notResolveLinkTos())132                } returns mockk<CompletableFuture<ReadResult>>(relaxed = true) {133                    every {134                        get().events.first().originalEvent.streamRevision.valueUnsigned135                    } returns 42136                }137                every {138                    hint(CompletableFuture::class)139                    subscribeToStream(streamId, any(), any())140                } returns CompletableFuture.completedFuture(mockk<Subscription>(relaxed = true))141            }142            val subscriberFactory = GrpcEventSubscriberFactory(143                eventStoreDBClient = eventStoreMock,144                category = category,145                serdes = mockk()146            )147            var onLiveCalled = 0148            subscriberFactory.createSubscriber("subscriber",149                fromEvent = 42,150                onEvent = { },151                onLive = { onLiveCalled += 1 })152            onLiveCalled shouldBe 1153        }154        "onLive should be called when subscription receives last event of stream" {155            val category = UUID.randomUUID().toString()156            val streamId = "\$ce-$category"157            val lastEvent = 42L158            val subscribeFrom = 41L159            val listener = slot<SubscriptionListener>()160            val eventStoreMock = mockk<EventStoreDBClient>(relaxed = true) {161                every {162                    hint(CompletableFuture::class)163                    readStream(streamId, 1, match { it.direction == Direction.Backwards })164                } returns mockk<CompletableFuture<ReadResult>>(relaxed = true) {165                    every {166                        get().events.first().originalEvent.streamRevision.valueUnsigned167                    } returns lastEvent168                }169                every {170                    hint(CompletableFuture::class)171                    subscribeToStream(streamId, capture(listener), match { it.startingRevision.valueUnsigned == subscribeFrom })172                } returns CompletableFuture.completedFuture(mockk<Subscription>(relaxed = true))173            }174            val subscriberFactory = GrpcEventSubscriberFactory(175                eventStoreDBClient = eventStoreMock,176                category = category,177                serdes = mockk(relaxed = true) {178                    every {179                        deserialize(any(), any())180                    } returns mockk() { every { upgrade() } returns null }181                }182            )183            var onLiveCalled = 0184            var onEventCalled = 0185            subscriberFactory.createSubscriber("subscriber",186                fromEvent = subscribeFrom,...RPKNotificationServiceTest.kt
Source:RPKNotificationServiceTest.kt  
...26import io.kotest.matchers.shouldBe27import io.mockk.every28import io.mockk.mockk29import io.mockk.verify30import java.util.concurrent.CompletableFuture.completedFuture31class RPKNotificationServiceTest : WordSpec({32    val recipient = mockk<RPKProfile>()33    val plugin = mockk<RPKNotificationsBukkit>()34    val database = mockk<Database>()35    val notificationTable = mockk<RPKNotificationTable>()36    every { database.getTable(RPKNotificationTable::class.java) } returns notificationTable37    val notificationService = RPKNotificationServiceImpl(plugin, database)38    "Notification service" should {39        "return notifications for a profile" {40            val notifications = listOf(mockk<RPKNotification>())41            every { notificationTable.get(recipient) } returns completedFuture(notifications)42            notificationService.getNotifications(recipient).join() shouldBe notifications43        }44        "return notification by id" {45            val id = RPKNotificationId(1)46            val notification = mockk<RPKNotification>()47            every { notificationTable.get(id) } returns completedFuture(notification)48            notificationService.getNotification(id).join() shouldBe notification49        }50        "delete notification" {51            val notification = mockk<RPKNotification>()52            every { notificationTable.delete(notification) } returns completedFuture(null)53            notificationService.removeNotification(notification).join()54            verify { notificationTable.delete(notification) }55        }56        "create notification" {57            every { notificationTable.insert(any()) } returns completedFuture(null)58            notificationService.createNotification(59                recipient,60                "test title",61                "test content",62            ).join() should {63                it.recipient shouldBe recipient64                it.title shouldBe "test title"65                it.content shouldBe "test content"66                it.read shouldBe false67            }68        }69        "add notification" {70            every { notificationTable.insert(any()) } returns completedFuture(null)71            val notification = mockk<RPKNotification>()72            notificationService.addNotification(notification)73            verify { notificationTable.insert(notification) }74        }75    }76})...QueueServiceImplTest.kt
Source:QueueServiceImplTest.kt  
...38                val receiveMessageResponse = with(ReceiveMessageResponse.builder()) {39                    messages(message)40                    build()41                }42                val queueAttributesFuture = CompletableFuture.completedFuture(queueAttributesResponse)43                val noMessagesFuture = CompletableFuture.completedFuture(noMessagesResponse)44                val receiveMessageFuture = CompletableFuture.completedFuture(receiveMessageResponse)45                val sqs = mock<SqsAsyncClient> {46                    on { getQueueAttributes(any<GetQueueAttributesRequest>()) } doReturnConsecutively47                            List(10) { noMessagesFuture } + listOf(queueAttributesFuture, noMessagesFuture)48                    on { receiveMessage(any<ReceiveMessageRequest>()) } doReturn receiveMessageFuture49                }50                val queueService = QueueServiceImpl(sqs, SQS_QUEUE_URL, SQS_CHECK_INTERVAL)51                withTimeoutOrNull(100) {52                    queueService.incomingPrefixes().collect { (receiptHandle, keys) ->53                        receiptHandle shouldBe SQS_RECEIPT_HANDLE54                        keys shouldContainInOrder listOf(S3_OBJECT_KEY_1, S3_OBJECT_KEY_2)55                        verify(sqs, times(1)).receiveMessage(any<ReceiveMessageRequest>())56                    }57                }58            }...FlowBalanceServiceTest.kt
Source:FlowBalanceServiceTest.kt  
...26    val flowApiMock = mockk<AsyncFlowAccessApi> {27        every {28            executeScriptAtLatestBlock(any(), any())29        } returns30            CompletableFuture.completedFuture(FlowScriptResponse(balanceResponse))31    }32    val balanceRepository = mockk<BalanceRepository> {33        every {34            save(any())35        } answers { Mono.just(arg(0)) }36    }37    val service = FlowBalanceService(38        FlowChainId.TESTNET,39        flowApiMock,40        balanceRepository41    )42    test("should init balances and save") {43        service.initBalances(44            FlowAddress("0x1c0a1528f6966cb8"),...matchers.kt
Source:matchers.kt  
...5import io.kotest.matchers.shouldBe6import io.kotest.matchers.shouldNot7import io.kotest.matchers.shouldNotBe8import java.util.concurrent.CompletableFuture9fun <T> CompletableFuture<T>.shouldBeCompletedExceptionally() = this shouldBe completedExceptionally<T>()10fun <T> CompletableFuture<T>.shouldNotBeCompletedExceptionally() = this shouldNotBe completedExceptionally<T>()11fun <T> completedExceptionally() = object : Matcher<CompletableFuture<T>> {12   override fun test(value: CompletableFuture<T>): MatcherResult =13      MatcherResult(14         value.isCompletedExceptionally,15         { "Future should be completed exceptionally" },16         {17            "Future should not be completed exceptionally"18         })19}20fun <T> CompletableFuture<T>.shouldBeCompleted() = this shouldBe completed<T>()21fun <T> CompletableFuture<T>.shouldNotBeCompleted() = this shouldNotBe completed<T>()22fun <T> completed() = object : Matcher<CompletableFuture<T>> {23   override fun test(value: CompletableFuture<T>): MatcherResult =24      MatcherResult(25         value.isDone,26         { "Future should be completed" },27         {28            "Future should not be completed"29         })30}31fun <T> CompletableFuture<T>.shouldBeCancelled() = this shouldBe cancelled<T>()32fun <T> CompletableFuture<T>.shouldNotBeCancelled() = this shouldNotBe cancelled<T>()33fun <T> cancelled() = object : Matcher<CompletableFuture<T>> {34   override fun test(value: CompletableFuture<T>): MatcherResult =35      MatcherResult(36         value.isCancelled,37         { "Future should be completed" },38         {39            "Future should not be completed"40         })41}42infix fun CompletableFuture<*>.shouldCompleteExceptionallyWith(throwable: Throwable) =43   this should completeExceptionallyWith(throwable)44infix fun CompletableFuture<*>.shouldNotCompleteExceptionallyWith(throwable: Throwable) =45   this shouldNot completeExceptionallyWith(throwable)46internal fun completeExceptionallyWith(throwable: Throwable) = object : Matcher<CompletableFuture<*>> {47   override fun test(value: CompletableFuture<*>): MatcherResult {48      val exception = value.runCatching { get() }.exceptionOrNull()49      return MatcherResult(50         exception != null && exception.cause == throwable,51         { errorMessageForTestFailure(exception?.cause, throwable) },52         { "Expected future not to fail with ${exception?.cause}, but it did fail with it." }53      )...EsjcEventWriterTest.kt
Source:EsjcEventWriterTest.kt  
...25                Event(aggregateId = aggregateId, eventData = SomeEventData(aggregateId))26            val capturedEventData = slot<List<EventData>>()27            val eventStoreMock = mockk<EventStore>().apply {28                every { appendToStream("ks.fiks.$eventAggregateType.${event.aggregateId}", 0L, capture(capturedEventData)) } returns29                        CompletableFuture.completedFuture(WriteResult(0, Position(0L, 0L)))30            }31            val deserializer = mockk<EventSerdes>()32                    .apply {33                        every { isJson() } returns true34                        every { serialize(event.eventData) } returns "foo".toByteArray()35                        every { getSerializationId(any<KClass<no.ks.kes.lib.EventData<*>>>()) } answers { firstArg<KClass<no.ks.kes.lib.EventData<*>>>().simpleName!! }36                    }37            val esjcEventWriter = EsjcAggregateRepository(38                    eventStore = eventStoreMock,39                    streamIdGenerator = { t: String, id: UUID -> "ks.fiks.$t.$id" },40                    serdes = deserializer)41            esjcEventWriter.append(eventAggregateType, event.aggregateId, ExpectedEventNumber.Exact(0L), listOf(event))42            with(capturedEventData.captured.single()) {43                Assertions.assertArrayEquals( "foo".toByteArray(), this.data!!)...GrpcEventWriterTest.kt
Source:GrpcEventWriterTest.kt  
...22                Event(aggregateId = aggregateId, eventData = SomeEventData(aggregateId))23            val capturedEventData = slot<Iterator<com.eventstore.dbclient.EventData>>()24            val eventStoreMock = mockk<EventStoreDBClient>().apply {25                every { appendToStream("ks.fiks.$eventAggregateType.${event.aggregateId}", any(), capture(capturedEventData)) } returns26                        CompletableFuture.completedFuture(WriteResult(StreamRevision.START, Position.START))27            }28            val serializer = mockk<EventSerdes>()29                    .apply {30                        every { isJson() } returns true31                        every { serialize(event.eventData) } returns "foo".toByteArray()32                        every { getSerializationId(any<KClass<no.ks.kes.lib.EventData<*>>>()) } answers { firstArg<KClass<no.ks.kes.lib.EventData<*>>>().simpleName!! }33                    }34            val grpcEventWriter = GrpcAggregateRepository(35                    eventStoreDBClient = eventStoreMock,36                    streamIdGenerator = { t: String, id: UUID -> "ks.fiks.$t.$id" },37                    serdes = serializer)38            grpcEventWriter.append(eventAggregateType, event.aggregateId, ExpectedEventNumber.Exact(0L), listOf(event))39            with(capturedEventData.captured.next()) {40                Assertions.assertArrayEquals( "foo".toByteArray(), this.eventData!!)...completed
Using AI Code Generation
1@DisplayName ( "test completed matcher" ) @Test fun testCompleted () { val future = CompletableFuture . completedFuture ( "completed" ) future should completed ( "completed" ) }2@DisplayName ( "test failed matcher" ) @Test fun testFailed () { val future = CompletableFuture . failedFuture ( IllegalStateException ( "failed" )) future should failed ( IllegalStateException ( "failed" )) }3@DisplayName ( "test instanceOf matcher" ) @Test fun testInstanceOf () { val future = CompletableFuture . completedFuture ( "completed" ) future should instanceOf ( String :: class ) }4@DisplayName ( "test isCompleted matcher" ) @Test fun testIsCompleted () { val future = CompletableFuture . completedFuture ( "completed" ) future should isCompleted () }5@DisplayName ( "test isCompletedWithValue matcher" ) @Test fun testIsCompletedWithValue () { val future = CompletableFuture . completedFuture ( "completed" ) future should isCompletedWithValue ( "completed" ) }6@DisplayName ( "test isNotCompleted matcher" ) @Test fun testIsNotCompleted () { val future = CompletableFuture . completedFuture ( "completed" ) future should isNotCompleted () }7@DisplayName ( "test isNotCompletedWithValue matcher" ) @Test fun testIsNotCompletedWithValue () { val future = CompletableFuture . completedFuture ( "completed" ) future should isNotCompletedWithValue ( "completed" ) }8@DisplayName ( "test isNotCompletedWithValueInstanceOf matcher" ) @Test fun testIsNotCompletedWithValueInstanceOf () { val future = CompletableFuture . completedFuture ( "completed" ) future should isNotCompletedWithValueInstanceOf ( String :: class ) }9@DisplayName ( "test isNotcompleted
Using AI Code Generation
1@kotlin . test . Test fun `should complete with given value` () { val future = future { 1 } future shouldCompleteWith 1 }2@kotlin . test . Test fun `should fail with given exception` () { val future = future { throw IllegalArgumentException ( "some exception" ) } future shouldFailWith IllegalArgumentException :: class }3@kotlin . test . Test fun `should fail with given exception and message` () { val future = future { throw IllegalArgumentException ( "some exception" ) } future shouldFailWith IllegalArgumentException :: class andMessage "some exception" }4@kotlin . test . Test fun `should fail with given message` () { val future = future { throw IllegalArgumentException ( "some exception" ) } future shouldFailWithMessage "some exception" }5@kotlin . test . Test fun `should be cancelled` () { val future = future { 1 } future . cancel ( true ) future shouldBeCancelled }6@kotlin . test . Test fun `should be cancelled with given message` () { val future = future { 1 } future . cancel ( true ) future shouldBeCancelledWithMessage "CancellationException: Future.cancel() was called." }7@kotlin . test . Test fun `should be completed with given value` () { val future = future { 1 } future shouldBeCompletedWith 1 }8@kotlin . test . Test fun `should be failed with given exception` () { val future = future { throw IllegalArgumentException ( "some exception" ) } future shouldBeFailedWith IllegalArgumentException :: class }9@kotlin . test . Test fun `should be failed with given exception and message` () { val future = futureLearn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
