package org.opendaylight.openflowplugin.openflow.md.core;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
private QueueProcessor<OfHeader, DataObject> queueProcessor;
private QueueKeeper<OfHeader> queue;
- private ThreadPoolExecutor hsPool;
private HandshakeManager handshakeManager;
private boolean firstHelloProcessed;
@Override
public void init() {
- int handshakeThreadLimit = 1;
- hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit,
- handshakeThreadLimit, 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(), "OFHandshake-"
- + conductorId);
-
connectionAdapter.setMessageListener(this);
connectionAdapter.setSystemListener(this);
connectionAdapter.setConnectionReadyListener(this);
checkState(CONDUCTOR_STATE.HANDSHAKING);
HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
hello, handshakeManager, connectionAdapter);
- hsPool.submit(handshakeStepWrapper);
+ Thread t = new Thread(handshakeStepWrapper, "OFHandshake-" + conductorId);
+ t.setDaemon(true);
+ t.start();
}
/**
checkState(CONDUCTOR_STATE.HANDSHAKING);
HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
null, handshakeManager, connectionAdapter);
- hsPool.execute(handshakeStepWrapper);
+ Thread t = new Thread(handshakeStepWrapper, "OFHandshake-" + conductorId);
+ t.setDaemon(true);
+ t.start();
firstHelloProcessed = true;
} else {
LOG.debug("already touched by hello message");
}
SessionContext sessionContext = OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion);
- hsPool.shutdown();
- hsPool.purge();
conductorState = CONDUCTOR_STATE.WORKING;
QueueKeeperFactory.plugQueue(queueProcessor, queue);
}
LOG.warn("Closing handshake context failed: {}", e.getMessage());
LOG.debug("Detail in hanshake context close:", e);
}
- } else {
- //This condition will occure when Old Helium openflowplugin implementation will be used.
- shutdownPoolPolitely();
- }
- }
-
- private void shutdownPoolPolitely() {
- LOG.debug("Terminating handshake pool for node {}", connectionAdapter.getRemoteAddress());
- hsPool.shutdown();
- try {
- hsPool.awaitTermination(1, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOG.debug("Error while awaiting termination of pool. Will force shutdown now.");
- } finally {
- hsPool.purge();
- if (!hsPool.isTerminated()) {
- hsPool.shutdownNow();
- }
- LOG.debug("is handshake pool for node {} is terminated : {}",
- connectionAdapter.getRemoteAddress(), hsPool.isTerminated());
}
}
public void setHandshakeContext(HandshakeContext handshakeContext) {
this.handshakeContext = handshakeContext;
}
-
- @VisibleForTesting
- ThreadPoolExecutor getHsPool() {
- return hsPool;
- }
}