package apoc.cypher;

import apoc.Extended;
import apoc.Pools;
import apoc.result.MapResult;
import apoc.util.FileUtils;
import apoc.util.MapUtil;
import apoc.util.QueueBasedSpliterator;
import apoc.util.Util;
import apoc.util.collection.Iterators;
import java.io.IOException;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.QueryStatistics;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Mode;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;

@Extended
/* loaded from: input_file:apoc/cypher/CypherExtended.class */
public class CypherExtended {
    public static final String COMPILED_PREFIX = "CYPHER runtime=interpreted";
    public static final int MAX_BATCH = 10000;

    @Context
    public Transaction tx;

    @Context
    public GraphDatabaseService db;

    @Context
    public Log log;

    @Context
    public TerminationGuard terminationGuard;

    @Context
    public Pools pools;
    public static final int PARTITIONS = 100 * Runtime.getRuntime().availableProcessors();
    private static final Pattern shellControl = Pattern.compile("^:?\\b(begin|commit|rollback)\\b", 2);

    /* loaded from: input_file:apoc/cypher/CypherExtended$RowResult.class */
    public static class RowResult {
        public static final RowResult TOMBSTONE = new RowResult(-1, null);
        public long row;
        public Map<String, Object> result;

        public RowResult(long j, Map<String, Object> map) {
            this.row = j;
            this.result = map;
        }
    }

    @Procedure(mode = Mode.WRITE)
    @Description("apoc.cypher.runFile(file or url,[{statistics:true,timeout:10,parameters:{}}]) - runs each statement in the file, all semicolon separated - currently no schema operations")
    public Stream<RowResult> runFile(@Name("file") String str, @Name(value = "config", defaultValue = "{}") Map<String, Object> map) {
        return runFiles(Collections.singletonList(str), map);
    }

    @Procedure(mode = Mode.WRITE)
    @Description("apoc.cypher.runFiles([files or urls],[{statistics:true,timeout:10,parameters:{}}])) - runs each statement in the files, all semicolon separated")
    public Stream<RowResult> runFiles(@Name("file") List<String> list, @Name(value = "config", defaultValue = "{}") Map<String, Object> map) {
        return runFiles(list, map, (Map) map.getOrDefault("parameters", Collections.emptyMap()), false);
    }

    private Stream<RowResult> runFiles(List<String> list, Map<String, Object> map, Map<String, Object> map2, boolean z) {
        boolean z2 = Util.toBoolean(map.getOrDefault("statistics", true));
        int intValue = Util.toInteger(map.getOrDefault("timeout", 10)).intValue();
        int intValue2 = Util.toInteger(map.getOrDefault("queueCapacity", 100)).intValue();
        return list.stream().flatMap(str -> {
            Scanner createScannerFor = createScannerFor(readerForFile(str));
            return (Stream) runManyStatements(createScannerFor, map2, z, z2, intValue, intValue2).onClose(() -> {
                Util.close(createScannerFor, exc -> {
                    this.log.info("Cannot close the scanner for file " + str + " because the following exception", exc);
                });
            });
        });
    }

    @Procedure(mode = Mode.SCHEMA)
    @Description("apoc.cypher.runSchemaFile(file or url,[{statistics:true,timeout:10}]) - allows only schema operations, runs each schema statement in the file, all semicolon separated")
    public Stream<RowResult> runSchemaFile(@Name("file") String str, @Name(value = "config", defaultValue = "{}") Map<String, Object> map) {
        return runSchemaFiles(Collections.singletonList(str), map);
    }

    @Procedure(mode = Mode.SCHEMA)
    @Description("apoc.cypher.runSchemaFiles([files or urls],{statistics:true,timeout:10}) - allows only schema operations, runs each schema statement in the files, all semicolon separated")
    public Stream<RowResult> runSchemaFiles(@Name("file") List<String> list, @Name(value = "config", defaultValue = "{}") Map<String, Object> map) {
        return runFiles(list, map, Collections.emptyMap(), true);
    }

    private Stream<RowResult> runManyStatements(Scanner scanner, Map<String, Object> map, boolean z, boolean z2, int i, int i2) {
        return StreamSupport.stream(new QueueBasedSpliterator(runInSeparateThreadAndSendTombstone(i2, blockingQueue -> {
            if (z) {
                runSchemaStatementsInTx(scanner, blockingQueue, map, z2, i);
            } else {
                runDataStatementsInTx(scanner, blockingQueue, map, z2, i);
            }
        }, RowResult.TOMBSTONE), RowResult.TOMBSTONE, this.terminationGuard, Integer.MAX_VALUE), false);
    }

    private <T> BlockingQueue<T> runInSeparateThreadAndSendTombstone(int i, Consumer<BlockingQueue<T>> consumer, T t) {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(i);
        Util.newDaemonThread(() -> {
            try {
                consumer.accept(arrayBlockingQueue);
                while (true) {
                    try {
                        arrayBlockingQueue.put(t);
                        return;
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            } catch (Throwable th) {
                while (true) {
                    try {
                        arrayBlockingQueue.put(t);
                        return;
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }).start();
        return arrayBlockingQueue;
    }

    private void runDataStatementsInTx(Scanner scanner, BlockingQueue<RowResult> blockingQueue, Map<String, Object> map, boolean z, long j) {
        while (scanner.hasNext()) {
            String removeShellControlCommands = removeShellControlCommands(scanner.next());
            if (!removeShellControlCommands.trim().isEmpty() && !isSchemaOperation(removeShellControlCommands)) {
                if (isPeriodicOperation(removeShellControlCommands)) {
                    Util.inThread(this.pools, () -> {
                        return this.db.executeTransactionally(removeShellControlCommands, map, result -> {
                            return consumeResult(result, blockingQueue, z, j);
                        });
                    });
                } else {
                    Util.inTx(this.db, this.pools, transaction -> {
                        Result execute = transaction.execute(removeShellControlCommands, map);
                        try {
                            Object consumeResult = consumeResult(execute, blockingQueue, z, j);
                            if (execute != null) {
                                execute.close();
                            }
                            return consumeResult;
                        } catch (Throwable th) {
                            if (execute != null) {
                                try {
                                    execute.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    });
                }
            }
        }
    }

    private Scanner createScannerFor(Reader reader) {
        Scanner scanner = new Scanner(reader);
        scanner.useDelimiter(";\r?\n");
        return scanner;
    }

    private void runSchemaStatementsInTx(Scanner scanner, BlockingQueue<RowResult> blockingQueue, Map<String, Object> map, boolean z, long j) {
        while (scanner.hasNext()) {
            String removeShellControlCommands = removeShellControlCommands(scanner.next());
            if (!removeShellControlCommands.trim().isEmpty() && isSchemaOperation(removeShellControlCommands)) {
                Util.inTx(this.db, this.pools, transaction -> {
                    Result execute = transaction.execute(removeShellControlCommands, map);
                    try {
                        Object consumeResult = consumeResult(execute, blockingQueue, z, j);
                        if (execute != null) {
                            execute.close();
                        }
                        return consumeResult;
                    } catch (Throwable th) {
                        if (execute != null) {
                            try {
                                execute.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                });
            }
        }
    }

    private Object consumeResult(Result result, BlockingQueue<RowResult> blockingQueue, boolean z, long j) {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            int i = 0;
            while (result.hasNext()) {
                this.terminationGuard.check();
                int i2 = i;
                i++;
                blockingQueue.put(new RowResult(i2, result.next()));
            }
            if (z) {
                blockingQueue.put(new RowResult(-1L, toMap(result.getQueryStatistics(), System.currentTimeMillis() - currentTimeMillis, i)));
            }
            return Integer.valueOf(i);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private String removeShellControlCommands(String str) {
        Matcher matcher = shellControl.matcher(str.trim());
        return matcher.find() ? removeShellControlCommands(matcher.replaceAll("")) : str;
    }

    private boolean isSchemaOperation(String str) {
        return str.matches("(?is).*(create|drop)\\s+(index|constraint).*");
    }

    private boolean isPeriodicOperation(String str) {
        return str.matches("(?is).*using\\s+periodic.*") || str.matches("(?is).*in\\s+transactions.*");
    }

    private Map<String, Object> toMap(QueryStatistics queryStatistics, long j, long j2) {
        return MapUtil.map("rows", Long.valueOf(j2), "time", Long.valueOf(j), "nodesCreated", Integer.valueOf(queryStatistics.getNodesCreated()), "nodesDeleted", Integer.valueOf(queryStatistics.getNodesDeleted()), "labelsAdded", Integer.valueOf(queryStatistics.getLabelsAdded()), "labelsRemoved", Integer.valueOf(queryStatistics.getLabelsRemoved()), "relationshipsCreated", Integer.valueOf(queryStatistics.getRelationshipsCreated()), "relationshipsDeleted", Integer.valueOf(queryStatistics.getRelationshipsDeleted()), "propertiesSet", Integer.valueOf(queryStatistics.getPropertiesSet()), "constraintsAdded", Integer.valueOf(queryStatistics.getConstraintsAdded()), "constraintsRemoved", Integer.valueOf(queryStatistics.getConstraintsRemoved()), "indexesAdded", Integer.valueOf(queryStatistics.getIndexesAdded()), "indexesRemoved", Integer.valueOf(queryStatistics.getIndexesRemoved()));
    }

    private Reader readerForFile(@Name("file") String str) {
        try {
            return FileUtils.readerFor(str);
        } catch (IOException e) {
            throw new RuntimeException("Error accessing file " + str, e);
        }
    }

    public static String withParamMapping(String str, Collection<String> collection) {
        return collection.isEmpty() ? str : (" WITH " + String.join(", ", (Iterable<? extends CharSequence>) collection.stream().map(str2 -> {
            return String.format(" $`%s` as `%s` ", str2, str2);
        }).collect(Collectors.toList()))) + str;
    }

    public static String compiled(String str) {
        return str.substring(0, 6).equalsIgnoreCase("cypher") ? str : "CYPHER runtime=interpreted" + str;
    }

    @Procedure
    @Description("apoc.cypher.parallel(fragment, `paramMap`, `keyList`) yield value - executes fragments in parallel through a list defined in `paramMap` with a key `keyList`")
    public Stream<MapResult> parallel(@Name("fragment") String str, @Name("params") Map<String, Object> map, @Name("parallelizeOn") String str2) {
        if (map == null) {
            return CypherUtils.runCypherQuery(this.tx, str, map);
        }
        if (str2 == null || !map.containsKey(str2)) {
            throw new RuntimeException("Can't parallelize on key " + str2 + " available keys " + map.keySet());
        }
        Object obj = map.get(str2);
        if (!(obj instanceof Collection)) {
            throw new RuntimeException("Can't parallelize a non collection " + str2 + " : " + obj);
        }
        String withParamMapping = withParamMapping(str, map.keySet());
        return ((Collection) obj).parallelStream().flatMap(obj2 -> {
            this.terminationGuard.check();
            HashMap hashMap = new HashMap(map);
            hashMap.replace(str2, obj2);
            return this.tx.execute(withParamMapping, hashMap).stream().map(MapResult::new);
        });
    }

    @Procedure
    @Description("apoc.cypher.mapParallel(fragment, params, list-to-parallelize) yield value - executes fragment in parallel batches with the list segments being assigned to _")
    public Stream<MapResult> mapParallel(@Name("fragment") String str, @Name("params") Map<String, Object> map, @Name("list") List<Object> list) {
        String withParamsAndIterator = withParamsAndIterator(str, map.keySet(), "_");
        this.tx.execute("EXPLAIN " + withParamsAndIterator).close();
        return Util.partitionSubList(list, PARTITIONS, null).flatMap(list2 -> {
            return Iterators.asList(this.tx.execute(withParamsAndIterator, parallelParams(map, "_", list2))).stream();
        }).map(MapResult::new);
    }

    @Procedure
    @Description("apoc.cypher.mapParallel2(fragment, params, list-to-parallelize) yield value - executes fragment in parallel batches with the list segments being assigned to _")
    public Stream<MapResult> mapParallel2(@Name("fragment") String str, @Name("params") Map<String, Object> map, @Name("list") List<Object> list, @Name("partitions") long j, @Name(value = "timeout", defaultValue = "10") long j2) {
        String withParamsAndIterator = withParamsAndIterator(str, map.keySet(), "_");
        this.tx.execute("EXPLAIN " + withParamsAndIterator).close();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(100000);
        Stream<List<Object>> partitionSubList = Util.partitionSubList(list, (int) (j <= 0 ? PARTITIONS : j), null);
        Util.inFuture(this.pools, () -> {
            long count = partitionSubList.map(list2 -> {
                try {
                    Transaction beginTx = this.db.beginTx();
                    try {
                        Result execute = beginTx.execute(withParamsAndIterator, parallelParams(map, "_", list2));
                        try {
                            Object consumeResult = consumeResult(execute, arrayBlockingQueue, false, j2);
                            if (execute != null) {
                                execute.close();
                            }
                            if (beginTx != null) {
                                beginTx.close();
                            }
                            return consumeResult;
                        } catch (Throwable th) {
                            if (execute != null) {
                                try {
                                    execute.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    } finally {
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }).count();
            arrayBlockingQueue.put(RowResult.TOMBSTONE);
            return Long.valueOf(count);
        });
        return StreamSupport.stream(new QueueBasedSpliterator(arrayBlockingQueue, RowResult.TOMBSTONE, this.terminationGuard, (int) j2), true).map(rowResult -> {
            return new MapResult(rowResult.result);
        });
    }

    public Map<String, Object> parallelParams(@Name("params") Map<String, Object> map, String str, List<Object> list) {
        if (map.isEmpty()) {
            return Collections.singletonMap(str, list);
        }
        HashMap hashMap = new HashMap(map);
        hashMap.put(str, list);
        return hashMap;
    }

    @Procedure
    @Description("apoc.cypher.parallel2(fragment, `paramMap`, `keyList`) yield value - executes fragments in parallel batches through a list defined in `paramMap` with a key `keyList`")
    public Stream<MapResult> parallel2(@Name("fragment") String str, @Name("params") Map<String, Object> map, @Name("parallelizeOn") String str2) {
        if (map == null) {
            return CypherUtils.runCypherQuery(this.tx, str, map);
        }
        if (StringUtils.isEmpty(str2) || !map.containsKey(str2)) {
            throw new RuntimeException("Can't parallelize on key " + str2 + " available keys " + map.keySet() + ". Note that parallelizeOn parameter must be not empty");
        }
        Object obj = map.get(str2);
        if (!(obj instanceof Collection)) {
            throw new RuntimeException("Can't parallelize a non collection " + str2 + " : " + obj);
        }
        String withParamsAndIterator = withParamsAndIterator(str, map.keySet(), str2);
        this.tx.execute("EXPLAIN " + withParamsAndIterator).close();
        Collection collection = (Collection) obj;
        int size = collection.size();
        int i = PARTITIONS;
        int max = Math.max(size / i, 1);
        if (max > 10000) {
            max = 10000;
            i = (size / 10000) + 1;
        }
        ArrayList arrayList = new ArrayList(i);
        ArrayList arrayList2 = new ArrayList(max);
        Iterator it = collection.iterator();
        while (it.hasNext()) {
            arrayList2.add(it.next());
            if (arrayList2.size() == max) {
                arrayList.add(submit(this.db, withParamsAndIterator, map, str2, arrayList2, this.terminationGuard));
                arrayList2 = new ArrayList(max);
            }
        }
        if (!arrayList2.isEmpty()) {
            arrayList.add(submit(this.db, withParamsAndIterator, map, str2, arrayList2, this.terminationGuard));
        }
        return arrayList.stream().flatMap(future -> {
            try {
                return ((List) future.get()).stream().map(MapResult::new);
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException("Error executing in parallel " + withParamsAndIterator, e);
            }
        });
    }

    public static String withParamsAndIterator(String str, Collection<String> collection, String str2) {
        return Util.withMapping(collection.stream().filter(str3 -> {
            return !str3.equals(str2);
        }), str4 -> {
            return Util.param(str4) + " AS " + Util.quote(str4);
        }) + " UNWIND " + Util.param(str2) + " AS " + Util.quote(str2) + " " + str;
    }

    private Future<List<Map<String, Object>>> submit(GraphDatabaseService graphDatabaseService, String str, Map<String, Object> map, String str2, List<Object> list, TerminationGuard terminationGuard) {
        return this.pools.getDefaultExecutorService().submit(() -> {
            terminationGuard.check();
            return (List) graphDatabaseService.executeTransactionally(str, parallelParams(map, str2, list), result -> {
                return Iterators.asList(result);
            });
        });
    }
}
