package org.neo4j.driver.integration.reactive;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.neo4j.driver.Config;
import org.neo4j.driver.Driver;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
import org.neo4j.driver.internal.util.Neo4jFeature;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.reactivestreams.ReactiveSession;
import org.neo4j.driver.testutil.DatabaseExtension;
import org.neo4j.driver.testutil.LoggingUtil;
import org.neo4j.driver.testutil.ParallelizableIT;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.NonNull;

@EnabledOnNeo4jWith(Neo4jFeature.BOLT_V4)
@ParallelizableIT
/* loaded from: input_file:org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.class */
public class ReactiveStreamsSessionIT {

    @RegisterExtension
    static final DatabaseExtension neo4j = new DatabaseExtension();

    @MethodSource({"managedTransactionsReturningReactiveResultPublisher"})
    @ParameterizedTest
    void shouldErrorWhenReactiveResultIsReturned(Function<ReactiveSession, Publisher<ReactiveResult>> function) {
        ReactiveSession session = neo4j.driver().session(ReactiveSession.class);
        Assertions.assertEquals("org.neo4j.driver.reactivestreams.ReactiveResult is not a valid return value, it should be consumed before producing a return value", Assertions.assertThrows(ClientException.class, () -> {
            Flux.from((Publisher) function.apply(session)).blockFirst();
        }).getMessage());
        Flux.from(session.close()).blockFirst();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void shouldReleaseResultsOnSubscriptionCancellation(boolean z) throws InterruptedException {
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        Driver customDriver = neo4j.customDriver(Config.builder().withDriverMetrics().withLogging(LoggingUtil.boltLogging(synchronizedList)).build());
        try {
            customDriver.verifyConnectivity();
            CompletableFuture.allOf((CompletableFuture[]) IntStream.range(0, 100).mapToObj(i -> {
                return CompletableFuture.supplyAsync(() -> {
                    final CompletableFuture completableFuture = new CompletableFuture();
                    customDriver.session(ReactiveSession.class).run("UNWIND range (0,10000) AS x RETURN x").subscribe(new BaseSubscriber<ReactiveResult>() { // from class: org.neo4j.driver.integration.reactive.ReactiveStreamsSessionIT.1
                        protected void hookOnSubscribe(@NonNull Subscription subscription) {
                            completableFuture.complete(subscription);
                        }

                        /* JADX INFO: Access modifiers changed from: protected */
                        public void hookOnNext(@NonNull ReactiveResult reactiveResult) {
                            Mono.fromDirect(reactiveResult.consume()).subscribe();
                        }
                    });
                    return completableFuture.thenApplyAsync(subscription -> {
                        if (z) {
                            subscription.request(1L);
                        }
                        subscription.cancel();
                        return subscription;
                    });
                });
            }).map(completableFuture -> {
                return completableFuture.thenCompose(Function.identity());
            }).toArray(i2 -> {
                return new CompletableFuture[i2];
            })).join();
            Instant plus = Instant.now().plus(5L, (TemporalUnit) ChronoUnit.MINUTES);
            int i3 = -1;
            while (Instant.now().isBefore(plus)) {
                i3 = customDriver.metrics().connectionPoolMetrics().stream().map((v0) -> {
                    return v0.inUse();
                }).mapToInt((v0) -> {
                    return v0.intValue();
                }).sum();
                if (i3 == 0) {
                    if (customDriver != null) {
                        customDriver.close();
                        return;
                    }
                    return;
                }
                Thread.sleep(100L);
            }
            Assertions.fail(String.format("not all connections have been released\n%d are still in use\nlatest metrics: %s\nmessage log: \n%s", Integer.valueOf(i3), customDriver.metrics().connectionPoolMetrics(), String.join("\n", synchronizedList)));
            if (customDriver != null) {
                customDriver.close();
            }
        } catch (Throwable th) {
            if (customDriver != null) {
                try {
                    customDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void shouldRollbackResultOnSubscriptionCancellation() {
        Driver customDriver = neo4j.customDriver(Config.builder().withMaxConnectionPoolSize(1).build());
        try {
            ReactiveSession session = customDriver.session(ReactiveSession.class);
            String uuid = UUID.randomUUID().toString();
            final CompletableFuture completableFuture = new CompletableFuture();
            session.run("CREATE ({id: $id})", Map.of("id", uuid)).subscribe(new BaseSubscriber<ReactiveResult>() { // from class: org.neo4j.driver.integration.reactive.ReactiveStreamsSessionIT.2
                protected void hookOnSubscribe(@NonNull Subscription subscription) {
                    subscription.cancel();
                    completableFuture.complete(null);
                }
            });
            completableFuture.join();
            Assertions.assertEquals(0L, (Long) Mono.fromDirect(session.run("MATCH (n {id: $id}) RETURN n", Map.of("id", uuid))).flatMapMany((v0) -> {
                return v0.records();
            }).count().block());
            if (customDriver != null) {
                customDriver.close();
            }
        } catch (Throwable th) {
            if (customDriver != null) {
                try {
                    customDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void shouldEmitAllSuccessfullyEmittedValues() {
        ReactiveSession session = neo4j.driver().session(ReactiveSession.class);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Assertions.assertEquals(List.of(0, 1, 0, 1, 2, 3, 4, 5), (List) Flux.from(session.executeRead(reactiveTransactionContext -> {
            Flux map = Mono.from(reactiveTransactionContext.run("UNWIND range(0, 5) AS x RETURN x")).flatMapMany((v0) -> {
                return v0.records();
            }).map(record -> {
                return Integer.valueOf(record.get("x").asInt());
            });
            return atomicBoolean.getAndSet(true) ? map : map.handle((num, synchronousSink) -> {
                if (num.intValue() == 2) {
                    synchronousSink.error(new ServiceUnavailableException("simulated"));
                } else {
                    synchronousSink.next(num);
                }
            });
        })).collectList().block());
    }

    static List<Function<ReactiveSession, Publisher<ReactiveResult>>> managedTransactionsReturningReactiveResultPublisher() {
        return List.of(reactiveSession -> {
            return reactiveSession.executeWrite(reactiveTransactionContext -> {
                return reactiveTransactionContext.run("RETURN 1");
            });
        }, reactiveSession2 -> {
            return reactiveSession2.executeRead(reactiveTransactionContext -> {
                return reactiveTransactionContext.run("RETURN 1");
            });
        });
    }
}
