package org.neo4j.driver.integration.reactive;

import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.neo4j.driver.Config;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.TransactionTerminatedException;
import org.neo4j.driver.internal.reactivestreams.InternalReactiveTransaction;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.reactivestreams.ReactiveSession;
import org.neo4j.driver.reactivestreams.ReactiveTransaction;
import org.neo4j.driver.testutil.DatabaseExtension;
import org.neo4j.driver.testutil.ParallelizableIT;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@ParallelizableIT
/* loaded from: input_file:org/neo4j/driver/integration/reactive/ReactiveTransactionIT.class */
class ReactiveTransactionIT {

    @RegisterExtension
    static final DatabaseExtension neo4j = new DatabaseExtension();

    ReactiveTransactionIT() {
    }

    @Test
    void shouldPreventPullAfterTransactionTermination() {
        ReactiveTransaction reactiveTransaction = (ReactiveTransaction) Mono.fromDirect(neo4j.driver().session(ReactiveSession.class).beginTransaction()).block();
        Assertions.assertNotNull(reactiveTransaction);
        long fetchSize = Config.defaultConfig().fetchSize() + 1;
        ReactiveResult reactiveResult = (ReactiveResult) Mono.fromDirect(reactiveTransaction.run("UNWIND range(1, $limit) AS x RETURN x", Map.of("limit", Long.valueOf(fetchSize)))).block();
        ReactiveResult reactiveResult2 = (ReactiveResult) Mono.fromDirect(reactiveTransaction.run("UNWIND range(1, $limit) AS x RETURN x", Map.of("limit", Long.valueOf(fetchSize)))).block();
        ClientException assertThrows = Assertions.assertThrows(ClientException.class, () -> {
            Mono.fromDirect(reactiveTransaction.run("invalid")).block();
        });
        Assertions.assertEquals(assertThrows.code(), "Neo.ClientError.Statement.SyntaxError");
        Assertions.assertNotNull(reactiveResult);
        Assertions.assertNotNull(reactiveResult2);
        for (ReactiveResult reactiveResult3 : List.of(reactiveResult, reactiveResult2)) {
            Assertions.assertEquals(assertThrows, Assertions.assertThrows(ClientException.class, () -> {
                Flux.from(reactiveResult3.records()).blockFirst();
            }).getCause());
        }
        Mono.fromDirect(reactiveTransaction.close()).block();
    }

    @Test
    void shouldPreventDiscardAfterTransactionTermination() {
        ReactiveTransaction reactiveTransaction = (ReactiveTransaction) Mono.fromDirect(neo4j.driver().session(ReactiveSession.class).beginTransaction()).block();
        Assertions.assertNotNull(reactiveTransaction);
        long fetchSize = Config.defaultConfig().fetchSize() + 1;
        ReactiveResult reactiveResult = (ReactiveResult) Mono.fromDirect(reactiveTransaction.run("UNWIND range(1, $limit) AS x RETURN x", Map.of("limit", Long.valueOf(fetchSize)))).block();
        ReactiveResult reactiveResult2 = (ReactiveResult) Mono.fromDirect(reactiveTransaction.run("UNWIND range(1, $limit) AS x RETURN x", Map.of("limit", Long.valueOf(fetchSize)))).block();
        ClientException assertThrows = Assertions.assertThrows(ClientException.class, () -> {
            Mono.fromDirect(reactiveTransaction.run("invalid")).block();
        });
        Assertions.assertEquals(assertThrows.code(), "Neo.ClientError.Statement.SyntaxError");
        Assertions.assertNotNull(reactiveResult);
        Assertions.assertNotNull(reactiveResult2);
        for (ReactiveResult reactiveResult3 : List.of(reactiveResult, reactiveResult2)) {
            Assertions.assertEquals(assertThrows, Assertions.assertThrows(ClientException.class, () -> {
                Mono.fromDirect(reactiveResult3.consume()).block();
            }).getCause());
        }
        Mono.fromDirect(reactiveTransaction.close()).block();
    }

    @Test
    void shouldPreventRunAfterTransactionTermination() {
        ReactiveTransaction reactiveTransaction = (ReactiveTransaction) Mono.fromDirect(neo4j.driver().session(ReactiveSession.class).beginTransaction()).block();
        Assertions.assertNotNull(reactiveTransaction);
        ClientException assertThrows = Assertions.assertThrows(ClientException.class, () -> {
            Mono.fromDirect(reactiveTransaction.run("invalid")).block();
        });
        Assertions.assertEquals(assertThrows.code(), "Neo.ClientError.Statement.SyntaxError");
        Assertions.assertEquals(assertThrows, Assertions.assertThrows(TransactionTerminatedException.class, () -> {
            Mono.fromDirect(reactiveTransaction.run("RETURN 1")).block();
        }).getCause());
        Mono.fromDirect(reactiveTransaction.close()).block();
    }

    @Test
    void shouldPreventPullAfterDriverTransactionTermination() {
        InternalReactiveTransaction internalReactiveTransaction = (InternalReactiveTransaction) Mono.fromDirect(neo4j.driver().session(ReactiveSession.class).beginTransaction()).block();
        Assertions.assertNotNull(internalReactiveTransaction);
        long fetchSize = Config.defaultConfig().fetchSize() + 1;
        ReactiveResult reactiveResult = (ReactiveResult) Mono.fromDirect(internalReactiveTransaction.run("UNWIND range(1, $limit) AS x RETURN x", Map.of("limit", Long.valueOf(fetchSize)))).block();
        ReactiveResult reactiveResult2 = (ReactiveResult) Mono.fromDirect(internalReactiveTransaction.run("UNWIND range(1, $limit) AS x RETURN x", Map.of("limit", Long.valueOf(fetchSize)))).block();
        Mono.fromDirect(internalReactiveTransaction.terminate()).block();
        Assertions.assertNotNull(reactiveResult);
        Assertions.assertNotNull(reactiveResult2);
        for (ReactiveResult reactiveResult3 : List.of(reactiveResult, reactiveResult2)) {
            Assertions.assertThrows(TransactionTerminatedException.class, () -> {
                Flux.from(reactiveResult3.records()).blockFirst();
            });
        }
        Mono.fromDirect(internalReactiveTransaction.close()).block();
    }

    @Test
    void shouldPreventDiscardAfterDriverTransactionTermination() {
        InternalReactiveTransaction internalReactiveTransaction = (InternalReactiveTransaction) Mono.fromDirect(neo4j.driver().session(ReactiveSession.class).beginTransaction()).block();
        Assertions.assertNotNull(internalReactiveTransaction);
        long fetchSize = Config.defaultConfig().fetchSize() + 1;
        ReactiveResult reactiveResult = (ReactiveResult) Mono.fromDirect(internalReactiveTransaction.run("UNWIND range(1, $limit) AS x RETURN x", Map.of("limit", Long.valueOf(fetchSize)))).block();
        ReactiveResult reactiveResult2 = (ReactiveResult) Mono.fromDirect(internalReactiveTransaction.run("UNWIND range(1, $limit) AS x RETURN x", Map.of("limit", Long.valueOf(fetchSize)))).block();
        Mono.fromDirect(internalReactiveTransaction.terminate()).block();
        Assertions.assertNotNull(reactiveResult);
        Assertions.assertNotNull(reactiveResult2);
        for (ReactiveResult reactiveResult3 : List.of(reactiveResult, reactiveResult2)) {
            Assertions.assertThrows(TransactionTerminatedException.class, () -> {
                Mono.fromDirect(reactiveResult3.consume()).block();
            });
        }
        Mono.fromDirect(internalReactiveTransaction.close()).block();
    }

    @Test
    void shouldPreventRunAfterDriverTransactionTermination() {
        InternalReactiveTransaction internalReactiveTransaction = (InternalReactiveTransaction) Mono.fromDirect(neo4j.driver().session(ReactiveSession.class).beginTransaction()).block();
        Assertions.assertNotNull(internalReactiveTransaction);
        Mono.fromDirect(internalReactiveTransaction.run("UNWIND range(1, $limit) AS x RETURN x", Map.of("limit", Long.valueOf(Config.defaultConfig().fetchSize() + 1)))).block();
        Mono.fromDirect(internalReactiveTransaction.terminate()).block();
        Assertions.assertThrows(TransactionTerminatedException.class, () -> {
            Mono.fromDirect(internalReactiveTransaction.run("UNWIND range(0, 5) AS x RETURN x")).block();
        });
        Mono.fromDirect(internalReactiveTransaction.close()).block();
    }

    @Test
    void shouldTerminateTransactionAndHandleFailureResponseOrPreventFurtherPulls() {
        InternalReactiveTransaction internalReactiveTransaction = (InternalReactiveTransaction) Mono.fromDirect(neo4j.driver().session(ReactiveSession.class).beginTransaction()).block();
        Assertions.assertNotNull(internalReactiveTransaction);
        ReactiveResult reactiveResult = (ReactiveResult) Mono.fromDirect(internalReactiveTransaction.run("UNWIND range(1, $limit) AS x RETURN x", Map.of("limit", Long.valueOf(Config.defaultConfig().fetchSize() + 1)))).block();
        Mono.fromDirect(internalReactiveTransaction.terminate()).block();
        Assertions.assertNotNull(reactiveResult);
        Assertions.assertThrows(TransactionTerminatedException.class, () -> {
            Flux.from(reactiveResult.records()).blockLast();
        });
        Mono.fromDirect(internalReactiveTransaction.close()).block();
    }
}
