The high priority queue containing control messages is processed first(before data... 90/14290/1
authorary <jatoth@cisco.com>
Tue, 20 Jan 2015 13:06:11 +0000 (14:06 +0100)
committerary <jatoth@cisco.com>
Tue, 20 Jan 2015 15:10:08 +0000 (16:10 +0100)
Signed-off-by: ary <jatoth@cisco.com>
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/util/PollableQueuesPriorityZipper.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFairImpl.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesPriorityZipperTest.java [new file with mode: 0644]

diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/util/PollableQueuesPriorityZipper.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/util/PollableQueuesPriorityZipper.java
new file mode 100644 (file)
index 0000000..f28876e
--- /dev/null
@@ -0,0 +1,63 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.api.openflow.md.util;
+
+import java.util.Queue;
+
+/**
+ * Zipper groups together a list of queues and exposes one poll method. Polling
+ * iterates through all groups and returns first not-null result of poll method
+ * on each queue. If after polling each grouped queue for one time there is
+ * still null result, poll will return null. <br/>
+ * Iterating keeps last position so this polling is supposed to be fairly
+ * distributed.
+ * 
+ * @param <T>
+ *            common item type of zipped queues
+ */
+public class PollableQueuesPriorityZipper<T> {
+
+    private Queue<T> prioritizedSource;
+    private PollableQueuesZipper<T> zipper;
+
+    /**
+     * default ctor
+     */
+    public PollableQueuesPriorityZipper() {
+        zipper = new PollableQueuesZipper<>();
+    }
+
+    /**
+     * Add all member queues before first invocation of
+     * {@link PollableQueuesPriorityZipper#poll()}
+     * 
+     * @param queue
+     *            to be added to group
+     */
+    public void addSource(Queue<T> queue) {
+        zipper.addSource(queue);
+    }
+
+    /**
+     * @return next common product of polling member groups
+     */
+    public T poll() {
+        T item = null;
+
+        item = prioritizedSource.poll();
+        if (item == null) {
+            item = zipper.poll();
+        }
+
+        return item;
+    }
+
+    public void setPrioritizedSource(Queue<T> prioritizedSource) {
+        this.prioritizedSource = prioritizedSource;
+    }
+}
index 04b708416c0911ec82b8f70520b7b6f3f5106928..dbf7d4d2d330ef9c44092cfbe66d09b9230d144f 100644 (file)
@@ -15,7 +15,7 @@ import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
-import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesZipper;
+import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -23,34 +23,34 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 
 /**
- * QueueKeeper implementation based on {@link OfHeader} 
+ * QueueKeeper implementation based on {@link OfHeader}
  */
 public class QueueKeeperFairImpl implements QueueKeeper<OfHeader> {
 
     private static Logger LOG = LoggerFactory
             .getLogger(QueueKeeperFairImpl.class);
-    
+
     private Queue<QueueItem<OfHeader>> queueDefault;
     private BlockingQueue<QueueItem<OfHeader>> queueUnordered;
     private AutoCloseable pollRegistration;
     private int capacity = 5000;
     private HarvesterHandle harvesterHandle;
-    private PollableQueuesZipper<QueueItem<OfHeader>> queueZipper;
+    private PollableQueuesPriorityZipper<QueueItem<OfHeader>> queueZipper;
 
     @Override
     public void close() throws Exception {
-        Preconditions.checkNotNull(pollRegistration, "pollRegistration not available");
+        Preconditions.checkNotNull(pollRegistration,
+                "pollRegistration not available");
         pollRegistration.close();
     }
 
     @Override
-    public void push(
-            OfHeader message,
-            ConnectionConductor conductor,
+    public void push(OfHeader message, ConnectionConductor conductor,
             QueueKeeper.QueueType queueType) {
-        QueueItemOFImpl qItem = new QueueItemOFImpl(message, conductor, queueType);
+        QueueItemOFImpl qItem = new QueueItemOFImpl(message, conductor,
+                queueType);
         boolean enqueued = false;
-        
+
         switch (queueType) {
         case DEFAULT:
             enqueued = queueDefault.offer(qItem);
@@ -59,18 +59,19 @@ public class QueueKeeperFairImpl implements QueueKeeper<OfHeader> {
             enqueued = queueUnordered.offer(qItem);
             break;
         default:
-            LOG.warn("unsupported queue type: [{}] -> dropping message [{}]", queueType, message.getImplementedInterface());
+            LOG.warn("unsupported queue type: [{}] -> dropping message [{}]",
+                    queueType, message.getImplementedInterface());
         }
-        
+
         if (enqueued) {
             harvesterHandle.ping();
         } else {
             LOG.debug("ingress throttling is use -> {}", queueType);
         }
-        
-        // if enqueueing fails -> message will be dropped 
+
+        // if enqueueing fails -> message will be dropped
     }
-    
+
     /**
      * @return the ingressQueue
      */
@@ -81,29 +82,31 @@ public class QueueKeeperFairImpl implements QueueKeeper<OfHeader> {
     }
 
     /**
-     * @param processingRegistration the processingRegistration to set
+     * @param processingRegistration
+     *            the processingRegistration to set
      */
     @Override
     public void setPollRegistration(AutoCloseable processingRegistration) {
         this.pollRegistration = processingRegistration;
     }
-    
+
     /**
-     * @param capacity the capacity of internal blocking queue
+     * @param capacity
+     *            the capacity of internal blocking queue
      */
     public void setCapacity(int capacity) {
         this.capacity = capacity;
     }
-    
+
     /**
      * init blocking queue
      */
     public void init() {
         queueUnordered = new ArrayBlockingQueue<>(capacity);
         queueDefault = new ArrayBlockingQueue<>(capacity);
-        queueZipper = new PollableQueuesZipper<>();
-        queueZipper.addSource(queueDefault);
+        queueZipper = new PollableQueuesPriorityZipper<>();
         queueZipper.addSource(queueUnordered);
+        queueZipper.setPrioritizedSource(queueDefault);
     }
 
     /**
diff --git a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesPriorityZipperTest.java b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesPriorityZipperTest.java
new file mode 100644 (file)
index 0000000..a6a8da2
--- /dev/null
@@ -0,0 +1,90 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper;
+
+import com.google.common.collect.Lists;
+
+/**
+ * test for {@link PollableQueuesPriorityZipper}
+ */
+public class PollableQueuesPriorityZipperTest {
+
+    /**
+     * Test method for
+     * {@link org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper#poll()}
+     * .
+     */
+    @Test
+    public void testPoll3() {
+        Queue<String> l1 = new LinkedBlockingQueue<String>(Lists.newArrayList(
+                "1", "2", "3"));
+        Queue<String> l2 = new LinkedBlockingQueue<String>(Lists.newArrayList(
+                "a", "b", "c", "d"));
+        Queue<String> l3 = new LinkedBlockingQueue<String>(Lists.newArrayList(
+                "A", "B"));
+
+        PollableQueuesPriorityZipper<String> zipper = new PollableQueuesPriorityZipper<>();
+        zipper.setPrioritizedSource(l1);
+        zipper.addSource(l2);
+        zipper.addSource(l3);
+
+        String[] expected = new String[] { "1", "2", "3", "a", "A", "b", "B",
+                "c", "d", null, "XXX" };
+        List<String> result = new ArrayList<>();
+        while (true) {
+            String data = zipper.poll();
+            result.add(data);
+            if (data == null) {
+                break;
+            }
+        }
+        l1.offer("XXX");
+        result.add(zipper.poll());
+        Assert.assertArrayEquals(expected, result.toArray());
+    }
+
+    /**
+     * Test method for
+     * {@link org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper#poll()}
+     * .
+     */
+    @Test
+    public void testPoll2() {
+        Queue<String> l1 = new LinkedBlockingQueue<String>(Lists.newArrayList(
+                "1", "2", "3"));
+        Queue<String> l3 = new LinkedBlockingQueue<String>(Lists.newArrayList(
+                "A", "B"));
+
+        PollableQueuesPriorityZipper<String> zipper = new PollableQueuesPriorityZipper<>();
+        zipper.setPrioritizedSource(l1);
+        zipper.addSource(l3);
+
+        String[] expected = new String[] { "1", "2", "3", "A", "B", null, "XXX" };
+        List<String> result = new ArrayList<>();
+        while (true) {
+            String data = zipper.poll();
+            result.add(data);
+            if (data == null) {
+                break;
+            }
+        }
+        l1.offer("XXX");
+        result.add(zipper.poll());
+        Assert.assertArrayEquals(expected, result.toArray());
+    }
+
+}