package apoc.kafka.consumer.kafka;

import apoc.kafka.extensions.CommonExtensionsKt;
import apoc.kafka.service.StreamsSinkEntity;
import apoc.kafka.service.errors.ErrorData;
import apoc.kafka.service.errors.ErrorService;
import apoc.kafka.service.errors.KafkaErrorService;
import com.unboundid.ldap.sdk.Version;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.collections.MapsKt;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.neo4j.logging.Log;

/* compiled from: KafkaAutoCommitEventConsumer.kt */
@Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��~\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\"\n\u0002\u0010\u000e\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u001c\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0010$\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\t\n\u0002\b\u0004\b\u0016\u0018��2\u00020\u0001B+\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\f\u0010\u0006\u001a\b\u0012\u0004\u0012\u00020\b0\u0007\u0012\u0006\u0010\t\u001a\u00020\b¢\u0006\u0002\u0010\nJL\u0010\u0015\u001a\u00020\u00162\u001e\u0010\u0017\u001a\u001a\u0012\u0004\u0012\u00020\b\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u001a0\u0019\u0012\u0004\u0012\u00020\u00160\u00182\u0006\u0010\u001b\u001a\u00020\b2\u001c\u0010\u001c\u001a\u0018\u0012\u0014\u0012\u0012\u0012\u0006\b\u0001\u0012\u00020\u001f\u0012\u0006\b\u0001\u0012\u00020\u001f0\u001e0\u001dJ(\u0010 \u001a\u00020\u00162\u001e\u0010\u0017\u001a\u001a\u0012\u0004\u0012\u00020\b\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u001a0\u0019\u0012\u0004\u0012\u00020\u00160\u0018H\u0016J<\u0010 \u001a\u00020\u00162\u0012\u0010!\u001a\u000e\u0012\u0004\u0012\u00020\b\u0012\u0004\u0012\u00020\u001f0\"2\u001e\u0010\u0017\u001a\u001a\u0012\u0004\u0012\u00020\b\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u001a0\u0019\u0012\u0004\u0012\u00020\u00160\u0018H\u0016J:\u0010#\u001a\u000e\u0012\u0004\u0012\u00020$\u0012\u0004\u0012\u00020%0\"2\u0006\u0010&\u001a\u00020'2\u001e\u0010\u0017\u001a\u001a\u0012\u0004\u0012\u00020\b\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u001a0\u0019\u0012\u0004\u0012\u00020\u00160\u0018J(\u0010(\u001a\u00020\u00162\u001e\u0010\u0017\u001a\u001a\u0012\u0004\u0012\u00020\b\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u001a0\u0019\u0012\u0004\u0012\u00020\u00160\u0018H\u0002J\u001c\u0010)\u001a\u00020\u00162\u0012\u0010*\u001a\u000e\u0012\u0004\u0012\u00020$\u0012\u0004\u0012\u00020+0\"H\u0002J\b\u0010,\u001a\u00020\u0016H\u0016J\b\u0010-\u001a\u00020\u0016H\u0016J\b\u0010.\u001a\u00020\u0016H\u0016R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u0019\u0010\u000b\u001a\n\u0012\u0002\b\u0003\u0012\u0002\b\u00030\f¢\u0006\b\n��\u001a\u0004\b\r\u0010\u000eR\u000e\u0010\t\u001a\u00020\bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000f\u001a\u00020\u0010X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0011\u001a\u00020\u0012X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u0017\u0010\u0006\u001a\b\u0012\u0004\u0012\u00020\b0\u0007¢\u0006\b\n��\u001a\u0004\b\u0013\u0010\u0014¨\u0006/"}, d2 = {"Lapoc/kafka/consumer/kafka/KafkaAutoCommitEventConsumer;", "Lapoc/kafka/consumer/kafka/KafkaEventConsumer;", "config", "Lapoc/kafka/consumer/kafka/KafkaSinkConfiguration;", "log", "Lorg/neo4j/logging/Log;", "topics", Version.VERSION_QUALIFIER, Version.VERSION_QUALIFIER, "dbName", "(Lapoc/kafka/consumer/kafka/KafkaSinkConfiguration;Lorg/neo4j/logging/Log;Ljava/util/Set;Ljava/lang/String;)V", "consumer", "Lorg/apache/kafka/clients/consumer/KafkaConsumer;", "getConsumer", "()Lorg/apache/kafka/clients/consumer/KafkaConsumer;", "errorService", "Lapoc/kafka/service/errors/ErrorService;", "isSeekSet", "Ljava/util/concurrent/atomic/AtomicBoolean;", "getTopics", "()Ljava/util/Set;", "executeAction", Version.VERSION_QUALIFIER, "action", "Lkotlin/Function2;", Version.VERSION_QUALIFIER, "Lapoc/kafka/service/StreamsSinkEntity;", "topic", "topicRecords", Version.VERSION_QUALIFIER, "Lorg/apache/kafka/clients/consumer/ConsumerRecord;", Version.VERSION_QUALIFIER, "read", "topicConfig", Version.VERSION_QUALIFIER, "readFromPartition", "Lorg/apache/kafka/common/TopicPartition;", "Lorg/apache/kafka/clients/consumer/OffsetAndMetadata;", "kafkaTopicConfig", "Lapoc/kafka/consumer/kafka/KafkaTopicConfig;", "readSimple", "setSeek", "topicPartitionsMap", Version.VERSION_QUALIFIER, "start", "stop", "wakeup", "apoc"})
@SourceDebugExtension({"SMAP\nKafkaAutoCommitEventConsumer.kt\nKotlin\n*S Kotlin\n*F\n+ 1 KafkaAutoCommitEventConsumer.kt\napoc/kafka/consumer/kafka/KafkaAutoCommitEventConsumer\n+ 2 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n+ 3 Maps.kt\nkotlin/collections/MapsKt__MapsKt\n+ 4 _Maps.kt\nkotlin/collections/MapsKt___MapsKt\n*L\n1#1,139:1\n1855#2,2:140\n1549#2:142\n1620#2,3:143\n1549#2:146\n1620#2,3:147\n1238#2,4:152\n1238#2,4:165\n442#3:150\n392#3:151\n483#3,7:156\n442#3:163\n392#3:164\n215#4,2:169\n*S KotlinDebug\n*F\n+ 1 KafkaAutoCommitEventConsumer.kt\napoc/kafka/consumer/kafka/KafkaAutoCommitEventConsumer\n*L\n77#1:140,2\n85#1:142\n85#1:143,3\n87#1:146\n87#1:147,3\n98#1:152,4\n100#1:165,4\n98#1:150\n98#1:151\n99#1:156,7\n100#1:163\n100#1:164\n125#1:169,2\n*E\n"})
/* loaded from: input_file:apoc/kafka/consumer/kafka/KafkaAutoCommitEventConsumer.class */
public class KafkaAutoCommitEventConsumer extends KafkaEventConsumer {

    @NotNull
    private final KafkaSinkConfiguration config;

    @NotNull
    private final Log log;

    @NotNull
    private final Set<String> topics;

    @NotNull
    private final String dbName;

    @NotNull
    private final ErrorService errorService;

    @NotNull
    private final AtomicBoolean isSeekSet;

    @NotNull
    private final KafkaConsumer<?, ?> consumer;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public KafkaAutoCommitEventConsumer(@NotNull KafkaSinkConfiguration kafkaSinkConfiguration, @NotNull Log log, @NotNull Set<String> set, @NotNull String str) {
        super(kafkaSinkConfiguration, log, set);
        Intrinsics.checkNotNullParameter(kafkaSinkConfiguration, "config");
        Intrinsics.checkNotNullParameter(log, "log");
        Intrinsics.checkNotNullParameter(set, "topics");
        Intrinsics.checkNotNullParameter(str, "dbName");
        this.config = kafkaSinkConfiguration;
        this.log = log;
        this.topics = set;
        this.dbName = str;
        this.errorService = new KafkaErrorService(this.config.asProperties(), ErrorService.ErrorConfig.Companion.from(MapsKt.emptyMap()), new Function2<String, Exception, Unit>() { // from class: apoc.kafka.consumer.kafka.KafkaAutoCommitEventConsumer$errorService$1
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(2);
            }

            public final void invoke(@NotNull String str2, @Nullable Exception exc) {
                Log log2;
                Intrinsics.checkNotNullParameter(str2, "s");
                log2 = KafkaAutoCommitEventConsumer.this.log;
                Intrinsics.checkNotNull(exc, "null cannot be cast to non-null type kotlin.Throwable");
                log2.error(str2, exc);
            }

            public /* bridge */ /* synthetic */ Object invoke(Object obj, Object obj2) {
                invoke((String) obj, (Exception) obj2);
                return Unit.INSTANCE;
            }
        });
        this.isSeekSet = new AtomicBoolean();
        if (!Intrinsics.areEqual(this.config.getKeyDeserializer(), ByteArrayDeserializer.class.getName()) || !Intrinsics.areEqual(this.config.getValueDeserializer(), ByteArrayDeserializer.class.getName())) {
            throw new RuntimeException("Invalid config");
        }
        this.consumer = new KafkaConsumer<>(this.config.asProperties());
    }

    @NotNull
    public final Set<String> getTopics() {
        return this.topics;
    }

    @NotNull
    public final KafkaConsumer<?, ?> getConsumer() {
        return this.consumer;
    }

    @Override // apoc.kafka.consumer.StreamsEventConsumer
    public void start() {
        if (this.topics.isEmpty()) {
            this.log.info("No topics specified Kafka Consumer will not started");
        } else {
            this.consumer.subscribe(this.topics);
        }
    }

    @Override // apoc.kafka.consumer.StreamsEventConsumer
    public void stop() {
        this.consumer.close();
        this.errorService.close();
    }

    private final void readSimple(Function2<? super String, ? super List<StreamsSinkEntity>, Unit> function2) {
        ConsumerRecords poll = this.consumer.poll(Duration.ZERO);
        if (poll.isEmpty()) {
            return;
        }
        for (String str : this.topics) {
            Iterable<? extends ConsumerRecord<? extends Object, ? extends Object>> records = poll.records(str);
            Intrinsics.checkNotNull(records);
            executeAction(function2, str, records);
        }
    }

    public final void executeAction(@NotNull Function2<? super String, ? super List<StreamsSinkEntity>, Unit> function2, @NotNull String str, @NotNull Iterable<? extends ConsumerRecord<? extends Object, ? extends Object>> iterable) {
        Intrinsics.checkNotNullParameter(function2, "action");
        Intrinsics.checkNotNullParameter(str, "topic");
        Intrinsics.checkNotNullParameter(iterable, "topicRecords");
        try {
            ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(iterable, 10));
            Iterator<? extends ConsumerRecord<? extends Object, ? extends Object>> it = iterable.iterator();
            while (it.hasNext()) {
                arrayList.add(CommonExtensionsKt.toStreamsSinkEntity(it.next()));
            }
            function2.invoke(str, arrayList);
        } catch (Exception e) {
            ErrorService errorService = this.errorService;
            ArrayList arrayList2 = new ArrayList(CollectionsKt.collectionSizeOrDefault(iterable, 10));
            Iterator<? extends ConsumerRecord<? extends Object, ? extends Object>> it2 = iterable.iterator();
            while (it2.hasNext()) {
                arrayList2.add(ErrorData.Companion.from(it2.next(), e, getClass(), this.dbName));
            }
            errorService.report(arrayList2);
        }
    }

    @NotNull
    public final Map<TopicPartition, OffsetAndMetadata> readFromPartition(@NotNull KafkaTopicConfig kafkaTopicConfig, @NotNull Function2<? super String, ? super List<StreamsSinkEntity>, Unit> function2) {
        Intrinsics.checkNotNullParameter(kafkaTopicConfig, "kafkaTopicConfig");
        Intrinsics.checkNotNullParameter(function2, "action");
        setSeek(kafkaTopicConfig.getTopicPartitionsMap());
        ConsumerRecords poll = this.consumer.poll(Duration.ZERO);
        if (poll.isEmpty()) {
            return MapsKt.emptyMap();
        }
        Map<TopicPartition, Long> topicPartitionsMap = kafkaTopicConfig.getTopicPartitionsMap();
        LinkedHashMap linkedHashMap = new LinkedHashMap(MapsKt.mapCapacity(topicPartitionsMap.size()));
        for (Object obj : topicPartitionsMap.entrySet()) {
            linkedHashMap.put(((Map.Entry) obj).getKey(), poll.records((TopicPartition) ((Map.Entry) obj).getKey()));
        }
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        for (Map.Entry entry : linkedHashMap.entrySet()) {
            List list = (List) entry.getValue();
            Intrinsics.checkNotNull(list);
            if (!list.isEmpty()) {
                linkedHashMap2.put(entry.getKey(), entry.getValue());
            }
        }
        LinkedHashMap linkedHashMap3 = linkedHashMap2;
        LinkedHashMap linkedHashMap4 = new LinkedHashMap(MapsKt.mapCapacity(linkedHashMap3.size()));
        for (Object obj2 : linkedHashMap3.entrySet()) {
            Object key = ((Map.Entry) obj2).getKey();
            Map.Entry entry2 = (Map.Entry) obj2;
            TopicPartition topicPartition = (TopicPartition) entry2.getKey();
            List list2 = (List) entry2.getValue();
            String str = topicPartition.topic();
            Intrinsics.checkNotNullExpressionValue(str, "topic(...)");
            Intrinsics.checkNotNull(list2);
            executeAction(function2, str, list2);
            Object last = CollectionsKt.last(list2);
            Intrinsics.checkNotNullExpressionValue(last, "last(...)");
            linkedHashMap4.put(key, CommonExtensionsKt.offsetAndMetadata$default((ConsumerRecord) last, null, 1, null));
        }
        return linkedHashMap4;
    }

    @Override // apoc.kafka.consumer.StreamsEventConsumer
    public void read(@NotNull Function2<? super String, ? super List<StreamsSinkEntity>, Unit> function2) {
        Intrinsics.checkNotNullParameter(function2, "action");
        readSimple(function2);
    }

    @Override // apoc.kafka.consumer.StreamsEventConsumer
    public void read(@NotNull Map<String, ? extends Object> map, @NotNull Function2<? super String, ? super List<StreamsSinkEntity>, Unit> function2) {
        Intrinsics.checkNotNullParameter(map, "topicConfig");
        Intrinsics.checkNotNullParameter(function2, "action");
        KafkaTopicConfig fromMap = KafkaTopicConfig.Companion.fromMap(map);
        if (fromMap.getTopicPartitionsMap().isEmpty()) {
            readSimple(function2);
        } else {
            readFromPartition(fromMap, function2);
        }
    }

    private final void setSeek(Map<TopicPartition, Long> map) {
        if (this.isSeekSet.compareAndSet(false, true)) {
            this.consumer.poll(0L);
            for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
                long longValue = entry.getValue().longValue();
                if (longValue == -1) {
                    this.consumer.seekToBeginning(CollectionsKt.listOf(entry.getKey()));
                } else if (longValue == -2) {
                    this.consumer.seekToEnd(CollectionsKt.listOf(entry.getKey()));
                } else {
                    this.consumer.seek(entry.getKey(), entry.getValue().longValue());
                }
            }
        }
    }

    @Override // apoc.kafka.consumer.kafka.KafkaEventConsumer
    public void wakeup() {
        this.consumer.wakeup();
    }
}
