X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fopenflow%2Fmd%2Fcore%2FConnectionConductorImpl.java;h=da09a63acf7835e830a1026d65cceaa2b3871c16;hb=ac9e64befe5a156b10118fb4826b33c5b7841804;hp=912cc6e72c5f223b218da7109a3e6da88b057052;hpb=a73a74356cdfaf8f766179ea52baab25e9e6c5e7;p=openflowplugin.git diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java index 912cc6e72c..da09a63acf 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java @@ -8,9 +8,9 @@ package org.opendaylight.openflowplugin.openflow.md.core; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; @@ -21,6 +21,9 @@ import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager; import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper; +import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType; +import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory; +import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder; @@ -37,7 +40,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Port; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestDescCaseBuilder; @@ -61,6 +64,9 @@ import com.google.common.util.concurrent.Futures; public class ConnectionConductorImpl implements OpenflowProtocolListener, SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener { + /** ingress queue limit */ + private static final int INGRESS_QUEUE_MAX_SIZE = 200; + protected static final Logger LOG = LoggerFactory .getLogger(ConnectionConductorImpl.class); @@ -75,25 +81,39 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, private ConnectionConductor.CONDUCTOR_STATE conductorState; private Short version; - private SwitchConnectionDistinguisher auxiliaryKey; + protected SwitchConnectionDistinguisher auxiliaryKey; - private SessionContext sessionContext; + protected SessionContext sessionContext; - private QueueKeeper queueKeeper; - private ExecutorService hsPool; + private QueueProcessor queueProcessor; + private QueueKeeper queue; + private ThreadPoolExecutor hsPool; private HandshakeManager handshakeManager; private boolean firstHelloProcessed; private PortFeaturesUtil portFeaturesUtils; + private int conductorId; + + private int ingressMaxQueueSize; + + /** * @param connectionAdapter */ public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) { + this(connectionAdapter, INGRESS_QUEUE_MAX_SIZE); + } + + /** + * @param connectionAdapter + * @param ingressMaxQueueSize ingress queue limit (blocking) + */ + public ConnectionConductorImpl(ConnectionAdapter connectionAdapter, int ingressMaxQueueSize) { this.connectionAdapter = connectionAdapter; + this.ingressMaxQueueSize = ingressMaxQueueSize; conductorState = CONDUCTOR_STATE.HANDSHAKING; - hsPool = Executors.newFixedThreadPool(1); firstHelloProcessed = false; handshakeManager = new HandshakeManagerImpl(connectionAdapter, ConnectionConductor.versionOrder.get(0), ConnectionConductor.versionOrder); @@ -104,14 +124,20 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void init() { + int handshakeThreadLimit = 1; + hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit , handshakeThreadLimit, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), + "OFHandshake-"+conductorId); + connectionAdapter.setMessageListener(this); connectionAdapter.setSystemListener(this); connectionAdapter.setConnectionReadyListener(this); + queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor, ingressMaxQueueSize); } @Override - public void setQueueKeeper(QueueKeeper queueKeeper) { - this.queueKeeper = queueKeeper; + public void setQueueProcessor(QueueProcessor queueProcessor) { + this.queueProcessor = queueProcessor; } /** @@ -141,17 +167,33 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void onErrorMessage(ErrorMessage errorMessage) { - queueKeeper.push(errorMessage, this); + enqueueMessage(errorMessage); + } + + + /** + * @param message + */ + private void enqueueMessage(OfHeader message) { + enqueueMessage(message, QueueType.DEFAULT); + } + + /** + * @param message + * @param queueType enqueue type + */ + private void enqueueMessage(OfHeader message, QueueType queueType) { + queue.push(message, this, queueType); } @Override public void onExperimenterMessage(ExperimenterMessage experimenterMessage) { - queueKeeper.push(experimenterMessage, this); + enqueueMessage(experimenterMessage); } @Override public void onFlowRemovedMessage(FlowRemovedMessage message) { - queueKeeper.push(message, this); + enqueueMessage(message); } @@ -163,17 +205,15 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, * then continue connection processing. if no common version found, just disconnect. * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version. * 4. If Hello message received again with not supported version, just disconnect. - * - * TODO: Better to handle handshake into a maintainable innerclass which uses State-Pattern. */ @Override - public synchronized void onHelloMessage(final HelloMessage hello) { + public void onHelloMessage(final HelloMessage hello) { LOG.debug("processing HELLO.xid: {}", hello.getXid()); firstHelloProcessed = true; checkState(CONDUCTOR_STATE.HANDSHAKING); HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper( hello, handshakeManager, connectionAdapter); - hsPool.execute(handshakeStepWrapper); + hsPool.submit(handshakeStepWrapper); } /** @@ -194,18 +234,18 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void onMultipartReplyMessage(MultipartReplyMessage message) { - queueKeeper.push(message, this); + enqueueMessage(message); } @Override public void onPacketInMessage(PacketInMessage message) { - queueKeeper.push(message, this, QueueKeeper.QueueType.UNORDERED); + enqueueMessage(message, QueueKeeper.QueueType.UNORDERED); } @Override public void onPortStatusMessage(PortStatusMessage message) { processPortStatusMsg(message); - queueKeeper.push(message, this); + enqueueMessage(message); } protected void processPortStatusMsg(PortStatus msg) { @@ -223,14 +263,14 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, Boolean portBandwidth = portFeaturesUtils.getPortBandwidth(msg); if(portBandwidth == null) { - LOG.warn("can't get bandwidth info from port: {}, aborting port update", msg.toString()); + LOG.debug("can't get bandwidth info from port: {}, aborting port update", msg.toString()); } else { this.getSessionContext().getPhysicalPorts().put(portNumber, msg); this.getSessionContext().getPortsBandwidth().put(portNumber, portBandwidth); } } - protected void deletePort(Port port) { + protected void deletePort(PortGrouping port) { Long portNumber = port.getPortNo(); this.getSessionContext().getPhysicalPorts().remove(portNumber); @@ -248,7 +288,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, disconnect(); OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this); } else { - LOG.debug("first idle state occured"); + LOG.debug("first idle state occured, sessionCtx={}|auxId={}", sessionContext, auxiliaryKey); EchoInputBuilder builder = new EchoInputBuilder(); builder.setVersion(getVersion()); builder.setXid(getSessionContext().getNextXid()); @@ -322,7 +362,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public Future disconnect() { - LOG.info("disconnecting: sessionCtx="+sessionContext+"|auxId="+auxiliaryKey); + LOG.trace("disconnecting: sessionCtx={}|auxId={}", sessionContext, auxiliaryKey); Future result = null; if (connectionAdapter.isAlive()) { @@ -361,7 +401,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, } @Override - public synchronized void onConnectionReady() { + public void onConnectionReady() { LOG.debug("connection is ready-to-use"); if (! firstHelloProcessed) { HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper( @@ -376,20 +416,35 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput, Short negotiatedVersion) { - version = negotiatedVersion; - conductorState = CONDUCTOR_STATE.WORKING; - OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion); - requestDesc(); - requestPorts(); + postHandshakeBasic(featureOutput, negotiatedVersion); + + // post-handshake actions if(version == OFConstants.OFP_VERSION_1_3){ + requestPorts(); requestGroupFeatures(); requestMeterFeatures(); } else if (version == OFConstants.OFP_VERSION_1_0) { // Because the GetFeaturesOutput contains information about the port // in OF1.0 (that we would otherwise get from the PortDesc) we have to pass // it up for parsing to convert into a NodeConnectorUpdate - queueKeeper.push(featureOutput, this); + enqueueMessage(featureOutput); } + + requestDesc(); + } + + /** + * used by tests + * @param featureOutput + * @param negotiatedVersion + */ + protected void postHandshakeBasic(GetFeaturesOutput featureOutput, + Short negotiatedVersion) { + version = negotiatedVersion; + conductorState = CONDUCTOR_STATE.WORKING; + OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion); + hsPool.shutdown(); + hsPool.purge(); } /* @@ -457,4 +512,9 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, hsPool.shutdownNow(); LOG.debug("pool is terminated: {}", hsPool.isTerminated()); } + + @Override + public void setId(int conductorId) { + this.conductorId = conductorId; + } }