X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fopenflow%2Fmd%2Fcore%2FConnectionConductorImpl.java;h=81b22925afa13134ef99291bc2f2236d17dfddd2;hb=b0e8d777920a430776efe07c625637fe75204505;hp=31e4c581405a2fb5af34e8ed571aadabb420056f;hpb=3a48f57d58c9c55ccbf290af2e087f9cabb3c9bd;p=openflowplugin.git diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java index 31e4c58140..81b22925af 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java @@ -21,6 +21,9 @@ import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager; import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper; +import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType; +import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory; +import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder; @@ -61,6 +64,9 @@ import com.google.common.util.concurrent.Futures; public class ConnectionConductorImpl implements OpenflowProtocolListener, SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener { + /** ingress queue limit */ + private static final int INGRESS_QUEUE_MAX_SIZE = 200; + protected static final Logger LOG = LoggerFactory .getLogger(ConnectionConductorImpl.class); @@ -79,7 +85,8 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, protected SessionContext sessionContext; - private QueueKeeper queueKeeper; + private QueueProcessor queueProcessor; + private QueueKeeper queue; private ThreadPoolExecutor hsPool; private HandshakeManager handshakeManager; @@ -87,15 +94,26 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, private PortFeaturesUtil portFeaturesUtils; + private int conductorId; + + private int ingressMaxQueueSize; + + /** * @param connectionAdapter */ public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) { + this(connectionAdapter, INGRESS_QUEUE_MAX_SIZE); + } + + /** + * @param connectionAdapter + * @param ingressMaxQueueSize ingress queue limit (blocking) + */ + public ConnectionConductorImpl(ConnectionAdapter connectionAdapter, int ingressMaxQueueSize) { this.connectionAdapter = connectionAdapter; + this.ingressMaxQueueSize = ingressMaxQueueSize; conductorState = CONDUCTOR_STATE.HANDSHAKING; - int handshakeThreadLimit = 1; - hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit , handshakeThreadLimit, 0L, - TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); firstHelloProcessed = false; handshakeManager = new HandshakeManagerImpl(connectionAdapter, ConnectionConductor.versionOrder.get(0), ConnectionConductor.versionOrder); @@ -106,14 +124,20 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void init() { + int handshakeThreadLimit = 1; + hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit , handshakeThreadLimit, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), + "OFHandshake-"+conductorId); + connectionAdapter.setMessageListener(this); connectionAdapter.setSystemListener(this); connectionAdapter.setConnectionReadyListener(this); + queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor, ingressMaxQueueSize); } @Override - public void setQueueKeeper(QueueKeeper queueKeeper) { - this.queueKeeper = queueKeeper; + public void setQueueProcessor(QueueProcessor queueProcessor) { + this.queueProcessor = queueProcessor; } /** @@ -143,17 +167,33 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void onErrorMessage(ErrorMessage errorMessage) { - queueKeeper.push(errorMessage, this); + enqueueMessage(errorMessage); + } + + + /** + * @param message + */ + private void enqueueMessage(OfHeader message) { + enqueueMessage(message, QueueType.DEFAULT); + } + + /** + * @param message + * @param queueType enqueue type + */ + private void enqueueMessage(OfHeader message, QueueType queueType) { + queue.push(message, this, queueType); } @Override public void onExperimenterMessage(ExperimenterMessage experimenterMessage) { - queueKeeper.push(experimenterMessage, this); + enqueueMessage(experimenterMessage); } @Override public void onFlowRemovedMessage(FlowRemovedMessage message) { - queueKeeper.push(message, this); + enqueueMessage(message); } @@ -167,7 +207,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, * 4. If Hello message received again with not supported version, just disconnect. */ @Override - public synchronized void onHelloMessage(final HelloMessage hello) { + public void onHelloMessage(final HelloMessage hello) { LOG.debug("processing HELLO.xid: {}", hello.getXid()); firstHelloProcessed = true; checkState(CONDUCTOR_STATE.HANDSHAKING); @@ -194,18 +234,18 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void onMultipartReplyMessage(MultipartReplyMessage message) { - queueKeeper.push(message, this); + enqueueMessage(message); } @Override public void onPacketInMessage(PacketInMessage message) { - queueKeeper.push(message, this, QueueKeeper.QueueType.UNORDERED); + enqueueMessage(message, QueueKeeper.QueueType.UNORDERED); } @Override public void onPortStatusMessage(PortStatusMessage message) { processPortStatusMsg(message); - queueKeeper.push(message, this); + enqueueMessage(message); } protected void processPortStatusMsg(PortStatus msg) { @@ -361,7 +401,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, } @Override - public synchronized void onConnectionReady() { + public void onConnectionReady() { LOG.debug("connection is ready-to-use"); if (! firstHelloProcessed) { HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper( @@ -386,7 +426,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, // Because the GetFeaturesOutput contains information about the port // in OF1.0 (that we would otherwise get from the PortDesc) we have to pass // it up for parsing to convert into a NodeConnectorUpdate - queueKeeper.push(featureOutput, this); + enqueueMessage(featureOutput); } requestDesc(); @@ -472,4 +512,9 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, hsPool.shutdownNow(); LOG.debug("pool is terminated: {}", hsPool.isTerminated()); } + + @Override + public void setId(int conductorId) { + this.conductorId = conductorId; + } }