bug 2446 - High priority (control) queue stop reading from channel if is full 95/15195/4
authorary <jatoth@cisco.com>
Thu, 12 Feb 2015 12:54:24 +0000 (13:54 +0100)
committermichal rehak <mirehak@cisco.com>
Tue, 17 Feb 2015 10:53:27 +0000 (10:53 +0000)
this patch depends on https://git.opendaylight.org/gerrit/#/c/14438/

Signed-off-by: ary <jatoth@cisco.com>
Change-Id: Ie990b1987d2210ec520648e081bb250588f4f342

openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/queue/WaterMarkListener.java [new file with mode: 0644]
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/queue/WaterMarkListenerImpl.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFactory.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFairImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/WrapperQueueImpl.java [new file with mode: 0644]
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/plan/ConnectionAdapterStackImpl.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImplTest.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/WrapperQueueImplTest.java [new file with mode: 0644]

diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/queue/WaterMarkListener.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/queue/WaterMarkListener.java
new file mode 100644 (file)
index 0000000..e2609c3
--- /dev/null
@@ -0,0 +1,14 @@
+package org.opendaylight.openflowplugin.api.openflow.md.queue;
+
+public interface WaterMarkListener {
+
+    /**
+     * When HighWaterMark reached and currently not flooded
+     */
+    void onHighWaterMark();
+
+    /**
+     * When LowWaterMark reached and currently flooded
+     */
+    void onLowWaterMark();
+}
diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/queue/WaterMarkListenerImpl.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/queue/WaterMarkListenerImpl.java
new file mode 100644 (file)
index 0000000..e30e1f4
--- /dev/null
@@ -0,0 +1,45 @@
+package org.opendaylight.openflowplugin.api.openflow.md.queue;
+
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class WaterMarkListenerImpl implements WaterMarkListener {
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(WaterMarkListenerImpl.class);
+
+    private ConnectionAdapter connectionAdapter;
+
+    public WaterMarkListenerImpl(ConnectionAdapter connectionAdapter) {
+        this.connectionAdapter = Preconditions.checkNotNull(connectionAdapter);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.opendaylight.openflowplugin.api.openflow.md.queue.QueueListener#
+     * onHighWaterMark
+     * (org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter)
+     */
+    @Override
+    public void onHighWaterMark() {
+        connectionAdapter.setAutoRead(false);
+        LOG.debug("AutoRead is set on false.");
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.opendaylight.openflowplugin.api.openflow.md.queue.QueueListener#
+     * onLowWaterMark
+     * (org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter)
+     */
+    @Override
+    public void onLowWaterMark() {
+        connectionAdapter.setAutoRead(true);
+        LOG.debug("AutoRead is set on true.");
+    }
+}
index e892f575db90a57bc8be9895d7e17832f1cc351c..210c88adb768683df90039b55bbeb7476c2f6a4f 100644 (file)
@@ -8,11 +8,11 @@
 
 package org.opendaylight.openflowplugin.openflow.md.core;
 
-import com.google.common.util.concurrent.Futures;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
 import org.opendaylight.openflowplugin.api.OFConstants;
@@ -27,6 +27,8 @@ import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionManag
 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper.QueueType;
 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueProcessor;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListenerImpl;
 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
 import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil;
 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory;
@@ -62,11 +64,15 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.Futures;
+
 /**
  * @author mirehak
  */
 public class ConnectionConductorImpl implements OpenflowProtocolListener,
-        SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener, NotificationEnqueuer, AutoCloseable {
+        SystemNotificationsListener, ConnectionConductor,
+        ConnectionReadyListener, HandshakeListener, NotificationEnqueuer,
+        AutoCloseable {
 
     /**
      * ingress queue limit
@@ -76,9 +82,10 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     protected static final Logger LOG = LoggerFactory
             .getLogger(ConnectionConductorImpl.class);
 
-    /* variable to make BitMap-based negotiation enabled / disabled.
-     * it will help while testing and isolating issues related to processing of
-     * BitMaps from switches.
+    /*
+     * variable to make BitMap-based negotiation enabled / disabled. it will
+     * help while testing and isolating issues related to processing of BitMaps
+     * from switches.
      */
     private boolean isBitmapNegotiationEnable = true;
     protected ErrorHandler errorHandler;
@@ -104,7 +111,6 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
     private int ingressMaxQueueSize;
 
-
     /**
      * @param connectionAdapter
      */
@@ -114,15 +120,18 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
     /**
      * @param connectionAdapter
-     * @param ingressMaxQueueSize ingress queue limit (blocking)
+     * @param ingressMaxQueueSize
+     *            ingress queue limit (blocking)
      */
-    public ConnectionConductorImpl(ConnectionAdapter connectionAdapter, int ingressMaxQueueSize) {
+    public ConnectionConductorImpl(ConnectionAdapter connectionAdapter,
+            int ingressMaxQueueSize) {
         this.connectionAdapter = connectionAdapter;
         this.ingressMaxQueueSize = ingressMaxQueueSize;
         conductorState = CONDUCTOR_STATE.HANDSHAKING;
         firstHelloProcessed = false;
         handshakeManager = new HandshakeManagerImpl(connectionAdapter,
-                ConnectionConductor.versionOrder.get(0), ConnectionConductor.versionOrder);
+                ConnectionConductor.versionOrder.get(0),
+                ConnectionConductor.versionOrder);
         handshakeManager.setUseVersionBitmap(isBitmapNegotiationEnable);
         handshakeManager.setHandshakeListener(this);
         portFeaturesUtils = PortFeaturesUtil.getInstance();
@@ -131,23 +140,29 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     @Override
     public void init() {
         int handshakeThreadLimit = 1;
-        hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit, handshakeThreadLimit, 0L,
-                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
-                "OFHandshake-" + conductorId);
+        hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit,
+                handshakeThreadLimit, 0L, TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<Runnable>(), "OFHandshake-"
+                        + conductorId);
 
         connectionAdapter.setMessageListener(this);
         connectionAdapter.setSystemListener(this);
         connectionAdapter.setConnectionReadyListener(this);
-        queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor, ingressMaxQueueSize);
+        WaterMarkListener waterMarkListener = new WaterMarkListenerImpl(
+                connectionAdapter);
+        queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor,
+                ingressMaxQueueSize, waterMarkListener);
     }
 
     @Override
-    public void setQueueProcessor(QueueProcessor<OfHeader, DataObject> queueProcessor) {
+    public void setQueueProcessor(
+            QueueProcessor<OfHeader, DataObject> queueProcessor) {
         this.queueProcessor = queueProcessor;
     }
 
     /**
-     * @param errorHandler the errorHandler to set
+     * @param errorHandler
+     *            the errorHandler to set
      */
     @Override
     public void setErrorHandler(ErrorHandler errorHandler) {
@@ -160,7 +175,8 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         new Thread(new Runnable() {
             @Override
             public void run() {
-                LOG.debug("echo request received: " + echoRequestMessage.getXid());
+                LOG.debug("echo request received: "
+                        + echoRequestMessage.getXid());
                 EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
                 builder.setVersion(echoRequestMessage.getVersion());
                 builder.setXid(echoRequestMessage.getXid());
@@ -176,7 +192,6 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         enqueueMessage(errorMessage);
     }
 
-
     /**
      * @param message
      */
@@ -191,7 +206,8 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
     /**
      * @param message
-     * @param queueType enqueue type
+     * @param queueType
+     *            enqueue type
      */
     private void enqueueMessage(OfHeader message, QueueType queueType) {
         queue.push(message, this, queueType);
@@ -207,15 +223,15 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         enqueueMessage(message);
     }
 
-
     /**
-     * version negotiation happened as per following steps:
-     * 1. If HelloMessage version field has same version, continue connection processing.
-     * If HelloMessage version is lower than supported versions, just disconnect.
+     * version negotiation happened as per following steps: 1. If HelloMessage
+     * version field has same version, continue connection processing. If
+     * HelloMessage version is lower than supported versions, just disconnect.
      * 2. If HelloMessage contains bitmap and common version found in bitmap
-     * then continue connection processing. if no common version found, just disconnect.
-     * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
-     * 4. If Hello message received again with not supported version, just disconnect.
+     * then continue connection processing. if no common version found, just
+     * disconnect. 3. If HelloMessage version is not supported, send
+     * HelloMessage with lower supported version. 4. If Hello message received
+     * again with not supported version, just disconnect.
      */
     @Override
     public void onHelloMessage(final HelloMessage hello) {
@@ -274,10 +290,13 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         Boolean portBandwidth = portFeaturesUtils.getPortBandwidth(msg);
 
         if (portBandwidth == null) {
-            LOG.debug("can't get bandwidth info from port: {}, aborting port update", msg.toString());
+            LOG.debug(
+                    "can't get bandwidth info from port: {}, aborting port update",
+                    msg.toString());
         } else {
             this.getSessionContext().getPhysicalPorts().put(portNumber, msg);
-            this.getSessionContext().getPortsBandwidth().put(portNumber, portBandwidth);
+            this.getSessionContext().getPortsBandwidth()
+                    .put(portNumber, portBandwidth);
         }
     }
 
@@ -294,12 +313,17 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
             @Override
             public void run() {
                 if (!CONDUCTOR_STATE.WORKING.equals(getConductorState())) {
-                    // idle state in any other conductorState than WORKING means real
-                    // problem and wont be handled by echoReply, but disconnection
+                    // idle state in any other conductorState than WORKING means
+                    // real
+                    // problem and wont be handled by echoReply, but
+                    // disconnection
                     disconnect();
-                    OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this);
+                    OFSessionUtil.getSessionManager().invalidateOnDisconnect(
+                            ConnectionConductorImpl.this);
                 } else {
-                    LOG.debug("first idle state occured, sessionCtx={}|auxId={}", sessionContext, auxiliaryKey);
+                    LOG.debug(
+                            "first idle state occured, sessionCtx={}|auxId={}",
+                            sessionContext, auxiliaryKey);
                     EchoInputBuilder builder = new EchoInputBuilder();
                     builder.setVersion(getVersion());
                     builder.setXid(getSessionContext().getNextXid());
@@ -308,27 +332,30 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
                             .echo(builder.build());
 
                     try {
-                        RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
-                                getMaxTimeoutUnit());
+                        RpcResult<EchoOutput> echoReplyValue = echoReplyFuture
+                                .get(getMaxTimeout(), getMaxTimeoutUnit());
                         if (echoReplyValue.isSuccessful()) {
                             setConductorState(CONDUCTOR_STATE.WORKING);
                         } else {
-                            for (RpcError replyError : echoReplyValue.getErrors()) {
+                            for (RpcError replyError : echoReplyValue
+                                    .getErrors()) {
                                 Throwable cause = replyError.getCause();
                                 LOG.error(
                                         "while receiving echoReply in TIMEOUTING state: "
                                                 + cause.getMessage(), cause);
                             }
-                            //switch issue occurred
+                            // switch issue occurred
                             throw new Exception("switch issue occurred");
                         }
                     } catch (Exception e) {
                         LOG.error("while waiting for echoReply in TIMEOUTING state: "
                                 + e.getMessage());
                         errorHandler.handleException(e, sessionContext);
-                        //switch is not responding
+                        // switch is not responding
                         disconnect();
-                        OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this);
+                        OFSessionUtil.getSessionManager()
+                                .invalidateOnDisconnect(
+                                        ConnectionConductorImpl.this);
                     }
                 }
             }
@@ -337,7 +364,8 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     }
 
     /**
-     * @param conductorState the connectionState to set
+     * @param conductorState
+     *            the connectionState to set
      */
     @Override
     public void setConductorState(CONDUCTOR_STATE conductorState) {
@@ -373,7 +401,8 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
     @Override
     public Future<Boolean> disconnect() {
-        LOG.trace("disconnecting: sessionCtx={}|auxId={}", sessionContext, auxiliaryKey);
+        LOG.trace("disconnecting: sessionCtx={}|auxId={}", sessionContext,
+                auxiliaryKey);
 
         Future<Boolean> result = null;
         if (connectionAdapter.isAlive()) {
@@ -426,7 +455,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
     @Override
     public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput,
-                                       Short negotiatedVersion) {
+            Short negotiatedVersion) {
         postHandshakeBasic(featureOutput, negotiatedVersion);
 
         // post-handshake actions
@@ -447,19 +476,21 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
     /**
      * used by tests
-     *
+     * 
      * @param featureOutput
      * @param negotiatedVersion
      */
     protected void postHandshakeBasic(GetFeaturesOutput featureOutput,
-                                      Short negotiatedVersion) {
+            Short negotiatedVersion) {
         version = negotiatedVersion;
         if (version == OFConstants.OFP_VERSION_1_0) {
-            //  Because the GetFeaturesOutput contains information about the port
-            //  in OF1.0 (that we would otherwise get from the PortDesc) we have to pass
-            //  it up for parsing to convert into a NodeConnectorUpdate
+            // Because the GetFeaturesOutput contains information about the port
+            // in OF1.0 (that we would otherwise get from the PortDesc) we have
+            // to pass
+            // it up for parsing to convert into a NodeConnectorUpdate
             //
-            // BUG-1988 - this must be the first item in queue in order not to get behind link-up message
+            // BUG-1988 - this must be the first item in queue in order not to
+            // get behind link-up message
             enqueueMessage(featureOutput);
         }
 
@@ -471,14 +502,15 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     }
 
     /*
-     *  Send an OFPMP_DESC request message to the switch
+     * Send an OFPMP_DESC request message to the switch
      */
     private void requestDesc() {
         MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder();
         builder.setType(MultipartType.OFPMPDESC);
         builder.setVersion(getVersion());
         builder.setFlags(new MultipartRequestFlags(false));
-        builder.setMultipartRequestBody(new MultipartRequestDescCaseBuilder().build());
+        builder.setMultipartRequestBody(new MultipartRequestDescCaseBuilder()
+                .build());
         builder.setXid(getSessionContext().getNextXid());
         getConnectionAdapter().multipartRequest(builder.build());
     }
@@ -488,7 +520,8 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         builder.setType(MultipartType.OFPMPPORTDESC);
         builder.setVersion(getVersion());
         builder.setFlags(new MultipartRequestFlags(false));
-        builder.setMultipartRequestBody(new MultipartRequestPortDescCaseBuilder().build());
+        builder.setMultipartRequestBody(new MultipartRequestPortDescCaseBuilder()
+                .build());
         builder.setXid(getSessionContext().getNextXid());
         getConnectionAdapter().multipartRequest(builder.build());
     }
@@ -500,11 +533,11 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         mprInput.setFlags(new MultipartRequestFlags(false));
         mprInput.setXid(getSessionContext().getNextXid());
 
-        MultipartRequestGroupFeaturesCaseBuilder mprGroupFeaturesBuild =
-                new MultipartRequestGroupFeaturesCaseBuilder();
+        MultipartRequestGroupFeaturesCaseBuilder mprGroupFeaturesBuild = new MultipartRequestGroupFeaturesCaseBuilder();
         mprInput.setMultipartRequestBody(mprGroupFeaturesBuild.build());
 
-        LOG.debug("Send group features statistics request :{}", mprGroupFeaturesBuild);
+        LOG.debug("Send group features statistics request :{}",
+                mprGroupFeaturesBuild);
         getConnectionAdapter().multipartRequest(mprInput.build());
 
     }
@@ -516,20 +549,20 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         mprInput.setFlags(new MultipartRequestFlags(false));
         mprInput.setXid(getSessionContext().getNextXid());
 
-        MultipartRequestMeterFeaturesCaseBuilder mprMeterFeaturesBuild =
-                new MultipartRequestMeterFeaturesCaseBuilder();
+        MultipartRequestMeterFeaturesCaseBuilder mprMeterFeaturesBuild = new MultipartRequestMeterFeaturesCaseBuilder();
         mprInput.setMultipartRequestBody(mprMeterFeaturesBuild.build());
 
-        LOG.debug("Send meter features statistics request :{}", mprMeterFeaturesBuild);
+        LOG.debug("Send meter features statistics request :{}",
+                mprMeterFeaturesBuild);
         getConnectionAdapter().multipartRequest(mprInput.build());
 
     }
 
     /**
-     * @param isBitmapNegotiationEnable the isBitmapNegotiationEnable to set
+     * @param isBitmapNegotiationEnable
+     *            the isBitmapNegotiationEnable to set
      */
-    public void setBitmapNegotiationEnable(
-            boolean isBitmapNegotiationEnable) {
+    public void setBitmapNegotiationEnable(boolean isBitmapNegotiationEnable) {
         this.isBitmapNegotiationEnable = isBitmapNegotiationEnable;
     }
 
index 0c856cff298e2f502090d6a4815f0f9529dbea9a..1143afa1def797c68a85f1f44f02b69eba2c5222 100644 (file)
@@ -9,37 +9,50 @@ package org.opendaylight.openflowplugin.openflow.md.queue;
 
 import org.opendaylight.openflowplugin.api.openflow.md.queue.MessageSourcePollRegistrator;
 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 
 /**
- * factory for {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper} implementations
+ * factory for
+ * {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper}
+ * implementations
  */
 public abstract class QueueKeeperFactory {
-    
+
     /**
-     * @param sourceRegistrator 
-     * @param capacity blocking queue capacity
-     * @return fair reading implementation of {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper} (not registered = not started yet)
+     * @param sourceRegistrator
+     * @param capacity
+     *            blocking queue capacity
+     * @param waterMarkListener
+     * @return fair reading implementation of
+     *         {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper}
+     *         (not registered = not started yet)
      */
     public static QueueKeeper<OfHeader> createFairQueueKeeper(
-            MessageSourcePollRegistrator<QueueKeeper<OfHeader>> sourceRegistrator, int capacity) {
+            MessageSourcePollRegistrator<QueueKeeper<OfHeader>> sourceRegistrator,
+            int capacity, WaterMarkListener waterMarkListener) {
         QueueKeeperFairImpl queueKeeper = new QueueKeeperFairImpl();
         queueKeeper.setCapacity(capacity);
         queueKeeper.setHarvesterHandle(sourceRegistrator.getHarvesterHandle());
+        queueKeeper.setWaterMarkListener(waterMarkListener);
         queueKeeper.init();
-        
+
         return queueKeeper;
     }
 
     /**
-     * register queue by harvester, start processing it. Use {@link QueueKeeperFairImpl#close()} to kill the queue and stop processing. 
+     * register queue by harvester, start processing it. Use
+     * {@link QueueKeeperFairImpl#close()} to kill the queue and stop
+     * processing.
+     * 
      * @param sourceRegistrator
      * @param queueKeeper
      */
     public static <V> void plugQueue(
             MessageSourcePollRegistrator<QueueKeeper<V>> sourceRegistrator,
             QueueKeeper<V> queueKeeper) {
-        AutoCloseable registration = sourceRegistrator.registerMessageSource(queueKeeper);
+        AutoCloseable registration = sourceRegistrator
+                .registerMessageSource(queueKeeper);
         queueKeeper.setPollRegistration(registration);
         sourceRegistrator.getHarvesterHandle().ping();
     }
index dbf7d4d2d330ef9c44092cfbe66d09b9230d144f..b24ba9f74969bd33b7c6e84cea017955ced42655 100644 (file)
@@ -15,6 +15,7 @@ import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener;
 import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.slf4j.Logger;
@@ -37,6 +38,8 @@ public class QueueKeeperFairImpl implements QueueKeeper<OfHeader> {
     private HarvesterHandle harvesterHandle;
     private PollableQueuesPriorityZipper<QueueItem<OfHeader>> queueZipper;
 
+    private WaterMarkListener waterMarkListener;
+
     @Override
     public void close() throws Exception {
         Preconditions.checkNotNull(pollRegistration,
@@ -77,8 +80,7 @@ public class QueueKeeperFairImpl implements QueueKeeper<OfHeader> {
      */
     @Override
     public QueueItem<OfHeader> poll() {
-        QueueItem<OfHeader> nextQueueItem = queueZipper.poll();
-        return nextQueueItem;
+        return queueZipper.poll();
     }
 
     /**
@@ -102,11 +104,18 @@ public class QueueKeeperFairImpl implements QueueKeeper<OfHeader> {
      * init blocking queue
      */
     public void init() {
+        Preconditions.checkNotNull(waterMarkListener);
         queueUnordered = new ArrayBlockingQueue<>(capacity);
         queueDefault = new ArrayBlockingQueue<>(capacity);
+        WrapperQueueImpl<QueueItem<OfHeader>> wrapperQueue = new WrapperQueueImpl<>(
+                capacity, queueDefault, waterMarkListener);
         queueZipper = new PollableQueuesPriorityZipper<>();
         queueZipper.addSource(queueUnordered);
-        queueZipper.setPrioritizedSource(queueDefault);
+        queueZipper.setPrioritizedSource(wrapperQueue);
+    }
+
+    public void setWaterMarkListener(WaterMarkListener waterMarkListener) {
+        this.waterMarkListener = waterMarkListener;
     }
 
     /**
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/WrapperQueueImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/WrapperQueueImpl.java
new file mode 100644 (file)
index 0000000..8a3404f
--- /dev/null
@@ -0,0 +1,152 @@
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Queue;
+
+import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class WrapperQueueImpl<E> implements Queue<E> {
+
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(WrapperQueueImpl.class);
+
+    private int lowWaterMark;
+    private int highWaterMark;
+
+    private WaterMarkListener queueListenerMark;
+
+    private Queue<E> queueDefault;
+
+    private boolean flooded;
+
+    /**
+     * @param capacity
+     * @param queueZipper
+     */
+    public WrapperQueueImpl(int capacity, Queue<E> queueDefault,
+            WaterMarkListener queueListenerMark) {
+        this.queueListenerMark = queueListenerMark;
+        this.queueDefault = Preconditions.checkNotNull(queueDefault);
+
+        this.highWaterMark = (int) (capacity * 0.8);
+        this.lowWaterMark = (int) (capacity * 0.65);
+    }
+
+    /**
+     * Marking checks size of {@link #queueDefault} and on the basis of this is
+     * set autoRead
+     */
+    private void marking() {
+        if (queueDefault.size() >= highWaterMark && !flooded) {
+            queueListenerMark.onHighWaterMark();
+            flooded = true;
+        } else if (queueDefault.size() <= lowWaterMark && flooded) {
+            queueListenerMark.onLowWaterMark();
+            flooded = false;
+        }
+    }
+
+    /**
+     * @return true if flooded
+     */
+    public boolean isFlooded() {
+        return flooded;
+    }
+
+    /**
+     * poll {@link QueueItem} and call {@link #marking()} for check marks and
+     * set autoRead if it need it
+     * 
+     * @return polled item
+     */
+    public E poll() {
+        E nextQueueItem = queueDefault.poll();
+        marking();
+        return nextQueueItem;
+    }
+
+    public boolean add(E e) {
+        return queueDefault.add(e);
+    }
+
+    public int size() {
+        return queueDefault.size();
+    }
+
+    public boolean isEmpty() {
+        return queueDefault.isEmpty();
+    }
+
+    public boolean contains(Object o) {
+        return queueDefault.contains(o);
+    }
+
+    public boolean offer(E e) {
+        boolean enqueueResult = queueDefault.offer(e);
+        marking();
+        return enqueueResult;
+    }
+
+    public Iterator<E> iterator() {
+        return queueDefault.iterator();
+    }
+
+    public E remove() {
+        return queueDefault.remove();
+    }
+
+    public Object[] toArray() {
+        return queueDefault.toArray();
+    }
+
+    public E element() {
+        return queueDefault.element();
+    }
+
+    public E peek() {
+        return queueDefault.peek();
+    }
+
+    public <T> T[] toArray(T[] a) {
+        return queueDefault.toArray(a);
+    }
+
+    public boolean remove(Object o) {
+        return queueDefault.remove(o);
+    }
+
+    public boolean containsAll(Collection<?> c) {
+        return queueDefault.containsAll(c);
+    }
+
+    public boolean addAll(Collection<? extends E> c) {
+        return queueDefault.addAll(c);
+    }
+
+    public boolean removeAll(Collection<?> c) {
+        return queueDefault.removeAll(c);
+    }
+
+    public boolean retainAll(Collection<?> c) {
+        return queueDefault.retainAll(c);
+    }
+
+    public void clear() {
+        queueDefault.clear();
+    }
+
+    public boolean equals(Object o) {
+        return queueDefault.equals(o);
+    }
+
+    public int hashCode() {
+        return queueDefault.hashCode();
+    }
+
+}
index 296bdeccaa9daf073788c85c3c97def83b712b51..9af46b4379ddceea8ec1d33fd8b6b23144b560bf 100644 (file)
@@ -100,6 +100,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
 
     private int planItemCounter;
 
+    private boolean autoRead = true;
+
     /**
      * default ctor
      */
@@ -108,7 +110,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     }
 
     @Override
-    public synchronized Future<RpcResult<BarrierOutput>> barrier(BarrierInput arg0) {
+    public synchronized Future<RpcResult<BarrierOutput>> barrier(
+            BarrierInput arg0) {
         checkRpcAndNext(arg0, "barrier");
         SettableFuture<RpcResult<BarrierOutput>> result = createAndRegisterRpcResult(arg0);
         return result;
@@ -143,14 +146,16 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     }
 
     @Override
-    public synchronized Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput arg0) {
+    public synchronized Future<RpcResult<GetAsyncOutput>> getAsync(
+            GetAsyncInput arg0) {
         checkRpcAndNext(arg0, "echo");
         Future<RpcResult<GetAsyncOutput>> result = createAndRegisterRpcResult(arg0);
         return result;
     }
 
     @Override
-    public synchronized Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput arg0) {
+    public synchronized Future<RpcResult<GetConfigOutput>> getConfig(
+            GetConfigInput arg0) {
         checkRpcAndNext(arg0, "echo");
         Future<RpcResult<GetConfigOutput>> result = createAndRegisterRpcResult(arg0);
         return result;
@@ -258,7 +263,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
 
     @Override
     public void checkListeners() {
-        if (ofListener == null || systemListener == null || connectionReadyListener == null) {
+        if (ofListener == null || systemListener == null
+                || connectionReadyListener == null) {
             occuredExceptions
                     .add(new IllegalStateException("missing listeners"));
         }
@@ -283,23 +289,29 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
             throw new IllegalStateException("eventPlan already depleted");
         }
 
-        LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName, rpcInput.getVersion(), rpcInput.getXid());
+        LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName,
+                rpcInput.getVersion(), rpcInput.getXid());
         if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)
                 && !(eventPlan.peek() instanceof SwitchTestWaitForAllEvent)) {
             if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
-                SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan.peek());
-                msg = "expected [notification: " +notifEvent.getPlannedNotification()+ "], got [" + rpcInput.getClass().getSimpleName()
-                        + "]";
+                SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan
+                        .peek());
+                msg = "expected [notification: "
+                        + notifEvent.getPlannedNotification() + "], got ["
+                        + rpcInput.getClass().getSimpleName() + "]";
             } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
-                SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan.peek());
-                msg = "expected [rpc: " +rpcEvent.getPlannedRpcResponse()+ "], got [" + rpcInput.getClass().getSimpleName()
+                SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan
+                        .peek());
+                msg = "expected [rpc: " + rpcEvent.getPlannedRpcResponse()
+                        + "], got [" + rpcInput.getClass().getSimpleName()
                         + "]";
             }
         } else {
             if (eventPlan.peek() instanceof SwitchTestWaitForAllEvent) {
                 SwitchTestWaitForAllEvent switchTestWaitForAll = (SwitchTestWaitForAllEvent) eventPlan
                         .peek();
-                Set<SwitchTestWaitForRpcEvent> eventBag = switchTestWaitForAll.getWaitEventBag();
+                Set<SwitchTestWaitForRpcEvent> eventBag = switchTestWaitForAll
+                        .getWaitEventBag();
                 List<String> msgLot = new ArrayList<>();
 
                 if (eventBag == null || eventBag.isEmpty()) {
@@ -307,7 +319,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
                 } else {
                     finished = false;
                     for (SwitchTestWaitForRpcEvent switchTestWaitForRpc : eventBag) {
-                        String msgPart = checkSingleRpcContent(rpcInput, rpcName, switchTestWaitForRpc);
+                        String msgPart = checkSingleRpcContent(rpcInput,
+                                rpcName, switchTestWaitForRpc);
 
                         if (msgPart != null) {
                             msgLot.add(msgPart);
@@ -329,13 +342,15 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
             } else if (eventPlan.peek() instanceof SwitchTestWaitForRpcEvent) {
                 SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
                         .peek();
-                msg = checkSingleRpcContent(rpcInput, rpcName, switchTestRpcEvent);
+                msg = checkSingleRpcContent(rpcInput, rpcName,
+                        switchTestRpcEvent);
             }
         }
 
         if (msg != null) {
             LOG.debug("rpc check .. FAILED: " + msg);
-            occuredExceptions.add(new IllegalArgumentException("step:"+planItemCounter+" | "+msg));
+            occuredExceptions.add(new IllegalArgumentException("step:"
+                    + planItemCounter + " | " + msg));
         } else {
             LOG.debug("rpc check .. OK");
         }
@@ -349,15 +364,17 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
      * @param switchTestWaitForRpc
      * @return
      */
-    private static String checkSingleRpcContent(OfHeader rpcInput, String rpcName,
-            SwitchTestWaitForRpcEvent switchTestWaitForRpc) {
+    private static String checkSingleRpcContent(OfHeader rpcInput,
+            String rpcName, SwitchTestWaitForRpcEvent switchTestWaitForRpc) {
         String failureMsg = null;
         if (!rpcName.equals(switchTestWaitForRpc.getRpcName())) {
-            failureMsg = "expected rpc name [" + switchTestWaitForRpc.getRpcName()
-                    + "], got [" + rpcName + "]";
+            failureMsg = "expected rpc name ["
+                    + switchTestWaitForRpc.getRpcName() + "], got [" + rpcName
+                    + "]";
         } else if (!rpcInput.getXid().equals(switchTestWaitForRpc.getXid())) {
-            failureMsg = "expected "+rpcName+".xid [" + switchTestWaitForRpc.getXid()
-                    + "], got [" + rpcInput.getXid() + "]";
+            failureMsg = "expected " + rpcName + ".xid ["
+                    + switchTestWaitForRpc.getXid() + "], got ["
+                    + rpcInput.getXid() + "]";
         }
 
         return failureMsg;
@@ -380,9 +397,10 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
      * discard current event, execute next, if possible
      */
     private void next() {
-        LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})", planItemCounter, eventPlan.peek());
+        LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})",
+                planItemCounter, eventPlan.peek());
         eventPlan.pop();
-        planItemCounter ++;
+        planItemCounter++;
         planTouched = true;
         try {
             Thread.sleep(JOB_DELAY);
@@ -397,7 +415,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
      */
     private synchronized void proceed() {
         boolean processed = false;
-        LOG.debug("proceeding plan item[{}]: {}", planItemCounter, eventPlan.peek());
+        LOG.debug("proceeding plan item[{}]: {}", planItemCounter,
+                eventPlan.peek());
         if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
             SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan
                     .peek();
@@ -409,12 +428,13 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
             processRpcResponse(rpcResponse);
             processed = true;
         } else if (eventPlan.peek() instanceof SwitchTestCallbackEvent) {
-            SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan.peek();
+            SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan
+                    .peek();
             try {
                 callbackEvent.getCallback().call();
             } catch (Exception e) {
                 LOG.error(e.getMessage(), e);
-               occuredExceptions.add(e);
+                occuredExceptions.add(e);
             }
             processed = true;
         }
@@ -439,8 +459,10 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
             planTouched = false;
             proceed();
             if (!planTouched) {
-                occuredExceptions.add(new IllegalStateException(
-                        "eventPlan STALLED, planItemCounter="+planItemCounter));
+                occuredExceptions
+                        .add(new IllegalStateException(
+                                "eventPlan STALLED, planItemCounter="
+                                        + planItemCounter));
                 break;
             }
         }
@@ -459,61 +481,57 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     private synchronized void processNotification(
             final SwitchTestNotificationEvent notificationEvent) {
 
-        Notification notification = notificationEvent
-                .getPlannedNotification();
+        Notification notification = notificationEvent.getPlannedNotification();
         LOG.debug("notificating OF_LISTENER: "
                 + notification.getClass().getSimpleName());
 
         // system events
         if (notification instanceof DisconnectEvent) {
-            systemListener
-            .onDisconnectEvent((DisconnectEvent) notification);
+            systemListener.onDisconnectEvent((DisconnectEvent) notification);
         }
         // of notifications
         else if (notification instanceof EchoRequestMessage) {
-            ofListener
-            .onEchoRequestMessage((EchoRequestMessage) notification);
+            ofListener.onEchoRequestMessage((EchoRequestMessage) notification);
         } else if (notification instanceof ErrorMessage) {
             ofListener.onErrorMessage((ErrorMessage) notification);
         } else if (notification instanceof ExperimenterMessage) {
             ofListener
-            .onExperimenterMessage((ExperimenterMessage) notification);
+                    .onExperimenterMessage((ExperimenterMessage) notification);
         } else if (notification instanceof FlowRemovedMessage) {
-            ofListener
-            .onFlowRemovedMessage((FlowRemovedMessage) notification);
+            ofListener.onFlowRemovedMessage((FlowRemovedMessage) notification);
         } else if (notification instanceof HelloMessage) {
             ofListener.onHelloMessage((HelloMessage) notification);
         } else if (notification instanceof MultipartReplyMessage) {
             ofListener
-            .onMultipartReplyMessage((MultipartReplyMessage) notification);
+                    .onMultipartReplyMessage((MultipartReplyMessage) notification);
         } else if (notification instanceof PacketInMessage) {
-            ofListener
-            .onPacketInMessage((PacketInMessage) notification);
+            ofListener.onPacketInMessage((PacketInMessage) notification);
         } else if (notification instanceof PortStatusMessage) {
-            ofListener
-            .onPortStatusMessage((PortStatusMessage) notification);
+            ofListener.onPortStatusMessage((PortStatusMessage) notification);
         }
         // default
         else {
-            occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " +
-                    "message listening not supported for type: "
-                            + notification.getClass()));
+            occuredExceptions.add(new IllegalStateException("step:"
+                    + planItemCounter + " | "
+                    + "message listening not supported for type: "
+                    + notification.getClass()));
         }
 
-        LOG.debug("notification ["+notification.getClass().getSimpleName()+"] .. done");
+        LOG.debug("notification [" + notification.getClass().getSimpleName()
+                + "] .. done");
     }
 
     /**
      * @param rpcResponse
      */
-    private synchronized void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) {
-        OfHeader plannedRpcResponseValue = rpcResponse
-                .getPlannedRpcResponse();
+    private synchronized void processRpcResponse(
+            final SwitchTestRcpResponseEvent rpcResponse) {
+        OfHeader plannedRpcResponseValue = rpcResponse.getPlannedRpcResponse();
         LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid());
 
         @SuppressWarnings("unchecked")
         SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
-        .get(rpcResponse.getXid());
+                .get(rpcResponse.getXid());
 
         if (response != null) {
             boolean successful = plannedRpcResponseValue != null;
@@ -521,33 +539,25 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
             if (successful) {
                 errors = Collections.emptyList();
             } else {
-                errors = Lists
-                        .newArrayList(RpcErrors
-                                .getRpcError(
-                                        "unit",
-                                        "unit",
-                                        "not requested",
-                                        ErrorSeverity.ERROR,
-                                        "planned response to RPC.id = "
-                                                + rpcResponse.getXid(),
-                                                ErrorType.RPC,
-                                                new Exception(
-                                                        "rpc response failed (planned behavior)")));
+                errors = Lists.newArrayList(RpcErrors.getRpcError("unit",
+                        "unit", "not requested", ErrorSeverity.ERROR,
+                        "planned response to RPC.id = " + rpcResponse.getXid(),
+                        ErrorType.RPC, new Exception(
+                                "rpc response failed (planned behavior)")));
             }
             RpcResult<?> result = Rpcs.getRpcResult(successful,
                     plannedRpcResponseValue, errors);
             response.set(result);
         } else {
             String msg = "RpcResponse not expected: xid="
-                    + rpcResponse.getXid()
-                    + ", "
-                    + plannedRpcResponseValue.getClass()
-                    .getSimpleName();
+                    + rpcResponse.getXid() + ", "
+                    + plannedRpcResponseValue.getClass().getSimpleName();
             LOG.error(msg);
-            occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " + msg));
+            occuredExceptions.add(new IllegalStateException("step:"
+                    + planItemCounter + " | " + msg));
         }
 
-        LOG.debug("rpc ["+rpcResponse.getXid()+"] .. done");
+        LOG.debug("rpc [" + rpcResponse.getXid() + "] .. done");
     }
 
     /**
@@ -597,7 +607,7 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
 
     @Override
     public void fireConnectionReadyNotification() {
-            connectionReadyListener.onConnectionReady();
+        connectionReadyListener.onConnectionReady();
     }
 
     @Override
@@ -607,8 +617,7 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     }
 
     @Override
-    public Future<RpcResult<Void>> multipartRequest(
-            MultipartRequestInput arg0) {
+    public Future<RpcResult<Void>> multipartRequest(MultipartRequestInput arg0) {
         checkRpcAndNext(arg0, "multipartRequestInput");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
         return result;
@@ -620,4 +629,14 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
         return null;
     }
 
+    @Override
+    public boolean isAutoRead() {
+        return autoRead;
+    }
+
+    @Override
+    public void setAutoRead(boolean autoRead) {
+        this.autoRead = autoRead;
+    }
+
 }
index 7ce4730333ab3fbfb7de08f7232a5d19550ebcdf..09a532ababb8337b5ee5e259acd6782594255c1d 100644 (file)
@@ -23,16 +23,16 @@ import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
-import org.opendaylight.openflowplugin.api.openflow.md.core.session.IMessageDispatchService;
-import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
-import org.opendaylight.openflowplugin.api.openflow.md.core.session.SwitchSessionKeyOF;
-import org.opendaylight.openflowplugin.api.openflow.md.ModelDrivenSwitch;
 import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.md.ModelDrivenSwitch;
 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
 import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationEnqueuer;
 import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationQueueWrapper;
 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.openflowplugin.api.openflow.md.core.session.IMessageDispatchService;
+import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
+import org.opendaylight.openflowplugin.api.openflow.md.core.session.SwitchSessionKeyOF;
 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueProcessor;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder;
@@ -97,7 +97,7 @@ public class MessageDispatchServiceImplTest {
 
     /**
      * Test barrier message for null cookie
-     *
+     * 
      * @throws Exception
      */
     @Test
@@ -106,7 +106,8 @@ public class MessageDispatchServiceImplTest {
         SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
         BarrierInputBuilder barrierMsg = new BarrierInputBuilder();
         session.getMessageDispatchService().barrier(barrierMsg.build(), cookie);
-        Assert.assertEquals(MessageType.BARRIER, session.getPrimaryConductor().getMessageType());
+        Assert.assertEquals(MessageType.BARRIER, session.getPrimaryConductor()
+                .getMessageType());
     }
 
     /**
@@ -117,8 +118,10 @@ public class MessageDispatchServiceImplTest {
         MockConnectionConductor conductor = new MockConnectionConductor(1);
         SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
         ExperimenterInputBuilder experimenterInputBuilder = new ExperimenterInputBuilder();
-        session.getMessageDispatchService().experimenter(experimenterInputBuilder.build(), cookie);
-        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
+        session.getMessageDispatchService().experimenter(
+                experimenterInputBuilder.build(), cookie);
+        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor()
+                .getMessageType());
     }
 
     /**
@@ -129,8 +132,10 @@ public class MessageDispatchServiceImplTest {
         MockConnectionConductor conductor = new MockConnectionConductor(1);
         SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
         GetAsyncInputBuilder getAsyncInputBuilder = new GetAsyncInputBuilder();
-        session.getMessageDispatchService().getAsync(getAsyncInputBuilder.build(), cookie);
-        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
+        session.getMessageDispatchService().getAsync(
+                getAsyncInputBuilder.build(), cookie);
+        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor()
+                .getMessageType());
     }
 
     /**
@@ -141,8 +146,10 @@ public class MessageDispatchServiceImplTest {
         MockConnectionConductor conductor = new MockConnectionConductor(1);
         SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
         GetConfigInputBuilder getConfigInputBuilder = new GetConfigInputBuilder();
-        session.getMessageDispatchService().getConfig(getConfigInputBuilder.build(), cookie);
-        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
+        session.getMessageDispatchService().getConfig(
+                getConfigInputBuilder.build(), cookie);
+        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor()
+                .getMessageType());
     }
 
     /**
@@ -153,8 +160,10 @@ public class MessageDispatchServiceImplTest {
         MockConnectionConductor conductor = new MockConnectionConductor(1);
         SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
         GetFeaturesInputBuilder getFeaturesInputBuilder = new GetFeaturesInputBuilder();
-        session.getMessageDispatchService().getFeatures(getFeaturesInputBuilder.build(), cookie);
-        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
+        session.getMessageDispatchService().getFeatures(
+                getFeaturesInputBuilder.build(), cookie);
+        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor()
+                .getMessageType());
     }
 
     /**
@@ -165,8 +174,10 @@ public class MessageDispatchServiceImplTest {
         MockConnectionConductor conductor = new MockConnectionConductor(1);
         SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
         GetQueueConfigInputBuilder getQueueConfigInputBuilder = new GetQueueConfigInputBuilder();
-        session.getMessageDispatchService().getQueueConfig(getQueueConfigInputBuilder.build(), cookie);
-        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
+        session.getMessageDispatchService().getQueueConfig(
+                getQueueConfigInputBuilder.build(), cookie);
+        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor()
+                .getMessageType());
     }
 
     /**
@@ -177,8 +188,10 @@ public class MessageDispatchServiceImplTest {
         MockConnectionConductor conductor = new MockConnectionConductor(1);
         SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
         MultipartRequestInputBuilder multipartRequestInputBuilder = new MultipartRequestInputBuilder();
-        session.getMessageDispatchService().multipartRequest(multipartRequestInputBuilder.build(), cookie);
-        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
+        session.getMessageDispatchService().multipartRequest(
+                multipartRequestInputBuilder.build(), cookie);
+        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor()
+                .getMessageType());
     }
 
     /**
@@ -189,8 +202,10 @@ public class MessageDispatchServiceImplTest {
         MockConnectionConductor conductor = new MockConnectionConductor(1);
         SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
         RoleRequestInputBuilder roleRequestInputBuilder = new RoleRequestInputBuilder();
-        session.getMessageDispatchService().roleRequest(roleRequestInputBuilder.build(), cookie);
-        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
+        session.getMessageDispatchService().roleRequest(
+                roleRequestInputBuilder.build(), cookie);
+        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor()
+                .getMessageType());
     }
 
     /**
@@ -201,25 +216,27 @@ public class MessageDispatchServiceImplTest {
         MockConnectionConductor conductor = new MockConnectionConductor(1);
         SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
         TableModInputBuilder tableModInputBuilder = new TableModInputBuilder();
-        session.getMessageDispatchService().tableMod(tableModInputBuilder.build(), cookie);
-        Assert.assertEquals(MessageType.TABLEMOD, session.getPrimaryConductor().getMessageType());
+        session.getMessageDispatchService().tableMod(
+                tableModInputBuilder.build(), cookie);
+        Assert.assertEquals(MessageType.TABLEMOD, session.getPrimaryConductor()
+                .getMessageType());
     }
 
-
     /**
      * Test packet out message for primary connection
-     *
+     * 
      * @throws Exception
      */
     @Test
     public void testPacketOutMessageForPrimary() throws Exception {
         session.getMessageDispatchService().packetOut(null, null);
-        Assert.assertEquals(MessageType.PACKETOUT, session.getPrimaryConductor().getMessageType());
+        Assert.assertEquals(MessageType.PACKETOUT, session
+                .getPrimaryConductor().getMessageType());
     }
 
     /**
      * Test packet out message for auxiliary connection
-     *
+     * 
      * @throws Exception
      */
     @Test
@@ -228,14 +245,16 @@ public class MessageDispatchServiceImplTest {
         SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
         session.addAuxiliaryConductor(cookie, conductor);
         session.getMessageDispatchService().packetOut(null, cookie);
-        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
-        conductor = (MockConnectionConductor) session.getAuxiliaryConductor(cookie);
+        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor()
+                .getMessageType());
+        conductor = (MockConnectionConductor) session
+                .getAuxiliaryConductor(cookie);
         Assert.assertEquals(MessageType.PACKETOUT, conductor.getMessageType());
     }
 
     /**
      * Test packet out message when multiple auxiliary connection exist
-     *
+     * 
      * @throws Exception
      */
     @Test
@@ -253,28 +272,33 @@ public class MessageDispatchServiceImplTest {
         // send message
         session.getMessageDispatchService().packetOut(builder.build(), cookie2);
 
-        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
+        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor()
+                .getMessageType());
 
-        conductor3 = (MockConnectionConductor) session.getAuxiliaryConductor(cookie3);
+        conductor3 = (MockConnectionConductor) session
+                .getAuxiliaryConductor(cookie3);
         Assert.assertEquals(MessageType.NONE, conductor3.getMessageType());
 
-        conductor2 = (MockConnectionConductor) session.getAuxiliaryConductor(cookie2);
+        conductor2 = (MockConnectionConductor) session
+                .getAuxiliaryConductor(cookie2);
         Assert.assertEquals(MessageType.PACKETOUT, conductor2.getMessageType());
 
-        conductor1 = (MockConnectionConductor) session.getAuxiliaryConductor(cookie1);
+        conductor1 = (MockConnectionConductor) session
+                .getAuxiliaryConductor(cookie1);
         Assert.assertEquals(MessageType.NONE, conductor1.getMessageType());
 
     }
 
     /**
      * Test for invalid session
-     *
+     * 
      * @throws Exception
      */
     @Test
     public void testInvalidSession() throws Exception {
         session.setValid(false);
-        Future<RpcResult<Void>> resultFuture = session.getMessageDispatchService().packetOut(null, null);
+        Future<RpcResult<Void>> resultFuture = session
+                .getMessageDispatchService().packetOut(null, null);
         if (resultFuture.isDone()) {
             RpcResult<Void> rpcResult = resultFuture.get();
             Assert.assertTrue(!rpcResult.getErrors().isEmpty());
@@ -282,9 +306,12 @@ public class MessageDispatchServiceImplTest {
             Iterator<RpcError> it = rpcResult.getErrors().iterator();
             RpcError rpcError = it.next();
 
-            Assert.assertTrue(rpcError.getApplicationTag().equals(OFConstants.APPLICATION_TAG));
-            Assert.assertTrue(rpcError.getTag().equals(OFConstants.ERROR_TAG_TIMEOUT));
-            Assert.assertTrue(rpcError.getErrorType().equals(RpcError.ErrorType.TRANSPORT));
+            Assert.assertTrue(rpcError.getApplicationTag().equals(
+                    OFConstants.APPLICATION_TAG));
+            Assert.assertTrue(rpcError.getTag().equals(
+                    OFConstants.ERROR_TAG_TIMEOUT));
+            Assert.assertTrue(rpcError.getErrorType().equals(
+                    RpcError.ErrorType.TRANSPORT));
         }
     }
 
@@ -320,7 +347,8 @@ class MockSessionContext implements SessionContext {
     }
 
     @Override
-    public ConnectionConductor getAuxiliaryConductor(SwitchConnectionDistinguisher auxiliaryKey) {
+    public ConnectionConductor getAuxiliaryConductor(
+            SwitchConnectionDistinguisher auxiliaryKey) {
 
         return map.get(auxiliaryKey);
     }
@@ -332,12 +360,15 @@ class MockSessionContext implements SessionContext {
     }
 
     @Override
-    public void addAuxiliaryConductor(SwitchConnectionDistinguisher auxiliaryKey, ConnectionConductor conductorArg) {
+    public void addAuxiliaryConductor(
+            SwitchConnectionDistinguisher auxiliaryKey,
+            ConnectionConductor conductorArg) {
         map.put(auxiliaryKey, conductorArg);
     }
 
     @Override
-    public ConnectionConductor removeAuxiliaryConductor(SwitchConnectionDistinguisher connectionCookie) {
+    public ConnectionConductor removeAuxiliaryConductor(
+            SwitchConnectionDistinguisher connectionCookie) {
         return map.remove(connectionCookie);
     }
 
@@ -434,19 +465,21 @@ class MockSessionContext implements SessionContext {
     }
 
     /**
-     * @param seed the seed to set
+     * @param seed
+     *            the seed to set
      */
     public void setSeed(int seed) {
         this.seed = seed;
     }
-    
+
     @Override
     public NotificationEnqueuer getNotificationEnqueuer() {
         return conductor;
     }
 }
 
-class MockConnectionConductor implements ConnectionConductor, NotificationEnqueuer {
+class MockConnectionConductor implements ConnectionConductor,
+        NotificationEnqueuer {
 
     private int conductorNum;
     private MockConnectionAdapter adapter;
@@ -526,7 +559,8 @@ class MockConnectionConductor implements ConnectionConductor, NotificationEnqueu
     }
 
     @Override
-    public void setQueueProcessor(QueueProcessor<OfHeader, DataObject> queueKeeper) {
+    public void setQueueProcessor(
+            QueueProcessor<OfHeader, DataObject> queueKeeper) {
         // NOOP
     }
 
@@ -539,7 +573,7 @@ class MockConnectionConductor implements ConnectionConductor, NotificationEnqueu
     public void setId(int conductorId) {
         // NOOP
     }
-    
+
     @Override
     public void enqueueNotification(NotificationQueueWrapper notification) {
         // NOOP
@@ -602,13 +636,15 @@ class MockConnectionAdapter implements ConnectionAdapter {
     }
 
     @Override
-    public Future<RpcResult<GetFeaturesOutput>> getFeatures(GetFeaturesInput input) {
+    public Future<RpcResult<GetFeaturesOutput>> getFeatures(
+            GetFeaturesInput input) {
         // TODO Auto-generated method stub
         return null;
     }
 
     @Override
-    public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(GetQueueConfigInput input) {
+    public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
+            GetQueueConfigInput input) {
         // TODO Auto-generated method stub
         return null;
     }
@@ -644,7 +680,8 @@ class MockConnectionAdapter implements ConnectionAdapter {
     }
 
     @Override
-    public Future<RpcResult<RoleRequestOutput>> roleRequest(RoleRequestInput input) {
+    public Future<RpcResult<RoleRequestOutput>> roleRequest(
+            RoleRequestInput input) {
         // TODO Auto-generated method stub
         return null;
     }
@@ -705,7 +742,8 @@ class MockConnectionAdapter implements ConnectionAdapter {
     }
 
     /**
-     * @param messageType the messageType to set
+     * @param messageType
+     *            the messageType to set
      */
     public void setMessageType(MessageType messageType) {
         this.messageType = messageType;
@@ -723,18 +761,33 @@ class MockConnectionAdapter implements ConnectionAdapter {
     }
 
     @Override
-    public Future<RpcResult<Void>> multipartRequest(
-            MultipartRequestInput input) {
+    public Future<RpcResult<Void>> multipartRequest(MultipartRequestInput input) {
         // TODO Auto-generated method stub
         return null;
     }
 
-    /* (non-Javadoc)
-     * @see org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter#getRemoteAddress()
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter
+     * #getRemoteAddress()
      */
     @Override
     public InetSocketAddress getRemoteAddress() {
         // TODO Auto-generated method stub
         return null;
     }
+
+    @Override
+    public boolean isAutoRead() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public void setAutoRead(boolean arg0) {
+        // TODO Auto-generated method stub
+
+    }
 }
diff --git a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/WrapperQueueImplTest.java b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/WrapperQueueImplTest.java
new file mode 100644 (file)
index 0000000..62add1e
--- /dev/null
@@ -0,0 +1,163 @@
+package org.opendaylight.openflowplugin.openflow.md.util;
+
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener;
+import org.opendaylight.openflowplugin.openflow.md.queue.WrapperQueueImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(MockitoJUnitRunner.class)
+public class WrapperQueueImplTest {
+
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(WrapperQueueImplTest.class);
+
+    @Mock
+    private ConnectionConductor connectionConductor;
+
+    @Mock
+    private WaterMarkListener waterMarkListener;
+
+    private WrapperQueueImpl<Integer> wrapperQueueImpl;
+    private final int capacity = 100;
+    private Queue<Integer> queueDefault;
+    private int highWaterMark = 80;
+    private int lowWaterMark = 65;
+
+    /**
+     * Setup before tests
+     */
+    @Before
+    public void setUp() {
+        queueDefault = new ArrayBlockingQueue<>(capacity);
+
+        wrapperQueueImpl = new WrapperQueueImpl<>(capacity, queueDefault,
+                waterMarkListener);
+    }
+
+    /**
+     * Test for check if wrapper is not null
+     */
+    @Test
+    public void testWrapperQueueImpl() {
+        Assert.assertNotNull("Wrapper can not be null.", wrapperQueueImpl);
+    }
+
+    /**
+     * Test for set setAutoRead on false on high water mark
+     */
+    @Test
+    public void testReadOnHighWaterMark() {
+
+        Assert.assertFalse("Wrapper must be not flooded at the start.",
+                wrapperQueueImpl.isFlooded());
+
+        push(79);
+        Assert.assertFalse("Wrapper should not be flooded.",
+                wrapperQueueImpl.isFlooded());
+        Mockito.verify(waterMarkListener, Mockito.times(0)).onHighWaterMark();
+
+        push(1);
+        Assert.assertTrue("Wrapper should be flooded.",
+                wrapperQueueImpl.isFlooded());
+        Mockito.verify(waterMarkListener, Mockito.times(1)).onHighWaterMark();
+
+        Assert.assertEquals(
+                "Size of queue has to be equals to 80% of capacity of queue",
+                highWaterMark, queueDefault.size());
+    }
+
+    /**
+     * 
+     */
+    private void push(int size) {
+        for (int i = 0; i < size; i++) {
+            try {
+                wrapperQueueImpl.offer(i);
+            } catch (Exception e) {
+                LOG.error("Failed to offer item to queue.", e);
+            }
+        }
+    }
+
+    /**
+     * Test for setAutoRead on true on low water mark
+     */
+    @Test
+    public void testReadOnLowWaterMark() {
+        Mockito.verify(waterMarkListener, Mockito.times(0)).onHighWaterMark();
+        push(80);
+        Assert.assertTrue("Wrapper should be flooded.",
+                wrapperQueueImpl.isFlooded());
+        Mockito.verify(waterMarkListener, Mockito.times(1)).onHighWaterMark();
+
+        Assert.assertEquals(
+                "Size of queue has to be equals to 80% of capacity of queue",
+                highWaterMark, queueDefault.size());
+
+        poll(14);
+        Mockito.verify(waterMarkListener, Mockito.times(0)).onLowWaterMark();
+        Assert.assertTrue("Wrapper should be still flooded.",
+                wrapperQueueImpl.isFlooded());
+
+        poll(1);
+        Mockito.verify(waterMarkListener, Mockito.times(1)).onLowWaterMark();
+
+        Assert.assertEquals(
+                "Size of queue has to be equals to 65% on lowWaterMark.",
+                lowWaterMark, queueDefault.size());
+        Assert.assertFalse("Wrapped should be not flooded.",
+                wrapperQueueImpl.isFlooded());
+    }
+
+    /**
+     * Polling messages
+     */
+    private void poll(int size) {
+
+        for (int i = 0; i < size; i++) {
+            wrapperQueueImpl.poll();
+        }
+    }
+
+    /**
+     * Test for one cycle.
+     */
+    @Test
+    public void testEndReadOnHWMStartOnLWM() {
+
+        Assert.assertFalse("Wrapper should not be flooded",
+                wrapperQueueImpl.isFlooded());
+        Mockito.verify(waterMarkListener, Mockito.times(0)).onLowWaterMark();
+        Mockito.verify(waterMarkListener, Mockito.times(0)).onHighWaterMark();
+
+        push(81);
+        Assert.assertTrue("Wrapper should be flooded",
+                wrapperQueueImpl.isFlooded());
+        Mockito.verify(waterMarkListener, Mockito.times(0)).onLowWaterMark();
+        Mockito.verify(waterMarkListener, Mockito.times(1)).onHighWaterMark();
+
+        poll(17);
+        Assert.assertFalse("Wrapper should not be flooded",
+                wrapperQueueImpl.isFlooded());
+        Mockito.verify(waterMarkListener, Mockito.times(1)).onLowWaterMark();
+        Mockito.verify(waterMarkListener, Mockito.times(1)).onHighWaterMark();
+
+        push(18);
+        Assert.assertTrue("Wrapper should be flooded",
+                wrapperQueueImpl.isFlooded());
+
+        Mockito.verify(waterMarkListener, Mockito.times(1)).onLowWaterMark();
+        Mockito.verify(waterMarkListener, Mockito.times(2)).onHighWaterMark();
+    }
+}