package apoc.kafka.service.errors;

import apoc.kafka.service.errors.ErrorService;
import apoc.kafka.utils.KafkaUtil;
import com.unboundid.ldap.sdk.Version;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.TuplesKt;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.collections.MapsKt;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import kotlin.text.Charsets;
import kotlin.text.StringsKt;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.neo4j.util.VisibleForTesting;

/* compiled from: KafkaErrorService.kt */
@Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��P\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u0012\n\u0002\b\u0003\n\u0002\u0010$\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010 \n\u0002\b\u0002\u0018�� \u00192\u00020\u0001:\u0001\u0019B9\b\u0016\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012 \u0010\u0006\u001a\u001c\u0012\u0004\u0012\u00020\b\u0012\f\u0012\n\u0018\u00010\tj\u0004\u0018\u0001`\n\u0012\u0004\u0012\u00020\u000b0\u0007¢\u0006\u0002\u0010\fBE\u0012\u0014\u0010\r\u001a\u0010\u0012\u0004\u0012\u00020\u000f\u0012\u0004\u0012\u00020\u000f\u0018\u00010\u000e\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012 \u0010\u0006\u001a\u001c\u0012\u0004\u0012\u00020\b\u0012\f\u0012\n\u0018\u00010\tj\u0004\u0018\u0001`\n\u0012\u0004\u0012\u00020\u000b0\u0007¢\u0006\u0002\u0010\u0010J\b\u0010\u0011\u001a\u00020\u000bH\u0016J\u001c\u0010\u0012\u001a\u000e\u0012\u0004\u0012\u00020\b\u0012\u0004\u0012\u00020\u000f0\u00132\u0006\u0010\u0014\u001a\u00020\u0015H\u0007J\u0016\u0010\u0016\u001a\u00020\u000b2\f\u0010\u0017\u001a\b\u0012\u0004\u0012\u00020\u00150\u0018H\u0016R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R(\u0010\u0006\u001a\u001c\u0012\u0004\u0012\u00020\b\u0012\f\u0012\n\u0018\u00010\tj\u0004\u0018\u0001`\n\u0012\u0004\u0012\u00020\u000b0\u0007X\u0082\u0004¢\u0006\u0002\n��R\u001c\u0010\r\u001a\u0010\u0012\u0004\u0012\u00020\u000f\u0012\u0004\u0012\u00020\u000f\u0018\u00010\u000eX\u0082\u0004¢\u0006\u0002\n��¨\u0006\u001a"}, d2 = {"Lapoc/kafka/service/errors/KafkaErrorService;", "Lapoc/kafka/service/errors/ErrorService;", "config", "Ljava/util/Properties;", "errorConfig", "Lapoc/kafka/service/errors/ErrorService$ErrorConfig;", "log", "Lkotlin/Function2;", Version.VERSION_QUALIFIER, "Ljava/lang/Exception;", "Lkotlin/Exception;", Version.VERSION_QUALIFIER, "(Ljava/util/Properties;Lapoc/kafka/service/errors/ErrorService$ErrorConfig;Lkotlin/jvm/functions/Function2;)V", "producer", "Lorg/apache/kafka/clients/producer/Producer;", Version.VERSION_QUALIFIER, "(Lorg/apache/kafka/clients/producer/Producer;Lapoc/kafka/service/errors/ErrorService$ErrorConfig;Lkotlin/jvm/functions/Function2;)V", "close", "populateContextHeaders", Version.VERSION_QUALIFIER, "errorData", "Lapoc/kafka/service/errors/ErrorData;", "report", "errorDatas", Version.VERSION_QUALIFIER, "Companion", "apoc"})
@SourceDebugExtension({"SMAP\nKafkaErrorService.kt\nKotlin\n*S Kotlin\n*F\n+ 1 KafkaErrorService.kt\napoc/kafka/service/errors/KafkaErrorService\n+ 2 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n+ 3 _Maps.kt\nkotlin/collections/MapsKt___MapsKt\n*L\n1#1,97:1\n1855#2,2:98\n1549#2:100\n1620#2,3:101\n1855#2,2:104\n1855#2:106\n1856#2:109\n215#3,2:107\n*S KotlinDebug\n*F\n+ 1 KafkaErrorService.kt\napoc/kafka/service/errors/KafkaErrorService\n*L\n40#1:98,2\n42#1:100\n42#1:101,3\n42#1:104,2\n46#1:106\n46#1:109\n56#1:107,2\n*E\n"})
/* loaded from: input_file:apoc/kafka/service/errors/KafkaErrorService.class */
public final class KafkaErrorService extends ErrorService {

    @NotNull
    public static final Companion Companion = new Companion(null);

    @Nullable
    private final Producer<byte[], byte[]> producer;

    @NotNull
    private final ErrorService.ErrorConfig errorConfig;

    @NotNull
    private final Function2<String, Exception, Unit> log;

    /* compiled from: KafkaErrorService.kt */
    @Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��8\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u0012\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u0002\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002JH\u0010\u0003\u001a\u0010\u0012\u0004\u0012\u00020\u0005\u0012\u0004\u0012\u00020\u0005\u0018\u00010\u00042\u0006\u0010\u0006\u001a\u00020\u00072\u0006\u0010\b\u001a\u00020\t2 \u0010\n\u001a\u001c\u0012\u0004\u0012\u00020\f\u0012\f\u0012\n\u0018\u00010\rj\u0004\u0018\u0001`\u000e\u0012\u0004\u0012\u00020\u000f0\u000bH\u0002¨\u0006\u0010"}, d2 = {"Lapoc/kafka/service/errors/KafkaErrorService$Companion;", Version.VERSION_QUALIFIER, "()V", "producer", "Lorg/apache/kafka/clients/producer/KafkaProducer;", Version.VERSION_QUALIFIER, "errorConfig", "Lapoc/kafka/service/errors/ErrorService$ErrorConfig;", "config", "Ljava/util/Properties;", "log", "Lkotlin/Function2;", Version.VERSION_QUALIFIER, "Ljava/lang/Exception;", "Lkotlin/Exception;", Version.VERSION_QUALIFIER, "apoc"})
    /* loaded from: input_file:apoc/kafka/service/errors/KafkaErrorService$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public final KafkaProducer<byte[], byte[]> producer(ErrorService.ErrorConfig errorConfig, Properties properties, Function2<? super String, ? super Exception, Unit> function2) {
            KafkaProducer<byte[], byte[]> kafkaProducer;
            if (errorConfig.getDlqTopic() == null) {
                return null;
            }
            try {
                KafkaUtil.INSTANCE.validateConnection(properties.getOrDefault("bootstrap.servers", Version.VERSION_QUALIFIER).toString(), "bootstrap.servers", false);
                properties.put("key.serializer", ByteArraySerializer.class.getName());
                properties.put("value.serializer", ByteArraySerializer.class.getName());
                kafkaProducer = new KafkaProducer<>(properties);
            } catch (Exception e) {
                function2.invoke("Cannot initialize the custom DLQ because of the following exception: ", e);
                kafkaProducer = null;
            }
            return kafkaProducer;
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    /* JADX WARN: Multi-variable type inference failed */
    public KafkaErrorService(@Nullable Producer<byte[], byte[]> producer, @NotNull ErrorService.ErrorConfig errorConfig, @NotNull Function2<? super String, ? super Exception, Unit> function2) {
        super(null, 1, null);
        Intrinsics.checkNotNullParameter(errorConfig, "errorConfig");
        Intrinsics.checkNotNullParameter(function2, "log");
        this.producer = producer;
        this.errorConfig = errorConfig;
        this.log = function2;
    }

    /* JADX WARN: 'this' call moved to the top of the method (can break code semantics) */
    public KafkaErrorService(@NotNull Properties properties, @NotNull ErrorService.ErrorConfig errorConfig, @NotNull Function2<? super String, ? super Exception, Unit> function2) {
        this((Producer<byte[], byte[]>) Companion.producer(errorConfig, properties, function2), errorConfig, function2);
        Intrinsics.checkNotNullParameter(properties, "config");
        Intrinsics.checkNotNullParameter(errorConfig, "errorConfig");
        Intrinsics.checkNotNullParameter(function2, "log");
    }

    @Override // apoc.kafka.service.errors.ErrorService
    public void report(@NotNull List<ErrorData> list) {
        Intrinsics.checkNotNullParameter(list, "errorDatas");
        if (this.errorConfig.getFail()) {
            throw new ProcessingError(list);
        }
        if (this.errorConfig.getLog()) {
            if (this.errorConfig.getLogMessages()) {
                for (ErrorData errorData : list) {
                    this.log.invoke(errorData.toLogString(), errorData.getException());
                }
            } else {
                List<ErrorData> list2 = list;
                ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(list2, 10));
                Iterator<T> it = list2.iterator();
                while (it.hasNext()) {
                    arrayList.add(((ErrorData) it.next()).getException());
                }
                Iterator it2 = CollectionsKt.distinct(arrayList).iterator();
                while (it2.hasNext()) {
                    this.log.invoke("Error processing " + list.size() + " messages", (Exception) it2.next());
                }
            }
        }
        for (ErrorData errorData2 : list) {
            Producer<byte[], byte[]> producer = this.producer;
            if (producer != null) {
                try {
                    ProducerRecord producerRecord = errorData2.getTimestamp() == -1 ? new ProducerRecord(this.errorConfig.getDlqTopic(), (Integer) null, errorData2.getKey(), errorData2.getValue()) : new ProducerRecord(this.errorConfig.getDlqTopic(), (Integer) null, Long.valueOf(errorData2.getTimestamp()), errorData2.getKey(), errorData2.getValue());
                    if (this.errorConfig.getDlqHeaders()) {
                        Headers headers = producerRecord.headers();
                        for (Map.Entry<String, byte[]> entry : populateContextHeaders(errorData2).entrySet()) {
                            headers.add(entry.getKey(), entry.getValue());
                        }
                    }
                    producer.send(producerRecord);
                } catch (Exception e) {
                    this.log.invoke("Error writing to DLQ " + e + ": " + errorData2.toLogString(), e);
                }
            }
        }
    }

    @VisibleForTesting
    @NotNull
    public final Map<String, byte[]> populateContextHeaders(@NotNull ErrorData errorData) {
        Intrinsics.checkNotNullParameter(errorData, "errorData");
        String populateContextHeaders$prefix = populateContextHeaders$prefix(this, "topic");
        byte[] bytes = errorData.getOriginalTopic().getBytes(Charsets.UTF_8);
        Intrinsics.checkNotNullExpressionValue(bytes, "this as java.lang.String).getBytes(charset)");
        String populateContextHeaders$prefix2 = populateContextHeaders$prefix(this, "partition");
        byte[] bytes2 = errorData.getPartition().getBytes(Charsets.UTF_8);
        Intrinsics.checkNotNullExpressionValue(bytes2, "this as java.lang.String).getBytes(charset)");
        String populateContextHeaders$prefix3 = populateContextHeaders$prefix(this, "offset");
        byte[] bytes3 = errorData.getOffset().getBytes(Charsets.UTF_8);
        Intrinsics.checkNotNullExpressionValue(bytes3, "this as java.lang.String).getBytes(charset)");
        Map<String, byte[]> mutableMapOf = MapsKt.mutableMapOf(new Pair[]{TuplesKt.to(populateContextHeaders$prefix, bytes), TuplesKt.to(populateContextHeaders$prefix2, bytes2), TuplesKt.to(populateContextHeaders$prefix3, bytes3)});
        String databaseName = errorData.getDatabaseName();
        if (!(databaseName == null || StringsKt.isBlank(databaseName))) {
            String populateContextHeaders$prefix4 = populateContextHeaders$prefix(this, "databaseName");
            byte[] bytes4 = errorData.getDatabaseName().getBytes(Charsets.UTF_8);
            Intrinsics.checkNotNullExpressionValue(bytes4, "this as java.lang.String).getBytes(charset)");
            mutableMapOf.put(populateContextHeaders$prefix4, bytes4);
        }
        if (errorData.getExecutingClass() != null) {
            String populateContextHeaders$prefix5 = populateContextHeaders$prefix(this, "class.name");
            String name = errorData.getExecutingClass().getName();
            Intrinsics.checkNotNullExpressionValue(name, "getName(...)");
            byte[] bytes5 = name.getBytes(Charsets.UTF_8);
            Intrinsics.checkNotNullExpressionValue(bytes5, "this as java.lang.String).getBytes(charset)");
            mutableMapOf.put(populateContextHeaders$prefix5, bytes5);
        }
        if (errorData.getException() != null) {
            String populateContextHeaders$prefix6 = populateContextHeaders$prefix(this, "exception.class.name");
            String name2 = errorData.getException().getClass().getName();
            Intrinsics.checkNotNullExpressionValue(name2, "getName(...)");
            byte[] bytes6 = name2.getBytes(Charsets.UTF_8);
            Intrinsics.checkNotNullExpressionValue(bytes6, "this as java.lang.String).getBytes(charset)");
            mutableMapOf.put(populateContextHeaders$prefix6, bytes6);
            if (errorData.getException().getMessage() != null) {
                String populateContextHeaders$prefix7 = populateContextHeaders$prefix(this, "exception.message");
                byte[] bytes7 = String.valueOf(errorData.getException().getMessage()).getBytes(Charsets.UTF_8);
                Intrinsics.checkNotNullExpressionValue(bytes7, "this as java.lang.String).getBytes(charset)");
                mutableMapOf.put(populateContextHeaders$prefix7, bytes7);
            }
            String populateContextHeaders$prefix8 = populateContextHeaders$prefix(this, "exception.stacktrace");
            String stackTrace = ExceptionUtils.getStackTrace(errorData.getException());
            Intrinsics.checkNotNullExpressionValue(stackTrace, "getStackTrace(...)");
            byte[] bytes8 = stackTrace.getBytes(Charsets.UTF_8);
            Intrinsics.checkNotNullExpressionValue(bytes8, "this as java.lang.String).getBytes(charset)");
            mutableMapOf.put(populateContextHeaders$prefix8, bytes8);
        }
        return mutableMapOf;
    }

    @Override // apoc.kafka.service.errors.ErrorService
    public void close() {
        Producer<byte[], byte[]> producer = this.producer;
        if (producer != null) {
            producer.close();
        }
    }

    private static final String populateContextHeaders$prefix(KafkaErrorService kafkaErrorService, String str) {
        return kafkaErrorService.errorConfig.getDlqHeaderPrefix() + str;
    }
}
