From: ary Date: Tue, 20 Jan 2015 13:06:11 +0000 (+0100) Subject: The high priority queue containing control messages is processed first(before data... X-Git-Tag: release/lithium~709 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F90%2F14290%2F1;p=openflowplugin.git The high priority queue containing control messages is processed first(before data queue) in PollableQueuesPriorityZipper, if it is not queue with high priority then is used addSource in PollableQueuesPriorityZipper for use the PollableQueuesZipper. Signed-off-by: ary --- diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/util/PollableQueuesPriorityZipper.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/util/PollableQueuesPriorityZipper.java new file mode 100644 index 0000000000..f28876eded --- /dev/null +++ b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/util/PollableQueuesPriorityZipper.java @@ -0,0 +1,63 @@ +/** + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.api.openflow.md.util; + +import java.util.Queue; + +/** + * Zipper groups together a list of queues and exposes one poll method. Polling + * iterates through all groups and returns first not-null result of poll method + * on each queue. If after polling each grouped queue for one time there is + * still null result, poll will return null.
+ * Iterating keeps last position so this polling is supposed to be fairly + * distributed. + * + * @param + * common item type of zipped queues + */ +public class PollableQueuesPriorityZipper { + + private Queue prioritizedSource; + private PollableQueuesZipper zipper; + + /** + * default ctor + */ + public PollableQueuesPriorityZipper() { + zipper = new PollableQueuesZipper<>(); + } + + /** + * Add all member queues before first invocation of + * {@link PollableQueuesPriorityZipper#poll()} + * + * @param queue + * to be added to group + */ + public void addSource(Queue queue) { + zipper.addSource(queue); + } + + /** + * @return next common product of polling member groups + */ + public T poll() { + T item = null; + + item = prioritizedSource.poll(); + if (item == null) { + item = zipper.poll(); + } + + return item; + } + + public void setPrioritizedSource(Queue prioritizedSource) { + this.prioritizedSource = prioritizedSource; + } +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFairImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFairImpl.java index 04b708416c..dbf7d4d2d3 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFairImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFairImpl.java @@ -15,7 +15,7 @@ import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor; import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle; import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem; import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper; -import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesZipper; +import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,34 +23,34 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; /** - * QueueKeeper implementation based on {@link OfHeader} + * QueueKeeper implementation based on {@link OfHeader} */ public class QueueKeeperFairImpl implements QueueKeeper { private static Logger LOG = LoggerFactory .getLogger(QueueKeeperFairImpl.class); - + private Queue> queueDefault; private BlockingQueue> queueUnordered; private AutoCloseable pollRegistration; private int capacity = 5000; private HarvesterHandle harvesterHandle; - private PollableQueuesZipper> queueZipper; + private PollableQueuesPriorityZipper> queueZipper; @Override public void close() throws Exception { - Preconditions.checkNotNull(pollRegistration, "pollRegistration not available"); + Preconditions.checkNotNull(pollRegistration, + "pollRegistration not available"); pollRegistration.close(); } @Override - public void push( - OfHeader message, - ConnectionConductor conductor, + public void push(OfHeader message, ConnectionConductor conductor, QueueKeeper.QueueType queueType) { - QueueItemOFImpl qItem = new QueueItemOFImpl(message, conductor, queueType); + QueueItemOFImpl qItem = new QueueItemOFImpl(message, conductor, + queueType); boolean enqueued = false; - + switch (queueType) { case DEFAULT: enqueued = queueDefault.offer(qItem); @@ -59,18 +59,19 @@ public class QueueKeeperFairImpl implements QueueKeeper { enqueued = queueUnordered.offer(qItem); break; default: - LOG.warn("unsupported queue type: [{}] -> dropping message [{}]", queueType, message.getImplementedInterface()); + LOG.warn("unsupported queue type: [{}] -> dropping message [{}]", + queueType, message.getImplementedInterface()); } - + if (enqueued) { harvesterHandle.ping(); } else { LOG.debug("ingress throttling is use -> {}", queueType); } - - // if enqueueing fails -> message will be dropped + + // if enqueueing fails -> message will be dropped } - + /** * @return the ingressQueue */ @@ -81,29 +82,31 @@ public class QueueKeeperFairImpl implements QueueKeeper { } /** - * @param processingRegistration the processingRegistration to set + * @param processingRegistration + * the processingRegistration to set */ @Override public void setPollRegistration(AutoCloseable processingRegistration) { this.pollRegistration = processingRegistration; } - + /** - * @param capacity the capacity of internal blocking queue + * @param capacity + * the capacity of internal blocking queue */ public void setCapacity(int capacity) { this.capacity = capacity; } - + /** * init blocking queue */ public void init() { queueUnordered = new ArrayBlockingQueue<>(capacity); queueDefault = new ArrayBlockingQueue<>(capacity); - queueZipper = new PollableQueuesZipper<>(); - queueZipper.addSource(queueDefault); + queueZipper = new PollableQueuesPriorityZipper<>(); queueZipper.addSource(queueUnordered); + queueZipper.setPrioritizedSource(queueDefault); } /** diff --git a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesPriorityZipperTest.java b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesPriorityZipperTest.java new file mode 100644 index 0000000000..a6a8da250e --- /dev/null +++ b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesPriorityZipperTest.java @@ -0,0 +1,90 @@ +/** + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.junit.Assert; +import org.junit.Test; +import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper; + +import com.google.common.collect.Lists; + +/** + * test for {@link PollableQueuesPriorityZipper} + */ +public class PollableQueuesPriorityZipperTest { + + /** + * Test method for + * {@link org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper#poll()} + * . + */ + @Test + public void testPoll3() { + Queue l1 = new LinkedBlockingQueue(Lists.newArrayList( + "1", "2", "3")); + Queue l2 = new LinkedBlockingQueue(Lists.newArrayList( + "a", "b", "c", "d")); + Queue l3 = new LinkedBlockingQueue(Lists.newArrayList( + "A", "B")); + + PollableQueuesPriorityZipper zipper = new PollableQueuesPriorityZipper<>(); + zipper.setPrioritizedSource(l1); + zipper.addSource(l2); + zipper.addSource(l3); + + String[] expected = new String[] { "1", "2", "3", "a", "A", "b", "B", + "c", "d", null, "XXX" }; + List result = new ArrayList<>(); + while (true) { + String data = zipper.poll(); + result.add(data); + if (data == null) { + break; + } + } + l1.offer("XXX"); + result.add(zipper.poll()); + Assert.assertArrayEquals(expected, result.toArray()); + } + + /** + * Test method for + * {@link org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper#poll()} + * . + */ + @Test + public void testPoll2() { + Queue l1 = new LinkedBlockingQueue(Lists.newArrayList( + "1", "2", "3")); + Queue l3 = new LinkedBlockingQueue(Lists.newArrayList( + "A", "B")); + + PollableQueuesPriorityZipper zipper = new PollableQueuesPriorityZipper<>(); + zipper.setPrioritizedSource(l1); + zipper.addSource(l3); + + String[] expected = new String[] { "1", "2", "3", "A", "B", null, "XXX" }; + List result = new ArrayList<>(); + while (true) { + String data = zipper.poll(); + result.add(data); + if (data == null) { + break; + } + } + l1.offer("XXX"); + result.add(zipper.poll()); + Assert.assertArrayEquals(expected, result.toArray()); + } + +}