package apoc.kafka.producer.kafka;

import apoc.kafka.events.KafkaStatus;
import apoc.kafka.events.StreamsEvent;
import apoc.kafka.events.StreamsTransactionEvent;
import apoc.kafka.extensions.GraphDatabaseServerExtensionsKt;
import apoc.kafka.producer.ExtensionsKt;
import apoc.kafka.producer.StreamsEventRouterConfiguration;
import apoc.kafka.producer.kafka.KafkaConfiguration;
import apoc.kafka.utils.JSONUtils;
import com.unboundid.ldap.sdk.Version;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import kotlin.Lazy;
import kotlin.LazyKt;
import kotlin.Metadata;
import kotlin.coroutines.CoroutineContext;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.sync.Mutex;
import kotlinx.coroutines.sync.MutexKt;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.logging.Log;

/* compiled from: KafkaEventRouter.kt */
@Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��z\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0010$\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010\u0012\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000b\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0010\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010 \n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0002\u0018��2\u00020\u0001B)\u0012\u0012\u0010\u0002\u001a\u000e\u0012\u0004\u0012\u00020\u0004\u0012\u0004\u0012\u00020\u00040\u0003\u0012\u0006\u0010\u0005\u001a\u00020\u0006\u0012\u0006\u0010\u0007\u001a\u00020\b¢\u0006\u0002\u0010\tJ8\u0010\u001e\u001a\u0010\u0012\u0004\u0012\u00020\u0004\u0012\u0004\u0012\u00020\u0001\u0018\u00010\u00032\u0016\u0010\u001f\u001a\u0012\u0012\u0006\u0012\u0004\u0018\u00010\u001d\u0012\u0006\u0012\u0004\u0018\u00010\u001d0 2\b\b\u0002\u0010!\u001a\u00020\"H\u0002JF\u0010#\u001a\u0010\u0012\u0004\u0012\u00020\u0004\u0012\u0004\u0012\u00020\u0001\u0018\u00010\u00032\u0006\u0010$\u001a\u00020\u00042\u0006\u0010%\u001a\u00020&2\u0014\u0010\u0002\u001a\u0010\u0012\u0004\u0012\u00020\u0004\u0012\u0006\u0012\u0004\u0018\u00010\u00010\u00032\b\b\u0002\u0010!\u001a\u00020\"H\u0002J.\u0010#\u001a\u00020'2\u0006\u0010$\u001a\u00020\u00042\u0006\u0010%\u001a\u00020(2\u0014\u0010\u0002\u001a\u0010\u0012\u0004\u0012\u00020\u0004\u0012\u0006\u0012\u0004\u0018\u00010\u00010\u0003H\u0002J4\u0010)\u001a\u00020'2\u0006\u0010$\u001a\u00020\u00042\u000e\u0010*\u001a\n\u0012\u0006\b\u0001\u0012\u00020&0+2\u0014\u0010\u0002\u001a\u0010\u0012\u0004\u0012\u00020\u0004\u0012\u0006\u0012\u0004\u0018\u00010\u00010\u0003JF\u0010,\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u0004\u0012\u0004\u0012\u00020\u00010\u00030+2\u0006\u0010$\u001a\u00020\u00042\u000e\u0010*\u001a\n\u0012\u0006\b\u0001\u0012\u00020&0+2\u0014\u0010\u0002\u001a\u0010\u0012\u0004\u0012\u00020\u0004\u0012\u0006\u0012\u0004\u0018\u00010\u00010\u0003J\u0006\u0010-\u001a\u00020'J\u001a\u0010.\u001a\u00020/2\u0010\u0010\u001b\u001a\f\u0012\u0002\b\u0003\u0012\u0002\b\u0003\u0018\u00010\u001cH\u0002J\u0006\u00100\u001a\u00020'R\u001a\u0010\u0002\u001a\u000e\u0012\u0004\u0012\u00020\u0004\u0012\u0004\u0012\u00020\u00040\u0003X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0005\u001a\u00020\u0006X\u0082\u0004¢\u0006\u0002\n��R\u0011\u0010\n\u001a\u00020\u000b¢\u0006\b\n��\u001a\u0004\b\f\u0010\rR\u001b\u0010\u000e\u001a\u00020\u000f8BX\u0082\u0084\u0002¢\u0006\f\n\u0004\b\u0012\u0010\u0013\u001a\u0004\b\u0010\u0010\u0011R\u001b\u0010\u0014\u001a\u00020\u00158BX\u0082\u0084\u0002¢\u0006\f\n\u0004\b\u0018\u0010\u0013\u001a\u0004\b\u0016\u0010\u0017R\u000e\u0010\u0007\u001a\u00020\bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0019\u001a\u00020\u001aX\u0082\u0004¢\u0006\u0002\n��R\u001c\u0010\u001b\u001a\u0010\u0012\u0004\u0012\u00020\u001d\u0012\u0004\u0012\u00020\u001d\u0018\u00010\u001cX\u0082\u000e¢\u0006\u0002\n��¨\u00061"}, d2 = {"Lapoc/kafka/producer/kafka/KafkaEventRouter;", Version.VERSION_QUALIFIER, "config", Version.VERSION_QUALIFIER, Version.VERSION_QUALIFIER, "db", "Lorg/neo4j/graphdb/GraphDatabaseService;", "log", "Lorg/neo4j/logging/Log;", "(Ljava/util/Map;Lorg/neo4j/graphdb/GraphDatabaseService;Lorg/neo4j/logging/Log;)V", "eventRouterConfiguration", "Lapoc/kafka/producer/StreamsEventRouterConfiguration;", "getEventRouterConfiguration", "()Lapoc/kafka/producer/StreamsEventRouterConfiguration;", "kafkaAdminService", "Lapoc/kafka/producer/kafka/KafkaAdminService;", "getKafkaAdminService", "()Lapoc/kafka/producer/kafka/KafkaAdminService;", "kafkaAdminService$delegate", "Lkotlin/Lazy;", "kafkaConfig", "Lapoc/kafka/producer/kafka/KafkaConfiguration;", "getKafkaConfig", "()Lapoc/kafka/producer/kafka/KafkaConfiguration;", "kafkaConfig$delegate", "mutex", "Lkotlinx/coroutines/sync/Mutex;", "producer", "Lapoc/kafka/producer/kafka/Neo4jKafkaProducer;", Version.VERSION_QUALIFIER, "send", "producerRecord", "Lorg/apache/kafka/clients/producer/ProducerRecord;", "sync", Version.VERSION_QUALIFIER, "sendEvent", "topic", "event", "Lapoc/kafka/events/StreamsEvent;", Version.VERSION_QUALIFIER, "Lapoc/kafka/events/StreamsTransactionEvent;", "sendEvents", "transactionEvents", Version.VERSION_QUALIFIER, "sendEventsSync", "start", "status", "Lapoc/kafka/events/KafkaStatus;", "stop", "apoc"})
@SourceDebugExtension({"SMAP\nKafkaEventRouter.kt\nKotlin\n*S Kotlin\n*F\n+ 1 KafkaEventRouter.kt\napoc/kafka/producer/kafka/KafkaEventRouter\n+ 2 fake.kt\nkotlin/jvm/internal/FakeKt\n+ 3 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n*L\n1#1,194:1\n1#2:195\n1#2:206\n1603#3,9:196\n1855#3:205\n1856#3:207\n1612#3:208\n1855#3,2:209\n*S KotlinDebug\n*F\n+ 1 KafkaEventRouter.kt\napoc/kafka/producer/kafka/KafkaEventRouter\n*L\n128#1:206\n128#1:196,9\n128#1:205\n128#1:207\n128#1:208\n139#1:209,2\n*E\n"})
/* loaded from: input_file:apoc/kafka/producer/kafka/KafkaEventRouter.class */
public final class KafkaEventRouter {

    @NotNull
    private final Map<String, String> config;

    @NotNull
    private final GraphDatabaseService db;

    @NotNull
    private final Log log;

    @NotNull
    private final StreamsEventRouterConfiguration eventRouterConfiguration;

    @NotNull
    private final Mutex mutex;

    @Nullable
    private Neo4jKafkaProducer<byte[], byte[]> producer;

    @NotNull
    private final Lazy kafkaConfig$delegate;

    @NotNull
    private final Lazy kafkaAdminService$delegate;

    public KafkaEventRouter(@NotNull Map<String, String> map, @NotNull GraphDatabaseService graphDatabaseService, @NotNull Log log) {
        Intrinsics.checkNotNullParameter(map, "config");
        Intrinsics.checkNotNullParameter(graphDatabaseService, "db");
        Intrinsics.checkNotNullParameter(log, "log");
        this.config = map;
        this.db = graphDatabaseService;
        this.log = log;
        StreamsEventRouterConfiguration.Companion companion = StreamsEventRouterConfiguration.Companion;
        Map<String, String> map2 = this.config;
        String databaseName = this.db.databaseName();
        Intrinsics.checkNotNullExpressionValue(databaseName, "databaseName(...)");
        this.eventRouterConfiguration = companion.from(map2, databaseName, GraphDatabaseServerExtensionsKt.isDefaultDb(this.db), this.log);
        this.mutex = MutexKt.Mutex$default(false, 1, (Object) null);
        this.kafkaConfig$delegate = LazyKt.lazy(new Function0<KafkaConfiguration>() { // from class: apoc.kafka.producer.kafka.KafkaEventRouter$kafkaConfig$2
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(0);
            }

            @NotNull
            /* renamed from: invoke, reason: merged with bridge method [inline-methods] */
            public final KafkaConfiguration m76invoke() {
                Map<String, String> map3;
                Log log2;
                KafkaConfiguration.Companion companion2 = KafkaConfiguration.Companion;
                map3 = KafkaEventRouter.this.config;
                log2 = KafkaEventRouter.this.log;
                return companion2.from(map3, log2);
            }
        });
        this.kafkaAdminService$delegate = LazyKt.lazy(new Function0<KafkaAdminService>() { // from class: apoc.kafka.producer.kafka.KafkaEventRouter$kafkaAdminService$2
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(0);
            }

            @NotNull
            /* renamed from: invoke, reason: merged with bridge method [inline-methods] */
            public final KafkaAdminService m75invoke() {
                KafkaConfiguration kafkaConfig;
                Log log2;
                kafkaConfig = KafkaEventRouter.this.getKafkaConfig();
                log2 = KafkaEventRouter.this.log;
                return new KafkaAdminService(kafkaConfig, log2);
            }
        });
    }

    @NotNull
    public final StreamsEventRouterConfiguration getEventRouterConfiguration() {
        return this.eventRouterConfiguration;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final KafkaConfiguration getKafkaConfig() {
        return (KafkaConfiguration) this.kafkaConfig$delegate.getValue();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final KafkaAdminService getKafkaAdminService() {
        return (KafkaAdminService) this.kafkaAdminService$delegate.getValue();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final KafkaStatus status(Neo4jKafkaProducer<?, ?> neo4jKafkaProducer) {
        return neo4jKafkaProducer != null ? KafkaStatus.RUNNING : KafkaStatus.STOPPED;
    }

    public final void start() {
        BuildersKt.runBlocking$default((CoroutineContext) null, new KafkaEventRouter$start$1(this, null), 1, (Object) null);
    }

    public final void stop() {
        BuildersKt.runBlocking$default((CoroutineContext) null, new KafkaEventRouter$stop$1(this, null), 1, (Object) null);
    }

    private final Map<String, Object> send(ProducerRecord<byte[], byte[]> producerRecord, boolean z) {
        KafkaAdminService kafkaAdminService = getKafkaAdminService();
        String str = producerRecord.topic();
        Intrinsics.checkNotNullExpressionValue(str, "topic(...)");
        if (!kafkaAdminService.isValidTopic(str)) {
            if (!this.log.isDebugEnabled()) {
                return null;
            }
            this.log.debug("Error while sending record to " + producerRecord.topic() + ", because it doesn't exists");
            return null;
        }
        if (!z) {
            Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer = this.producer;
            if (neo4jKafkaProducer != null) {
                neo4jKafkaProducer.send(producerRecord, (v2, v3) -> {
                    send$lambda$0(r2, r3, v2, v3);
                });
            }
            return null;
        }
        Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer2 = this.producer;
        if (neo4jKafkaProducer2 != null) {
            Future send = neo4jKafkaProducer2.send(producerRecord);
            if (send != null) {
                RecordMetadata recordMetadata = (RecordMetadata) send.get();
                if (recordMetadata != null) {
                    return ExtensionsKt.toMap(recordMetadata);
                }
            }
        }
        return null;
    }

    static /* synthetic */ Map send$default(KafkaEventRouter kafkaEventRouter, ProducerRecord producerRecord, boolean z, int i, Object obj) {
        if ((i & 2) != 0) {
            z = false;
        }
        return kafkaEventRouter.send(producerRecord, z);
    }

    /* JADX WARN: Removed duplicated region for block: B:11:0x005e  */
    /* JADX WARN: Removed duplicated region for block: B:15:0x007e  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private final java.util.Map<java.lang.String, java.lang.Object> sendEvent(java.lang.String r9, apoc.kafka.events.StreamsEvent r10, java.util.Map<java.lang.String, ? extends java.lang.Object> r11, boolean r12) {
        /*
            r8 = this;
            r0 = r8
            org.neo4j.logging.Log r0 = r0.log
            boolean r0 = r0.isDebugEnabled()
            if (r0 == 0) goto L1e
            r0 = r8
            org.neo4j.logging.Log r0 = r0.log
            r1 = r10
            java.lang.Object r1 = r1.getPayload()
            java.lang.String r1 = "Trying to send a simple event with payload " + r1 + " to kafka"
            r0.debug(r1)
        L1e:
            r0 = r11
            java.lang.String r1 = "key"
            java.util.UUID r2 = java.util.UUID.randomUUID()
            java.lang.String r2 = r2.toString()
            java.lang.Object r0 = r0.getOrDefault(r1, r2)
            r13 = r0
            r0 = r11
            java.lang.String r1 = "partition"
            java.lang.Object r0 = r0.get(r1)
            r1 = r0
            if (r1 == 0) goto L4b
            java.lang.String r0 = r0.toString()
            r1 = r0
            if (r1 == 0) goto L4b
            int r0 = java.lang.Integer.parseInt(r0)
            java.lang.Integer r0 = java.lang.Integer.valueOf(r0)
            goto L4d
        L4b:
            r0 = 0
        L4d:
            r14 = r0
            r0 = r9
            r1 = r14
            long r2 = java.lang.System.currentTimeMillis()
            java.lang.Long r2 = java.lang.Long.valueOf(r2)
            r3 = r13
            r4 = r3
            if (r4 == 0) goto L7e
            r16 = r3
            r20 = r2
            r19 = r1
            r18 = r0
            r0 = 0
            r17 = r0
            apoc.kafka.utils.JSONUtils r0 = apoc.kafka.utils.JSONUtils.INSTANCE
            r1 = r16
            byte[] r0 = r0.writeValueAsBytes(r1)
            r21 = r0
            r0 = r18
            r1 = r19
            r2 = r20
            r3 = r21
            goto L80
        L7e:
            r3 = 0
        L80:
            apoc.kafka.utils.JSONUtils r4 = apoc.kafka.utils.JSONUtils.INSTANCE
            r5 = r10
            byte[] r4 = r4.writeValueAsBytes(r5)
            r22 = r4
            r23 = r3
            r24 = r2
            r25 = r1
            r26 = r0
            org.apache.kafka.clients.producer.ProducerRecord r0 = new org.apache.kafka.clients.producer.ProducerRecord
            r1 = r0
            r2 = r26
            r3 = r25
            r4 = r24
            r5 = r23
            r6 = r22
            r1.<init>(r2, r3, r4, r5, r6)
            r15 = r0
            r0 = r8
            r1 = r15
            r2 = r12
            java.util.Map r0 = r0.send(r1, r2)
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: apoc.kafka.producer.kafka.KafkaEventRouter.sendEvent(java.lang.String, apoc.kafka.events.StreamsEvent, java.util.Map, boolean):java.util.Map");
    }

    static /* synthetic */ Map sendEvent$default(KafkaEventRouter kafkaEventRouter, String str, StreamsEvent streamsEvent, Map map, boolean z, int i, Object obj) {
        if ((i & 8) != 0) {
            z = false;
        }
        return kafkaEventRouter.sendEvent(str, streamsEvent, map, z);
    }

    private final void sendEvent(String str, StreamsTransactionEvent streamsTransactionEvent, Map<String, ? extends Object> map) {
        if (this.log.isDebugEnabled()) {
            Log log = this.log;
            long txId = streamsTransactionEvent.getMeta().getTxId();
            streamsTransactionEvent.getMeta().getTxEventId();
            log.debug("Trying to send a transaction event with txId " + txId + " and txEventId " + log + " to kafka");
        }
        byte[] writeValueAsBytes = JSONUtils.INSTANCE.writeValueAsBytes(ExtensionsKt.asSourceRecordKey(streamsTransactionEvent, getKafkaConfig().getLogCompactionStrategy()));
        StreamsTransactionEvent asSourceRecordValue = ExtensionsKt.asSourceRecordValue(streamsTransactionEvent, getKafkaConfig().getLogCompactionStrategy());
        send$default(this, new ProducerRecord(str, (Integer) null, Long.valueOf(System.currentTimeMillis()), writeValueAsBytes, asSourceRecordValue != null ? JSONUtils.INSTANCE.writeValueAsBytes(asSourceRecordValue) : null), false, 2, null);
    }

    @NotNull
    public final List<Map<String, Object>> sendEventsSync(@NotNull String str, @NotNull List<? extends StreamsEvent> list, @NotNull Map<String, ? extends Object> map) {
        Intrinsics.checkNotNullParameter(str, "topic");
        Intrinsics.checkNotNullParameter(list, "transactionEvents");
        Intrinsics.checkNotNullParameter(map, "config");
        Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer = this.producer;
        if (neo4jKafkaProducer != null) {
            neo4jKafkaProducer.beginTransaction();
        }
        ArrayList arrayList = new ArrayList();
        Iterator<T> it = list.iterator();
        while (it.hasNext()) {
            Map<String, Object> sendEvent = sendEvent(str, (StreamsEvent) it.next(), map, true);
            if (sendEvent != null) {
                arrayList.add(sendEvent);
            }
        }
        ArrayList arrayList2 = arrayList;
        Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer2 = this.producer;
        if (neo4jKafkaProducer2 != null) {
            neo4jKafkaProducer2.commitTransaction();
        }
        return arrayList2;
    }

    public final void sendEvents(@NotNull String str, @NotNull List<? extends StreamsEvent> list, @NotNull Map<String, ? extends Object> map) {
        Intrinsics.checkNotNullParameter(str, "topic");
        Intrinsics.checkNotNullParameter(list, "transactionEvents");
        Intrinsics.checkNotNullParameter(map, "config");
        try {
            Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer = this.producer;
            if (neo4jKafkaProducer != null) {
                neo4jKafkaProducer.beginTransaction();
            }
            for (StreamsEvent streamsEvent : list) {
                if (streamsEvent instanceof StreamsTransactionEvent) {
                    sendEvent(str, (StreamsTransactionEvent) streamsEvent, map);
                } else {
                    sendEvent$default(this, str, streamsEvent, map, false, 8, null);
                }
            }
            Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer2 = this.producer;
            if (neo4jKafkaProducer2 != null) {
                neo4jKafkaProducer2.commitTransaction();
            }
        } catch (ProducerFencedException e) {
            this.log.error("Another producer with the same transactional.id has been started. Stack trace is:", e);
            Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer3 = this.producer;
            if (neo4jKafkaProducer3 != null) {
                neo4jKafkaProducer3.close();
            }
        } catch (KafkaException e2) {
            this.log.error("Generic kafka error. Stack trace is:", e2);
            Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer4 = this.producer;
            if (neo4jKafkaProducer4 != null) {
                neo4jKafkaProducer4.abortTransaction();
            }
        } catch (AuthorizationException e3) {
            this.log.error("Error in authorization. Stack trace is:", e3);
            Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer5 = this.producer;
            if (neo4jKafkaProducer5 != null) {
                neo4jKafkaProducer5.close();
            }
        } catch (OutOfOrderSequenceException e4) {
            this.log.error("The broker received an unexpected sequence number from the producer. Stack trace is:", e4);
            Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer6 = this.producer;
            if (neo4jKafkaProducer6 != null) {
                neo4jKafkaProducer6.close();
            }
        }
    }

    private static final void send$lambda$0(KafkaEventRouter kafkaEventRouter, ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exc) {
        Intrinsics.checkNotNullParameter(kafkaEventRouter, "this$0");
        Intrinsics.checkNotNullParameter(producerRecord, "$producerRecord");
        if (recordMetadata != null && kafkaEventRouter.log.isDebugEnabled()) {
            Log log = kafkaEventRouter.log;
            int partition = recordMetadata.partition();
            long offset = recordMetadata.offset();
            String str = recordMetadata.topic();
            recordMetadata.serializedKeySize();
            log.debug("Successfully sent record in partition " + partition + " offset " + offset + " data " + log + " key size " + str);
        }
        if (exc == null || !kafkaEventRouter.log.isDebugEnabled()) {
            return;
        }
        kafkaEventRouter.log.debug("Error while sending record to " + producerRecord.topic() + ", because of the following exception:", exc);
    }
}
