package com.neo4j.gds.arrow.server.handlers.v1;

import com.neo4j.gds.arrow.core.api.Command;
import com.neo4j.gds.arrow.core.metrics.FlightMetrics;
import com.neo4j.gds.arrow.core.process.ProcessIdentifier;
import com.neo4j.gds.arrow.core.process.ProcessRegistry;
import com.neo4j.gds.arrow.server.actions.v1.GdsFlightServerCommands;
import com.neo4j.gds.arrow.server.api.GetStreamHandler;
import com.neo4j.gds.arrow.server.export.ExportProcess;
import com.neo4j.gds.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import com.neo4j.gds.shaded.org.apache.arrow.flight.FlightProducer;
import com.neo4j.gds.shaded.org.apache.arrow.memory.BufferAllocator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
import org.neo4j.dbms.api.DatabaseNotFoundException;
import org.neo4j.gds.api.DatabaseId;
import org.neo4j.gds.core.utils.progress.TaskStore;
import org.neo4j.gds.core.utils.progress.TaskStoreService;
import org.neo4j.gds.logging.Log;
import org.neo4j.gds.utils.StringFormatting;

/* loaded from: input_file:com/neo4j/gds/arrow/server/handlers/v1/GetStream.class */
public class GetStream implements GetStreamHandler {
    private final IntSupplier batchSizeSupplier;
    private final ProcessRegistry processRegistry;
    private final BufferAllocator bufferAllocator;
    private final ExecutorService executorService;
    private final TaskStoreService taskStoreService;
    private final Log log;
    private final Supplier<List<String>> existingDatabaseNames;

    public GetStream(IntSupplier intSupplier, ProcessRegistry processRegistry, BufferAllocator bufferAllocator, ExecutorService executorService, TaskStoreService taskStoreService, Log log, Supplier<List<String>> supplier) {
        this.batchSizeSupplier = intSupplier;
        this.processRegistry = processRegistry;
        this.bufferAllocator = bufferAllocator;
        this.executorService = executorService;
        this.taskStoreService = taskStoreService;
        this.log = log;
        this.existingDatabaseNames = supplier;
    }

    @Override // com.neo4j.gds.arrow.server.api.GetStreamHandler
    public void getStream(FlightProducer.CallContext callContext, Command command, FlightProducer.ServerStreamListener serverStreamListener, ObjectMapper objectMapper, FlightMetrics flightMetrics) throws Exception {
        String peerIdentity = callContext.peerIdentity();
        GdsFlightServerCommands.Commands fromCommandHeader = GdsFlightServerCommands.Commands.fromCommandHeader(command.header());
        GdsFlightServerCommands.BaseGetCommand fromCommand = GdsFlightServerCommands.BaseGetCommand.fromCommand(objectMapper, fromCommandHeader, command.body());
        validateDatabaseExists(fromCommand.databaseName());
        ExportProcess<?> orCreateExportProcess = getOrCreateExportProcess(new ProcessIdentifier.Export(fromCommand.jobId().asString()), peerIdentity, fromCommand, fromCommandHeader, this.taskStoreService.getTaskStore(DatabaseId.of(fromCommand.databaseName())), this.batchSizeSupplier, this.bufferAllocator, this.executorService, this.log, flightMetrics);
        serverStreamListener.setOnCancelHandler(() -> {
            orCreateExportProcess.abort(new RuntimeException("Stream was cancelled by the client"));
            this.log.warn("Export stream was cancelled by client side", new Object[0]);
        });
        orCreateExportProcess.export(serverStreamListener);
    }

    private ExportProcess<?> getOrCreateExportProcess(ProcessIdentifier processIdentifier, String str, GdsFlightServerCommands.BaseGetCommand baseGetCommand, GdsFlightServerCommands.Commands commands, TaskStore taskStore, IntSupplier intSupplier, BufferAllocator bufferAllocator, ExecutorService executorService, Log log, FlightMetrics flightMetrics) {
        if (this.processRegistry.exists(processIdentifier)) {
            ExportProcess<?> exportProcess = (ExportProcess) this.processRegistry.get(processIdentifier);
            if (!exportProcess.aborted() && !exportProcess.done()) {
                return exportProcess;
            }
        }
        ExportProcess<?> exportProcess2 = new ExportProcess<>(str, baseGetCommand, commands, taskStore, intSupplier, bufferAllocator, executorService, log, flightMetrics);
        this.processRegistry.add(processIdentifier, exportProcess2);
        return exportProcess2;
    }

    private void validateDatabaseExists(String str) {
        if (!this.existingDatabaseNames.get().contains(str)) {
            throw new DatabaseNotFoundException(StringFormatting.formatWithLocale("No database with name `%s` found", str));
        }
    }
}
