/*
 * Decompiled with CFR 0.152.
 */
package haveno.network.p2p.network;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.Inject;
import com.google.protobuf.InvalidProtocolBufferException;
import haveno.common.Proto;
import haveno.common.ThreadUtils;
import haveno.common.app.Capabilities;
import haveno.common.app.Capability;
import haveno.common.app.HasCapabilities;
import haveno.common.app.Version;
import haveno.common.config.Config;
import haveno.common.proto.ProtobufferException;
import haveno.common.proto.network.NetworkProtoResolver;
import haveno.common.util.SingleThreadExecutorUtils;
import haveno.common.util.Utilities;
import haveno.network.p2p.BundleOfEnvelopes;
import haveno.network.p2p.CloseConnectionMessage;
import haveno.network.p2p.ExtendedDataSizePermission;
import haveno.network.p2p.NodeAddress;
import haveno.network.p2p.SendersNodeAddressMessage;
import haveno.network.p2p.SupportedCapabilitiesMessage;
import haveno.network.p2p.network.BanFilter;
import haveno.network.p2p.network.CloseConnectionReason;
import haveno.network.p2p.network.ConnectionListener;
import haveno.network.p2p.network.ConnectionState;
import haveno.network.p2p.network.ConnectionStatistics;
import haveno.network.p2p.network.InboundConnection;
import haveno.network.p2p.network.MessageListener;
import haveno.network.p2p.network.ProtoOutputStream;
import haveno.network.p2p.network.RuleViolation;
import haveno.network.p2p.network.Statistic;
import haveno.network.p2p.network.SupportedCapabilitiesListener;
import haveno.network.p2p.peers.keepalive.messages.KeepAliveMessage;
import haveno.network.p2p.storage.P2PDataStorage;
import haveno.network.p2p.storage.messages.AddDataMessage;
import haveno.network.p2p.storage.messages.AddPersistableNetworkPayloadMessage;
import haveno.network.p2p.storage.messages.RemoveDataMessage;
import haveno.network.p2p.storage.payload.CapabilityRequiringPayload;
import haveno.network.p2p.storage.payload.PersistableNetworkPayload;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InvalidClassException;
import java.io.OptionalDataException;
import java.io.StreamCorruptedException;
import java.lang.ref.WeakReference;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javafx.beans.property.ObjectProperty;
import javafx.beans.property.SimpleObjectProperty;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import protobuf.NetworkEnvelope;

public class Connection
implements HasCapabilities,
Runnable,
MessageListener {
    private static final Logger log = LoggerFactory.getLogger(Connection.class);
    @Inject
    @Nullable
    private static Config config;
    private static final int PERMITTED_MESSAGE_SIZE = 204800;
    private static final int MAX_PERMITTED_MESSAGE_SIZE = 0xA00000;
    private static final int SOCKET_TIMEOUT;
    private static final int SHUTDOWN_TIMEOUT = 100;
    private static final String THREAD_ID;
    private final Socket socket;
    private final ConnectionListener connectionListener;
    @Nullable
    private final BanFilter banFilter;
    private final String uid;
    private final ExecutorService executorService;
    private final Statistic statistic;
    private final ConnectionState connectionState;
    private final ConnectionStatistics connectionStatistics;
    private ProtoOutputStream protoOutputStream;
    private Optional<NodeAddress> peersNodeAddressOptional = Optional.empty();
    private volatile boolean stopped;
    private final ObjectProperty<NodeAddress> peersNodeAddressProperty = new SimpleObjectProperty<NodeAddress>();
    private final List<Long> messageTimeStamps = new ArrayList<Long>();
    private final CopyOnWriteArraySet<MessageListener> messageListeners = new CopyOnWriteArraySet();
    private volatile long lastSendTimeStamp = 0L;
    private final CopyOnWriteArraySet<WeakReference<SupportedCapabilitiesListener>> capabilitiesListeners = new CopyOnWriteArraySet();
    private RuleViolation ruleViolation;
    private final ConcurrentHashMap<RuleViolation, Integer> ruleViolations = new ConcurrentHashMap();
    private final Capabilities capabilities = new Capabilities(new Capability[0]);
    private static final long LOG_THROTTLE_INTERVAL_MS = 30000L;
    private static long lastLoggedInvalidRequestReportTs;
    private static int numThrottledInvalidRequestReports;
    private static long lastLoggedWarningTs;
    private static int numThrottledWarnings;
    private static long lastLoggedInfoTs;
    private static int numThrottledInfos;
    private InputStream protoInputStream;
    private final NetworkProtoResolver networkProtoResolver;
    private long lastReadTimeStamp;
    private boolean threadNameSet;

    public static int getPermittedMessageSize() {
        return 204800;
    }

    public static int getMaxPermittedMessageSize() {
        return 0xA00000;
    }

    public static int getShutdownTimeout() {
        return 100;
    }

    Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener, @Nullable NodeAddress peersNodeAddress, NetworkProtoResolver networkProtoResolver, @Nullable BanFilter banFilter) {
        this.socket = socket;
        this.connectionListener = connectionListener;
        this.banFilter = banFilter;
        this.uid = UUID.randomUUID().toString();
        this.executorService = SingleThreadExecutorUtils.getSingleThreadExecutor("Executor service for connection with uid " + this.uid);
        this.statistic = new Statistic();
        this.addMessageListener(messageListener);
        this.networkProtoResolver = networkProtoResolver;
        this.connectionState = new ConnectionState(this);
        this.connectionStatistics = new ConnectionStatistics(this, this.connectionState);
        this.init(peersNodeAddress);
    }

    private void init(@Nullable NodeAddress peersNodeAddress) {
        try {
            this.socket.setSoTimeout(SOCKET_TIMEOUT);
            this.protoOutputStream = new ProtoOutputStream(this.socket.getOutputStream(), this.statistic);
            this.protoInputStream = this.socket.getInputStream();
            this.executorService.submit(this);
            if (peersNodeAddress != null) {
                this.setPeersNodeAddress(peersNodeAddress);
                if (this.banFilter != null && this.banFilter.isPeerBanned(peersNodeAddress)) {
                    this.reportInvalidRequest(RuleViolation.PEER_BANNED, "We created an outbound connection with a banned peer");
                }
            }
            ThreadUtils.execute(() -> this.connectionListener.onConnection(this), THREAD_ID);
        }
        catch (Throwable e) {
            this.handleException(e);
        }
    }

    @Override
    public Capabilities getCapabilities() {
        return this.capabilities;
    }

    void sendMessage(haveno.common.proto.network.NetworkEnvelope networkEnvelope) {
        long ts = System.currentTimeMillis();
        log.debug(">> Send networkEnvelope of type: {}", (Object)networkEnvelope.getClass().getSimpleName());
        if (this.stopped) {
            log.debug("called sendMessage but was already stopped");
            return;
        }
        if (this.banFilter != null && this.peersNodeAddressOptional.isPresent() && this.banFilter.isPeerBanned(this.peersNodeAddressOptional.get())) {
            String errorMessage = "We tried to send a message to a banned peer. message=" + networkEnvelope.getClass().getSimpleName();
            this.reportInvalidRequest(RuleViolation.PEER_BANNED, errorMessage);
            return;
        }
        if (!this.testCapability(networkEnvelope)) {
            log.debug("Capability for networkEnvelope is required but not supported");
            return;
        }
        int networkEnvelopeSize = networkEnvelope.toProtoNetworkEnvelope().getSerializedSize();
        try {
            long now = System.currentTimeMillis();
            long elapsed = now - this.lastSendTimeStamp;
            if (elapsed < (long)this.getSendMsgThrottleTrigger()) {
                log.debug("We got 2 sendMessage requests in less than {} ms. We set the thread to sleep for {} ms to avoid flooding our peer. lastSendTimeStamp={}, now={}, elapsed={}, networkEnvelope={}", this.getSendMsgThrottleTrigger(), this.getSendMsgThrottleSleep(), this.lastSendTimeStamp, now, elapsed, networkEnvelope.getClass().getSimpleName());
                Thread.sleep(this.getSendMsgThrottleSleep());
            }
            this.lastSendTimeStamp = now;
            if (!this.stopped) {
                this.protoOutputStream.writeEnvelope(networkEnvelope);
                ThreadUtils.execute(() -> this.messageListeners.forEach(e -> e.onMessageSent(networkEnvelope, this)), THREAD_ID);
                ThreadUtils.execute(() -> this.connectionStatistics.addSendMsgMetrics(System.currentTimeMillis() - ts, networkEnvelopeSize), THREAD_ID);
            }
        }
        catch (Throwable t2) {
            this.handleException(t2);
            throw new RuntimeException(t2);
        }
    }

    public boolean testCapability(haveno.common.proto.network.NetworkEnvelope networkEnvelope) {
        if (networkEnvelope instanceof BundleOfEnvelopes) {
            BundleOfEnvelopes bundleOfEnvelopes = (BundleOfEnvelopes)networkEnvelope;
            this.updateBundleOfEnvelopes(bundleOfEnvelopes);
            return !bundleOfEnvelopes.getEnvelopes().isEmpty();
        }
        return this.extractCapabilityRequiringPayload(networkEnvelope).map(this::testCapability).orElse(true);
    }

    private boolean testCapability(CapabilityRequiringPayload capabilityRequiringPayload) {
        boolean result = this.capabilities.containsAll(capabilityRequiringPayload.getRequiredCapabilities());
        if (!result) {
            log.debug("We did not send {} because capabilities are not supported.", (Object)capabilityRequiringPayload.getClass().getSimpleName());
        }
        return result;
    }

    private void updateBundleOfEnvelopes(BundleOfEnvelopes bundleOfEnvelopes) {
        List toRemove = bundleOfEnvelopes.getEnvelopes().stream().filter(networkEnvelope -> !this.testCapability((haveno.common.proto.network.NetworkEnvelope)networkEnvelope)).collect(Collectors.toList());
        bundleOfEnvelopes.getEnvelopes().removeAll(toRemove);
    }

    private Optional<CapabilityRequiringPayload> extractCapabilityRequiringPayload(Proto proto) {
        Proto candidate = proto;
        if (proto instanceof AddDataMessage) {
            candidate = ((AddDataMessage)proto).getProtectedStorageEntry().getProtectedStoragePayload();
        } else if (proto instanceof RemoveDataMessage) {
            candidate = ((RemoveDataMessage)proto).getProtectedStorageEntry().getProtectedStoragePayload();
        } else if (proto instanceof AddPersistableNetworkPayloadMessage) {
            candidate = ((AddPersistableNetworkPayloadMessage)proto).getPersistableNetworkPayload();
        }
        if (candidate instanceof CapabilityRequiringPayload) {
            return Optional.of((CapabilityRequiringPayload)candidate);
        }
        return Optional.empty();
    }

    public void addMessageListener(MessageListener messageListener) {
        boolean isNewEntry = this.messageListeners.add(messageListener);
        if (!isNewEntry) {
            log.warn("Try to add a messageListener which was already added.");
        }
    }

    public void removeMessageListener(MessageListener messageListener) {
        boolean contained = this.messageListeners.remove(messageListener);
        if (!contained) {
            log.debug("Try to remove a messageListener which was never added.\n\tThat might happen because of async behaviour of CopyOnWriteArraySet");
        }
    }

    public void addWeakCapabilitiesListener(SupportedCapabilitiesListener listener) {
        this.capabilitiesListeners.add(new WeakReference<SupportedCapabilitiesListener>(listener));
    }

    private boolean violatesThrottleLimit() {
        long now = System.currentTimeMillis();
        this.messageTimeStamps.add(now);
        while (this.messageTimeStamps.size() > this.getMsgThrottlePer10Sec()) {
            this.messageTimeStamps.remove(0);
        }
        return this.violatesThrottleLimit(now, 1, this.getMsgThrottlePerSec()) || this.violatesThrottleLimit(now, 10, this.getMsgThrottlePer10Sec());
    }

    private int getMsgThrottlePerSec() {
        return config != null ? Connection.config.msgThrottlePerSec : 200;
    }

    private int getMsgThrottlePer10Sec() {
        return config != null ? Connection.config.msgThrottlePer10Sec : 1000;
    }

    private int getSendMsgThrottleSleep() {
        return config != null ? Connection.config.sendMsgThrottleSleep : 50;
    }

    private int getSendMsgThrottleTrigger() {
        return config != null ? Connection.config.sendMsgThrottleTrigger : 20;
    }

    private boolean violatesThrottleLimit(long now, int seconds, int messageCountLimit) {
        long compareValue;
        if (this.messageTimeStamps.size() >= messageCountLimit && now - (compareValue = this.messageTimeStamps.get(this.messageTimeStamps.size() - messageCountLimit).longValue()) < TimeUnit.SECONDS.toMillis(seconds)) {
            log.error("violatesThrottleLimit {}/{} second(s)", (Object)messageCountLimit, (Object)seconds);
            return true;
        }
        return false;
    }

    @Override
    public void onMessage(haveno.common.proto.network.NetworkEnvelope networkEnvelope, Connection connection) {
        Preconditions.checkArgument(connection.equals(this));
        if (networkEnvelope instanceof BundleOfEnvelopes) {
            this.onBundleOfEnvelopes((BundleOfEnvelopes)networkEnvelope, connection);
        } else {
            ThreadUtils.execute(() -> this.messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection)), THREAD_ID);
        }
    }

    private void onBundleOfEnvelopes(BundleOfEnvelopes bundleOfEnvelopes, Connection connection) {
        HashMap itemsByHash = new HashMap();
        HashSet<haveno.common.proto.network.NetworkEnvelope> envelopesToProcess = new HashSet<haveno.common.proto.network.NetworkEnvelope>();
        List<haveno.common.proto.network.NetworkEnvelope> networkEnvelopes = bundleOfEnvelopes.getEnvelopes();
        for (haveno.common.proto.network.NetworkEnvelope networkEnvelope : networkEnvelopes) {
            boolean isValid;
            if (networkEnvelope instanceof SendersNodeAddressMessage && !(isValid = this.processSendersNodeAddressMessage((SendersNodeAddressMessage)((Object)networkEnvelope)))) {
                this.throttleWarn("Received an invalid " + networkEnvelope.getClass().getSimpleName() + " at processing BundleOfEnvelopes");
                continue;
            }
            if (networkEnvelope instanceof AddPersistableNetworkPayloadMessage) {
                PersistableNetworkPayload persistableNetworkPayload = ((AddPersistableNetworkPayloadMessage)networkEnvelope).getPersistableNetworkPayload();
                byte[] hash = persistableNetworkPayload.getHash();
                String itemName = persistableNetworkPayload.getClass().getSimpleName();
                P2PDataStorage.ByteArray byteArray = new P2PDataStorage.ByteArray(hash);
                itemsByHash.putIfAbsent(byteArray, new HashSet());
                Set envelopesByHash = (Set)itemsByHash.get(byteArray);
                if (!envelopesByHash.contains(networkEnvelope)) {
                    envelopesByHash.add(networkEnvelope);
                    envelopesToProcess.add(networkEnvelope);
                    continue;
                }
                log.debug("We got duplicated items for {}. We ignore the duplicates. Hash: {}", (Object)itemName, (Object)Utilities.encodeToHex(hash));
                continue;
            }
            envelopesToProcess.add(networkEnvelope);
        }
        envelopesToProcess.forEach(envelope -> ThreadUtils.execute(() -> this.messageListeners.forEach(listener -> listener.onMessage((haveno.common.proto.network.NetworkEnvelope)envelope, connection)), THREAD_ID));
    }

    private void setPeersNodeAddress(NodeAddress peerNodeAddress) {
        Preconditions.checkNotNull(peerNodeAddress, "peerAddress must not be null");
        this.peersNodeAddressOptional = Optional.of(peerNodeAddress);
        if (this instanceof InboundConnection) {
            log.debug("\n\n############################################################\nWe got the peers node address set.\npeersNodeAddress= " + peerNodeAddress.getFullAddress() + "\nconnection.uid=" + this.getUid() + "\n############################################################\n");
        }
        this.peersNodeAddressProperty.set(peerNodeAddress);
    }

    public boolean hasPeersNodeAddress() {
        return this.peersNodeAddressOptional.isPresent();
    }

    public void shutDown(CloseConnectionReason closeConnectionReason) {
        this.shutDown(closeConnectionReason, null);
    }

    public void shutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) {
        log.debug("shutDown: peersNodeAddressOptional={}, closeConnectionReason={}", (Object)this.peersNodeAddressOptional, (Object)closeConnectionReason);
        this.connectionState.shutDown();
        if (!this.stopped) {
            String peersNodeAddress = this.peersNodeAddressOptional.map(NodeAddress::toString).orElse("null");
            log.debug("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\nShutDown connection:\npeersNodeAddress=" + peersNodeAddress + "\ncloseConnectionReason=" + String.valueOf((Object)closeConnectionReason) + "\nuid=" + this.uid + "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
            if (closeConnectionReason.sendCloseMessage) {
                new Thread(() -> {
                    try {
                        String reason = closeConnectionReason == CloseConnectionReason.RULE_VIOLATION ? this.getRuleViolation().name() : closeConnectionReason.name();
                        this.sendMessage(new CloseConnectionMessage(reason));
                        this.stopped = true;
                        Uninterruptibles.sleepUninterruptibly(200L, TimeUnit.MILLISECONDS);
                    }
                    catch (Throwable t2) {
                        log.error(ExceptionUtils.getStackTrace(t2));
                    }
                    finally {
                        this.stopped = true;
                        ThreadUtils.execute(() -> this.doShutDown(closeConnectionReason, shutDownCompleteHandler), THREAD_ID);
                    }
                }, "Connection:SendCloseConnectionMessage-" + this.uid).start();
            } else {
                this.stopped = true;
                this.doShutDown(closeConnectionReason, shutDownCompleteHandler);
            }
        } else {
            log.debug("stopped was already at shutDown call");
            ThreadUtils.execute(() -> this.doShutDown(closeConnectionReason, shutDownCompleteHandler), THREAD_ID);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doShutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) {
        ThreadUtils.execute(() -> this.connectionListener.onDisconnect(closeConnectionReason, this), THREAD_ID);
        try {
            this.protoOutputStream.onConnectionShutdown();
            this.socket.close();
        }
        catch (SocketException e) {
            log.trace("SocketException at shutdown might be expected {}", (Object)e.getMessage());
        }
        catch (IOException e) {
            log.error("Exception at shutdown. {}\n", (Object)e.getMessage(), (Object)e);
        }
        finally {
            this.capabilitiesListeners.clear();
            try {
                this.protoInputStream.close();
            }
            catch (IOException e) {
                log.error(ExceptionUtils.getStackTrace(e));
            }
            Utilities.shutdownAndAwaitTermination(this.executorService, 100L, TimeUnit.MILLISECONDS);
            log.debug("Connection shutdown complete {}", (Object)this);
            if (shutDownCompleteHandler != null) {
                ThreadUtils.execute(shutDownCompleteHandler, THREAD_ID);
            }
        }
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof Connection)) {
            return false;
        }
        Connection that = (Connection)o;
        return this.uid.equals(that.uid);
    }

    public int hashCode() {
        return this.uid.hashCode();
    }

    public String toString() {
        return "Connection{peerAddress=" + String.valueOf(this.peersNodeAddressOptional) + ", connectionState=" + String.valueOf(this.connectionState) + ", connectionType=" + (this instanceof InboundConnection ? "InboundConnection" : "OutboundConnection") + ", uid='" + this.uid + "'}";
    }

    public String printDetails() {
        String portInfo = this.socket.getLocalPort() == 0 ? "port=" + this.socket.getPort() : "localPort=" + this.socket.getLocalPort() + "/port=" + this.socket.getPort();
        return "Connection{peerAddress=" + String.valueOf(this.peersNodeAddressOptional) + ", connectionState=" + String.valueOf(this.connectionState) + ", portInfo=" + portInfo + ", uid='" + this.uid + "', ruleViolation=" + String.valueOf((Object)this.ruleViolation) + ", ruleViolations=" + String.valueOf(this.ruleViolations) + ", supportedCapabilities=" + String.valueOf(this.capabilities) + ", stopped=" + this.stopped + "}";
    }

    public boolean reportInvalidRequest(RuleViolation ruleViolation, String errorMessage) {
        return Connection.reportInvalidRequest(this, ruleViolation, errorMessage);
    }

    private static synchronized boolean reportInvalidRequest(Connection connection, RuleViolation ruleViolation, String errorMessage) {
        boolean logReport;
        boolean bl = logReport = System.currentTimeMillis() - lastLoggedInvalidRequestReportTs > 30000L;
        if (!logReport) {
            ++numThrottledInvalidRequestReports;
        }
        if (logReport) {
            log.warn("We got reported the ruleViolation {} at connection with address={}, uid={}, errorMessage={}", new Object[]{ruleViolation, connection.getPeersNodeAddressProperty(), connection.getUid(), errorMessage});
        }
        int numRuleViolations = connection.ruleViolations.getOrDefault((Object)ruleViolation, 0);
        connection.ruleViolations.put(ruleViolation, ++numRuleViolations);
        if (numRuleViolations >= ruleViolation.maxTolerance) {
            if (logReport) {
                log.warn("We close connection as we received too many corrupt requests. ruleViolations={} connection with address {} and uid {}", connection.ruleViolations, connection.peersNodeAddressProperty, connection.uid);
            }
            connection.ruleViolation = ruleViolation;
            if (ruleViolation == RuleViolation.PEER_BANNED) {
                if (logReport) {
                    log.debug("We close connection due RuleViolation.PEER_BANNED. peersNodeAddress={}", (Object)connection.getPeersNodeAddressOptional());
                }
                connection.shutDown(CloseConnectionReason.PEER_BANNED);
            } else if (ruleViolation == RuleViolation.INVALID_CLASS) {
                if (logReport) {
                    log.warn("We close connection due RuleViolation.INVALID_CLASS");
                }
                connection.shutDown(CloseConnectionReason.INVALID_CLASS_RECEIVED);
            } else {
                if (logReport) {
                    log.warn("We close connection due RuleViolation.RULE_VIOLATION");
                }
                connection.shutDown(CloseConnectionReason.RULE_VIOLATION);
            }
            Connection.resetReportedInvalidRequestsThrottle(logReport);
            return true;
        }
        Connection.resetReportedInvalidRequestsThrottle(logReport);
        return false;
    }

    private static synchronized void resetReportedInvalidRequestsThrottle(boolean logReport) {
        if (logReport) {
            if (numThrottledInvalidRequestReports > 0) {
                log.warn("We received {} other reports of invalid requests since the last log entry", (Object)numThrottledInvalidRequestReports);
            }
            numThrottledInvalidRequestReports = 0;
            lastLoggedInvalidRequestReportTs = System.currentTimeMillis();
        }
    }

    private void handleException(Throwable e) {
        CloseConnectionReason closeConnectionReason;
        if (this.stopped) {
            return;
        }
        if (e instanceof SocketException) {
            closeConnectionReason = this.socket.isClosed() ? CloseConnectionReason.SOCKET_CLOSED : CloseConnectionReason.RESET;
            this.throttleWarn("SocketException (expected if connection lost). closeConnectionReason=" + String.valueOf((Object)closeConnectionReason) + "; connection=" + String.valueOf(this));
        } else if (e instanceof SocketTimeoutException || e instanceof TimeoutException) {
            closeConnectionReason = CloseConnectionReason.SOCKET_TIMEOUT;
            this.throttleInfo("Shut down caused by exception " + e.getMessage() + " on connection=" + String.valueOf(this));
        } else if (e instanceof EOFException) {
            closeConnectionReason = CloseConnectionReason.TERMINATED;
            this.throttleWarn("Shut down caused by exception " + e.getMessage() + " on connection=" + String.valueOf(this));
        } else if (e instanceof OptionalDataException || e instanceof StreamCorruptedException) {
            closeConnectionReason = CloseConnectionReason.CORRUPTED_DATA;
            this.throttleWarn("Shut down caused by exception " + e.getMessage() + " on connection=" + String.valueOf(this));
        } else {
            closeConnectionReason = CloseConnectionReason.UNKNOWN_EXCEPTION;
            this.throttleWarn("Unknown reason for exception at socket: " + this.socket.toString() + "\n\tpeer=" + String.valueOf(this.peersNodeAddressOptional) + "\n\tException=" + e.toString());
        }
        this.shutDown(closeConnectionReason);
    }

    private boolean processSendersNodeAddressMessage(SendersNodeAddressMessage sendersNodeAddressMessage) {
        NodeAddress senderNodeAddress = sendersNodeAddressMessage.getSenderNodeAddress();
        Preconditions.checkNotNull(senderNodeAddress, "senderNodeAddress must not be null at SendersNodeAddressMessage " + sendersNodeAddressMessage.getClass().getSimpleName());
        Optional<NodeAddress> existingAddressOptional = this.getPeersNodeAddressOptional();
        if (existingAddressOptional.isPresent()) {
            Preconditions.checkArgument(existingAddressOptional.get().equals(senderNodeAddress), "senderNodeAddress not matching connections peer address.\n\tmessage=" + String.valueOf(sendersNodeAddressMessage));
        } else {
            this.setPeersNodeAddress(senderNodeAddress);
        }
        if (this.banFilter != null && this.banFilter.isPeerBanned(senderNodeAddress)) {
            String errorMessage = "We got a message from a banned peer. message=" + sendersNodeAddressMessage.getClass().getSimpleName();
            this.reportInvalidRequest(RuleViolation.PEER_BANNED, errorMessage);
            return false;
        }
        return true;
    }

    @Override
    public void run() {
        try {
            Thread.currentThread().setName("InputHandler-" + Utilities.toTruncatedString(this.uid, 15));
            while (!this.stopped && !Thread.currentThread().isInterrupted()) {
                if (!this.threadNameSet && this.getPeersNodeAddressOptional().isPresent()) {
                    Thread.currentThread().setName("InputHandler-" + Utilities.toTruncatedString(this.getPeersNodeAddressOptional().get().getFullAddress(), 15));
                    this.threadNameSet = true;
                }
                try {
                    boolean isValid;
                    String errorMessage;
                    boolean exceeds;
                    if (this.socket != null && this.socket.isClosed()) {
                        this.throttleWarn("Socket is null or closed socket=" + String.valueOf(this.socket));
                        this.shutDown(CloseConnectionReason.SOCKET_CLOSED);
                        return;
                    }
                    NetworkEnvelope proto = NetworkEnvelope.parseDelimitedFrom(this.protoInputStream);
                    long ts = System.currentTimeMillis();
                    if (this.socket != null && this.socket.isClosed()) {
                        this.throttleWarn("Socket is null or closed socket=" + String.valueOf(this.socket));
                        this.shutDown(CloseConnectionReason.SOCKET_CLOSED);
                        return;
                    }
                    if (proto == null) {
                        if (this.stopped) {
                            return;
                        }
                        if (this.protoInputStream.read() == -1) {
                            this.throttleWarn("proto is null because protoInputStream.read()=-1 (EOF). That is expected if client got stopped without proper shutdown.");
                        } else {
                            this.throttleWarn("proto is null. protoInputStream.read()=" + this.protoInputStream.read());
                        }
                        this.shutDown(CloseConnectionReason.NO_PROTO_BUFFER_ENV);
                        return;
                    }
                    if (this.banFilter != null && this.peersNodeAddressOptional.isPresent() && this.banFilter.isPeerBanned(this.peersNodeAddressOptional.get())) {
                        String errorMessage2 = "We got a message from a banned peer. proto=" + Utilities.toTruncatedString(proto);
                        this.reportInvalidRequest(RuleViolation.PEER_BANNED, errorMessage2);
                        return;
                    }
                    long now = System.currentTimeMillis();
                    long elapsed = now - this.lastReadTimeStamp;
                    if (elapsed < 10L) {
                        log.debug("We got 2 network messages received in less than 10 ms. We set the thread to sleep for 20 ms to avoid getting flooded by our peer. lastReadTimeStamp={}, now={}, elapsed={}", this.lastReadTimeStamp, now, elapsed);
                        Thread.sleep(20L);
                    }
                    haveno.common.proto.network.NetworkEnvelope networkEnvelope = this.networkProtoResolver.fromProto(proto);
                    this.lastReadTimeStamp = now;
                    log.debug("<< Received networkEnvelope of type: {}", (Object)networkEnvelope.getClass().getSimpleName());
                    int size = proto.getSerializedSize();
                    this.statistic.addReceivedBytes(size);
                    this.statistic.addReceivedMessage(networkEnvelope);
                    if (networkEnvelope instanceof ExtendedDataSizePermission) {
                        exceeds = size > 0xA00000;
                    } else {
                        boolean bl = exceeds = size > 204800;
                    }
                    if (networkEnvelope instanceof AddPersistableNetworkPayloadMessage && !((AddPersistableNetworkPayloadMessage)networkEnvelope).getPersistableNetworkPayload().verifyHashSize() && this.reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED, errorMessage = "PersistableNetworkPayload.verifyHashSize failed. hashSize=" + ((AddPersistableNetworkPayloadMessage)networkEnvelope).getPersistableNetworkPayload().getHash().length + "; object=" + Utilities.toTruncatedString(proto))) {
                        return;
                    }
                    if (exceeds && this.reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED, errorMessage = "size > MAX_MSG_SIZE. size=" + size + "; object=" + Utilities.toTruncatedString(proto))) {
                        return;
                    }
                    if (this.violatesThrottleLimit() && this.reportInvalidRequest(RuleViolation.THROTTLE_LIMIT_EXCEEDED, "Violates throttle limit")) {
                        return;
                    }
                    errorMessage = "RuleViolation.WRONG_NETWORK_ID. version of message=" + proto.getMessageVersion() + ", app version=" + Version.getP2PMessageVersion() + ", proto.toTruncatedString=" + Utilities.toTruncatedString(proto.toString());
                    if (!proto.getMessageVersion().equals(Version.getP2PMessageVersion()) && this.reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID, errorMessage)) {
                        return;
                    }
                    boolean causedShutDown = this.maybeHandleSupportedCapabilitiesMessage(networkEnvelope);
                    if (causedShutDown) {
                        return;
                    }
                    if (networkEnvelope instanceof CloseConnectionMessage) {
                        log.debug("CloseConnectionMessage received. Reason={}\n\tconnection={}", (Object)proto.getCloseConnectionMessage().getReason(), (Object)this);
                        if (CloseConnectionReason.PEER_BANNED.name().equals(proto.getCloseConnectionMessage().getReason())) {
                            log.warn("We got shut down because we are banned by the other peer. (InputHandler.run CloseConnectionMessage). Peer: {}", (Object)this.getPeersNodeAddressOptional());
                        }
                        this.shutDown(CloseConnectionReason.CLOSE_REQUESTED_BY_PEER);
                        return;
                    }
                    if (this.stopped) continue;
                    if (!(networkEnvelope instanceof KeepAliveMessage)) {
                        this.statistic.updateLastActivityTimestamp();
                    }
                    if (networkEnvelope instanceof SendersNodeAddressMessage && !(isValid = this.processSendersNodeAddressMessage((SendersNodeAddressMessage)((Object)networkEnvelope)))) {
                        return;
                    }
                    if (!(networkEnvelope instanceof SendersNodeAddressMessage) && this.peersNodeAddressOptional.isEmpty()) {
                        log.info("We got a {} from a peer with yet unknown address on connection with uid={}", (Object)networkEnvelope.getClass().getSimpleName(), (Object)this.uid);
                    }
                    ThreadUtils.execute(() -> this.onMessage(networkEnvelope, this), THREAD_ID);
                    ThreadUtils.execute(() -> this.connectionStatistics.addReceivedMsgMetrics(System.currentTimeMillis() - ts, size), THREAD_ID);
                }
                catch (InvalidClassException e) {
                    this.reportInvalidRequest(RuleViolation.INVALID_CLASS, e.getMessage());
                }
                catch (InvalidProtocolBufferException | ProtobufferException | NoClassDefFoundError e) {
                    this.reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE, e.getMessage());
                }
                catch (Throwable t2) {
                    this.handleException(t2);
                }
            }
        }
        catch (Throwable t3) {
            this.handleException(t3);
        }
    }

    public boolean maybeHandleSupportedCapabilitiesMessage(haveno.common.proto.network.NetworkEnvelope networkEnvelope) {
        if (!(networkEnvelope instanceof SupportedCapabilitiesMessage)) {
            return false;
        }
        Capabilities supportedCapabilities = ((SupportedCapabilitiesMessage)((Object)networkEnvelope)).getSupportedCapabilities();
        if (supportedCapabilities == null || supportedCapabilities.isEmpty()) {
            return false;
        }
        if (this.capabilities.equals(supportedCapabilities)) {
            return false;
        }
        if (!Capabilities.hasMandatoryCapability(supportedCapabilities)) {
            log.info("We close a connection because of CloseConnectionReason.MANDATORY_CAPABILITIES_NOT_SUPPORTED to node {}. Capabilities of old node: {}, networkEnvelope class name={}", this.getSenderNodeAddressAsString(networkEnvelope), supportedCapabilities.prettyPrint(), networkEnvelope.getClass().getSimpleName());
            this.shutDown(CloseConnectionReason.MANDATORY_CAPABILITIES_NOT_SUPPORTED);
            return true;
        }
        this.capabilities.set(supportedCapabilities);
        this.capabilitiesListeners.forEach((Consumer<WeakReference<SupportedCapabilitiesListener>>)((Consumer<WeakReference>)weakListener -> {
            SupportedCapabilitiesListener supportedCapabilitiesListener = (SupportedCapabilitiesListener)weakListener.get();
            if (supportedCapabilitiesListener != null) {
                ThreadUtils.execute(() -> supportedCapabilitiesListener.onChanged(supportedCapabilities), THREAD_ID);
            }
        }));
        return false;
    }

    @Nullable
    private NodeAddress getSenderNodeAddress(haveno.common.proto.network.NetworkEnvelope networkEnvelope) {
        return this.getPeersNodeAddressOptional().orElse(networkEnvelope instanceof SendersNodeAddressMessage ? ((SendersNodeAddressMessage)((Object)networkEnvelope)).getSenderNodeAddress() : null);
    }

    private String getSenderNodeAddressAsString(haveno.common.proto.network.NetworkEnvelope networkEnvelope) {
        NodeAddress nodeAddress = this.getSenderNodeAddress(networkEnvelope);
        return nodeAddress == null ? "null" : nodeAddress.getFullAddress();
    }

    private synchronized void throttleWarn(String msg) {
        boolean doLog;
        boolean bl = doLog = System.currentTimeMillis() - lastLoggedWarningTs > 30000L;
        if (doLog) {
            log.warn(msg);
            if (numThrottledWarnings > 0) {
                log.warn("{} warnings were throttled since the last log entry", (Object)numThrottledWarnings);
            }
            numThrottledWarnings = 0;
            lastLoggedWarningTs = System.currentTimeMillis();
        } else {
            ++numThrottledWarnings;
        }
    }

    private synchronized void throttleInfo(String msg) {
        boolean doLog;
        boolean bl = doLog = System.currentTimeMillis() - lastLoggedInfoTs > 30000L;
        if (doLog) {
            log.info(msg);
            if (numThrottledInfos > 0) {
                log.info("{} info logs were throttled since the last log entry", (Object)numThrottledInfos);
            }
            numThrottledInfos = 0;
            lastLoggedInfoTs = System.currentTimeMillis();
        } else {
            ++numThrottledInfos;
        }
    }

    public String getUid() {
        return this.uid;
    }

    public Statistic getStatistic() {
        return this.statistic;
    }

    public ConnectionState getConnectionState() {
        return this.connectionState;
    }

    public ConnectionStatistics getConnectionStatistics() {
        return this.connectionStatistics;
    }

    public Optional<NodeAddress> getPeersNodeAddressOptional() {
        return this.peersNodeAddressOptional;
    }

    public boolean isStopped() {
        return this.stopped;
    }

    public ObjectProperty<NodeAddress> getPeersNodeAddressProperty() {
        return this.peersNodeAddressProperty;
    }

    public RuleViolation getRuleViolation() {
        return this.ruleViolation;
    }

    static {
        SOCKET_TIMEOUT = (int)TimeUnit.SECONDS.toMillis(240L);
        THREAD_ID = Connection.class.getSimpleName();
        lastLoggedInvalidRequestReportTs = 0L;
        numThrottledInvalidRequestReports = 0;
        lastLoggedWarningTs = 0L;
        numThrottledWarnings = 0;
        lastLoggedInfoTs = 0L;
        numThrottledInfos = 0;
    }
}

