From da38c1d9f1fc59c08fb80fa4389a900e08c69982 Mon Sep 17 00:00:00 2001 From: ary Date: Thu, 12 Feb 2015 13:54:24 +0100 Subject: [PATCH] bug 2446 - High priority (control) queue stop reading from channel if is full this patch depends on https://git.opendaylight.org/gerrit/#/c/14438/ Signed-off-by: ary Change-Id: Ie990b1987d2210ec520648e081bb250588f4f342 --- .../openflow/md/queue/WaterMarkListener.java | 14 ++ .../md/queue/WaterMarkListenerImpl.java | 45 +++++ .../md/core/ConnectionConductorImpl.java | 149 +++++++++------- .../openflow/md/queue/QueueKeeperFactory.java | 31 +++- .../md/queue/QueueKeeperFairImpl.java | 15 +- .../openflow/md/queue/WrapperQueueImpl.java | 152 ++++++++++++++++ .../core/plan/ConnectionAdapterStackImpl.java | 159 +++++++++-------- .../MessageDispatchServiceImplTest.java | 161 +++++++++++------ .../md/util/WrapperQueueImplTest.java | 163 ++++++++++++++++++ 9 files changed, 695 insertions(+), 194 deletions(-) create mode 100644 openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/queue/WaterMarkListener.java create mode 100644 openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/queue/WaterMarkListenerImpl.java create mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/WrapperQueueImpl.java create mode 100644 openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/WrapperQueueImplTest.java diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/queue/WaterMarkListener.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/queue/WaterMarkListener.java new file mode 100644 index 0000000000..e2609c3096 --- /dev/null +++ b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/queue/WaterMarkListener.java @@ -0,0 +1,14 @@ +package org.opendaylight.openflowplugin.api.openflow.md.queue; + +public interface WaterMarkListener { + + /** + * When HighWaterMark reached and currently not flooded + */ + void onHighWaterMark(); + + /** + * When LowWaterMark reached and currently flooded + */ + void onLowWaterMark(); +} diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/queue/WaterMarkListenerImpl.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/queue/WaterMarkListenerImpl.java new file mode 100644 index 0000000000..e30e1f4db3 --- /dev/null +++ b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/queue/WaterMarkListenerImpl.java @@ -0,0 +1,45 @@ +package org.opendaylight.openflowplugin.api.openflow.md.queue; + +import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class WaterMarkListenerImpl implements WaterMarkListener { + + private static final Logger LOG = LoggerFactory + .getLogger(WaterMarkListenerImpl.class); + + private ConnectionAdapter connectionAdapter; + + public WaterMarkListenerImpl(ConnectionAdapter connectionAdapter) { + this.connectionAdapter = Preconditions.checkNotNull(connectionAdapter); + } + + /* + * (non-Javadoc) + * + * @see org.opendaylight.openflowplugin.api.openflow.md.queue.QueueListener# + * onHighWaterMark + * (org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter) + */ + @Override + public void onHighWaterMark() { + connectionAdapter.setAutoRead(false); + LOG.debug("AutoRead is set on false."); + } + + /* + * (non-Javadoc) + * + * @see org.opendaylight.openflowplugin.api.openflow.md.queue.QueueListener# + * onLowWaterMark + * (org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter) + */ + @Override + public void onLowWaterMark() { + connectionAdapter.setAutoRead(true); + LOG.debug("AutoRead is set on true."); + } +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java index e892f575db..210c88adb7 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java @@ -8,11 +8,11 @@ package org.opendaylight.openflowplugin.openflow.md.core; -import com.google.common.util.concurrent.Futures; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; + import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener; import org.opendaylight.openflowplugin.api.OFConstants; @@ -27,6 +27,8 @@ import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionManag import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper; import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper.QueueType; import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueProcessor; +import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener; +import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListenerImpl; import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil; import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil; import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory; @@ -62,11 +64,15 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.Futures; + /** * @author mirehak */ public class ConnectionConductorImpl implements OpenflowProtocolListener, - SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener, NotificationEnqueuer, AutoCloseable { + SystemNotificationsListener, ConnectionConductor, + ConnectionReadyListener, HandshakeListener, NotificationEnqueuer, + AutoCloseable { /** * ingress queue limit @@ -76,9 +82,10 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, protected static final Logger LOG = LoggerFactory .getLogger(ConnectionConductorImpl.class); - /* variable to make BitMap-based negotiation enabled / disabled. - * it will help while testing and isolating issues related to processing of - * BitMaps from switches. + /* + * variable to make BitMap-based negotiation enabled / disabled. it will + * help while testing and isolating issues related to processing of BitMaps + * from switches. */ private boolean isBitmapNegotiationEnable = true; protected ErrorHandler errorHandler; @@ -104,7 +111,6 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, private int ingressMaxQueueSize; - /** * @param connectionAdapter */ @@ -114,15 +120,18 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, /** * @param connectionAdapter - * @param ingressMaxQueueSize ingress queue limit (blocking) + * @param ingressMaxQueueSize + * ingress queue limit (blocking) */ - public ConnectionConductorImpl(ConnectionAdapter connectionAdapter, int ingressMaxQueueSize) { + public ConnectionConductorImpl(ConnectionAdapter connectionAdapter, + int ingressMaxQueueSize) { this.connectionAdapter = connectionAdapter; this.ingressMaxQueueSize = ingressMaxQueueSize; conductorState = CONDUCTOR_STATE.HANDSHAKING; firstHelloProcessed = false; handshakeManager = new HandshakeManagerImpl(connectionAdapter, - ConnectionConductor.versionOrder.get(0), ConnectionConductor.versionOrder); + ConnectionConductor.versionOrder.get(0), + ConnectionConductor.versionOrder); handshakeManager.setUseVersionBitmap(isBitmapNegotiationEnable); handshakeManager.setHandshakeListener(this); portFeaturesUtils = PortFeaturesUtil.getInstance(); @@ -131,23 +140,29 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void init() { int handshakeThreadLimit = 1; - hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit, handshakeThreadLimit, 0L, - TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), - "OFHandshake-" + conductorId); + hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit, + handshakeThreadLimit, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), "OFHandshake-" + + conductorId); connectionAdapter.setMessageListener(this); connectionAdapter.setSystemListener(this); connectionAdapter.setConnectionReadyListener(this); - queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor, ingressMaxQueueSize); + WaterMarkListener waterMarkListener = new WaterMarkListenerImpl( + connectionAdapter); + queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor, + ingressMaxQueueSize, waterMarkListener); } @Override - public void setQueueProcessor(QueueProcessor queueProcessor) { + public void setQueueProcessor( + QueueProcessor queueProcessor) { this.queueProcessor = queueProcessor; } /** - * @param errorHandler the errorHandler to set + * @param errorHandler + * the errorHandler to set */ @Override public void setErrorHandler(ErrorHandler errorHandler) { @@ -160,7 +175,8 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, new Thread(new Runnable() { @Override public void run() { - LOG.debug("echo request received: " + echoRequestMessage.getXid()); + LOG.debug("echo request received: " + + echoRequestMessage.getXid()); EchoReplyInputBuilder builder = new EchoReplyInputBuilder(); builder.setVersion(echoRequestMessage.getVersion()); builder.setXid(echoRequestMessage.getXid()); @@ -176,7 +192,6 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, enqueueMessage(errorMessage); } - /** * @param message */ @@ -191,7 +206,8 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, /** * @param message - * @param queueType enqueue type + * @param queueType + * enqueue type */ private void enqueueMessage(OfHeader message, QueueType queueType) { queue.push(message, this, queueType); @@ -207,15 +223,15 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, enqueueMessage(message); } - /** - * version negotiation happened as per following steps: - * 1. If HelloMessage version field has same version, continue connection processing. - * If HelloMessage version is lower than supported versions, just disconnect. + * version negotiation happened as per following steps: 1. If HelloMessage + * version field has same version, continue connection processing. If + * HelloMessage version is lower than supported versions, just disconnect. * 2. If HelloMessage contains bitmap and common version found in bitmap - * then continue connection processing. if no common version found, just disconnect. - * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version. - * 4. If Hello message received again with not supported version, just disconnect. + * then continue connection processing. if no common version found, just + * disconnect. 3. If HelloMessage version is not supported, send + * HelloMessage with lower supported version. 4. If Hello message received + * again with not supported version, just disconnect. */ @Override public void onHelloMessage(final HelloMessage hello) { @@ -274,10 +290,13 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, Boolean portBandwidth = portFeaturesUtils.getPortBandwidth(msg); if (portBandwidth == null) { - LOG.debug("can't get bandwidth info from port: {}, aborting port update", msg.toString()); + LOG.debug( + "can't get bandwidth info from port: {}, aborting port update", + msg.toString()); } else { this.getSessionContext().getPhysicalPorts().put(portNumber, msg); - this.getSessionContext().getPortsBandwidth().put(portNumber, portBandwidth); + this.getSessionContext().getPortsBandwidth() + .put(portNumber, portBandwidth); } } @@ -294,12 +313,17 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void run() { if (!CONDUCTOR_STATE.WORKING.equals(getConductorState())) { - // idle state in any other conductorState than WORKING means real - // problem and wont be handled by echoReply, but disconnection + // idle state in any other conductorState than WORKING means + // real + // problem and wont be handled by echoReply, but + // disconnection disconnect(); - OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this); + OFSessionUtil.getSessionManager().invalidateOnDisconnect( + ConnectionConductorImpl.this); } else { - LOG.debug("first idle state occured, sessionCtx={}|auxId={}", sessionContext, auxiliaryKey); + LOG.debug( + "first idle state occured, sessionCtx={}|auxId={}", + sessionContext, auxiliaryKey); EchoInputBuilder builder = new EchoInputBuilder(); builder.setVersion(getVersion()); builder.setXid(getSessionContext().getNextXid()); @@ -308,27 +332,30 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, .echo(builder.build()); try { - RpcResult echoReplyValue = echoReplyFuture.get(getMaxTimeout(), - getMaxTimeoutUnit()); + RpcResult echoReplyValue = echoReplyFuture + .get(getMaxTimeout(), getMaxTimeoutUnit()); if (echoReplyValue.isSuccessful()) { setConductorState(CONDUCTOR_STATE.WORKING); } else { - for (RpcError replyError : echoReplyValue.getErrors()) { + for (RpcError replyError : echoReplyValue + .getErrors()) { Throwable cause = replyError.getCause(); LOG.error( "while receiving echoReply in TIMEOUTING state: " + cause.getMessage(), cause); } - //switch issue occurred + // switch issue occurred throw new Exception("switch issue occurred"); } } catch (Exception e) { LOG.error("while waiting for echoReply in TIMEOUTING state: " + e.getMessage()); errorHandler.handleException(e, sessionContext); - //switch is not responding + // switch is not responding disconnect(); - OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this); + OFSessionUtil.getSessionManager() + .invalidateOnDisconnect( + ConnectionConductorImpl.this); } } } @@ -337,7 +364,8 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, } /** - * @param conductorState the connectionState to set + * @param conductorState + * the connectionState to set */ @Override public void setConductorState(CONDUCTOR_STATE conductorState) { @@ -373,7 +401,8 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public Future disconnect() { - LOG.trace("disconnecting: sessionCtx={}|auxId={}", sessionContext, auxiliaryKey); + LOG.trace("disconnecting: sessionCtx={}|auxId={}", sessionContext, + auxiliaryKey); Future result = null; if (connectionAdapter.isAlive()) { @@ -426,7 +455,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput, - Short negotiatedVersion) { + Short negotiatedVersion) { postHandshakeBasic(featureOutput, negotiatedVersion); // post-handshake actions @@ -447,19 +476,21 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, /** * used by tests - * + * * @param featureOutput * @param negotiatedVersion */ protected void postHandshakeBasic(GetFeaturesOutput featureOutput, - Short negotiatedVersion) { + Short negotiatedVersion) { version = negotiatedVersion; if (version == OFConstants.OFP_VERSION_1_0) { - // Because the GetFeaturesOutput contains information about the port - // in OF1.0 (that we would otherwise get from the PortDesc) we have to pass - // it up for parsing to convert into a NodeConnectorUpdate + // Because the GetFeaturesOutput contains information about the port + // in OF1.0 (that we would otherwise get from the PortDesc) we have + // to pass + // it up for parsing to convert into a NodeConnectorUpdate // - // BUG-1988 - this must be the first item in queue in order not to get behind link-up message + // BUG-1988 - this must be the first item in queue in order not to + // get behind link-up message enqueueMessage(featureOutput); } @@ -471,14 +502,15 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, } /* - * Send an OFPMP_DESC request message to the switch + * Send an OFPMP_DESC request message to the switch */ private void requestDesc() { MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder(); builder.setType(MultipartType.OFPMPDESC); builder.setVersion(getVersion()); builder.setFlags(new MultipartRequestFlags(false)); - builder.setMultipartRequestBody(new MultipartRequestDescCaseBuilder().build()); + builder.setMultipartRequestBody(new MultipartRequestDescCaseBuilder() + .build()); builder.setXid(getSessionContext().getNextXid()); getConnectionAdapter().multipartRequest(builder.build()); } @@ -488,7 +520,8 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, builder.setType(MultipartType.OFPMPPORTDESC); builder.setVersion(getVersion()); builder.setFlags(new MultipartRequestFlags(false)); - builder.setMultipartRequestBody(new MultipartRequestPortDescCaseBuilder().build()); + builder.setMultipartRequestBody(new MultipartRequestPortDescCaseBuilder() + .build()); builder.setXid(getSessionContext().getNextXid()); getConnectionAdapter().multipartRequest(builder.build()); } @@ -500,11 +533,11 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, mprInput.setFlags(new MultipartRequestFlags(false)); mprInput.setXid(getSessionContext().getNextXid()); - MultipartRequestGroupFeaturesCaseBuilder mprGroupFeaturesBuild = - new MultipartRequestGroupFeaturesCaseBuilder(); + MultipartRequestGroupFeaturesCaseBuilder mprGroupFeaturesBuild = new MultipartRequestGroupFeaturesCaseBuilder(); mprInput.setMultipartRequestBody(mprGroupFeaturesBuild.build()); - LOG.debug("Send group features statistics request :{}", mprGroupFeaturesBuild); + LOG.debug("Send group features statistics request :{}", + mprGroupFeaturesBuild); getConnectionAdapter().multipartRequest(mprInput.build()); } @@ -516,20 +549,20 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, mprInput.setFlags(new MultipartRequestFlags(false)); mprInput.setXid(getSessionContext().getNextXid()); - MultipartRequestMeterFeaturesCaseBuilder mprMeterFeaturesBuild = - new MultipartRequestMeterFeaturesCaseBuilder(); + MultipartRequestMeterFeaturesCaseBuilder mprMeterFeaturesBuild = new MultipartRequestMeterFeaturesCaseBuilder(); mprInput.setMultipartRequestBody(mprMeterFeaturesBuild.build()); - LOG.debug("Send meter features statistics request :{}", mprMeterFeaturesBuild); + LOG.debug("Send meter features statistics request :{}", + mprMeterFeaturesBuild); getConnectionAdapter().multipartRequest(mprInput.build()); } /** - * @param isBitmapNegotiationEnable the isBitmapNegotiationEnable to set + * @param isBitmapNegotiationEnable + * the isBitmapNegotiationEnable to set */ - public void setBitmapNegotiationEnable( - boolean isBitmapNegotiationEnable) { + public void setBitmapNegotiationEnable(boolean isBitmapNegotiationEnable) { this.isBitmapNegotiationEnable = isBitmapNegotiationEnable; } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFactory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFactory.java index 0c856cff29..1143afa1de 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFactory.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFactory.java @@ -9,37 +9,50 @@ package org.opendaylight.openflowplugin.openflow.md.queue; import org.opendaylight.openflowplugin.api.openflow.md.queue.MessageSourcePollRegistrator; import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper; +import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; /** - * factory for {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper} implementations + * factory for + * {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper} + * implementations */ public abstract class QueueKeeperFactory { - + /** - * @param sourceRegistrator - * @param capacity blocking queue capacity - * @return fair reading implementation of {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper} (not registered = not started yet) + * @param sourceRegistrator + * @param capacity + * blocking queue capacity + * @param waterMarkListener + * @return fair reading implementation of + * {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper} + * (not registered = not started yet) */ public static QueueKeeper createFairQueueKeeper( - MessageSourcePollRegistrator> sourceRegistrator, int capacity) { + MessageSourcePollRegistrator> sourceRegistrator, + int capacity, WaterMarkListener waterMarkListener) { QueueKeeperFairImpl queueKeeper = new QueueKeeperFairImpl(); queueKeeper.setCapacity(capacity); queueKeeper.setHarvesterHandle(sourceRegistrator.getHarvesterHandle()); + queueKeeper.setWaterMarkListener(waterMarkListener); queueKeeper.init(); - + return queueKeeper; } /** - * register queue by harvester, start processing it. Use {@link QueueKeeperFairImpl#close()} to kill the queue and stop processing. + * register queue by harvester, start processing it. Use + * {@link QueueKeeperFairImpl#close()} to kill the queue and stop + * processing. + * * @param sourceRegistrator * @param queueKeeper */ public static void plugQueue( MessageSourcePollRegistrator> sourceRegistrator, QueueKeeper queueKeeper) { - AutoCloseable registration = sourceRegistrator.registerMessageSource(queueKeeper); + AutoCloseable registration = sourceRegistrator + .registerMessageSource(queueKeeper); queueKeeper.setPollRegistration(registration); sourceRegistrator.getHarvesterHandle().ping(); } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFairImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFairImpl.java index dbf7d4d2d3..b24ba9f749 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFairImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFairImpl.java @@ -15,6 +15,7 @@ import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor; import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle; import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem; import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper; +import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener; import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.slf4j.Logger; @@ -37,6 +38,8 @@ public class QueueKeeperFairImpl implements QueueKeeper { private HarvesterHandle harvesterHandle; private PollableQueuesPriorityZipper> queueZipper; + private WaterMarkListener waterMarkListener; + @Override public void close() throws Exception { Preconditions.checkNotNull(pollRegistration, @@ -77,8 +80,7 @@ public class QueueKeeperFairImpl implements QueueKeeper { */ @Override public QueueItem poll() { - QueueItem nextQueueItem = queueZipper.poll(); - return nextQueueItem; + return queueZipper.poll(); } /** @@ -102,11 +104,18 @@ public class QueueKeeperFairImpl implements QueueKeeper { * init blocking queue */ public void init() { + Preconditions.checkNotNull(waterMarkListener); queueUnordered = new ArrayBlockingQueue<>(capacity); queueDefault = new ArrayBlockingQueue<>(capacity); + WrapperQueueImpl> wrapperQueue = new WrapperQueueImpl<>( + capacity, queueDefault, waterMarkListener); queueZipper = new PollableQueuesPriorityZipper<>(); queueZipper.addSource(queueUnordered); - queueZipper.setPrioritizedSource(queueDefault); + queueZipper.setPrioritizedSource(wrapperQueue); + } + + public void setWaterMarkListener(WaterMarkListener waterMarkListener) { + this.waterMarkListener = waterMarkListener; } /** diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/WrapperQueueImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/WrapperQueueImpl.java new file mode 100644 index 0000000000..8a3404f923 --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/WrapperQueueImpl.java @@ -0,0 +1,152 @@ +package org.opendaylight.openflowplugin.openflow.md.queue; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Queue; + +import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem; +import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class WrapperQueueImpl implements Queue { + + protected static final Logger LOG = LoggerFactory + .getLogger(WrapperQueueImpl.class); + + private int lowWaterMark; + private int highWaterMark; + + private WaterMarkListener queueListenerMark; + + private Queue queueDefault; + + private boolean flooded; + + /** + * @param capacity + * @param queueZipper + */ + public WrapperQueueImpl(int capacity, Queue queueDefault, + WaterMarkListener queueListenerMark) { + this.queueListenerMark = queueListenerMark; + this.queueDefault = Preconditions.checkNotNull(queueDefault); + + this.highWaterMark = (int) (capacity * 0.8); + this.lowWaterMark = (int) (capacity * 0.65); + } + + /** + * Marking checks size of {@link #queueDefault} and on the basis of this is + * set autoRead + */ + private void marking() { + if (queueDefault.size() >= highWaterMark && !flooded) { + queueListenerMark.onHighWaterMark(); + flooded = true; + } else if (queueDefault.size() <= lowWaterMark && flooded) { + queueListenerMark.onLowWaterMark(); + flooded = false; + } + } + + /** + * @return true if flooded + */ + public boolean isFlooded() { + return flooded; + } + + /** + * poll {@link QueueItem} and call {@link #marking()} for check marks and + * set autoRead if it need it + * + * @return polled item + */ + public E poll() { + E nextQueueItem = queueDefault.poll(); + marking(); + return nextQueueItem; + } + + public boolean add(E e) { + return queueDefault.add(e); + } + + public int size() { + return queueDefault.size(); + } + + public boolean isEmpty() { + return queueDefault.isEmpty(); + } + + public boolean contains(Object o) { + return queueDefault.contains(o); + } + + public boolean offer(E e) { + boolean enqueueResult = queueDefault.offer(e); + marking(); + return enqueueResult; + } + + public Iterator iterator() { + return queueDefault.iterator(); + } + + public E remove() { + return queueDefault.remove(); + } + + public Object[] toArray() { + return queueDefault.toArray(); + } + + public E element() { + return queueDefault.element(); + } + + public E peek() { + return queueDefault.peek(); + } + + public T[] toArray(T[] a) { + return queueDefault.toArray(a); + } + + public boolean remove(Object o) { + return queueDefault.remove(o); + } + + public boolean containsAll(Collection c) { + return queueDefault.containsAll(c); + } + + public boolean addAll(Collection c) { + return queueDefault.addAll(c); + } + + public boolean removeAll(Collection c) { + return queueDefault.removeAll(c); + } + + public boolean retainAll(Collection c) { + return queueDefault.retainAll(c); + } + + public void clear() { + queueDefault.clear(); + } + + public boolean equals(Object o) { + return queueDefault.equals(o); + } + + public int hashCode() { + return queueDefault.hashCode(); + } + +} diff --git a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/plan/ConnectionAdapterStackImpl.java b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/plan/ConnectionAdapterStackImpl.java index 296bdeccaa..9af46b4379 100644 --- a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/plan/ConnectionAdapterStackImpl.java +++ b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/plan/ConnectionAdapterStackImpl.java @@ -100,6 +100,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable { private int planItemCounter; + private boolean autoRead = true; + /** * default ctor */ @@ -108,7 +110,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable { } @Override - public synchronized Future> barrier(BarrierInput arg0) { + public synchronized Future> barrier( + BarrierInput arg0) { checkRpcAndNext(arg0, "barrier"); SettableFuture> result = createAndRegisterRpcResult(arg0); return result; @@ -143,14 +146,16 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable { } @Override - public synchronized Future> getAsync(GetAsyncInput arg0) { + public synchronized Future> getAsync( + GetAsyncInput arg0) { checkRpcAndNext(arg0, "echo"); Future> result = createAndRegisterRpcResult(arg0); return result; } @Override - public synchronized Future> getConfig(GetConfigInput arg0) { + public synchronized Future> getConfig( + GetConfigInput arg0) { checkRpcAndNext(arg0, "echo"); Future> result = createAndRegisterRpcResult(arg0); return result; @@ -258,7 +263,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable { @Override public void checkListeners() { - if (ofListener == null || systemListener == null || connectionReadyListener == null) { + if (ofListener == null || systemListener == null + || connectionReadyListener == null) { occuredExceptions .add(new IllegalStateException("missing listeners")); } @@ -283,23 +289,29 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable { throw new IllegalStateException("eventPlan already depleted"); } - LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName, rpcInput.getVersion(), rpcInput.getXid()); + LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName, + rpcInput.getVersion(), rpcInput.getXid()); if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent) && !(eventPlan.peek() instanceof SwitchTestWaitForAllEvent)) { if (eventPlan.peek() instanceof SwitchTestNotificationEvent) { - SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan.peek()); - msg = "expected [notification: " +notifEvent.getPlannedNotification()+ "], got [" + rpcInput.getClass().getSimpleName() - + "]"; + SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan + .peek()); + msg = "expected [notification: " + + notifEvent.getPlannedNotification() + "], got [" + + rpcInput.getClass().getSimpleName() + "]"; } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) { - SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan.peek()); - msg = "expected [rpc: " +rpcEvent.getPlannedRpcResponse()+ "], got [" + rpcInput.getClass().getSimpleName() + SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan + .peek()); + msg = "expected [rpc: " + rpcEvent.getPlannedRpcResponse() + + "], got [" + rpcInput.getClass().getSimpleName() + "]"; } } else { if (eventPlan.peek() instanceof SwitchTestWaitForAllEvent) { SwitchTestWaitForAllEvent switchTestWaitForAll = (SwitchTestWaitForAllEvent) eventPlan .peek(); - Set eventBag = switchTestWaitForAll.getWaitEventBag(); + Set eventBag = switchTestWaitForAll + .getWaitEventBag(); List msgLot = new ArrayList<>(); if (eventBag == null || eventBag.isEmpty()) { @@ -307,7 +319,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable { } else { finished = false; for (SwitchTestWaitForRpcEvent switchTestWaitForRpc : eventBag) { - String msgPart = checkSingleRpcContent(rpcInput, rpcName, switchTestWaitForRpc); + String msgPart = checkSingleRpcContent(rpcInput, + rpcName, switchTestWaitForRpc); if (msgPart != null) { msgLot.add(msgPart); @@ -329,13 +342,15 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable { } else if (eventPlan.peek() instanceof SwitchTestWaitForRpcEvent) { SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan .peek(); - msg = checkSingleRpcContent(rpcInput, rpcName, switchTestRpcEvent); + msg = checkSingleRpcContent(rpcInput, rpcName, + switchTestRpcEvent); } } if (msg != null) { LOG.debug("rpc check .. FAILED: " + msg); - occuredExceptions.add(new IllegalArgumentException("step:"+planItemCounter+" | "+msg)); + occuredExceptions.add(new IllegalArgumentException("step:" + + planItemCounter + " | " + msg)); } else { LOG.debug("rpc check .. OK"); } @@ -349,15 +364,17 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable { * @param switchTestWaitForRpc * @return */ - private static String checkSingleRpcContent(OfHeader rpcInput, String rpcName, - SwitchTestWaitForRpcEvent switchTestWaitForRpc) { + private static String checkSingleRpcContent(OfHeader rpcInput, + String rpcName, SwitchTestWaitForRpcEvent switchTestWaitForRpc) { String failureMsg = null; if (!rpcName.equals(switchTestWaitForRpc.getRpcName())) { - failureMsg = "expected rpc name [" + switchTestWaitForRpc.getRpcName() - + "], got [" + rpcName + "]"; + failureMsg = "expected rpc name [" + + switchTestWaitForRpc.getRpcName() + "], got [" + rpcName + + "]"; } else if (!rpcInput.getXid().equals(switchTestWaitForRpc.getXid())) { - failureMsg = "expected "+rpcName+".xid [" + switchTestWaitForRpc.getXid() - + "], got [" + rpcInput.getXid() + "]"; + failureMsg = "expected " + rpcName + ".xid [" + + switchTestWaitForRpc.getXid() + "], got [" + + rpcInput.getXid() + "]"; } return failureMsg; @@ -380,9 +397,10 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable { * discard current event, execute next, if possible */ private void next() { - LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})", planItemCounter, eventPlan.peek()); + LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})", + planItemCounter, eventPlan.peek()); eventPlan.pop(); - planItemCounter ++; + planItemCounter++; planTouched = true; try { Thread.sleep(JOB_DELAY); @@ -397,7 +415,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable { */ private synchronized void proceed() { boolean processed = false; - LOG.debug("proceeding plan item[{}]: {}", planItemCounter, eventPlan.peek()); + LOG.debug("proceeding plan item[{}]: {}", planItemCounter, + eventPlan.peek()); if (eventPlan.peek() instanceof SwitchTestNotificationEvent) { SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan .peek(); @@ -409,12 +428,13 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable { processRpcResponse(rpcResponse); processed = true; } else if (eventPlan.peek() instanceof SwitchTestCallbackEvent) { - SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan.peek(); + SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan + .peek(); try { callbackEvent.getCallback().call(); } catch (Exception e) { LOG.error(e.getMessage(), e); - occuredExceptions.add(e); + occuredExceptions.add(e); } processed = true; } @@ -439,8 +459,10 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable { planTouched = false; proceed(); if (!planTouched) { - occuredExceptions.add(new IllegalStateException( - "eventPlan STALLED, planItemCounter="+planItemCounter)); + occuredExceptions + .add(new IllegalStateException( + "eventPlan STALLED, planItemCounter=" + + planItemCounter)); break; } } @@ -459,61 +481,57 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable { private synchronized void processNotification( final SwitchTestNotificationEvent notificationEvent) { - Notification notification = notificationEvent - .getPlannedNotification(); + Notification notification = notificationEvent.getPlannedNotification(); LOG.debug("notificating OF_LISTENER: " + notification.getClass().getSimpleName()); // system events if (notification instanceof DisconnectEvent) { - systemListener - .onDisconnectEvent((DisconnectEvent) notification); + systemListener.onDisconnectEvent((DisconnectEvent) notification); } // of notifications else if (notification instanceof EchoRequestMessage) { - ofListener - .onEchoRequestMessage((EchoRequestMessage) notification); + ofListener.onEchoRequestMessage((EchoRequestMessage) notification); } else if (notification instanceof ErrorMessage) { ofListener.onErrorMessage((ErrorMessage) notification); } else if (notification instanceof ExperimenterMessage) { ofListener - .onExperimenterMessage((ExperimenterMessage) notification); + .onExperimenterMessage((ExperimenterMessage) notification); } else if (notification instanceof FlowRemovedMessage) { - ofListener - .onFlowRemovedMessage((FlowRemovedMessage) notification); + ofListener.onFlowRemovedMessage((FlowRemovedMessage) notification); } else if (notification instanceof HelloMessage) { ofListener.onHelloMessage((HelloMessage) notification); } else if (notification instanceof MultipartReplyMessage) { ofListener - .onMultipartReplyMessage((MultipartReplyMessage) notification); + .onMultipartReplyMessage((MultipartReplyMessage) notification); } else if (notification instanceof PacketInMessage) { - ofListener - .onPacketInMessage((PacketInMessage) notification); + ofListener.onPacketInMessage((PacketInMessage) notification); } else if (notification instanceof PortStatusMessage) { - ofListener - .onPortStatusMessage((PortStatusMessage) notification); + ofListener.onPortStatusMessage((PortStatusMessage) notification); } // default else { - occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " + - "message listening not supported for type: " - + notification.getClass())); + occuredExceptions.add(new IllegalStateException("step:" + + planItemCounter + " | " + + "message listening not supported for type: " + + notification.getClass())); } - LOG.debug("notification ["+notification.getClass().getSimpleName()+"] .. done"); + LOG.debug("notification [" + notification.getClass().getSimpleName() + + "] .. done"); } /** * @param rpcResponse */ - private synchronized void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) { - OfHeader plannedRpcResponseValue = rpcResponse - .getPlannedRpcResponse(); + private synchronized void processRpcResponse( + final SwitchTestRcpResponseEvent rpcResponse) { + OfHeader plannedRpcResponseValue = rpcResponse.getPlannedRpcResponse(); LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid()); @SuppressWarnings("unchecked") SettableFuture> response = (SettableFuture>) rpcResults - .get(rpcResponse.getXid()); + .get(rpcResponse.getXid()); if (response != null) { boolean successful = plannedRpcResponseValue != null; @@ -521,33 +539,25 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable { if (successful) { errors = Collections.emptyList(); } else { - errors = Lists - .newArrayList(RpcErrors - .getRpcError( - "unit", - "unit", - "not requested", - ErrorSeverity.ERROR, - "planned response to RPC.id = " - + rpcResponse.getXid(), - ErrorType.RPC, - new Exception( - "rpc response failed (planned behavior)"))); + errors = Lists.newArrayList(RpcErrors.getRpcError("unit", + "unit", "not requested", ErrorSeverity.ERROR, + "planned response to RPC.id = " + rpcResponse.getXid(), + ErrorType.RPC, new Exception( + "rpc response failed (planned behavior)"))); } RpcResult result = Rpcs.getRpcResult(successful, plannedRpcResponseValue, errors); response.set(result); } else { String msg = "RpcResponse not expected: xid=" - + rpcResponse.getXid() - + ", " - + plannedRpcResponseValue.getClass() - .getSimpleName(); + + rpcResponse.getXid() + ", " + + plannedRpcResponseValue.getClass().getSimpleName(); LOG.error(msg); - occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " + msg)); + occuredExceptions.add(new IllegalStateException("step:" + + planItemCounter + " | " + msg)); } - LOG.debug("rpc ["+rpcResponse.getXid()+"] .. done"); + LOG.debug("rpc [" + rpcResponse.getXid() + "] .. done"); } /** @@ -597,7 +607,7 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable { @Override public void fireConnectionReadyNotification() { - connectionReadyListener.onConnectionReady(); + connectionReadyListener.onConnectionReady(); } @Override @@ -607,8 +617,7 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable { } @Override - public Future> multipartRequest( - MultipartRequestInput arg0) { + public Future> multipartRequest(MultipartRequestInput arg0) { checkRpcAndNext(arg0, "multipartRequestInput"); SettableFuture> result = createOneWayRpcResult(); return result; @@ -620,4 +629,14 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable { return null; } + @Override + public boolean isAutoRead() { + return autoRead; + } + + @Override + public void setAutoRead(boolean autoRead) { + this.autoRead = autoRead; + } + } diff --git a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImplTest.java b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImplTest.java index 7ce4730333..09a532abab 100644 --- a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImplTest.java +++ b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImplTest.java @@ -23,16 +23,16 @@ import org.junit.Before; import org.junit.Test; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener; -import org.opendaylight.openflowplugin.api.openflow.md.core.session.IMessageDispatchService; -import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext; -import org.opendaylight.openflowplugin.api.openflow.md.core.session.SwitchSessionKeyOF; -import org.opendaylight.openflowplugin.api.openflow.md.ModelDrivenSwitch; import org.opendaylight.openflowplugin.api.OFConstants; +import org.opendaylight.openflowplugin.api.openflow.md.ModelDrivenSwitch; import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor; import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler; import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationEnqueuer; import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationQueueWrapper; import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher; +import org.opendaylight.openflowplugin.api.openflow.md.core.session.IMessageDispatchService; +import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext; +import org.opendaylight.openflowplugin.api.openflow.md.core.session.SwitchSessionKeyOF; import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueProcessor; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder; @@ -97,7 +97,7 @@ public class MessageDispatchServiceImplTest { /** * Test barrier message for null cookie - * + * * @throws Exception */ @Test @@ -106,7 +106,8 @@ public class MessageDispatchServiceImplTest { SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey(); BarrierInputBuilder barrierMsg = new BarrierInputBuilder(); session.getMessageDispatchService().barrier(barrierMsg.build(), cookie); - Assert.assertEquals(MessageType.BARRIER, session.getPrimaryConductor().getMessageType()); + Assert.assertEquals(MessageType.BARRIER, session.getPrimaryConductor() + .getMessageType()); } /** @@ -117,8 +118,10 @@ public class MessageDispatchServiceImplTest { MockConnectionConductor conductor = new MockConnectionConductor(1); SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey(); ExperimenterInputBuilder experimenterInputBuilder = new ExperimenterInputBuilder(); - session.getMessageDispatchService().experimenter(experimenterInputBuilder.build(), cookie); - Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType()); + session.getMessageDispatchService().experimenter( + experimenterInputBuilder.build(), cookie); + Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor() + .getMessageType()); } /** @@ -129,8 +132,10 @@ public class MessageDispatchServiceImplTest { MockConnectionConductor conductor = new MockConnectionConductor(1); SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey(); GetAsyncInputBuilder getAsyncInputBuilder = new GetAsyncInputBuilder(); - session.getMessageDispatchService().getAsync(getAsyncInputBuilder.build(), cookie); - Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType()); + session.getMessageDispatchService().getAsync( + getAsyncInputBuilder.build(), cookie); + Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor() + .getMessageType()); } /** @@ -141,8 +146,10 @@ public class MessageDispatchServiceImplTest { MockConnectionConductor conductor = new MockConnectionConductor(1); SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey(); GetConfigInputBuilder getConfigInputBuilder = new GetConfigInputBuilder(); - session.getMessageDispatchService().getConfig(getConfigInputBuilder.build(), cookie); - Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType()); + session.getMessageDispatchService().getConfig( + getConfigInputBuilder.build(), cookie); + Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor() + .getMessageType()); } /** @@ -153,8 +160,10 @@ public class MessageDispatchServiceImplTest { MockConnectionConductor conductor = new MockConnectionConductor(1); SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey(); GetFeaturesInputBuilder getFeaturesInputBuilder = new GetFeaturesInputBuilder(); - session.getMessageDispatchService().getFeatures(getFeaturesInputBuilder.build(), cookie); - Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType()); + session.getMessageDispatchService().getFeatures( + getFeaturesInputBuilder.build(), cookie); + Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor() + .getMessageType()); } /** @@ -165,8 +174,10 @@ public class MessageDispatchServiceImplTest { MockConnectionConductor conductor = new MockConnectionConductor(1); SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey(); GetQueueConfigInputBuilder getQueueConfigInputBuilder = new GetQueueConfigInputBuilder(); - session.getMessageDispatchService().getQueueConfig(getQueueConfigInputBuilder.build(), cookie); - Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType()); + session.getMessageDispatchService().getQueueConfig( + getQueueConfigInputBuilder.build(), cookie); + Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor() + .getMessageType()); } /** @@ -177,8 +188,10 @@ public class MessageDispatchServiceImplTest { MockConnectionConductor conductor = new MockConnectionConductor(1); SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey(); MultipartRequestInputBuilder multipartRequestInputBuilder = new MultipartRequestInputBuilder(); - session.getMessageDispatchService().multipartRequest(multipartRequestInputBuilder.build(), cookie); - Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType()); + session.getMessageDispatchService().multipartRequest( + multipartRequestInputBuilder.build(), cookie); + Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor() + .getMessageType()); } /** @@ -189,8 +202,10 @@ public class MessageDispatchServiceImplTest { MockConnectionConductor conductor = new MockConnectionConductor(1); SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey(); RoleRequestInputBuilder roleRequestInputBuilder = new RoleRequestInputBuilder(); - session.getMessageDispatchService().roleRequest(roleRequestInputBuilder.build(), cookie); - Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType()); + session.getMessageDispatchService().roleRequest( + roleRequestInputBuilder.build(), cookie); + Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor() + .getMessageType()); } /** @@ -201,25 +216,27 @@ public class MessageDispatchServiceImplTest { MockConnectionConductor conductor = new MockConnectionConductor(1); SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey(); TableModInputBuilder tableModInputBuilder = new TableModInputBuilder(); - session.getMessageDispatchService().tableMod(tableModInputBuilder.build(), cookie); - Assert.assertEquals(MessageType.TABLEMOD, session.getPrimaryConductor().getMessageType()); + session.getMessageDispatchService().tableMod( + tableModInputBuilder.build(), cookie); + Assert.assertEquals(MessageType.TABLEMOD, session.getPrimaryConductor() + .getMessageType()); } - /** * Test packet out message for primary connection - * + * * @throws Exception */ @Test public void testPacketOutMessageForPrimary() throws Exception { session.getMessageDispatchService().packetOut(null, null); - Assert.assertEquals(MessageType.PACKETOUT, session.getPrimaryConductor().getMessageType()); + Assert.assertEquals(MessageType.PACKETOUT, session + .getPrimaryConductor().getMessageType()); } /** * Test packet out message for auxiliary connection - * + * * @throws Exception */ @Test @@ -228,14 +245,16 @@ public class MessageDispatchServiceImplTest { SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey(); session.addAuxiliaryConductor(cookie, conductor); session.getMessageDispatchService().packetOut(null, cookie); - Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType()); - conductor = (MockConnectionConductor) session.getAuxiliaryConductor(cookie); + Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor() + .getMessageType()); + conductor = (MockConnectionConductor) session + .getAuxiliaryConductor(cookie); Assert.assertEquals(MessageType.PACKETOUT, conductor.getMessageType()); } /** * Test packet out message when multiple auxiliary connection exist - * + * * @throws Exception */ @Test @@ -253,28 +272,33 @@ public class MessageDispatchServiceImplTest { // send message session.getMessageDispatchService().packetOut(builder.build(), cookie2); - Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType()); + Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor() + .getMessageType()); - conductor3 = (MockConnectionConductor) session.getAuxiliaryConductor(cookie3); + conductor3 = (MockConnectionConductor) session + .getAuxiliaryConductor(cookie3); Assert.assertEquals(MessageType.NONE, conductor3.getMessageType()); - conductor2 = (MockConnectionConductor) session.getAuxiliaryConductor(cookie2); + conductor2 = (MockConnectionConductor) session + .getAuxiliaryConductor(cookie2); Assert.assertEquals(MessageType.PACKETOUT, conductor2.getMessageType()); - conductor1 = (MockConnectionConductor) session.getAuxiliaryConductor(cookie1); + conductor1 = (MockConnectionConductor) session + .getAuxiliaryConductor(cookie1); Assert.assertEquals(MessageType.NONE, conductor1.getMessageType()); } /** * Test for invalid session - * + * * @throws Exception */ @Test public void testInvalidSession() throws Exception { session.setValid(false); - Future> resultFuture = session.getMessageDispatchService().packetOut(null, null); + Future> resultFuture = session + .getMessageDispatchService().packetOut(null, null); if (resultFuture.isDone()) { RpcResult rpcResult = resultFuture.get(); Assert.assertTrue(!rpcResult.getErrors().isEmpty()); @@ -282,9 +306,12 @@ public class MessageDispatchServiceImplTest { Iterator it = rpcResult.getErrors().iterator(); RpcError rpcError = it.next(); - Assert.assertTrue(rpcError.getApplicationTag().equals(OFConstants.APPLICATION_TAG)); - Assert.assertTrue(rpcError.getTag().equals(OFConstants.ERROR_TAG_TIMEOUT)); - Assert.assertTrue(rpcError.getErrorType().equals(RpcError.ErrorType.TRANSPORT)); + Assert.assertTrue(rpcError.getApplicationTag().equals( + OFConstants.APPLICATION_TAG)); + Assert.assertTrue(rpcError.getTag().equals( + OFConstants.ERROR_TAG_TIMEOUT)); + Assert.assertTrue(rpcError.getErrorType().equals( + RpcError.ErrorType.TRANSPORT)); } } @@ -320,7 +347,8 @@ class MockSessionContext implements SessionContext { } @Override - public ConnectionConductor getAuxiliaryConductor(SwitchConnectionDistinguisher auxiliaryKey) { + public ConnectionConductor getAuxiliaryConductor( + SwitchConnectionDistinguisher auxiliaryKey) { return map.get(auxiliaryKey); } @@ -332,12 +360,15 @@ class MockSessionContext implements SessionContext { } @Override - public void addAuxiliaryConductor(SwitchConnectionDistinguisher auxiliaryKey, ConnectionConductor conductorArg) { + public void addAuxiliaryConductor( + SwitchConnectionDistinguisher auxiliaryKey, + ConnectionConductor conductorArg) { map.put(auxiliaryKey, conductorArg); } @Override - public ConnectionConductor removeAuxiliaryConductor(SwitchConnectionDistinguisher connectionCookie) { + public ConnectionConductor removeAuxiliaryConductor( + SwitchConnectionDistinguisher connectionCookie) { return map.remove(connectionCookie); } @@ -434,19 +465,21 @@ class MockSessionContext implements SessionContext { } /** - * @param seed the seed to set + * @param seed + * the seed to set */ public void setSeed(int seed) { this.seed = seed; } - + @Override public NotificationEnqueuer getNotificationEnqueuer() { return conductor; } } -class MockConnectionConductor implements ConnectionConductor, NotificationEnqueuer { +class MockConnectionConductor implements ConnectionConductor, + NotificationEnqueuer { private int conductorNum; private MockConnectionAdapter adapter; @@ -526,7 +559,8 @@ class MockConnectionConductor implements ConnectionConductor, NotificationEnqueu } @Override - public void setQueueProcessor(QueueProcessor queueKeeper) { + public void setQueueProcessor( + QueueProcessor queueKeeper) { // NOOP } @@ -539,7 +573,7 @@ class MockConnectionConductor implements ConnectionConductor, NotificationEnqueu public void setId(int conductorId) { // NOOP } - + @Override public void enqueueNotification(NotificationQueueWrapper notification) { // NOOP @@ -602,13 +636,15 @@ class MockConnectionAdapter implements ConnectionAdapter { } @Override - public Future> getFeatures(GetFeaturesInput input) { + public Future> getFeatures( + GetFeaturesInput input) { // TODO Auto-generated method stub return null; } @Override - public Future> getQueueConfig(GetQueueConfigInput input) { + public Future> getQueueConfig( + GetQueueConfigInput input) { // TODO Auto-generated method stub return null; } @@ -644,7 +680,8 @@ class MockConnectionAdapter implements ConnectionAdapter { } @Override - public Future> roleRequest(RoleRequestInput input) { + public Future> roleRequest( + RoleRequestInput input) { // TODO Auto-generated method stub return null; } @@ -705,7 +742,8 @@ class MockConnectionAdapter implements ConnectionAdapter { } /** - * @param messageType the messageType to set + * @param messageType + * the messageType to set */ public void setMessageType(MessageType messageType) { this.messageType = messageType; @@ -723,18 +761,33 @@ class MockConnectionAdapter implements ConnectionAdapter { } @Override - public Future> multipartRequest( - MultipartRequestInput input) { + public Future> multipartRequest(MultipartRequestInput input) { // TODO Auto-generated method stub return null; } - /* (non-Javadoc) - * @see org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter#getRemoteAddress() + /* + * (non-Javadoc) + * + * @see + * org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter + * #getRemoteAddress() */ @Override public InetSocketAddress getRemoteAddress() { // TODO Auto-generated method stub return null; } + + @Override + public boolean isAutoRead() { + // TODO Auto-generated method stub + return false; + } + + @Override + public void setAutoRead(boolean arg0) { + // TODO Auto-generated method stub + + } } diff --git a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/WrapperQueueImplTest.java b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/WrapperQueueImplTest.java new file mode 100644 index 0000000000..62add1e9f1 --- /dev/null +++ b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/WrapperQueueImplTest.java @@ -0,0 +1,163 @@ +package org.opendaylight.openflowplugin.openflow.md.util; + +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; +import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor; +import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener; +import org.opendaylight.openflowplugin.openflow.md.queue.WrapperQueueImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(MockitoJUnitRunner.class) +public class WrapperQueueImplTest { + + protected static final Logger LOG = LoggerFactory + .getLogger(WrapperQueueImplTest.class); + + @Mock + private ConnectionConductor connectionConductor; + + @Mock + private WaterMarkListener waterMarkListener; + + private WrapperQueueImpl wrapperQueueImpl; + private final int capacity = 100; + private Queue queueDefault; + private int highWaterMark = 80; + private int lowWaterMark = 65; + + /** + * Setup before tests + */ + @Before + public void setUp() { + queueDefault = new ArrayBlockingQueue<>(capacity); + + wrapperQueueImpl = new WrapperQueueImpl<>(capacity, queueDefault, + waterMarkListener); + } + + /** + * Test for check if wrapper is not null + */ + @Test + public void testWrapperQueueImpl() { + Assert.assertNotNull("Wrapper can not be null.", wrapperQueueImpl); + } + + /** + * Test for set setAutoRead on false on high water mark + */ + @Test + public void testReadOnHighWaterMark() { + + Assert.assertFalse("Wrapper must be not flooded at the start.", + wrapperQueueImpl.isFlooded()); + + push(79); + Assert.assertFalse("Wrapper should not be flooded.", + wrapperQueueImpl.isFlooded()); + Mockito.verify(waterMarkListener, Mockito.times(0)).onHighWaterMark(); + + push(1); + Assert.assertTrue("Wrapper should be flooded.", + wrapperQueueImpl.isFlooded()); + Mockito.verify(waterMarkListener, Mockito.times(1)).onHighWaterMark(); + + Assert.assertEquals( + "Size of queue has to be equals to 80% of capacity of queue", + highWaterMark, queueDefault.size()); + } + + /** + * + */ + private void push(int size) { + for (int i = 0; i < size; i++) { + try { + wrapperQueueImpl.offer(i); + } catch (Exception e) { + LOG.error("Failed to offer item to queue.", e); + } + } + } + + /** + * Test for setAutoRead on true on low water mark + */ + @Test + public void testReadOnLowWaterMark() { + Mockito.verify(waterMarkListener, Mockito.times(0)).onHighWaterMark(); + push(80); + Assert.assertTrue("Wrapper should be flooded.", + wrapperQueueImpl.isFlooded()); + Mockito.verify(waterMarkListener, Mockito.times(1)).onHighWaterMark(); + + Assert.assertEquals( + "Size of queue has to be equals to 80% of capacity of queue", + highWaterMark, queueDefault.size()); + + poll(14); + Mockito.verify(waterMarkListener, Mockito.times(0)).onLowWaterMark(); + Assert.assertTrue("Wrapper should be still flooded.", + wrapperQueueImpl.isFlooded()); + + poll(1); + Mockito.verify(waterMarkListener, Mockito.times(1)).onLowWaterMark(); + + Assert.assertEquals( + "Size of queue has to be equals to 65% on lowWaterMark.", + lowWaterMark, queueDefault.size()); + Assert.assertFalse("Wrapped should be not flooded.", + wrapperQueueImpl.isFlooded()); + } + + /** + * Polling messages + */ + private void poll(int size) { + + for (int i = 0; i < size; i++) { + wrapperQueueImpl.poll(); + } + } + + /** + * Test for one cycle. + */ + @Test + public void testEndReadOnHWMStartOnLWM() { + + Assert.assertFalse("Wrapper should not be flooded", + wrapperQueueImpl.isFlooded()); + Mockito.verify(waterMarkListener, Mockito.times(0)).onLowWaterMark(); + Mockito.verify(waterMarkListener, Mockito.times(0)).onHighWaterMark(); + + push(81); + Assert.assertTrue("Wrapper should be flooded", + wrapperQueueImpl.isFlooded()); + Mockito.verify(waterMarkListener, Mockito.times(0)).onLowWaterMark(); + Mockito.verify(waterMarkListener, Mockito.times(1)).onHighWaterMark(); + + poll(17); + Assert.assertFalse("Wrapper should not be flooded", + wrapperQueueImpl.isFlooded()); + Mockito.verify(waterMarkListener, Mockito.times(1)).onLowWaterMark(); + Mockito.verify(waterMarkListener, Mockito.times(1)).onHighWaterMark(); + + push(18); + Assert.assertTrue("Wrapper should be flooded", + wrapperQueueImpl.isFlooded()); + + Mockito.verify(waterMarkListener, Mockito.times(1)).onLowWaterMark(); + Mockito.verify(waterMarkListener, Mockito.times(2)).onHighWaterMark(); + } +} -- 2.36.6