package com.couchbase.client.core.transaction.util;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.error.transaction.AttemptExpiredException;
import com.couchbase.client.core.error.transaction.TransactionOperationFailedException;
import com.couchbase.client.core.transaction.AccessorUtil;
import com.couchbase.client.core.transaction.CoreTransactionAttemptContext;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

@Stability.Internal
/* loaded from: input_file:com/couchbase/client/core/transaction/util/ReactiveWaitGroup.class */
public class ReactiveWaitGroup {
    private final CoreTransactionAttemptContext ctx;
    private final ArrayList<Waiter> waiting = new ArrayList<>();
    private final boolean debugMode;

    /* loaded from: input_file:com/couchbase/client/core/transaction/util/ReactiveWaitGroup$Waiter.class */
    public static class Waiter {
        public final Sinks.One<Void> notifier = Sinks.one();
        public final String dbg;

        public Waiter(String str) {
            this.dbg = (String) Objects.requireNonNull(str);
        }
    }

    public ReactiveWaitGroup(CoreTransactionAttemptContext coreTransactionAttemptContext, boolean z) {
        this.debugMode = z;
        this.ctx = (CoreTransactionAttemptContext) Objects.requireNonNull(coreTransactionAttemptContext);
    }

    public synchronized int waitingCount() {
        return this.waiting.size();
    }

    public Mono<Waiter> add(String str) {
        return Mono.defer(() -> {
            Waiter waiter = new Waiter(str);
            synchronized (this) {
                this.waiting.add(waiter);
                if (this.debugMode) {
                    this.ctx.logger().info(this.ctx.attemptId(), String.format("WG: adding [%s], %d now in waiting", str, Integer.valueOf(this.waiting.size())));
                }
            }
            return Mono.just(waiter);
        });
    }

    public Mono<Void> done(Waiter waiter) {
        return Mono.defer(() -> {
            Sinks.One<Void> one = null;
            synchronized (this) {
                if (this.waiting.remove(waiter)) {
                    if (this.debugMode) {
                        this.ctx.logger().info(this.ctx.attemptId(), String.format("WG: [%s] is done, %d now in waiting", waiter.dbg, Integer.valueOf(this.waiting.size())));
                    }
                    one = waiter.notifier;
                } else if (this.debugMode) {
                    this.ctx.logger().info(this.ctx.attemptId(), String.format("WG: wanted to remove [%s] from waiters but it's not in there", waiter.dbg));
                }
            }
            if (one != null) {
                one.tryEmitValue((Object) null).orThrow();
            }
            return Mono.empty();
        });
    }

    public Mono<Void> await(Duration duration) {
        return Mono.defer(() -> {
            ArrayList arrayList;
            synchronized (this) {
                arrayList = new ArrayList(this.waiting);
            }
            return Flux.merge((Iterable) arrayList.stream().map(waiter -> {
                return waiter.notifier.asMono();
            }).collect(Collectors.toList())).timeout(duration).publishOn(this.ctx.scheduler()).onErrorResume(th -> {
                if (!(th instanceof TimeoutException)) {
                    return Mono.error(th);
                }
                String format = String.format("Attempt expired while waiting for %d - %s", Integer.valueOf(arrayList.size()), arrayList.stream().map(waiter2 -> {
                    return waiter2.dbg;
                }).collect(Collectors.joining(",")));
                if (this.debugMode) {
                    this.ctx.logger().info(this.ctx.attemptId(), format);
                }
                return Mono.error(AccessorUtil.operationFailed(this.ctx, TransactionOperationFailedException.Builder.createError().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED).doNotRollbackAttempt().cause(new AttemptExpiredException(format, th)).build()));
            }).then();
        });
    }
}
