package org.opendaylight.openflowplugin.impl.connection;
import java.net.InetAddress;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadPoolExecutor;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.impl.connection.listener.SystemNotificationsListenerImpl;
import org.opendaylight.openflowplugin.openflow.md.core.ErrorHandlerSimpleImpl;
import org.opendaylight.openflowplugin.openflow.md.core.HandshakeManagerImpl;
-import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
import org.slf4j.Logger;
private final boolean bitmapNegotiationEnabled = true;
private DeviceConnectedHandler deviceConnectedHandler;
private final long echoReplyTimeout;
+ private final ThreadPoolExecutor threadPool;
- public ConnectionManagerImpl(long echoReplyTimeout) {
+ public ConnectionManagerImpl(long echoReplyTimeout, final ThreadPoolExecutor threadPool) {
this.echoReplyTimeout = echoReplyTimeout;
+ this.threadPool = threadPool;
}
-
@Override
public void onSwitchConnected(final ConnectionAdapter connectionAdapter) {
-
- LOG.trace("preparing handshake: {}", connectionAdapter.getRemoteAddress());
-
- final int handshakeThreadLimit = 1;
- final ThreadPoolLoggingExecutor handshakePool = createHandshakePool(
- connectionAdapter.getRemoteAddress().toString(), handshakeThreadLimit);
-
LOG.trace("prepare connection context");
final ConnectionContext connectionContext = new ConnectionContextImpl(connectionAdapter);
final HandshakeManager handshakeManager = createHandshakeManager(connectionAdapter, handshakeListener);
LOG.trace("prepare handshake context");
- HandshakeContext handshakeContext = new HandshakeContextImpl(handshakePool, handshakeManager);
+ HandshakeContext handshakeContext = new HandshakeContextImpl(threadPool, handshakeManager);
handshakeListener.setHandshakeContext(handshakeContext);
connectionContext.setHandshakeContext(handshakeContext);
new OpenflowProtocolListenerInitialImpl(connectionContext, handshakeContext);
connectionAdapter.setMessageListener(ofMessageListener);
- final SystemNotificationsListener systemListener = new SystemNotificationsListenerImpl(connectionContext, echoReplyTimeout);
+ final SystemNotificationsListener systemListener = new SystemNotificationsListenerImpl(connectionContext, echoReplyTimeout, threadPool);
connectionAdapter.setSystemListener(systemListener);
LOG.trace("connection ballet finished");
}
- /**
- * @param connectionIdentifier
- * @param handshakeThreadLimit
- * @return
- */
- private static ThreadPoolLoggingExecutor createHandshakePool(
- final String connectionIdentifier, final int handshakeThreadLimit) {
- return new ThreadPoolLoggingExecutor(handshakeThreadLimit,
- handshakeThreadLimit, 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(HELLO_LIMIT), "OFHandshake-" + connectionIdentifier);
- }
-
/**
* @param connectionAdapter
* @param handshakeListener
return bitmapNegotiationEnabled;
}
-
@Override
public boolean accept(final InetAddress switchAddress) {
// TODO add connection accept logic based on address