package org.apache.cassandra.streaming;

import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.cassandra.streaming.StreamEvent;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/cassandra-all-2.0.9.jar:org/apache/cassandra/streaming/StreamResultFuture.class */
public final class StreamResultFuture extends AbstractFuture<StreamState> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) StreamResultFuture.class);
    public final UUID planId;
    public final String description;
    private final Map<InetAddress, StreamSession> ongoingSessions;
    private final Collection<StreamEventHandler> eventListeners = new ConcurrentLinkedQueue();
    private final Map<InetAddress, SessionInfo> sessionStates = new NonBlockingHashMap();

    private StreamResultFuture(UUID uuid, String str, Collection<StreamSession> collection) {
        this.planId = uuid;
        this.description = str;
        this.ongoingSessions = new HashMap(collection.size());
        for (StreamSession streamSession : collection) {
            this.ongoingSessions.put(streamSession.peer, streamSession);
        }
        if (collection.isEmpty()) {
            set(getCurrentState());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static StreamResultFuture init(UUID uuid, String str, Collection<StreamSession> collection, Collection<StreamEventHandler> collection2) {
        StreamResultFuture createAndRegister = createAndRegister(uuid, str, collection);
        if (collection2 != null) {
            Iterator<StreamEventHandler> it = collection2.iterator();
            while (it.hasNext()) {
                createAndRegister.addEventListener(it.next());
            }
        }
        logger.info("[Stream #{}] Executing streaming plan for {}", uuid, str);
        for (StreamSession streamSession : collection) {
            logger.info("[Stream #{}] Beginning stream session with {}", uuid, streamSession.peer);
            streamSession.init(createAndRegister);
            streamSession.start();
        }
        return createAndRegister;
    }

    public static synchronized StreamResultFuture initReceivingSide(UUID uuid, String str, InetAddress inetAddress, Socket socket, boolean z, int i) throws IOException {
        StreamResultFuture receivingStream = StreamManager.instance.getReceivingStream(uuid);
        if (receivingStream == null) {
            StreamSession streamSession = new StreamSession(inetAddress);
            receivingStream = new StreamResultFuture(uuid, str, Collections.singleton(streamSession));
            StreamManager.instance.registerReceiving(receivingStream);
            streamSession.init(receivingStream);
            streamSession.handler.initiateOnReceivingSide(socket, z, i);
        } else {
            receivingStream.attachSocket(inetAddress, socket, z, i);
            logger.info("[Stream #{}] Received streaming plan for {}", uuid, str);
        }
        return receivingStream;
    }

    private static StreamResultFuture createAndRegister(UUID uuid, String str, Collection<StreamSession> collection) {
        StreamResultFuture streamResultFuture = new StreamResultFuture(uuid, str, collection);
        StreamManager.instance.register(streamResultFuture);
        return streamResultFuture;
    }

    public void attachSocket(InetAddress inetAddress, Socket socket, boolean z, int i) throws IOException {
        StreamSession streamSession = this.ongoingSessions.get(inetAddress);
        if (streamSession == null) {
            throw new RuntimeException(String.format("Got connection from %s for stream session %s but no such session locally", inetAddress, this.planId));
        }
        streamSession.handler.initiateOnReceivingSide(socket, z, i);
    }

    public void addEventListener(StreamEventHandler streamEventHandler) {
        Futures.addCallback(this, streamEventHandler);
        this.eventListeners.add(streamEventHandler);
    }

    public StreamState getCurrentState() {
        return new StreamState(this.planId, this.description, ImmutableSet.copyOf((Collection) this.sessionStates.values()));
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        return this.planId.equals(((StreamResultFuture) obj).planId);
    }

    public int hashCode() {
        return this.planId.hashCode();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleSessionPrepared(StreamSession streamSession) {
        SessionInfo sessionInfo = streamSession.getSessionInfo();
        logger.info("[Stream #{}] Prepare completed. Receiving {} files({} bytes), sending {} files({} bytes)", streamSession.planId(), Long.valueOf(sessionInfo.getTotalFilesToReceive()), Long.valueOf(sessionInfo.getTotalSizeToReceive()), Long.valueOf(sessionInfo.getTotalFilesToSend()), Long.valueOf(sessionInfo.getTotalSizeToSend()));
        StreamEvent.SessionPreparedEvent sessionPreparedEvent = new StreamEvent.SessionPreparedEvent(this.planId, sessionInfo);
        this.sessionStates.put(sessionInfo.peer, sessionInfo);
        fireStreamEvent(sessionPreparedEvent);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleSessionComplete(StreamSession streamSession) {
        logger.info("[Stream #{}] Session with {} is complete", streamSession.planId(), streamSession.peer);
        SessionInfo sessionInfo = streamSession.getSessionInfo();
        this.sessionStates.put(sessionInfo.peer, sessionInfo);
        fireStreamEvent(new StreamEvent.SessionCompleteEvent(streamSession));
        maybeComplete(streamSession);
    }

    public void handleProgress(ProgressInfo progressInfo) {
        this.sessionStates.get(progressInfo.peer).updateProgress(progressInfo);
        fireStreamEvent(new StreamEvent.ProgressEvent(this.planId, progressInfo));
    }

    void fireStreamEvent(StreamEvent streamEvent) {
        Iterator<StreamEventHandler> it = this.eventListeners.iterator();
        while (it.hasNext()) {
            it.next().handleStreamEvent(streamEvent);
        }
    }

    private synchronized void maybeComplete(StreamSession streamSession) {
        this.ongoingSessions.remove(streamSession.peer);
        if (this.ongoingSessions.isEmpty()) {
            StreamState currentState = getCurrentState();
            if (currentState.hasFailedSession()) {
                logger.warn("[Stream #{}] Stream failed", this.planId);
                setException(new StreamException(currentState, "Stream failed"));
            } else {
                logger.info("[Stream #{}] All sessions completed", this.planId);
                set(currentState);
            }
        }
    }
}
