package apoc.kafka.consumer.kafka;

import apoc.kafka.consumer.StreamsEventConsumer;
import apoc.kafka.consumer.StreamsEventConsumerFactory;
import apoc.kafka.consumer.kafka.KafkaSinkConfiguration;
import apoc.kafka.events.KafkaStatus;
import apoc.kafka.extensions.GraphDatabaseServerExtensionsKt;
import com.unboundid.ldap.sdk.Version;
import java.util.Map;
import java.util.Set;
import kotlin.Metadata;
import kotlin.coroutines.CoroutineContext;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.Job;
import kotlinx.coroutines.sync.Mutex;
import kotlinx.coroutines.sync.MutexKt;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.Log;

/* compiled from: KafkaEventSink.kt */
@Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��*\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\u0018��2\u00020\u0001B\r\u0012\u0006\u0010\u0002\u001a\u00020\u0003¢\u0006\u0002\u0010\u0004J\u0006\u0010\t\u001a\u00020\nJ\u0006\u0010\u000b\u001a\u00020\fJ\u0012\u0010\u000b\u001a\u00020\f2\b\u0010\u0005\u001a\u0004\u0018\u00010\u0006H\u0002R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u0010\u0010\u0005\u001a\u0004\u0018\u00010\u0006X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u0007\u001a\u00020\bX\u0082\u0004¢\u0006\u0002\n��¨\u0006\r"}, d2 = {"Lapoc/kafka/consumer/kafka/KafkaEventSink;", Version.VERSION_QUALIFIER, "db", "Lorg/neo4j/kernel/internal/GraphDatabaseAPI;", "(Lorg/neo4j/kernel/internal/GraphDatabaseAPI;)V", "job", "Lkotlinx/coroutines/Job;", "mutex", "Lkotlinx/coroutines/sync/Mutex;", "getEventConsumerFactory", "Lapoc/kafka/consumer/StreamsEventConsumerFactory;", "status", "Lapoc/kafka/events/KafkaStatus;", "apoc"})
/* loaded from: input_file:apoc/kafka/consumer/kafka/KafkaEventSink.class */
public final class KafkaEventSink {

    @NotNull
    private final GraphDatabaseAPI db;

    @NotNull
    private final Mutex mutex;

    @Nullable
    private Job job;

    public KafkaEventSink(@NotNull GraphDatabaseAPI graphDatabaseAPI) {
        Intrinsics.checkNotNullParameter(graphDatabaseAPI, "db");
        this.db = graphDatabaseAPI;
        this.mutex = MutexKt.Mutex$default(false, 1, (Object) null);
    }

    @NotNull
    public final StreamsEventConsumerFactory getEventConsumerFactory() {
        return new StreamsEventConsumerFactory() { // from class: apoc.kafka.consumer.kafka.KafkaEventSink$getEventConsumerFactory$1
            @Override // apoc.kafka.consumer.StreamsEventConsumerFactory
            @NotNull
            public StreamsEventConsumer createStreamsEventConsumer(@NotNull Map<String, String> map, @NotNull Log log, @NotNull Set<? extends Object> set) {
                GraphDatabaseAPI graphDatabaseAPI;
                GraphDatabaseService graphDatabaseService;
                Intrinsics.checkNotNullParameter(map, "config");
                Intrinsics.checkNotNullParameter(log, "log");
                Intrinsics.checkNotNullParameter(set, "topics");
                graphDatabaseAPI = KafkaEventSink.this.db;
                String databaseName = graphDatabaseAPI.databaseName();
                KafkaSinkConfiguration.Companion companion = KafkaSinkConfiguration.Companion;
                Intrinsics.checkNotNull(databaseName);
                graphDatabaseService = KafkaEventSink.this.db;
                KafkaSinkConfiguration from = companion.from(map, databaseName, GraphDatabaseServerExtensionsKt.isDefaultDb(graphDatabaseService));
                return from.getEnableAutoCommit() ? new KafkaAutoCommitEventConsumer(from, log, set, databaseName) : new KafkaManualCommitEventConsumer(from, log, set, databaseName);
            }
        };
    }

    @NotNull
    public final KafkaStatus status() {
        return (KafkaStatus) BuildersKt.runBlocking$default((CoroutineContext) null, new KafkaEventSink$status$1(this, null), 1, (Object) null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final KafkaStatus status(Job job) {
        return Intrinsics.areEqual(job != null ? Boolean.valueOf(job.isActive()) : null, true) ? KafkaStatus.RUNNING : KafkaStatus.STOPPED;
    }
}
