package org.neo4j.driver.internal.reactive;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Stream;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Query;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.Values;
import org.neo4j.driver.internal.InternalRecord;
import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
import org.neo4j.driver.internal.bolt.api.TelemetryApi;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.cursor.RxResultCursorImpl;
import org.neo4j.driver.internal.telemetry.ApiTelemetryWork;
import org.neo4j.driver.internal.util.FixedRetryLogic;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.internal.value.IntegerValue;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.reactive.RxTransaction;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

/* loaded from: input_file:org/neo4j/driver/internal/reactive/InternalRxSessionTest.class */
class InternalRxSessionTest {
    InternalRxSessionTest() {
    }

    private static Stream<Function<RxSession, RxResult>> allSessionRunMethods() {
        return Stream.of((Object[]) new Function[]{rxSession -> {
            return rxSession.run("RETURN 1");
        }, rxSession2 -> {
            return rxSession2.run("RETURN $x", Values.parameters(new Object[]{"x", 1}));
        }, rxSession3 -> {
            return rxSession3.run("RETURN $x", Collections.singletonMap("x", 1));
        }, rxSession4 -> {
            return rxSession4.run("RETURN $x", new InternalRecord(Collections.singletonList("x"), new Value[]{new IntegerValue(1L)}));
        }, rxSession5 -> {
            return rxSession5.run(new Query("RETURN $x", Values.parameters(new Object[]{"x", 1})));
        }, rxSession6 -> {
            return rxSession6.run(new Query("RETURN $x", Values.parameters(new Object[]{"x", 1})), TransactionConfig.empty());
        }, rxSession7 -> {
            return rxSession7.run("RETURN $x", Collections.singletonMap("x", 1), TransactionConfig.empty());
        }, rxSession8 -> {
            return rxSession8.run("RETURN 1", TransactionConfig.empty());
        }});
    }

    private static Stream<Function<RxSession, Publisher<RxTransaction>>> allBeginTxMethods() {
        return Stream.of((Object[]) new Function[]{(v0) -> {
            return v0.beginTransaction();
        }, rxSession -> {
            return rxSession.beginTransaction(TransactionConfig.empty());
        }});
    }

    private static Stream<Function<RxSession, Publisher<String>>> allRunTxMethods() {
        return Stream.of((Object[]) new Function[]{rxSession -> {
            return rxSession.readTransaction(rxTransaction -> {
                return Flux.just("a");
            });
        }, rxSession2 -> {
            return rxSession2.writeTransaction(rxTransaction -> {
                return Flux.just("a");
            });
        }, rxSession3 -> {
            return rxSession3.readTransaction(rxTransaction -> {
                return Flux.just("a");
            }, TransactionConfig.empty());
        }, rxSession4 -> {
            return rxSession4.writeTransaction(rxTransaction -> {
                return Flux.just("a");
            }, TransactionConfig.empty());
        }});
    }

    @MethodSource({"allSessionRunMethods"})
    @ParameterizedTest
    void shouldDelegateRun(Function<RxSession, RxResult> function) {
        NetworkSession networkSession = (NetworkSession) Mockito.mock(NetworkSession.class);
        RxResultCursor rxResultCursor = (RxResultCursor) Mockito.mock(RxResultCursorImpl.class);
        Mockito.when(networkSession.runRx((Query) ArgumentMatchers.any(Query.class), (TransactionConfig) ArgumentMatchers.any(TransactionConfig.class), (CompletionStage) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(rxResultCursor));
        CompletionStage completionStage = (CompletionStage) ((RxResult) function.apply(new InternalRxSession(networkSession))).cursorFutureSupplier().get();
        ((NetworkSession) Mockito.verify(networkSession)).runRx((Query) ArgumentMatchers.any(Query.class), (TransactionConfig) ArgumentMatchers.any(TransactionConfig.class), (CompletionStage) ArgumentMatchers.any());
        MatcherAssert.assertThat((RxResultCursor) Futures.getNow(completionStage), CoreMatchers.equalTo(rxResultCursor));
    }

    @MethodSource({"allSessionRunMethods"})
    @ParameterizedTest
    void shouldReleaseConnectionIfFailedToRun(Function<RxSession, RxResult> function) {
        RuntimeException runtimeException = new RuntimeException("Hi there");
        NetworkSession networkSession = (NetworkSession) Mockito.mock(NetworkSession.class);
        Mockito.when(networkSession.runRx((Query) ArgumentMatchers.any(Query.class), (TransactionConfig) ArgumentMatchers.any(TransactionConfig.class), (CompletionStage) ArgumentMatchers.any())).thenReturn(CompletableFuture.failedFuture(runtimeException));
        Mockito.when(networkSession.releaseConnectionAsync()).thenReturn(Futures.completedWithNull());
        CompletionStage completionStage = (CompletionStage) ((RxResult) function.apply(new InternalRxSession(networkSession))).cursorFutureSupplier().get();
        ((NetworkSession) Mockito.verify(networkSession)).runRx((Query) ArgumentMatchers.any(Query.class), (TransactionConfig) ArgumentMatchers.any(TransactionConfig.class), (CompletionStage) ArgumentMatchers.any());
        MatcherAssert.assertThat(((RuntimeException) Assertions.assertThrows(CompletionException.class, () -> {
            Futures.getNow(completionStage);
        })).getCause(), CoreMatchers.equalTo(runtimeException));
        ((NetworkSession) Mockito.verify(networkSession)).releaseConnectionAsync();
    }

    @MethodSource({"allBeginTxMethods"})
    @ParameterizedTest
    void shouldDelegateBeginTx(Function<RxSession, Publisher<RxTransaction>> function) {
        NetworkSession networkSession = (NetworkSession) Mockito.mock(NetworkSession.class);
        UnmanagedTransaction unmanagedTransaction = (UnmanagedTransaction) Mockito.mock(UnmanagedTransaction.class);
        ApiTelemetryWork apiTelemetryWork = new ApiTelemetryWork(TelemetryApi.UNMANAGED_TRANSACTION);
        Mockito.when(networkSession.beginTransactionAsync((TransactionConfig) ArgumentMatchers.any(TransactionConfig.class), (String) ArgumentMatchers.isNull(), (ApiTelemetryWork) ArgumentMatchers.eq(apiTelemetryWork))).thenReturn(CompletableFuture.completedFuture(unmanagedTransaction));
        StepVerifier.create(Mono.from(function.apply(new InternalRxSession(networkSession)))).expectNextCount(1L).verifyComplete();
        ((NetworkSession) Mockito.verify(networkSession)).beginTransactionAsync((TransactionConfig) ArgumentMatchers.any(TransactionConfig.class), (String) ArgumentMatchers.isNull(), (ApiTelemetryWork) ArgumentMatchers.eq(apiTelemetryWork));
    }

    @MethodSource({"allBeginTxMethods"})
    @ParameterizedTest
    void shouldReleaseConnectionIfFailedToBeginTx(Function<RxSession, Publisher<RxTransaction>> function) {
        RuntimeException runtimeException = new RuntimeException("Hi there");
        NetworkSession networkSession = (NetworkSession) Mockito.mock(NetworkSession.class);
        ApiTelemetryWork apiTelemetryWork = new ApiTelemetryWork(TelemetryApi.UNMANAGED_TRANSACTION);
        Mockito.when(networkSession.beginTransactionAsync((TransactionConfig) ArgumentMatchers.any(TransactionConfig.class), (String) ArgumentMatchers.isNull(), (ApiTelemetryWork) ArgumentMatchers.eq(apiTelemetryWork))).thenReturn(CompletableFuture.failedFuture(runtimeException));
        Mockito.when(networkSession.releaseConnectionAsync()).thenReturn(Futures.completedWithNull());
        CompletableFuture future = Mono.from(function.apply(new InternalRxSession(networkSession))).toFuture();
        ((NetworkSession) Mockito.verify(networkSession)).beginTransactionAsync((TransactionConfig) ArgumentMatchers.any(TransactionConfig.class), (String) ArgumentMatchers.isNull(), (ApiTelemetryWork) ArgumentMatchers.eq(apiTelemetryWork));
        MatcherAssert.assertThat(((RuntimeException) Assertions.assertThrows(CompletionException.class, () -> {
            Futures.getNow(future);
        })).getCause(), CoreMatchers.equalTo(runtimeException));
        ((NetworkSession) Mockito.verify(networkSession)).releaseConnectionAsync();
    }

    @MethodSource({"allRunTxMethods"})
    @ParameterizedTest
    void shouldDelegateRunTx(Function<RxSession, Publisher<String>> function) {
        NetworkSession networkSession = (NetworkSession) Mockito.mock(NetworkSession.class);
        UnmanagedTransaction unmanagedTransaction = (UnmanagedTransaction) Mockito.mock(UnmanagedTransaction.class);
        ApiTelemetryWork apiTelemetryWork = new ApiTelemetryWork(TelemetryApi.MANAGED_TRANSACTION);
        Mockito.when(unmanagedTransaction.closeAsync(true)).thenReturn(Futures.completedWithNull());
        Mockito.when(networkSession.beginTransactionAsync((AccessMode) ArgumentMatchers.any(AccessMode.class), (TransactionConfig) ArgumentMatchers.any(TransactionConfig.class), (ApiTelemetryWork) ArgumentMatchers.eq(apiTelemetryWork))).thenReturn(CompletableFuture.completedFuture(unmanagedTransaction));
        Mockito.when(networkSession.retryLogic()).thenReturn(new FixedRetryLogic(1));
        StepVerifier.create(Flux.from(function.apply(new InternalRxSession(networkSession)))).expectNext("a").verifyComplete();
        ((NetworkSession) Mockito.verify(networkSession)).beginTransactionAsync((AccessMode) ArgumentMatchers.any(AccessMode.class), (TransactionConfig) ArgumentMatchers.any(TransactionConfig.class), (ApiTelemetryWork) ArgumentMatchers.eq(apiTelemetryWork));
        ((UnmanagedTransaction) Mockito.verify(unmanagedTransaction)).closeAsync(true);
    }

    @Test
    void shouldRetryOnError() {
        NetworkSession networkSession = (NetworkSession) Mockito.mock(NetworkSession.class);
        UnmanagedTransaction unmanagedTransaction = (UnmanagedTransaction) Mockito.mock(UnmanagedTransaction.class);
        Mockito.when(unmanagedTransaction.closeAsync(false)).thenReturn(Futures.completedWithNull());
        ApiTelemetryWork apiTelemetryWork = new ApiTelemetryWork(TelemetryApi.MANAGED_TRANSACTION);
        Mockito.when(networkSession.beginTransactionAsync((AccessMode) ArgumentMatchers.any(AccessMode.class), (TransactionConfig) ArgumentMatchers.any(TransactionConfig.class), (ApiTelemetryWork) ArgumentMatchers.eq(apiTelemetryWork))).thenReturn(CompletableFuture.completedFuture(unmanagedTransaction));
        Mockito.when(networkSession.retryLogic()).thenReturn(new FixedRetryLogic(2));
        StepVerifier.create(Flux.from(new InternalRxSession(networkSession).readTransaction(rxTransaction -> {
            return Flux.just("a").then(Mono.error(new RuntimeException("Errored")));
        }))).expectError(RuntimeException.class).verify();
        ((NetworkSession) Mockito.verify(networkSession, Mockito.times(2 + 1))).beginTransactionAsync((AccessMode) ArgumentMatchers.any(AccessMode.class), (TransactionConfig) ArgumentMatchers.any(TransactionConfig.class), (ApiTelemetryWork) ArgumentMatchers.eq(apiTelemetryWork));
        ((UnmanagedTransaction) Mockito.verify(unmanagedTransaction, Mockito.times(2 + 1))).closeAsync(false);
    }

    @Test
    void shouldObtainResultIfRetrySucceed() {
        int i = 2;
        NetworkSession networkSession = (NetworkSession) Mockito.mock(NetworkSession.class);
        UnmanagedTransaction unmanagedTransaction = (UnmanagedTransaction) Mockito.mock(UnmanagedTransaction.class);
        ApiTelemetryWork apiTelemetryWork = new ApiTelemetryWork(TelemetryApi.MANAGED_TRANSACTION);
        Mockito.when(unmanagedTransaction.closeAsync(false)).thenReturn(Futures.completedWithNull());
        Mockito.when(unmanagedTransaction.closeAsync(true)).thenReturn(Futures.completedWithNull());
        Mockito.when(networkSession.beginTransactionAsync((AccessMode) ArgumentMatchers.any(AccessMode.class), (TransactionConfig) ArgumentMatchers.any(TransactionConfig.class), (ApiTelemetryWork) ArgumentMatchers.eq(apiTelemetryWork))).thenReturn(CompletableFuture.completedFuture(unmanagedTransaction));
        Mockito.when(networkSession.retryLogic()).thenReturn(new FixedRetryLogic(2));
        InternalRxSession internalRxSession = new InternalRxSession(networkSession);
        AtomicInteger atomicInteger = new AtomicInteger();
        StepVerifier.create(Flux.from(internalRxSession.readTransaction(rxTransaction -> {
            return atomicInteger.getAndIncrement() == i ? Flux.just("a") : Flux.just("a").then(Mono.error(new RuntimeException("Errored")));
        }))).expectNext("a").verifyComplete();
        ((NetworkSession) Mockito.verify(networkSession, Mockito.times(2 + 1))).beginTransactionAsync((AccessMode) ArgumentMatchers.any(AccessMode.class), (TransactionConfig) ArgumentMatchers.any(TransactionConfig.class), (ApiTelemetryWork) ArgumentMatchers.eq(apiTelemetryWork));
        ((UnmanagedTransaction) Mockito.verify(unmanagedTransaction, Mockito.times(2))).closeAsync(false);
        ((UnmanagedTransaction) Mockito.verify(unmanagedTransaction)).closeAsync(true);
    }

    @Test
    void shouldDelegateBookmark() {
        NetworkSession networkSession = (NetworkSession) Mockito.mock(NetworkSession.class);
        new InternalRxSession(networkSession).lastBookmark();
        ((NetworkSession) Mockito.verify(networkSession)).lastBookmarks();
        Mockito.verifyNoMoreInteractions(new Object[]{networkSession});
    }

    @Test
    void shouldDelegateBookmarks() {
        NetworkSession networkSession = (NetworkSession) Mockito.mock(NetworkSession.class);
        new InternalRxSession(networkSession).lastBookmarks();
        ((NetworkSession) Mockito.verify(networkSession)).lastBookmarks();
        Mockito.verifyNoMoreInteractions(new Object[]{networkSession});
    }

    @Test
    void shouldDelegateClose() {
        NetworkSession networkSession = (NetworkSession) Mockito.mock(NetworkSession.class);
        Mockito.when(networkSession.closeAsync()).thenReturn(Futures.completedWithNull());
        StepVerifier.create(new InternalRxSession(networkSession).close()).verifyComplete();
        ((NetworkSession) Mockito.verify(networkSession)).closeAsync();
        Mockito.verifyNoMoreInteractions(new Object[]{networkSession});
    }
}
