package org.neo4j.driver.internal.cursor;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.locks.Lock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.neo4j.bolt.connection.BoltProtocolVersion;
import org.neo4j.bolt.connection.summary.RunSummary;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.Value;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.exceptions.TransactionNestingException;
import org.neo4j.driver.internal.DatabaseBookmark;
import org.neo4j.driver.internal.GqlStatusError;
import org.neo4j.driver.internal.InternalRecord;
import org.neo4j.driver.internal.adaptedbolt.DriverBoltConnection;
import org.neo4j.driver.internal.adaptedbolt.DriverResponseHandler;
import org.neo4j.driver.internal.adaptedbolt.summary.DiscardSummary;
import org.neo4j.driver.internal.adaptedbolt.summary.PullSummary;
import org.neo4j.driver.internal.cursor.AbstractRecordStateResponseHandler;
import org.neo4j.driver.internal.types.InternalTypeSystem;
import org.neo4j.driver.internal.util.ErrorUtil;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.internal.util.MetadataExtractor;
import org.neo4j.driver.summary.GqlStatusObject;
import org.neo4j.driver.summary.ResultSummary;

/* loaded from: input_file:org/neo4j/driver/internal/cursor/RxResultCursorImpl.class */
public class RxResultCursorImpl extends AbstractRecordStateResponseHandler implements RxResultCursor, DriverResponseHandler {
    private static final MetadataExtractor METADATA_EXTRACTOR = new MetadataExtractor("t_last");
    private static final ClientException IGNORED_ERROR = new ClientException(GqlStatusError.UNKNOWN.getStatus(), GqlStatusError.UNKNOWN.getStatusDescription("A message has been ignored during result streaming."), "N/A", "A message has been ignored during result streaming.", GqlStatusError.DIAGNOSTIC_RECORD, null);
    private static final Runnable NOOP_RUNNABLE = () -> {
    };
    private static final BiConsumer<Record, Throwable> NOOP_CONSUMER = (record, th) -> {
    };
    private static final RunSummary EMPTY_RUN_SUMMARY = new RunSummary() { // from class: org.neo4j.driver.internal.cursor.RxResultCursorImpl.1
        public long queryId() {
            return -1L;
        }

        public List<String> keys() {
            return List.of();
        }

        public long resultAvailableAfter() {
            return -1L;
        }

        public Optional<String> databaseName() {
            return Optional.empty();
        }
    };
    private final Logger log;
    private final DriverBoltConnection boltConnection;
    private final Lock boltConnectionLock;
    private final Query query;
    private final RunSummary runSummary;
    private final Throwable runError;
    private final Consumer<DatabaseBookmark> bookmarkConsumer;
    private final boolean closeOnSummary;
    private final boolean legacyNotifications;
    private boolean discardPending;
    private boolean runErrorExposed;
    private boolean summaryExposed;
    private BiConsumer<Record, Throwable> recordConsumer;
    private long outstandingDemand;
    private PullSummary pullSummary;
    private DiscardSummary discardSummary;
    private Throwable error;
    private final CompletableFuture<ResultSummary> summaryFuture = new CompletableFuture<>();
    private final CompletableFuture<Void> consumedFuture = new CompletableFuture<>();
    private State state = State.READY;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/driver/internal/cursor/RxResultCursorImpl$State.class */
    public enum State {
        READY,
        STREAMING,
        DISCARDING,
        FAILED,
        SUCCEEDED
    }

    public RxResultCursorImpl(DriverBoltConnection driverBoltConnection, Lock lock, Query query, RunSummary runSummary, Throwable th, Consumer<DatabaseBookmark> consumer, boolean z, Logging logging) {
        this.boltConnection = (DriverBoltConnection) Objects.requireNonNull(driverBoltConnection);
        this.boltConnectionLock = (Lock) Objects.requireNonNull(lock);
        this.legacyNotifications = new BoltProtocolVersion(5, 5).compareTo(driverBoltConnection.protocolVersion()) > 0;
        this.query = query;
        this.runSummary = th == null ? runSummary : EMPTY_RUN_SUMMARY;
        this.runError = th;
        this.bookmarkConsumer = consumer;
        this.closeOnSummary = z;
        this.log = logging.getLog(getClass());
        this.log.trace("[%d] New instance (runError=%s)", Integer.valueOf(hashCode()), throwableName(th));
    }

    @Override // org.neo4j.driver.internal.cursor.RxResultCursor
    public synchronized Throwable getRunError() {
        this.log.trace("[%d] Run error explicitly retrieved (value=%s)", Integer.valueOf(hashCode()), throwableName(this.runError));
        this.runErrorExposed = true;
        return this.runError;
    }

    @Override // org.neo4j.driver.internal.cursor.RxResultCursor
    public List<String> keys() {
        return this.runSummary.keys();
    }

    @Override // org.neo4j.driver.internal.FailableCursor
    public CompletionStage<Void> consumed() {
        return this.consumedFuture;
    }

    @Override // org.neo4j.driver.internal.cursor.RxResultCursor
    public boolean isDone() {
        return this.summaryFuture.isDone();
    }

    @Override // org.neo4j.driver.internal.cursor.RxResultCursor
    public void installRecordConsumer(BiConsumer<Record, Throwable> biConsumer) {
        Objects.requireNonNull(biConsumer);
        if (this.summaryExposed) {
            throw ErrorUtil.newResultConsumedError();
        }
        Runnable runnable = NOOP_RUNNABLE;
        synchronized (this) {
            if (this.recordConsumer == null) {
                this.recordConsumer = safeRecordConsumer(biConsumer);
                this.log.trace("[%d] Record consumer installed", Integer.valueOf(hashCode()));
                if (this.runError != null) {
                    handleError(this.runError);
                    runnable = this::onComplete;
                }
            } else {
                this.log.warn("[%d] Only one record consumer is supported, this request will be ignored", Integer.valueOf(hashCode()));
            }
        }
        runnable.run();
    }

    public void request(long j) {
        if (j <= 0) {
            this.log.warn("[%d] %d records requested, negative amounts are ignored", Integer.valueOf(hashCode()), Long.valueOf(j));
            return;
        }
        Runnable runnable = NOOP_RUNNABLE;
        synchronized (this) {
            updateRecordState(AbstractRecordStateResponseHandler.RecordState.NO_RECORD);
            this.log.trace("[%d] %d records requested in %s state", Integer.valueOf(hashCode()), Long.valueOf(j), this.state);
            switch (this.state) {
                case READY:
                    long appendDemand = appendDemand(j);
                    this.state = State.STREAMING;
                    runnable = () -> {
                        this.boltConnection.onLoop(() -> {
                            this.boltConnectionLock.lock();
                            return this.boltConnection.pull(this.runSummary.queryId(), appendDemand).thenCompose(driverBoltConnection -> {
                                return driverBoltConnection.flush(this);
                            }).whenComplete((r4, th) -> {
                                this.boltConnectionLock.unlock();
                                Throwable completionExceptionCause = Futures.completionExceptionCause(th);
                                if (completionExceptionCause != null) {
                                    handleError(completionExceptionCause);
                                    onComplete();
                                }
                            });
                        });
                    };
                    break;
                case STREAMING:
                    appendDemand(j);
                    break;
            }
        }
        runnable.run();
    }

    public void cancel() {
        Runnable runnable = NOOP_RUNNABLE;
        synchronized (this) {
            this.log.trace("[%d] Cancellation requested in %s state", Integer.valueOf(hashCode()), this.state);
            switch (this.state) {
                case READY:
                    runnable = setupDiscardRunnable();
                    break;
                case STREAMING:
                    this.discardPending = true;
                    break;
            }
        }
        runnable.run();
    }

    @Override // org.neo4j.driver.internal.cursor.RxResultCursor
    public CompletionStage<ResultSummary> summaryAsync() {
        Runnable runnable = NOOP_RUNNABLE;
        synchronized (this) {
            this.log.trace("[%d] Summary requested in %s state", Integer.valueOf(hashCode()), this.state);
            if (this.summaryExposed) {
                return this.summaryFuture;
            }
            this.summaryExposed = true;
            switch (this.state) {
                case READY:
                    if (this.runError != null && this.recordConsumer == null) {
                        handleError(this.runError);
                        runnable = this::onComplete;
                        break;
                    } else {
                        runnable = setupDiscardRunnable();
                        break;
                    }
                case STREAMING:
                    this.discardPending = true;
                    break;
            }
            runnable.run();
            return this.summaryFuture;
        }
    }

    @Override // org.neo4j.driver.internal.cursor.RxResultCursor
    public CompletionStage<Void> rollback() {
        synchronized (this) {
            this.log.trace("[%d] Rolling back unpublished result %s state", Integer.valueOf(hashCode()), this.state);
            switch (this.state) {
                case READY:
                    this.state = State.SUCCEEDED;
                    break;
                case STREAMING:
                case DISCARDING:
                    return this.summaryFuture.thenApply(resultSummary -> {
                        return null;
                    });
                case FAILED:
                case SUCCEEDED:
                    return CompletableFuture.completedFuture(null);
            }
            CompletableFuture completableFuture = new CompletableFuture();
            this.boltConnection.onLoop(() -> {
                this.boltConnectionLock.lock();
                return this.boltConnection.reset().thenCompose(driverBoltConnection -> {
                    return driverBoltConnection.flush(new DriverResponseHandler() { // from class: org.neo4j.driver.internal.cursor.RxResultCursorImpl.2
                        Throwable throwable = null;

                        @Override // org.neo4j.driver.internal.adaptedbolt.DriverResponseHandler
                        public void onError(Throwable th) {
                            this.throwable = Futures.completionExceptionCause(th);
                        }

                        @Override // org.neo4j.driver.internal.adaptedbolt.DriverResponseHandler
                        public void onComplete() {
                            if (this.throwable != null) {
                                completableFuture.completeExceptionally(this.throwable);
                            } else {
                                completableFuture.complete(null);
                            }
                        }
                    });
                }).whenComplete((r5, th) -> {
                    this.boltConnectionLock.unlock();
                    Throwable completionExceptionCause = Futures.completionExceptionCause(th);
                    if (completionExceptionCause != null) {
                        completableFuture.completeExceptionally(completionExceptionCause);
                    }
                });
            });
            return completableFuture.thenCompose(r4 -> {
                return this.boltConnection.onLoop(() -> {
                    this.boltConnectionLock.lock();
                    return this.boltConnection.close().whenComplete((r3, th) -> {
                        this.boltConnectionLock.unlock();
                    });
                });
            }).thenCompose(Function.identity()).whenComplete((r5, th) -> {
                completeSummaryFuture(null, null);
            }).exceptionally(th2 -> {
                return null;
            });
        }
    }

    @Override // org.neo4j.driver.internal.adaptedbolt.DriverResponseHandler
    public void onComplete() {
        Runnable runnable;
        synchronized (this) {
            this.log.trace("[%d] onComplete", Integer.valueOf(hashCode()));
            runnable = this.error != null ? setupCompletionRunnableWithError(this.error) : this.pullSummary != null ? setupCompletionRunnableWithPullSummary() : this.discardSummary != null ? setupCompletionRunnableWithSummaryMetadata(this.discardSummary.metadata()) : () -> {
                this.log.trace("[%d] onComplete resulted in no action", Integer.valueOf(hashCode()));
            };
        }
        runnable.run();
    }

    @Override // org.neo4j.driver.internal.adaptedbolt.DriverResponseHandler
    public synchronized void onError(Throwable th) {
        this.log.trace("[%d] onError", Integer.valueOf(hashCode()));
        handleError(th);
    }

    @Override // org.neo4j.driver.internal.adaptedbolt.DriverResponseHandler
    public synchronized void onIgnored() {
        this.log.trace("[%d] onIgnored", Integer.valueOf(hashCode()));
        handleError(IGNORED_ERROR);
    }

    @Override // org.neo4j.driver.internal.adaptedbolt.DriverResponseHandler
    public void onRecord(Value[] valueArr) {
        this.log.trace("[%d] onRecord", Integer.valueOf(hashCode()));
        synchronized (this) {
            updateRecordState(AbstractRecordStateResponseHandler.RecordState.HAD_RECORD);
            decrementDemand();
        }
        this.recordConsumer.accept(new InternalRecord((List<String>) this.runSummary.keys(), valueArr), null);
    }

    @Override // org.neo4j.driver.internal.adaptedbolt.DriverResponseHandler
    public synchronized void onPullSummary(PullSummary pullSummary) {
        this.log.trace("[%d] onPullSummary", Integer.valueOf(hashCode()));
        this.pullSummary = pullSummary;
    }

    @Override // org.neo4j.driver.internal.adaptedbolt.DriverResponseHandler
    public synchronized void onDiscardSummary(DiscardSummary discardSummary) {
        this.log.trace("[%d] onDiscardSummary", Integer.valueOf(hashCode()));
        this.discardSummary = discardSummary;
    }

    @Override // org.neo4j.driver.internal.FailableCursor
    public synchronized CompletionStage<Throwable> discardAllFailureAsync() {
        this.log.trace("[%d] Discard all requested", Integer.valueOf(hashCode()));
        boolean z = this.summaryExposed;
        boolean z2 = this.runErrorExposed;
        return summaryAsync().thenApply(resultSummary -> {
            return (Throwable) null;
        }).exceptionally(th -> {
            if (z2 || z) {
                return null;
            }
            return th;
        });
    }

    @Override // org.neo4j.driver.internal.FailableCursor
    public synchronized CompletionStage<Throwable> pullAllFailureAsync() {
        boolean z;
        this.log.trace("[%d] Pull all failure requested", Integer.valueOf(hashCode()));
        switch (this.state) {
            case READY:
            case STREAMING:
            case DISCARDING:
                z = true;
                break;
            case FAILED:
            case SUCCEEDED:
                z = false;
                break;
            default:
                throw new IncompatibleClassChangeError();
        }
        return (this.recordConsumer == null || !z) ? discardAllFailureAsync() : CompletableFuture.completedFuture(new TransactionNestingException("You cannot run another query or begin a new transaction in the same session before you've fully consumed the previous run result."));
    }

    private synchronized long appendDemand(long j) {
        if (j == Long.MAX_VALUE) {
            this.outstandingDemand = -1L;
        } else {
            try {
                this.outstandingDemand = Math.addExact(this.outstandingDemand, j);
            } catch (ArithmeticException e) {
                this.outstandingDemand = -1L;
            }
        }
        this.log.trace("[%d] Appended demand, outstanding is %d", Integer.valueOf(hashCode()), Long.valueOf(this.outstandingDemand));
        return this.outstandingDemand;
    }

    private synchronized long getDemand() {
        this.log.trace("[%d] Get demand, outstanding is %d", Integer.valueOf(hashCode()), Long.valueOf(this.outstandingDemand));
        return this.outstandingDemand;
    }

    private synchronized void decrementDemand() {
        if (this.outstandingDemand > 0) {
            this.outstandingDemand--;
        }
        this.log.trace("[%d] Decremented demand, outstanding is %d", Integer.valueOf(hashCode()), Long.valueOf(this.outstandingDemand));
    }

    private synchronized Runnable setupDiscardRunnable() {
        this.state = State.DISCARDING;
        return () -> {
            this.boltConnection.onLoop(() -> {
                this.boltConnectionLock.lock();
                return this.boltConnection.discard(this.runSummary.queryId(), -1L).thenCompose(driverBoltConnection -> {
                    return driverBoltConnection.flush(this);
                }).whenComplete((r4, th) -> {
                    this.boltConnectionLock.unlock();
                    Throwable completionExceptionCause = Futures.completionExceptionCause(th);
                    if (completionExceptionCause != null) {
                        handleError(completionExceptionCause);
                        onComplete();
                    }
                });
            });
        };
    }

    private synchronized Runnable setupCompletionRunnableWithPullSummary() {
        this.log.trace("[%d] Setting up completion with pull summary (hasMore=%b)", Integer.valueOf(hashCode()), Boolean.valueOf(this.pullSummary.hasMore()));
        Runnable runnable = NOOP_RUNNABLE;
        if (this.pullSummary.hasMore()) {
            this.pullSummary = null;
            if (this.discardPending) {
                this.discardPending = false;
                this.state = State.DISCARDING;
                runnable = () -> {
                    this.boltConnection.onLoop(() -> {
                        this.boltConnectionLock.lock();
                        return this.boltConnection.discard(this.runSummary.queryId(), -1L).thenCompose(driverBoltConnection -> {
                            return driverBoltConnection.flush(this);
                        }).whenComplete((r4, th) -> {
                            this.boltConnectionLock.unlock();
                            Throwable completionExceptionCause = Futures.completionExceptionCause(th);
                            if (completionExceptionCause != null) {
                                handleError(completionExceptionCause);
                                onComplete();
                            }
                        });
                    });
                };
            } else {
                long demand = getDemand();
                if (demand != 0) {
                    this.state = State.STREAMING;
                    runnable = () -> {
                        this.boltConnection.onLoop(() -> {
                            this.boltConnectionLock.lock();
                            return this.boltConnection.pull(this.runSummary.queryId(), demand > 0 ? demand : -1L).thenCompose(driverBoltConnection -> {
                                return driverBoltConnection.flush(this);
                            }).whenComplete((r4, th) -> {
                                this.boltConnectionLock.unlock();
                                Throwable completionExceptionCause = Futures.completionExceptionCause(th);
                                if (completionExceptionCause != null) {
                                    handleError(completionExceptionCause);
                                    onComplete();
                                }
                            });
                        });
                    };
                } else {
                    this.state = State.READY;
                }
            }
        } else {
            runnable = setupCompletionRunnableWithSummaryMetadata(this.pullSummary.metadata());
        }
        return runnable;
    }

    private synchronized Runnable setupCompletionRunnableWithSummaryMetadata(Map<String, Value> map) {
        Runnable runnable;
        this.log.trace("[%d] Setting up completion with summary metadata", Integer.valueOf(hashCode()));
        Runnable runnable2 = NOOP_RUNNABLE;
        ResultSummary resultSummary = null;
        try {
            resultSummary = resultSummary(map, generateGqlStatusObject(this.runSummary.keys()));
            this.state = State.SUCCEEDED;
        } catch (Throwable th) {
            handleError(th);
        }
        if (resultSummary != null) {
            databaseBookmark(map).ifPresent(this.bookmarkConsumer);
            Runnable runnable3 = setupSummaryAndRecordCompletionRunnable(resultSummary, null);
            runnable = () -> {
                closeBoltConnection(runnable3);
            };
        } else {
            runnable = this::onComplete;
        }
        return runnable;
    }

    private ResultSummary resultSummary(Map<String, Value> map, GqlStatusObject gqlStatusObject) {
        return METADATA_EXTRACTOR.extractSummary(this.query, this.boltConnection, this.runSummary.resultAvailableAfter(), map, this.legacyNotifications, gqlStatusObject);
    }

    private static Optional<DatabaseBookmark> databaseBookmark(Map<String, Value> map) {
        DatabaseBookmark databaseBookmark = null;
        Value value = map.get("bookmark");
        if (value != null && !value.isNull() && value.hasType(InternalTypeSystem.TYPE_SYSTEM.STRING())) {
            String asString = value.asString();
            if (!asString.isEmpty()) {
                databaseBookmark = new DatabaseBookmark(null, Bookmark.from(asString));
            }
        }
        return Optional.ofNullable(databaseBookmark);
    }

    private synchronized Runnable setupCompletionRunnableWithError(Throwable th) {
        this.log.trace("[%d] Setting up completion with error %s", Integer.valueOf(hashCode()), throwableName(th));
        ResultSummary resultSummary = null;
        try {
            resultSummary = resultSummary(Collections.emptyMap(), null);
        } catch (Throwable th2) {
            this.log.error(String.format("[%d] Failed to parse summary", Integer.valueOf(hashCode())), th2);
        }
        Runnable runnable = setupSummaryAndRecordCompletionRunnable(resultSummary, th);
        return () -> {
            closeBoltConnection(runnable);
        };
    }

    private void closeBoltConnection(Runnable runnable) {
        CompletionStage completedStage = CompletableFuture.completedStage(null);
        if (this.closeOnSummary) {
            completedStage = completedStage.thenCompose(r4 -> {
                return this.boltConnection.onLoop(() -> {
                    this.boltConnectionLock.lock();
                    return this.boltConnection.close().whenComplete((r3, th) -> {
                        this.boltConnectionLock.unlock();
                    });
                });
            }).thenCompose(Function.identity());
        }
        completedStage.whenComplete((r9, th) -> {
            if (this.log.isTraceEnabled() && th != null) {
                this.log.error(String.format("[%d] Failed to close connection", Integer.valueOf(hashCode())), Futures.completionExceptionCause(th));
            }
            runnable.run();
        });
    }

    private synchronized void handleError(Throwable th) {
        if (this.log.isTraceEnabled()) {
            this.log.error(String.format("[%d] handleError", Integer.valueOf(hashCode())), th);
        }
        this.state = State.FAILED;
        Throwable completionExceptionCause = Futures.completionExceptionCause(th);
        if (this.error == null) {
            this.error = completionExceptionCause;
            return;
        }
        if (completionExceptionCause == IGNORED_ERROR) {
            return;
        }
        if (this.error == IGNORED_ERROR || ((this.error instanceof Neo4jException) && !(completionExceptionCause instanceof Neo4jException))) {
            this.error = completionExceptionCause;
        }
    }

    private synchronized Runnable setupSummaryAndRecordCompletionRunnable(ResultSummary resultSummary, Throwable th) {
        BiConsumer<Record, Throwable> biConsumer = this.recordConsumer;
        this.recordConsumer = NOOP_CONSUMER;
        return () -> {
            if (th == null) {
                completeSummaryFuture(resultSummary, null);
                if (biConsumer != null) {
                    biConsumer.accept(null, null);
                    return;
                }
                return;
            }
            if (biConsumer == null || biConsumer == NOOP_CONSUMER) {
                completeSummaryFuture(null, th);
            } else {
                completeSummaryFuture(resultSummary, null);
                biConsumer.accept(null, th);
            }
        };
    }

    private void completeSummaryFuture(ResultSummary resultSummary, Throwable th) {
        Throwable completionExceptionCause = Futures.completionExceptionCause(th);
        this.log.trace("[%d] Completing summary future (summary=%s, throwable=%s)", Integer.valueOf(hashCode()), hash(resultSummary), throwableName(completionExceptionCause));
        if (completionExceptionCause != null) {
            this.consumedFuture.completeExceptionally(completionExceptionCause);
            this.summaryFuture.completeExceptionally(completionExceptionCause);
        } else {
            this.consumedFuture.complete(null);
            this.summaryFuture.complete(resultSummary);
        }
    }

    private BiConsumer<Record, Throwable> safeRecordConsumer(BiConsumer<Record, Throwable> biConsumer) {
        return (record, th) -> {
            try {
                biConsumer.accept(record, th);
                this.log.trace("[%d] Record consumer notified with (record=%s, throwable=%s)", Integer.valueOf(hashCode()), hash(record), throwableName(th));
            } catch (Throwable th) {
                this.log.error(String.format("[%d] Record consumer threw an error when notified with (record=%s, throwable=%s), this will be ignored", Integer.valueOf(hashCode()), hash(record), throwableName(th)), th);
            }
        };
    }

    private String hash(Object obj) {
        return obj == null ? "null" : String.valueOf(obj.hashCode());
    }

    private String throwableName(Throwable th) {
        return th == null ? "null" : th.getClass().getCanonicalName();
    }
}
