/*
 * Decompiled with CFR 0.152.
 */
package haveno.network.p2p.peers;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.proto.network.NetworkEnvelope;
import haveno.network.p2p.BundleOfEnvelopes;
import haveno.network.p2p.NodeAddress;
import haveno.network.p2p.network.Connection;
import haveno.network.p2p.network.NetworkNode;
import haveno.network.p2p.peers.Broadcaster;
import haveno.network.p2p.peers.PeerManager;
import haveno.network.p2p.storage.messages.BroadcastMessage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BroadcastHandler
implements PeerManager.Listener {
    private static final Logger log = LoggerFactory.getLogger(BroadcastHandler.class);
    private static final long BASE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(120L);
    private final NetworkNode networkNode;
    private final PeerManager peerManager;
    @Nullable
    private final ResultHandler resultHandler;
    private final String uid;
    private final AtomicBoolean stopped = new AtomicBoolean();
    private final AtomicBoolean timeoutTriggered = new AtomicBoolean();
    private final AtomicInteger numOfCompletedBroadcasts = new AtomicInteger();
    private final AtomicInteger numOfFailedBroadcasts = new AtomicInteger();
    private final AtomicInteger numPeersForBroadcast = new AtomicInteger();
    @Nullable
    private Timer timeoutTimer;
    private final Set<SettableFuture<Connection>> sendMessageFutures = new CopyOnWriteArraySet<SettableFuture<Connection>>();

    BroadcastHandler(NetworkNode networkNode, PeerManager peerManager, ResultHandler resultHandler) {
        this.networkNode = networkNode;
        this.peerManager = peerManager;
        this.resultHandler = resultHandler;
        this.uid = UUID.randomUUID().toString();
        peerManager.addListener(this);
    }

    public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests, boolean shutDownRequested, ListeningExecutorService executor) {
        int delay;
        if (broadcastRequests.isEmpty()) {
            return;
        }
        ArrayList<Connection> confirmedConnections = new ArrayList<Connection>(this.networkNode.getConfirmedConnections());
        Collections.shuffle(confirmedConnections);
        if (shutDownRequested) {
            delay = 1;
            this.numPeersForBroadcast.set(confirmedConnections.size());
        } else if (this.requestsContainOwnMessage(broadcastRequests)) {
            this.numPeersForBroadcast.set(confirmedConnections.size());
            delay = 50;
        } else {
            this.numPeersForBroadcast.set(Math.min(7, confirmedConnections.size()));
            delay = 100;
        }
        this.setupTimeoutHandler(broadcastRequests, delay, shutDownRequested);
        int iterations = this.numPeersForBroadcast.get();
        for (int i = 0; i < iterations; ++i) {
            long minDelay = (i + 1) * delay;
            long maxDelay = (i + 2) * delay;
            Connection connection = (Connection)confirmedConnections.get(i);
            UserThread.runAfterRandomDelay(() -> {
                if (this.stopped.get()) {
                    return;
                }
                List<Broadcaster.BroadcastRequest> broadcastRequestsForConnection = this.getBroadcastRequestsForConnection(connection, broadcastRequests);
                if (broadcastRequestsForConnection.isEmpty()) {
                    if (this.numPeersForBroadcast.get() > 0) {
                        this.numPeersForBroadcast.decrementAndGet();
                    }
                    this.checkForCompletion();
                    return;
                }
                if (connection.isStopped()) {
                    if (this.numPeersForBroadcast.get() > 0) {
                        this.numPeersForBroadcast.decrementAndGet();
                    }
                    this.checkForCompletion();
                    return;
                }
                try {
                    this.sendToPeer(connection, broadcastRequestsForConnection, executor);
                }
                catch (RejectedExecutionException e) {
                    log.error("RejectedExecutionException at broadcast ", e);
                    this.cleanup();
                }
            }, minDelay, maxDelay, TimeUnit.MILLISECONDS);
        }
    }

    public void cancel() {
        this.cleanup();
    }

    @Override
    public void onAllConnectionsLost() {
        this.cleanup();
    }

    @Override
    public void onNewConnectionAfterAllConnectionsLost() {
    }

    @Override
    public void onAwakeFromStandby() {
    }

    private boolean requestsContainOwnMessage(List<Broadcaster.BroadcastRequest> broadcastRequests) {
        NodeAddress myAddress = this.networkNode.getNodeAddress();
        if (myAddress == null) {
            return false;
        }
        return broadcastRequests.stream().anyMatch(e -> myAddress.equals(e.getSender()));
    }

    private void setupTimeoutHandler(List<Broadcaster.BroadcastRequest> broadcastRequests, int delay, boolean shutDownRequested) {
        long baseTimeoutMs = shutDownRequested ? TimeUnit.SECONDS.toMillis(1L) : BASE_TIMEOUT_MS;
        long timeoutDelay = baseTimeoutMs + (long)(delay * (this.numPeersForBroadcast.get() + 1));
        this.timeoutTimer = UserThread.runAfter(() -> {
            if (this.stopped.get()) {
                return;
            }
            this.timeoutTriggered.set(true);
            this.numOfFailedBroadcasts.incrementAndGet();
            log.warn("Broadcast did not complete after {} sec.\nnumPeersForBroadcast={}\nnumOfCompletedBroadcasts={}\nnumOfFailedBroadcasts={}", (double)timeoutDelay / 1000.0, this.numPeersForBroadcast, this.numOfCompletedBroadcasts, this.numOfFailedBroadcasts);
            this.maybeNotifyListeners(broadcastRequests);
            this.cleanup();
        }, timeoutDelay, TimeUnit.MILLISECONDS);
    }

    private List<Broadcaster.BroadcastRequest> getBroadcastRequestsForConnection(Connection connection, List<Broadcaster.BroadcastRequest> broadcastRequests) {
        return broadcastRequests.stream().filter(broadcastRequest -> !connection.getPeersNodeAddressOptional().isPresent() || !connection.getPeersNodeAddressOptional().get().equals(broadcastRequest.getSender())).filter(broadcastRequest -> connection.testCapability(broadcastRequest.getMessage())).collect(Collectors.toList());
    }

    private void sendToPeer(final Connection connection, final List<Broadcaster.BroadcastRequest> broadcastRequestsForConnection, ListeningExecutorService executor) {
        BroadcastMessage broadcastMessage = this.getMessage(broadcastRequestsForConnection);
        SettableFuture<Connection> future = this.networkNode.sendMessage(connection, (NetworkEnvelope)broadcastMessage, executor);
        this.sendMessageFutures.add(future);
        Futures.addCallback(future, new FutureCallback<Connection>(){

            @Override
            public void onSuccess(Connection connection2) {
                BroadcastHandler.this.numOfCompletedBroadcasts.incrementAndGet();
                if (BroadcastHandler.this.stopped.get()) {
                    return;
                }
                BroadcastHandler.this.maybeNotifyListeners(broadcastRequestsForConnection);
                BroadcastHandler.this.checkForCompletion();
            }

            @Override
            public void onFailure(@NotNull Throwable throwable) {
                if (BroadcastHandler.this.stopped.get()) {
                    return;
                }
                log.warn("Broadcast to " + String.valueOf(connection.getPeersNodeAddressOptional()) + " failed. ", throwable);
                BroadcastHandler.this.numOfFailedBroadcasts.incrementAndGet();
                BroadcastHandler.this.maybeNotifyListeners(broadcastRequestsForConnection);
                BroadcastHandler.this.checkForCompletion();
            }
        }, MoreExecutors.directExecutor());
    }

    private BroadcastMessage getMessage(List<Broadcaster.BroadcastRequest> broadcastRequests) {
        if (broadcastRequests.size() == 1) {
            return broadcastRequests.get(0).getMessage();
        }
        return new BundleOfEnvelopes(broadcastRequests.stream().map(Broadcaster.BroadcastRequest::getMessage).collect(Collectors.toList()));
    }

    private void maybeNotifyListeners(List<Broadcaster.BroadcastRequest> broadcastRequests) {
        int numOfCompletedBroadcastsTarget = Math.max(1, Math.min(this.numPeersForBroadcast.get(), 3));
        if (this.numOfCompletedBroadcasts.get() == numOfCompletedBroadcastsTarget) {
            broadcastRequests.stream().map(Broadcaster.BroadcastRequest::getListener).filter(Objects::nonNull).forEach(listener -> listener.onSufficientlyBroadcast(broadcastRequests));
        } else {
            boolean timeoutAndNotEnoughSucceeded;
            int maxPossibleSuccessCases = this.numPeersForBroadcast.get() - this.numOfFailedBroadcasts.get();
            boolean notEnoughSucceededOrOpen = maxPossibleSuccessCases == numOfCompletedBroadcastsTarget - 1;
            boolean bl = timeoutAndNotEnoughSucceeded = this.timeoutTriggered.get() && this.numOfCompletedBroadcasts.get() < numOfCompletedBroadcastsTarget;
            if (notEnoughSucceededOrOpen || timeoutAndNotEnoughSucceeded) {
                broadcastRequests.stream().map(Broadcaster.BroadcastRequest::getListener).filter(Objects::nonNull).forEach(listener -> listener.onNotSufficientlyBroadcast(this.numOfCompletedBroadcasts.get(), this.numOfFailedBroadcasts.get()));
            }
        }
    }

    private void checkForCompletion() {
        if (this.numOfCompletedBroadcasts.get() + this.numOfFailedBroadcasts.get() == this.numPeersForBroadcast.get()) {
            this.cleanup();
        }
    }

    private void cleanup() {
        if (this.stopped.get()) {
            return;
        }
        this.stopped.set(true);
        if (this.timeoutTimer != null) {
            this.timeoutTimer.stop();
            this.timeoutTimer = null;
        }
        this.sendMessageFutures.stream().filter(future -> !future.isCancelled() && !future.isDone()).forEach(future -> {
            try {
                future.cancel(true);
            }
            catch (Exception e) {
                if (this.networkNode.isShutDownStarted()) {
                    return;
                }
                throw e;
            }
        });
        this.sendMessageFutures.clear();
        this.peerManager.removeListener(this);
        this.resultHandler.onCompleted(this);
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof BroadcastHandler)) {
            return false;
        }
        BroadcastHandler that = (BroadcastHandler)o;
        return this.uid.equals(that.uid);
    }

    public int hashCode() {
        return this.uid.hashCode();
    }

    static interface ResultHandler {
        public void onCompleted(BroadcastHandler var1);
    }

    public static interface Listener {
        public void onSufficientlyBroadcast(List<Broadcaster.BroadcastRequest> var1);

        public void onNotSufficientlyBroadcast(int var1, int var2);
    }
}

