package com.neo4j.gds.core.write;

import com.neo4j.gds.retries.TruncatedExponentialBackoff;
import com.neo4j.kernel.enterprise.api.security.EnterpriseLoginContext;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongUnaryOperator;
import java.util.stream.Stream;
import org.neo4j.bolt.dbapi.BoltGraphDatabaseServiceSPI;
import org.neo4j.gds.api.ExportedRelationship;
import org.neo4j.gds.api.nodeproperties.ValueType;
import org.neo4j.gds.core.concurrency.DefaultPool;
import org.neo4j.gds.core.utils.progress.tasks.ProgressTracker;
import org.neo4j.gds.core.write.RelationshipStreamExporter;
import org.neo4j.gds.termination.TerminationFlag;
import org.neo4j.values.storable.Value;

/* loaded from: input_file:com/neo4j/gds/core/write/CypherRelationshipStreamExporter.class */
public final class CypherRelationshipStreamExporter implements RelationshipStreamExporter {
    private static final int QUEUE_CAPACITY = 2;
    private final BoltGraphDatabaseServiceSPI fabricDb;
    private final EnterpriseLoginContext loginContext;
    private final LongUnaryOperator toOriginalId;
    private final Stream<ExportedRelationship> relationships;
    private final int batchSize;
    private final TerminationFlag terminationFlag;
    private final ProgressTracker progressTracker;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/neo4j/gds/core/write/CypherRelationshipStreamExporter$Buffer.class */
    public static class Buffer {
        private final long capacity;
        private final ExportedRelationship[] relationships;
        private int size;

        Buffer(int i) {
            this.relationships = new ExportedRelationship[i];
            this.capacity = i;
        }

        void add(ExportedRelationship exportedRelationship) {
            this.relationships[this.size] = exportedRelationship;
            this.size++;
        }

        boolean isFull() {
            return ((long) this.size) == this.capacity;
        }

        void reset() {
            this.size = 0;
        }
    }

    /* loaded from: input_file:com/neo4j/gds/core/write/CypherRelationshipStreamExporter$Writer.class */
    static class Writer implements Runnable {
        private final TerminationFlag terminationFlag;
        private final ProgressTracker progressTracker;
        private final LongUnaryOperator toOriginalId;
        private final BlockingQueue<Buffer> writeQueue;
        private final BlockingQueue<Buffer> bufferPool;
        private final String relationshipType;
        private final String[] propertyKeys;
        private final QueryRunner queryRunner;
        private long written;

        Writer(LongUnaryOperator longUnaryOperator, BlockingQueue<Buffer> blockingQueue, BlockingQueue<Buffer> blockingQueue2, String str, String[] strArr, TerminationFlag terminationFlag, ProgressTracker progressTracker, QueryRunner queryRunner) {
            this.toOriginalId = longUnaryOperator;
            this.writeQueue = blockingQueue;
            this.bufferPool = blockingQueue2;
            this.relationshipType = str;
            this.propertyKeys = strArr == null ? new String[0] : strArr;
            this.terminationFlag = terminationFlag;
            this.progressTracker = progressTracker;
            this.queryRunner = queryRunner;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    Buffer take = this.writeQueue.take();
                    if (take.size == 0) {
                        return;
                    }
                    this.written += write(take, this.relationshipType, this.propertyKeys);
                    this.progressTracker.logProgress(this.written, "has written %d relationships");
                    take.reset();
                    this.bufferPool.put(take);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            }
        }

        private int write(Buffer buffer, String str, String... strArr) {
            int i = buffer.size;
            int length = strArr.length;
            ExportedRelationship[] exportedRelationshipArr = buffer.relationships;
            this.terminationFlag.assertRunning();
            ParameterizedRelationshipsQuery parameterizedRelationshipsQuery = new ParameterizedRelationshipsQuery(str);
            for (int i2 = 0; i2 < i; i2++) {
                ExportedRelationship exportedRelationship = exportedRelationshipArr[i2];
                Value[] values = exportedRelationship.values();
                RelationshipKeyValue[] relationshipKeyValueArr = new RelationshipKeyValue[length];
                for (int i3 = 0; i3 < length; i3++) {
                    relationshipKeyValueArr[i3] = RelationshipKeyValue.of(strArr[i3], values[i3]);
                }
                parameterizedRelationshipsQuery.addParameters(this.toOriginalId.applyAsLong(exportedRelationship.sourceNode()), this.toOriginalId.applyAsLong(exportedRelationship.targetNode()), relationshipKeyValueArr);
            }
            return this.queryRunner.runQuery(parameterizedRelationshipsQuery.query(), parameterizedRelationshipsQuery.queryParameters());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CypherRelationshipStreamExporter(BoltGraphDatabaseServiceSPI boltGraphDatabaseServiceSPI, EnterpriseLoginContext enterpriseLoginContext, LongUnaryOperator longUnaryOperator, Stream<ExportedRelationship> stream, int i, TerminationFlag terminationFlag, ProgressTracker progressTracker) {
        this.fabricDb = boltGraphDatabaseServiceSPI;
        this.loginContext = enterpriseLoginContext;
        this.toOriginalId = longUnaryOperator;
        this.relationships = (Stream) stream.sequential();
        this.batchSize = i;
        this.terminationFlag = terminationFlag;
        this.progressTracker = progressTracker;
    }

    @Override // org.neo4j.gds.core.write.RelationshipStreamExporter
    public long write(String str, List<String> list, List<ValueType> list2) {
        this.progressTracker.beginSubTask();
        try {
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(2);
            LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue(2);
            for (int i = 0; i < 2; i++) {
                linkedBlockingQueue2.add(new Buffer(this.batchSize));
            }
            Writer writer = new Writer(this.toOriginalId, linkedBlockingQueue, linkedBlockingQueue2, str, (String[]) list.toArray(new String[0]), this.terminationFlag, this.progressTracker, new QueryRunnerRetryDecorator(TruncatedExponentialBackoff::create, new DefaultQueryRunner(this.fabricDb, this.loginContext, (v0) -> {
                return v0.getRelationshipsCreated();
            })));
            Future<?> submit = DefaultPool.INSTANCE.submit(writer);
            AtomicReference atomicReference = new AtomicReference((Buffer) linkedBlockingQueue2.poll());
            this.relationships.forEach(exportedRelationship -> {
                Buffer buffer = (Buffer) atomicReference.get();
                buffer.add(exportedRelationship);
                if (buffer.isFull()) {
                    try {
                        linkedBlockingQueue.put(buffer);
                        atomicReference.set((Buffer) linkedBlockingQueue2.take());
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    }
                }
            });
            try {
                linkedBlockingQueue.put((Buffer) atomicReference.get());
                linkedBlockingQueue.put(new Buffer(0));
                submit.get();
                long j = writer.written;
                this.progressTracker.endSubTask();
                return j;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            } catch (ExecutionException e2) {
                throw new RuntimeException(e2);
            }
        } catch (Throwable th) {
            this.progressTracker.endSubTask();
            throw th;
        }
    }
}
