--- /dev/null
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.api.openflow.md.util;
+
+import java.util.Queue;
+
+/**
+ * Zipper groups together a list of queues and exposes one poll method. Polling
+ * iterates through all groups and returns first not-null result of poll method
+ * on each queue. If after polling each grouped queue for one time there is
+ * still null result, poll will return null. <br/>
+ * Iterating keeps last position so this polling is supposed to be fairly
+ * distributed.
+ *
+ * @param <T>
+ * common item type of zipped queues
+ */
+public class PollableQueuesPriorityZipper<T> {
+
+ private Queue<T> prioritizedSource;
+ private PollableQueuesZipper<T> zipper;
+
+ /**
+ * default ctor
+ */
+ public PollableQueuesPriorityZipper() {
+ zipper = new PollableQueuesZipper<>();
+ }
+
+ /**
+ * Add all member queues before first invocation of
+ * {@link PollableQueuesPriorityZipper#poll()}
+ *
+ * @param queue
+ * to be added to group
+ */
+ public void addSource(Queue<T> queue) {
+ zipper.addSource(queue);
+ }
+
+ /**
+ * @return next common product of polling member groups
+ */
+ public T poll() {
+ T item = null;
+
+ item = prioritizedSource.poll();
+ if (item == null) {
+ item = zipper.poll();
+ }
+
+ return item;
+ }
+
+ public void setPrioritizedSource(Queue<T> prioritizedSource) {
+ this.prioritizedSource = prioritizedSource;
+ }
+}
import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
-import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesZipper;
+import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
/**
- * QueueKeeper implementation based on {@link OfHeader}
+ * QueueKeeper implementation based on {@link OfHeader}
*/
public class QueueKeeperFairImpl implements QueueKeeper<OfHeader> {
private static Logger LOG = LoggerFactory
.getLogger(QueueKeeperFairImpl.class);
-
+
private Queue<QueueItem<OfHeader>> queueDefault;
private BlockingQueue<QueueItem<OfHeader>> queueUnordered;
private AutoCloseable pollRegistration;
private int capacity = 5000;
private HarvesterHandle harvesterHandle;
- private PollableQueuesZipper<QueueItem<OfHeader>> queueZipper;
+ private PollableQueuesPriorityZipper<QueueItem<OfHeader>> queueZipper;
@Override
public void close() throws Exception {
- Preconditions.checkNotNull(pollRegistration, "pollRegistration not available");
+ Preconditions.checkNotNull(pollRegistration,
+ "pollRegistration not available");
pollRegistration.close();
}
@Override
- public void push(
- OfHeader message,
- ConnectionConductor conductor,
+ public void push(OfHeader message, ConnectionConductor conductor,
QueueKeeper.QueueType queueType) {
- QueueItemOFImpl qItem = new QueueItemOFImpl(message, conductor, queueType);
+ QueueItemOFImpl qItem = new QueueItemOFImpl(message, conductor,
+ queueType);
boolean enqueued = false;
-
+
switch (queueType) {
case DEFAULT:
enqueued = queueDefault.offer(qItem);
enqueued = queueUnordered.offer(qItem);
break;
default:
- LOG.warn("unsupported queue type: [{}] -> dropping message [{}]", queueType, message.getImplementedInterface());
+ LOG.warn("unsupported queue type: [{}] -> dropping message [{}]",
+ queueType, message.getImplementedInterface());
}
-
+
if (enqueued) {
harvesterHandle.ping();
} else {
LOG.debug("ingress throttling is use -> {}", queueType);
}
-
- // if enqueueing fails -> message will be dropped
+
+ // if enqueueing fails -> message will be dropped
}
-
+
/**
* @return the ingressQueue
*/
}
/**
- * @param processingRegistration the processingRegistration to set
+ * @param processingRegistration
+ * the processingRegistration to set
*/
@Override
public void setPollRegistration(AutoCloseable processingRegistration) {
this.pollRegistration = processingRegistration;
}
-
+
/**
- * @param capacity the capacity of internal blocking queue
+ * @param capacity
+ * the capacity of internal blocking queue
*/
public void setCapacity(int capacity) {
this.capacity = capacity;
}
-
+
/**
* init blocking queue
*/
public void init() {
queueUnordered = new ArrayBlockingQueue<>(capacity);
queueDefault = new ArrayBlockingQueue<>(capacity);
- queueZipper = new PollableQueuesZipper<>();
- queueZipper.addSource(queueDefault);
+ queueZipper = new PollableQueuesPriorityZipper<>();
queueZipper.addSource(queueUnordered);
+ queueZipper.setPrioritizedSource(queueDefault);
}
/**
--- /dev/null
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper;
+
+import com.google.common.collect.Lists;
+
+/**
+ * test for {@link PollableQueuesPriorityZipper}
+ */
+public class PollableQueuesPriorityZipperTest {
+
+ /**
+ * Test method for
+ * {@link org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper#poll()}
+ * .
+ */
+ @Test
+ public void testPoll3() {
+ Queue<String> l1 = new LinkedBlockingQueue<String>(Lists.newArrayList(
+ "1", "2", "3"));
+ Queue<String> l2 = new LinkedBlockingQueue<String>(Lists.newArrayList(
+ "a", "b", "c", "d"));
+ Queue<String> l3 = new LinkedBlockingQueue<String>(Lists.newArrayList(
+ "A", "B"));
+
+ PollableQueuesPriorityZipper<String> zipper = new PollableQueuesPriorityZipper<>();
+ zipper.setPrioritizedSource(l1);
+ zipper.addSource(l2);
+ zipper.addSource(l3);
+
+ String[] expected = new String[] { "1", "2", "3", "a", "A", "b", "B",
+ "c", "d", null, "XXX" };
+ List<String> result = new ArrayList<>();
+ while (true) {
+ String data = zipper.poll();
+ result.add(data);
+ if (data == null) {
+ break;
+ }
+ }
+ l1.offer("XXX");
+ result.add(zipper.poll());
+ Assert.assertArrayEquals(expected, result.toArray());
+ }
+
+ /**
+ * Test method for
+ * {@link org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper#poll()}
+ * .
+ */
+ @Test
+ public void testPoll2() {
+ Queue<String> l1 = new LinkedBlockingQueue<String>(Lists.newArrayList(
+ "1", "2", "3"));
+ Queue<String> l3 = new LinkedBlockingQueue<String>(Lists.newArrayList(
+ "A", "B"));
+
+ PollableQueuesPriorityZipper<String> zipper = new PollableQueuesPriorityZipper<>();
+ zipper.setPrioritizedSource(l1);
+ zipper.addSource(l3);
+
+ String[] expected = new String[] { "1", "2", "3", "A", "B", null, "XXX" };
+ List<String> result = new ArrayList<>();
+ while (true) {
+ String data = zipper.poll();
+ result.add(data);
+ if (data == null) {
+ break;
+ }
+ }
+ l1.offer("XXX");
+ result.add(zipper.poll());
+ Assert.assertArrayEquals(expected, result.toArray());
+ }
+
+}