BUG-1075: ingress back pressure 58/7458/9
authorMichal Rehak <mirehak@cisco.com>
Tue, 27 May 2014 11:43:42 +0000 (13:43 +0200)
committerPrasanna Huddar <prasanna.k.huddar@gmail.com>
Wed, 18 Jun 2014 15:19:32 +0000 (15:19 +0000)
- added fair competition queue keeper implementation
- added queueProcessor (harvesting and processing messages)
- per-connection-queues register for queueProcessor and are blocked if polling queue is full
- removed netty thread blocking by publish
- added dropping of messages
- added meaningful names to threads
- added harvester, queues zipper, messageSource registration stuff
- changed ticketPool to blocking if poolQueue full
- doubleCheckedLocking for harverster starving
- handshake pool switched to logging implementation
- onHello method is no more synchronized
- for messages {hello, features} timeout = 8s
- optimized synchronized singleton methods
- used ThreadFactoryBuilder
- processed comments

Change-Id: Ic0b4ebc2df4e19793fa1d1d125a0d31dd2f9c6c8
Signed-off-by: Michal Rehak <mirehak@cisco.com>
33 files changed:
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductor.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorFactory.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/HandshakeManagerImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/SwitchConnectionHandlerImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ThreadPoolLoggingExecutor.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/Enqueuer.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/HarvesterHandle.java [moved from openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/VersionExtractor.java with 57% similarity]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSourcePollRegistration.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSourcePollRegistrator.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueItem.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueItemOFImpl.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeper.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFactory.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFairImpl.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperHarvester.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java [deleted file]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessor.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessorLightImpl.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/RegisteredTypeExtractor.java [deleted file]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/Ticket.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketFinisher.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketFinisherImpl.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketProcessorFactory.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketProcessorFactoryImpl.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketResult.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesZipper.java [new file with mode: 0644]
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImplTest.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImplTest.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesZipperTest.java [new file with mode: 0644]

index 668bb57515a6008d2f0abc8fe0793a417f606a7b..e5720cb90d7ec8ed8d2bea0526eaa53b7350a542 100644 (file)
@@ -13,7 +13,7 @@ import java.util.concurrent.Future;
 
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
-import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 
@@ -98,11 +98,16 @@ public interface ConnectionConductor {
      * assign global queueKeeper
      * @param queueKeeper
      */
-    void setQueueKeeper(QueueKeeper<OfHeader, DataObject> queueKeeper);
+    void setQueueProcessor(QueueProcessor<OfHeader, DataObject> queueKeeper);
 
     /**
      * @param errorHandler for internal exception handling
      */
     void setErrorHandler(ErrorHandler errorHandler);
 
+    /**
+     * @param conductorId
+     */
+    void setId(int conductorId);
+
 }
index 3373761fa30ab7ee8a088e90b7c791a2dad829c1..2bd97db38e6060c7caa6ab691b4a940fea56903c 100644 (file)
@@ -8,8 +8,10 @@
 
 package org.opendaylight.openflowplugin.openflow.md.core;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
-import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 
@@ -17,17 +19,24 @@ import org.opendaylight.yangtools.yang.binding.DataObject;
  * @author mirehak
  *
  */
-public abstract class ConnectionConductorFactory {
+public final class ConnectionConductorFactory {
+
+    private static AtomicInteger conductorId = new AtomicInteger();
+    
+    private ConnectionConductorFactory() {
+        throw new UnsupportedOperationException();
+    }
 
     /**
      * @param connectionAdapter
-     * @param queueKeepe
+     * @param queueProcesso
      * @return conductor for given connection
      */
     public static ConnectionConductor createConductor(ConnectionAdapter connectionAdapter, 
-            QueueKeeper<OfHeader, DataObject> queueKeeper) {
+            QueueProcessor<OfHeader, DataObject> queueProcessor) {
         ConnectionConductor connectionConductor = new ConnectionConductorImpl(connectionAdapter);
-        connectionConductor.setQueueKeeper(queueKeeper);
+        connectionConductor.setQueueProcessor(queueProcessor);
+        connectionConductor.setId(conductorId.getAndIncrement());
         connectionConductor.init();
         return connectionConductor;
     }
index 31e4c581405a2fb5af34e8ed571aadabb420056f..81b22925afa13134ef99291bc2f2236d17dfddd2 100644 (file)
@@ -21,6 +21,9 @@ import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
@@ -61,6 +64,9 @@ import com.google.common.util.concurrent.Futures;
 public class ConnectionConductorImpl implements OpenflowProtocolListener,
         SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener {
 
+    /** ingress queue limit */
+    private static final int INGRESS_QUEUE_MAX_SIZE = 200;
+
     protected static final Logger LOG = LoggerFactory
             .getLogger(ConnectionConductorImpl.class);
 
@@ -79,7 +85,8 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
     protected SessionContext sessionContext;
 
-    private QueueKeeper<OfHeader, DataObject> queueKeeper;
+    private QueueProcessor<OfHeader, DataObject> queueProcessor;
+    private QueueKeeper<OfHeader> queue;
     private ThreadPoolExecutor hsPool;
     private HandshakeManager handshakeManager;
 
@@ -87,15 +94,26 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     
     private PortFeaturesUtil portFeaturesUtils;
 
+    private int conductorId;
+
+    private int ingressMaxQueueSize;
+
+    
     /**
      * @param connectionAdapter
      */
     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
+        this(connectionAdapter, INGRESS_QUEUE_MAX_SIZE);
+    }
+
+    /**
+     * @param connectionAdapter
+     * @param ingressMaxQueueSize ingress queue limit (blocking)
+     */
+    public ConnectionConductorImpl(ConnectionAdapter connectionAdapter, int ingressMaxQueueSize) {
         this.connectionAdapter = connectionAdapter;
+        this.ingressMaxQueueSize = ingressMaxQueueSize;
         conductorState = CONDUCTOR_STATE.HANDSHAKING;
-        int handshakeThreadLimit = 1;
-        hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit , handshakeThreadLimit, 0L, 
-                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
         firstHelloProcessed = false;
         handshakeManager = new HandshakeManagerImpl(connectionAdapter,
                 ConnectionConductor.versionOrder.get(0), ConnectionConductor.versionOrder);
@@ -106,14 +124,20 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
     @Override
     public void init() {
+        int handshakeThreadLimit = 1;
+        hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit , handshakeThreadLimit, 0L, 
+                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 
+                "OFHandshake-"+conductorId);
+        
         connectionAdapter.setMessageListener(this);
         connectionAdapter.setSystemListener(this);
         connectionAdapter.setConnectionReadyListener(this);
+        queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor, ingressMaxQueueSize);
     }
 
     @Override
-    public void setQueueKeeper(QueueKeeper<OfHeader, DataObject> queueKeeper) {
-        this.queueKeeper = queueKeeper;
+    public void setQueueProcessor(QueueProcessor<OfHeader, DataObject> queueProcessor) {
+        this.queueProcessor = queueProcessor;
     }
 
     /**
@@ -143,17 +167,33 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
     @Override
     public void onErrorMessage(ErrorMessage errorMessage) {
-        queueKeeper.push(errorMessage, this);
+        enqueueMessage(errorMessage);
+    }
+
+    
+    /**
+     * @param message
+     */
+    private void enqueueMessage(OfHeader message) {
+        enqueueMessage(message, QueueType.DEFAULT);
+    }
+
+    /**
+     * @param message
+     * @param queueType enqueue type
+     */
+    private void enqueueMessage(OfHeader message, QueueType queueType) {
+        queue.push(message, this, queueType);
     }
 
     @Override
     public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
-        queueKeeper.push(experimenterMessage, this);
+        enqueueMessage(experimenterMessage);
     }
 
     @Override
     public void onFlowRemovedMessage(FlowRemovedMessage message) {
-        queueKeeper.push(message, this);
+        enqueueMessage(message);
     }
 
 
@@ -167,7 +207,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
      * 4. If Hello message received again with not supported version, just disconnect.
      */
     @Override
-    public synchronized void onHelloMessage(final HelloMessage hello) {
+    public void onHelloMessage(final HelloMessage hello) {
         LOG.debug("processing HELLO.xid: {}", hello.getXid());
         firstHelloProcessed = true;
         checkState(CONDUCTOR_STATE.HANDSHAKING);
@@ -194,18 +234,18 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
     @Override
     public void onMultipartReplyMessage(MultipartReplyMessage message) {
-        queueKeeper.push(message, this);
+        enqueueMessage(message);
     }
 
     @Override
     public void onPacketInMessage(PacketInMessage message) {
-        queueKeeper.push(message, this, QueueKeeper.QueueType.UNORDERED);
+        enqueueMessage(message, QueueKeeper.QueueType.UNORDERED);
     }
 
     @Override
     public void onPortStatusMessage(PortStatusMessage message) {
         processPortStatusMsg(message);
-        queueKeeper.push(message, this);
+        enqueueMessage(message);
     }
     
     protected void processPortStatusMsg(PortStatus msg) {
@@ -361,7 +401,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     }
 
     @Override
-    public synchronized void onConnectionReady() {
+    public void onConnectionReady() {
         LOG.debug("connection is ready-to-use");
         if (! firstHelloProcessed) {
             HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
@@ -386,7 +426,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
             //  Because the GetFeaturesOutput contains information about the port
             //  in OF1.0 (that we would otherwise get from the PortDesc) we have to pass
             //  it up for parsing to convert into a NodeConnectorUpdate
-            queueKeeper.push(featureOutput, this);
+            enqueueMessage(featureOutput);
         }
         
         requestDesc();
@@ -472,4 +512,9 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         hsPool.shutdownNow();
         LOG.debug("pool is terminated: {}", hsPool.isTerminated());
     }
+    
+    @Override
+    public void setId(int conductorId) {
+        this.conductorId = conductorId;
+    }
 }
index acd76d6965204b72b73e49d33b0a1a153e11a4bf..c17b65318860b3fbf65476c49ba07a9a467a4b6d 100644 (file)
@@ -41,7 +41,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
     private Short version;
     private ErrorHandler errorHandler;
     
-    private long maxTimeout = 1000;
+    private long maxTimeout = 8000;
     private TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS;
     private Short highestVersion;
 
@@ -74,7 +74,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
     }
 
     @Override
-    public synchronized void shake() {
+    public void shake() {
         LOG.trace("handshake STARTED");
         setActiveXid(20L);
         HelloMessage receivedHelloLoc = receivedHello;
@@ -292,7 +292,6 @@ public class HandshakeManagerImpl implements HandshakeManager {
      */
     protected void postHandshake(Short proposedVersion, Long xid) throws Exception {
         // set version
-        long maxTimeout = 3000;
         version = proposedVersion;
 
         LOG.debug("version set: {}", proposedVersion);
index 945b663fbd1e23a73a2c2eac95fe490447d441bc..49b554e16d03d740f085329c42695efb242dd41c 100644 (file)
@@ -239,7 +239,7 @@ public class MDController implements IMDController, AutoCloseable {
         };
 
         ThreadPoolLoggingExecutor rpcPool = new ThreadPoolLoggingExecutor(rpcThreadLimit, rpcThreadLimit, 0L,
-                TimeUnit.MILLISECONDS, queue);
+                TimeUnit.MILLISECONDS, queue, "OFRpc");
         rpcPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
index 50d3bee271044d4f7c4f0e4e2db329b093460020..ecb9757f85c007e1e5cd092878a2be0d051f3822 100644 (file)
@@ -16,7 +16,7 @@ import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
 import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
-import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessorLightImpl;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
 
 /**
@@ -26,7 +26,7 @@ public class SwitchConnectionHandlerImpl implements SwitchConnectionHandler {
     
     private ScheduledThreadPoolExecutor spyPool; 
 
-    private QueueKeeperLightImpl queueKeeper;
+    private QueueProcessorLightImpl queueProcessor;
     private ErrorHandler errorHandler;
     private MessageSpy<DataContainer> messageSpy;
     private int spyRate = 10;
@@ -35,7 +35,7 @@ public class SwitchConnectionHandlerImpl implements SwitchConnectionHandler {
      *
      */
     public SwitchConnectionHandlerImpl() {
-        queueKeeper = new QueueKeeperLightImpl();
+        queueProcessor = new QueueProcessorLightImpl();
         
         //TODO: implement shutdown invocation upon service stop event
         spyPool = new ScheduledThreadPoolExecutor(1);
@@ -45,11 +45,11 @@ public class SwitchConnectionHandlerImpl implements SwitchConnectionHandler {
      * wire all up
      */
     public void init() {
-        queueKeeper.setTranslatorMapping(OFSessionUtil.getTranslatorMap());
-        queueKeeper.setPopListenersMapping(OFSessionUtil.getPopListenerMapping());
-        queueKeeper.setMessageSpy(messageSpy);
+        queueProcessor.setTranslatorMapping(OFSessionUtil.getTranslatorMap());
+        queueProcessor.setPopListenersMapping(OFSessionUtil.getPopListenerMapping());
+        queueProcessor.setMessageSpy(messageSpy);
         
-        queueKeeper.init();
+        queueProcessor.init();
         
         spyPool.scheduleAtFixedRate(messageSpy, spyRate, spyRate, TimeUnit.SECONDS);
     }
@@ -63,7 +63,7 @@ public class SwitchConnectionHandlerImpl implements SwitchConnectionHandler {
     @Override
     public void onSwitchConnected(ConnectionAdapter connectionAdapter) {
         ConnectionConductor conductor = ConnectionConductorFactory.createConductor(
-                connectionAdapter, queueKeeper);
+                connectionAdapter, queueProcessor);
         conductor.setErrorHandler(errorHandler);
     }
     
index 448f5ece920c3a61e5ab2ecdc7123454192a221c..b08bcc2027fc04fad3f483d14da2942acffa8b24 100644 (file)
@@ -14,8 +14,10 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
- * 
+ * threadPoolExecutor implementation logging exceptions thrown by threads
  */
 public class ThreadPoolLoggingExecutor extends ThreadPoolExecutor {
     
@@ -27,10 +29,13 @@ public class ThreadPoolLoggingExecutor extends ThreadPoolExecutor {
      * @param keepAliveTime
      * @param unit
      * @param workQueue
+     * @param poolName thread name prefix
      */
     public ThreadPoolLoggingExecutor(int corePoolSize, int maximumPoolSize,
-            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, 
+            final String poolName) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
+                new ThreadFactoryBuilder().setNameFormat(poolName+"-%d").build());
     }
 
     @Override
index 24ba19a676d1247cfec53b3a26197dd808f42024..a314018d777939e29bbbeea057bb2a3bce23a5ff 100644 (file)
@@ -54,9 +54,13 @@ public class SessionManagerOFImpl implements SessionManager {
     /**
      * @return singleton instance
      */
-    public static synchronized SessionManager getInstance() {
+    public static SessionManager getInstance() {
         if (instance == null) {
-            instance = new SessionManagerOFImpl();
+            synchronized (SessionContextOFImpl.class) {
+                if (instance == null) {
+                    instance = new SessionManagerOFImpl();
+                }
+            }
         }
         return instance;
     }
@@ -64,9 +68,15 @@ public class SessionManagerOFImpl implements SessionManager {
     /**
      * close and release singleton instace
      */
-    public static synchronized void releaseInstance() {
-        instance.close();
-        instance = null;
+    public static void releaseInstance() {
+        if (instance != null) {
+            synchronized (SessionManagerOFImpl.class) {
+                if (instance != null) {
+                    instance.close();
+                    instance = null;
+                }
+            }
+        }
     }
 
     private SessionManagerOFImpl() {
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/Enqueuer.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/Enqueuer.java
new file mode 100644 (file)
index 0000000..01a69d3
--- /dev/null
@@ -0,0 +1,28 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
+/**
+ * @param <IN> type of queue items 
+ */
+public interface Enqueuer<IN> {
+    
+    /**
+     * @param queueItem item to be enqueued
+     */
+    void enqueueQueueItem(IN queueItem);
+
+    /**
+     * @param queueItem
+     * @deprecated for testing and comparing purposes - this strategy blocks netty threads
+     */
+    @Deprecated
+    void directProcessQueueItem(QueueItem<OfHeader> queueItem);
+}
similarity index 57%
rename from openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/VersionExtractor.java
rename to openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/HarvesterHandle.java
index 0c9f830d843d47bf6d7e1d5aa9c760057c7cf99d..d1cefa70ff911497445630bbca222ed17083f9b5 100644 (file)
@@ -1,6 +1,6 @@
 /**
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
@@ -8,15 +8,13 @@
 package org.opendaylight.openflowplugin.openflow.md.queue;
 
 /**
- * @author mirehak
- * @param <T> message type
- *
+ * message harvester simple control
  */
-public interface VersionExtractor<T> {
+public interface HarvesterHandle {
 
     /**
-     * @param message
-     * @return version of message
+     * wakeup harvester in case it is in phase of starving sleep
      */
-    Short extractVersion(T message);
+    void ping();
+
 }
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSourcePollRegistration.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSourcePollRegistration.java
new file mode 100644 (file)
index 0000000..eff8189
--- /dev/null
@@ -0,0 +1,34 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+
+/**
+ * closable registration for {@link MessageSourcePollRegistrator}
+ * @param <IN> queue input message type
+ */
+public class MessageSourcePollRegistration<IN> implements AutoCloseable {
+    
+    private MessageSourcePollRegistrator<IN> messageSourceRegistry;
+    private IN messageSource;
+    
+    /**
+     * @param messageSourceRegistry
+     * @param messageSource 
+     */
+    public MessageSourcePollRegistration(MessageSourcePollRegistrator<IN> messageSourceRegistry,
+            IN messageSource) {
+        this.messageSourceRegistry = messageSourceRegistry;
+        this.messageSource = messageSource;
+    }
+
+    @Override
+    public void close() throws Exception {
+        messageSourceRegistry.unregisterMessageSource(messageSource);
+    }
+}
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSourcePollRegistrator.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSourcePollRegistrator.java
new file mode 100644 (file)
index 0000000..be43c83
--- /dev/null
@@ -0,0 +1,39 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.Collection;
+
+/**
+ * @param <IN> message wrapping type
+ * 
+ */
+public interface MessageSourcePollRegistrator<IN> {
+
+    /**
+     * @param messageSource to read from during processing
+     * @return closeable registration
+     */
+    AutoCloseable registerMessageSource(IN messageSource);
+
+    /**
+     * @param messageSource to be unregistered
+     * @return true if successfully unregistered
+     */
+    boolean unregisterMessageSource(IN messageSource);
+    
+    /**
+     * @return collection of registered message sources
+     */
+    Collection<IN> getMessageSources();
+
+    /**
+     * @return the harvest handle
+     */
+    HarvesterHandle getHarvesterHandle();
+}
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueItem.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueItem.java
new file mode 100644 (file)
index 0000000..a612d90
--- /dev/null
@@ -0,0 +1,32 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType;
+
+/**
+ * @param <IN> input message type 
+ */
+public interface QueueItem<IN> {
+    
+    /**
+     * @return wrapped message
+     */
+    IN getMessage();
+    
+    /**
+     * @return conductor the message arrived to
+     */
+    ConnectionConductor getConnectionConductor();
+    
+    /**
+     * @return queue type associated to this item 
+     */
+    QueueType getQueueType();
+}
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueItemOFImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueItemOFImpl.java
new file mode 100644 (file)
index 0000000..57fab66
--- /dev/null
@@ -0,0 +1,51 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
+/**
+ * QueueItem implementation based on {@link OfHeader}
+ */
+public class QueueItemOFImpl implements QueueItem<OfHeader> {
+
+    private OfHeader message;
+    private ConnectionConductor connectionConductor;
+    private QueueType queueType;
+
+    
+    
+    /**
+     * @param message
+     * @param connectionConductor
+     * @param queueType
+     */
+    public QueueItemOFImpl(OfHeader message,
+            ConnectionConductor connectionConductor, QueueType queueType) {
+        this.message = message;
+        this.connectionConductor = connectionConductor;
+        this.queueType = queueType;
+    }
+
+    @Override
+    public OfHeader getMessage() {
+        return message;
+    }
+
+    @Override
+    public ConnectionConductor getConnectionConductor() {
+        return connectionConductor;
+    }
+
+    @Override
+    public QueueType getQueueType() {
+        return queueType;
+    }
+}
index c22a81a2203249651294556b449075acbc148b4c..35eaacf1560a45cbc8bf6827cd937ee36e69ab2e 100644 (file)
@@ -7,13 +7,7 @@
  */
 package org.opendaylight.openflowplugin.openflow.md.queue;
 
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
-import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
-import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
 
 /**
  * This processing mechanism based on queue. Processing consists of 2 steps: translate and publish. 
@@ -28,7 +22,7 @@ import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
  * @param <IN> source type
  * @param <OUT> result type
  */
-public interface QueueKeeper<IN, OUT> {
+public interface QueueKeeper<IN> extends AutoCloseable {
     
     /** type of message enqueue */
     public enum QueueType {
@@ -37,18 +31,6 @@ public interface QueueKeeper<IN, OUT> {
         /** unordered processing - bypass fair processing */
         UNORDERED}
 
-    /**
-     * @param translatorMapping translators for message processing
-     */
-    void setTranslatorMapping(Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping);
-
-    /**
-     * enqueue message for processing using {@link QueueType#DEFAULT}
-     * @param message
-     * @param conductor source of message
-     */
-    void push(IN message, ConnectionConductor conductor);
-    
     /**
      * enqueue message for processing
      * @param message
@@ -58,7 +40,7 @@ public interface QueueKeeper<IN, OUT> {
     void push(IN message, ConnectionConductor conductor, QueueType queueType);
 
     /**
-     * @param popListenersMapping listeners invoked when processing done
+     * @return oldest item from queue - if available and remove it from queue
      */
-    void setPopListenersMapping(Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping);
+    QueueItem<IN> poll();
 }
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFactory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFactory.java
new file mode 100644 (file)
index 0000000..e023fd4
--- /dev/null
@@ -0,0 +1,34 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
+/**
+ * factory for {@link QueueKeeper} implementations
+ */
+public abstract class QueueKeeperFactory {
+    
+    /**
+     * @param sourceRegistrator 
+     * @param capacity blocking queue capacity
+     * @return fair reading implementation of {@link QueueKeeper}
+     */
+    @SuppressWarnings("resource")
+    public static QueueKeeper<OfHeader> createFairQueueKeeper(
+            MessageSourcePollRegistrator<QueueKeeper<OfHeader>> sourceRegistrator, int capacity) {
+        QueueKeeperFairImpl queueKeeper = new QueueKeeperFairImpl();
+        queueKeeper.setCapacity(capacity);
+        queueKeeper.setHarvesterHandle(sourceRegistrator.getHarvesterHandle());
+        queueKeeper.init();
+        
+        AutoCloseable registration = sourceRegistrator.registerMessageSource(queueKeeper);
+        queueKeeper.setPollRegistration(registration);
+        return queueKeeper;
+    }
+}
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFairImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFairImpl.java
new file mode 100644 (file)
index 0000000..30b4172
--- /dev/null
@@ -0,0 +1,111 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.util.PollableQueuesZipper;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * QueueKeeper implementation based on {@link OfHeader} 
+ */
+public class QueueKeeperFairImpl implements QueueKeeper<OfHeader> {
+
+    private static Logger LOG = LoggerFactory
+            .getLogger(QueueKeeperFairImpl.class);
+    
+    private Queue<QueueItem<OfHeader>> queueDefault;
+    private BlockingQueue<QueueItem<OfHeader>> queueUnordered;
+    private AutoCloseable pollRegistration;
+    private int capacity = 5000;
+    private HarvesterHandle harvesterHandle;
+    private PollableQueuesZipper<QueueItem<OfHeader>> queueZipper;
+
+    @Override
+    public void close() throws Exception {
+        Preconditions.checkNotNull(pollRegistration, "pollRegistration not available");
+        pollRegistration.close();
+    }
+
+    @Override
+    public void push(
+            OfHeader message,
+            ConnectionConductor conductor,
+            QueueKeeper.QueueType queueType) {
+        QueueItemOFImpl qItem = new QueueItemOFImpl(message, conductor, queueType);
+        boolean enqueued = false;
+        
+        switch (queueType) {
+        case DEFAULT:
+            enqueued = queueDefault.offer(qItem);
+            break;
+        case UNORDERED:
+            enqueued = queueUnordered.offer(qItem);
+            break;
+        default:
+            LOG.warn("unsupported queue type: [{}] -> dropping message [{}]", queueType, message.getImplementedInterface());
+        }
+        
+        if (enqueued) {
+            harvesterHandle.ping();
+        } else {
+            LOG.debug("ingress throttling is use -> {}", queueType);
+        }
+        
+        // if enqueueing fails -> message will be dropped 
+    }
+    
+    /**
+     * @return the ingressQueue
+     */
+    @Override
+    public QueueItem<OfHeader> poll() {
+        QueueItem<OfHeader> nextQueueItem = queueZipper.poll();
+        return nextQueueItem;
+    }
+
+    /**
+     * @param processingRegistration the processingRegistration to set
+     */
+    public void setPollRegistration(AutoCloseable processingRegistration) {
+        this.pollRegistration = processingRegistration;
+    }
+    
+    /**
+     * @param capacity the capacity of internal blocking queue
+     */
+    public void setCapacity(int capacity) {
+        this.capacity = capacity;
+    }
+    
+    /**
+     * init blocking queue
+     */
+    public void init() {
+        queueUnordered = new ArrayBlockingQueue<>(capacity);
+        queueDefault = new ArrayBlockingQueue<>(capacity);
+        queueZipper = new PollableQueuesZipper<>();
+        queueZipper.addSource(queueDefault);
+        queueZipper.addSource(queueUnordered);
+    }
+
+    /**
+     * @param harvesterHandle
+     */
+    public void setHarvesterHandle(HarvesterHandle harvesterHandle) {
+        this.harvesterHandle = harvesterHandle;
+    }
+}
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperHarvester.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperHarvester.java
new file mode 100644 (file)
index 0000000..e362aea
--- /dev/null
@@ -0,0 +1,90 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.Collection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @param <IN> 
+ * 
+ */
+public class QueueKeeperHarvester<IN> implements Runnable, HarvesterHandle {
+    private static Logger LOG = LoggerFactory.getLogger(QueueKeeperHarvester.class);
+
+    private Enqueuer<QueueItem<IN>> enqueuer;
+    private Collection<QueueKeeper<IN>> messageSources;
+
+    private boolean finishing = false;
+    private boolean starving;
+    
+    private Object harvestLock;
+
+    
+    /**
+     * @param enqueuer
+     * @param messageSources
+     * @param harvestLock 
+     */
+    public QueueKeeperHarvester(Enqueuer<QueueItem<IN>> enqueuer,
+            Collection<QueueKeeper<IN>> messageSources) {
+        this.enqueuer = enqueuer;
+        this.messageSources = messageSources;
+        harvestLock = new Object();
+    }
+
+    @Override
+    public void run() {
+        while (! finishing ) {
+            starving = true;
+            for (QueueKeeper<IN> source : messageSources) {
+                QueueItem<IN> qItem = source.poll();
+                if (qItem != null) {
+                    starving = false;
+                    enqueuer.enqueueQueueItem(qItem);
+                }
+            }
+            
+            if (starving) {
+                synchronized (harvestLock) {
+                    try {
+                        if (starving) {
+                            LOG.trace("messageHarvester is about to make a starve sleep");
+                            harvestLock.wait();
+                            LOG.trace("messageHarvester is waking up from a starve sleep");
+                        }
+                    } catch (InterruptedException e) {
+                        LOG.warn("message harvester has been interrupted during starve sleep", e);
+                    }
+                }
+            }
+        }
+    }
+    
+    /**
+     * finish harvester
+     */
+    public void shutdown() {
+        this.finishing = true;
+    }
+    
+    @Override
+    public void ping() {
+        if (starving) {
+            LOG.debug("pinging message harvester in starve status");
+            synchronized (harvestLock) {
+                if (starving) {
+                    starving = false;
+                    harvestLock.notify();
+                }
+            }
+        }
+    }
+}
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java
deleted file mode 100644 (file)
index 04a7ad7..0000000
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowplugin.openflow.md.queue;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
-import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
-import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
-import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
-import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
-import org.opendaylight.yangtools.yang.binding.DataContainer;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
- * <br/>
- * There is internal thread pool of limited size ({@link QueueKeeperLightImpl#setPoolSize(int)}) 
- * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners)
- * <br/>
- * Workflow:
- * <ol>
- * <li>upon message push ticket is created and enqueued</li>
- * <li>available threads from internal pool translate the massage wrapped in ticket</li>
- * <li>when translation of particular message is finished, result is set in future result of wrapping ticket</br>
- *     (order of tickets in queue is not touched during translate)
- * </li>
- * <li>at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does:
- *    <ol>
- *      <li>invoke blocking {@link BlockingQueue#take()} method in order to get the oldest ticket</li>
- *      <li>invoke blocking {@link Future#get()} on the dequeued ticket</li>
- *      <li>as soon as the result of translation is available, appropriate popListener is invoked</li>
- *    </ol>
- *    and this way the order of messages is preserved and also multiple threads are used by translating 
- * </li>
- * </ol>
- * 
- * 
- */
-public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
-
-    private static final Logger LOG = LoggerFactory
-            .getLogger(QueueKeeperLightImpl.class);
-
-    private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
-    private BlockingQueue<TicketResult<DataObject>> processQueue;
-    private ScheduledThreadPoolExecutor pool;
-    private int poolSize = 10;
-    private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
-    private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
-    private MessageSpy<DataContainer> messageSpy;
-
-    private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
-        @Override
-        public Short extractVersion(OfHeader message) {
-            return message.getVersion();
-        }
-    };
-
-    private RegisteredTypeExtractor<OfHeader> registeredSrcTypeExtractor =
-            new RegisteredTypeExtractor<OfHeader>() {
-        @SuppressWarnings("unchecked")
-        @Override
-        public Class<? extends OfHeader> extractRegisteredType(
-                OfHeader message) {
-            return (Class<? extends OfHeader>) message.getImplementedInterface();
-        }
-    };
-
-    private RegisteredTypeExtractor<DataObject> registeredOutTypeExtractor =
-            new RegisteredTypeExtractor<DataObject>() {
-        @SuppressWarnings("unchecked")
-        @Override
-        public Class<? extends DataObject> extractRegisteredType(
-                DataObject message) {
-            return (Class<? extends DataObject>) message.getImplementedInterface();
-        }
-    };
-
-    /**
-     * prepare queue
-     */
-    public void init() {
-        processQueue = new LinkedBlockingQueue<>(1000);
-        pool = new ScheduledThreadPoolExecutor(poolSize);
-
-        ticketProcessorFactory = new TicketProcessorFactory<>();
-        ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor);
-        ticketProcessorFactory.setTranslatorMapping(translatorMapping);
-        ticketProcessorFactory.setVersionExtractor(versionExtractor);
-        ticketProcessorFactory.setSpy(messageSpy);
-
-        TicketFinisher<DataObject> finisher = new TicketFinisher<>(
-                processQueue, popListenersMapping, registeredOutTypeExtractor);
-        new Thread(finisher).start();
-    }
-
-    /**
-     * stop processing queue
-     */
-    public void shutdown() {
-        pool.shutdown();
-    }
-
-    @Override
-    public void push(OfHeader message, ConnectionConductor conductor) {
-        push(message,conductor,QueueKeeper.QueueType.DEFAULT);
-    }
-
-    @Override
-    public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
-        messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
-        if(queueType == QueueKeeper.QueueType.DEFAULT) {
-            TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
-            ticket.setConductor(conductor);
-            ticket.setMessage(message);
-            LOG.debug("ticket scheduling: {}, ticket: {}",
-                    message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
-            try {
-                processQueue.put(ticket);
-                scheduleTicket(ticket);
-            } catch (InterruptedException e) {
-                LOG.warn("message enqueing interrupted", e);
-            }
-        } else if (queueType == QueueKeeper.QueueType.UNORDERED){
-            List<DataObject> processedMessages = translate(message,conductor);
-            pop(processedMessages,conductor);
-        }
-    }
-
-    /**
-     * @param ticket
-     */
-    private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
-        Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
-        pool.execute(ticketProcessor);
-    }
-
-    /**
-     * @param poolSize the poolSize to set
-     */
-    public void setPoolSize(int poolSize) {
-        this.poolSize = poolSize;
-    }
-
-    @Override
-    public void setTranslatorMapping(
-            Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
-        this.translatorMapping = translatorMapping;
-    }
-
-    @Override
-    public void setPopListenersMapping(
-            Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
-        this.popListenersMapping = popListenersMapping;
-    }
-
-    /**
-     * @param messageSpy the messageSpy to set
-     */
-    public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
-        this.messageSpy = messageSpy;
-    }
-
-    private List<DataObject> translate(OfHeader message, ConnectionConductor conductor) {
-        List<DataObject> result = new ArrayList<>();
-        Class<? extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
-        Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> translators = null;
-        LOG.debug("translating message: {}", messageType.getSimpleName());
-
-        Short version = versionExtractor.extractVersion(message);
-        if (version == null) {
-           throw new IllegalArgumentException("version is NULL");
-        }
-        TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
-        translators = translatorMapping.get(tKey);
-
-        LOG.debug("translatorKey: {} + {}", version, messageType.getName());
-
-        if (translators != null) {
-            for (IMDMessageTranslator<OfHeader, List<DataObject>> translator : translators) {
-                SwitchConnectionDistinguisher cookie = null;
-                // Pass cookie only for PACKT_IN
-                if (messageType.equals("PacketInMessage.class")) {
-                    cookie = conductor.getAuxiliaryKey();
-                }
-                List<DataObject> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
-                if(translatorOutput != null) {
-                    result.addAll(translatorOutput);
-                }
-            }
-            if (messageSpy != null) {
-                messageSpy.spyIn(message);
-                for (DataObject outMsg : result) {
-                    messageSpy.spyOut(outMsg);
-                }
-            }
-        } else {
-            LOG.warn("No translators for this message Type: {}", messageType);
-            messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
-        }
-        return result;
-    }
-
-    /**
-     * @param processedMessages
-     * @param conductor
-     */
-    private void pop(List<DataObject> processedMessages, ConnectionConductor conductor) {
-        for (DataObject msg : processedMessages) {
-            Class<? extends Object> registeredType =
-                    registeredOutTypeExtractor.extractRegisteredType(msg);
-            Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
-            if (popListeners == null) {
-                LOG.warn("no popListener registered for type {}"+registeredType);
-            } else {
-                for (PopListener<DataObject> consumer : popListeners) {
-                    consumer.onPop(msg);
-                }
-            }
-        }
-    }
-}
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessor.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessor.java
new file mode 100644 (file)
index 0000000..bd74c33
--- /dev/null
@@ -0,0 +1,42 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+
+/**
+ * This processing mechanism based on queue. Processing consists of 2 steps: translate and publish. 
+ * Proposed workflow (might slightly deviate in implementations):
+ * <ol>
+ * <li>messages of input type are pushed in (via {@link QueueProcessor#push(Object, ConnectionConductor)} and similar)</li>
+ * <li>ticket (executable task) is build upon each pushed message and enqueued</li>
+ * <li>ticket is translated using appropriate translator</li>
+ * <li>ticket is dequeued and result is published by appropriate popListener</li>
+ * </ol>
+ * Message order might be not important, e.g. when speed is of the essence
+ * @param <IN> source type
+ * @param <OUT> result type
+ */
+public interface QueueProcessor<IN, OUT> extends MessageSourcePollRegistrator<QueueKeeper<IN>>, Enqueuer<QueueItem<IN>> {
+    
+    /**
+     * @param translatorMapping translators for message processing
+     */
+    void setTranslatorMapping(Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping);
+
+    /**
+     * @param popListenersMapping listeners invoked when processing done
+     */
+    void setPopListenersMapping(Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping);
+}
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessorLightImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessorLightImpl.java
new file mode 100644 (file)
index 0000000..ad62f01
--- /dev/null
@@ -0,0 +1,241 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
+import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
+ * <br/>
+ * There is internal thread pool of limited size ({@link QueueProcessorLightImpl#setProcessingPoolSize(int)}) 
+ * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners)
+ * <br/>
+ * Workflow:
+ * <ol>
+ * <li>upon message push ticket is created and enqueued</li>
+ * <li>available threads from internal pool translate the massage wrapped in ticket</li>
+ * <li>when translation of particular message is finished, result is set in future result of wrapping ticket</br>
+ *     (order of tickets in queue is not touched during translate)
+ * </li>
+ * <li>at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does:
+ *    <ol>
+ *      <li>invoke blocking {@link BlockingQueue#take()} method in order to get the oldest ticket</li>
+ *      <li>invoke blocking {@link Future#get()} on the dequeued ticket</li>
+ *      <li>as soon as the result of translation is available, appropriate popListener is invoked</li>
+ *    </ol>
+ *    and this way the order of messages is preserved and also multiple threads are used by translating 
+ * </li>
+ * </ol>
+ * 
+ * 
+ */
+public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObject> {
+
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(QueueProcessorLightImpl.class);
+
+    private BlockingQueue<TicketResult<DataObject>> ticketQueue;
+    private ThreadPoolExecutor processorPool;
+    private int processingPoolSize = 4;
+    private ExecutorService harvesterPool;
+    private ExecutorService finisherPool;
+    
+    protected Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
+    private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
+    private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
+    private MessageSpy<DataContainer> messageSpy;
+    protected Collection<QueueKeeper<OfHeader>> messageSources;
+    private QueueKeeperHarvester<OfHeader> harvester;
+
+    protected TicketFinisher<DataObject> finisher;
+
+    /**
+     * prepare queue
+     */
+    public void init() {
+        int ticketQueueCapacity = 1500;
+        ticketQueue = new ArrayBlockingQueue<>(ticketQueueCapacity);
+        messageSources = new ConcurrentSkipListSet<>(
+                new Comparator<QueueKeeper<OfHeader>>() {
+                    @Override
+                    public int compare(QueueKeeper<OfHeader> o1,
+                            QueueKeeper<OfHeader> o2) {
+                        return Integer.valueOf(o1.hashCode()).compareTo(o2.hashCode());
+                    }
+                });
+        
+        processorPool = new ThreadPoolLoggingExecutor(processingPoolSize, processingPoolSize, 0, 
+                TimeUnit.MILLISECONDS, 
+                new ArrayBlockingQueue<Runnable>(ticketQueueCapacity), 
+                "OFmsgProcessor");
+        // force blocking when pool queue is full
+        processorPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+            @Override
+            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                try {
+                    executor.getQueue().put(r);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new IllegalStateException(e);
+                }
+            }
+        });
+        
+        harvesterPool = new ThreadPoolLoggingExecutor(1, 1, 0, 
+                TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "OFmsgHarvester");
+        finisherPool = new ThreadPoolLoggingExecutor(1, 1, 0, 
+                TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "OFmsgFinisher");
+        finisher = new TicketFinisherImpl(
+                ticketQueue, popListenersMapping);
+        finisherPool.execute(finisher);
+        
+        harvester = new QueueKeeperHarvester<OfHeader>(this, messageSources);
+        harvesterPool.execute(harvester);
+
+        ticketProcessorFactory = new TicketProcessorFactoryImpl();
+        ticketProcessorFactory.setTranslatorMapping(translatorMapping);
+        ticketProcessorFactory.setSpy(messageSpy);
+        ticketProcessorFactory.setTicketFinisher(finisher);
+    }
+
+    /**
+     * stop processing queue
+     */
+    public void shutdown() {
+        processorPool.shutdown();
+    }
+
+    @Override
+    public void enqueueQueueItem(QueueItem<OfHeader> queueItem) {
+        messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
+        TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
+        ticket.setConductor(queueItem.getConnectionConductor());
+        ticket.setMessage(queueItem.getMessage());
+        ticket.setQueueType(queueItem.getQueueType());
+        
+        LOG.trace("ticket scheduling: {}, ticket: {}",
+                queueItem.getMessage().getImplementedInterface().getSimpleName(), 
+                System.identityHashCode(queueItem));
+        scheduleTicket(ticket);
+    }
+    
+    
+    @Override
+    public void directProcessQueueItem(QueueItem<OfHeader> queueItem) {
+        messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
+        TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
+        ticket.setConductor(queueItem.getConnectionConductor());
+        ticket.setMessage(queueItem.getMessage());
+        
+        LOG.debug("ticket scheduling: {}, ticket: {}",
+                queueItem.getMessage().getImplementedInterface().getSimpleName(), 
+                System.identityHashCode(queueItem));
+        
+        ticketProcessorFactory.createProcessor(ticket).run();
+        
+        // publish notification
+        finisher.firePopNotification(ticket.getDirectResult());
+    }
+
+    /**
+     * @param ticket
+     */
+    private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
+        switch (ticket.getQueueType()) {
+        case DEFAULT:
+            Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
+            processorPool.execute(ticketProcessor);
+            try {
+                ticketQueue.put(ticket);
+            } catch (InterruptedException e) {
+                LOG.warn("enqeueue of unordered message ticket failed", e);
+            }
+            break;
+        case UNORDERED:
+            Runnable ticketProcessorSync = ticketProcessorFactory.createSyncProcessor(ticket);
+            processorPool.execute(ticketProcessorSync);
+            break;
+        default:
+            LOG.warn("unsupported enqueue type: {}", ticket.getQueueType());
+        }
+    }
+
+    /**
+     * @param poolSize the poolSize to set
+     */
+    public void setProcessingPoolSize(int poolSize) {
+        this.processingPoolSize = poolSize;
+    }
+
+    @Override
+    public void setTranslatorMapping(
+            Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
+        this.translatorMapping = translatorMapping;
+    }
+
+    @Override
+    public void setPopListenersMapping(
+            Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
+        this.popListenersMapping = popListenersMapping;
+    }
+
+    /**
+     * @param messageSpy the messageSpy to set
+     */
+    public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
+        this.messageSpy = messageSpy;
+    }
+
+    @Override
+    public AutoCloseable registerMessageSource(QueueKeeper<OfHeader> queue) {
+        boolean added = messageSources.add(queue);
+        if (! added) {
+            LOG.debug("registration of message source queue failed - already registered");
+        }
+        MessageSourcePollRegistration<QueueKeeper<OfHeader>> queuePollRegistration = 
+                new MessageSourcePollRegistration<>(this, queue);
+        return queuePollRegistration;
+    }
+    
+    @Override
+    public boolean unregisterMessageSource(QueueKeeper<OfHeader> queue) {
+        return messageSources.remove(queue);
+    }
+    
+    @Override
+    public Collection<QueueKeeper<OfHeader>> getMessageSources() {
+        return messageSources;
+    }
+    
+    @Override
+    public HarvesterHandle getHarvesterHandle() {
+        return harvester;
+    }
+}
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/RegisteredTypeExtractor.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/RegisteredTypeExtractor.java
deleted file mode 100644 (file)
index 4d805d7..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowplugin.openflow.md.queue;
-
-/**
- * @author mirehak
- * @param <T> parent type of message 
- */
-public interface RegisteredTypeExtractor<T> {
-    
-    /**
-     * @param message
-     * @return registered message type
-     */
-    public Class<? extends T> extractRegisteredType(T message);
-
-}
index 195284ccdeb97891e3d81e375f4b20ff596db571..e778d612af2c45d858dfa9098c64b14005b24372 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.openflowplugin.openflow.md.queue;
 
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType;
 
 /**
  * @author mirehak
@@ -26,4 +27,9 @@ public interface Ticket<IN, OUT> extends TicketResult<OUT> {
      * @return processed message
      */
     IN getMessage();
+    
+    /**
+     * @return queue type associated with ticket
+     */
+    QueueType getQueueType();
 }
index deeb4d938215527cbe9d30c9ccee40946d562e0d..0018b2da99e5086e30217836b758cdc5ae124f5d 100644 (file)
@@ -7,67 +7,22 @@
  */
 package org.opendaylight.openflowplugin.openflow.md.queue;
 
-import java.util.Collection;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * @param <OUT> result type
  *
  */
-public class TicketFinisher<OUT> implements Runnable {
-
-    private static final Logger LOG = LoggerFactory
-            .getLogger(TicketFinisher.class);
-
-    private final BlockingQueue<TicketResult<OUT>> queue;
-    private final Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping;
-    private final RegisteredTypeExtractor<OUT> registeredOutTypeExtractor;
+public interface TicketFinisher<OUT> extends Runnable {
 
     /**
-     * @param queue
-     * @param popListenersMapping
-     * @param registeredOutTypeExtractor
+     * initiate shutdown of this worker
      */
-    public TicketFinisher(BlockingQueue<TicketResult<OUT>> queue,
-            Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping,
-            RegisteredTypeExtractor<OUT> registeredOutTypeExtractor) {
-        this.queue = queue;
-        this.popListenersMapping = popListenersMapping;
-        this.registeredOutTypeExtractor = registeredOutTypeExtractor;
-    }
-
-
-    @Override
-    public void run() {
-        while (true) {
-            try {
-                //TODO:: handle shutdown of queue
-                TicketResult<OUT> result = queue.take();
-                long before = System.nanoTime();
-                LOG.debug("finishing ticket(before): {}, {} remain in queue, {} capacity remaining", System.identityHashCode(result),queue.size(), queue.remainingCapacity());
-                List<OUT> processedMessages = result.getResult().get();
-                long after = System.nanoTime();
-                LOG.debug("finishing ticket(after): {}, {} remain in queue, {} capacity remaining, processingTime {} ns", System.identityHashCode(result),queue.size(), queue.remainingCapacity(),after-before);
-                for (OUT msg : processedMessages) {
-                    Class<? extends Object> registeredType =
-                            registeredOutTypeExtractor.extractRegisteredType(msg);
-                    Collection<PopListener<OUT>> popListeners = popListenersMapping.get(registeredType);
-                    if (popListeners == null) {
-                        LOG.warn("no popListener registered for type {}"+registeredType);
-                    } else {
-                        for (PopListener<OUT> consumer : popListeners) {
-                            consumer.onPop(msg);
-                        }
-                    }
-                }
-            } catch (Exception e) {
-                LOG.error(e.getMessage(), e);
-            }
-        }
-    }
+    void finish();
+    
+    /**
+     * notify popListeners
+     * @param processedMessages
+     */
+    void firePopNotification(List<OUT> processedMessages);
 }
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketFinisherImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketFinisherImpl.java
new file mode 100644 (file)
index 0000000..aae4cad
--- /dev/null
@@ -0,0 +1,80 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class TicketFinisherImpl implements TicketFinisher<DataObject> {
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(TicketFinisherImpl.class);
+
+    private final Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
+
+    private boolean finished;
+
+    private BlockingQueue<TicketResult<DataObject>> queue;
+
+    /**
+     * @param queue
+     * @param popListenersMapping
+     */
+    public TicketFinisherImpl(BlockingQueue<TicketResult<DataObject>> queue,
+            Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
+        this.queue = queue;
+        this.popListenersMapping = popListenersMapping;
+    }
+    
+    @Override
+    public void run() {
+        while (! finished ) {
+            try {
+                //TODO:: handle shutdown of queue
+                TicketResult<DataObject> result = queue.take();
+                List<DataObject> processedMessages = result.getResult().get();
+                firePopNotification(processedMessages);
+            } catch (Exception e) {
+                LOG.warn("processing (translate, publish) of ticket failed", e);
+            }
+        }
+    }
+    
+    @Override
+    public void firePopNotification(List<DataObject> processedMessages) {
+        for (DataObject msg : processedMessages) {
+            Class<? extends Object> registeredType =
+                    msg.getImplementedInterface();
+            Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
+            if (popListeners == null) {
+                LOG.warn("no popListener registered for type {}", registeredType);
+            } else {
+                for (PopListener<DataObject> consumer : popListeners) {
+                    consumer.onPop(msg);
+                }
+            }
+        }
+    }
+    
+    /**
+     * initiate shutdown of this worker
+     */
+    @Override
+    public void finish() {
+        finished = true;
+    }
+}
index 81c72ef6455768f72732763858b3fb5bc1dd7217..7a11f3f00c523be0eaa668fa42e184ecb022e7bc 100644 (file)
@@ -10,20 +10,21 @@ package org.opendaylight.openflowplugin.openflow.md.queue;
 import java.util.List;
 
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType;
 
 import com.google.common.util.concurrent.SettableFuture;
 
 /**
- * @author mirehak
  * @param <IN> source type
  * @param <OUT> result type
- *
  */
 public class TicketImpl<IN, OUT> implements Ticket<IN, OUT> {
     
     private IN message;
     private ConnectionConductor conductor;
     private SettableFuture<List<OUT>> future;
+    private QueueType queueType;
+    private List<OUT> directResult;
     
     /**
      * default ctor
@@ -66,4 +67,32 @@ public class TicketImpl<IN, OUT> implements Ticket<IN, OUT> {
     public void setConductor(ConnectionConductor conductor) {
         this.conductor = conductor;
     }
+    
+    /**
+     * @param queueType the queueType to set
+     */
+    public void setQueueType(QueueType queueType) {
+        this.queueType = queueType;
+    }
+    
+    @Override
+    public QueueType getQueueType() {
+        return queueType;
+    }
+
+    /**
+     * @return the directResult
+     */
+    @Override
+    public List<OUT> getDirectResult() {
+        return directResult;
+    }
+
+    /**
+     * @param directResult the directResult to set
+     */
+    @Override
+    public void setDirectResult(List<OUT> directResult) {
+        this.directResult = directResult;
+    }
 }
index ace82888294444b66b399db8f9a5c01bd7dbfbc7..17d753934308eb17e324ec78804030866d52703c 100644 (file)
  */
 package org.opendaylight.openflowplugin.openflow.md.queue;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
-import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * @param <IN>
  * @param <OUT>
  */
-public class TicketProcessorFactory<IN extends DataObject, OUT extends DataObject> {
-
-    protected static final Logger LOG = LoggerFactory
-            .getLogger(TicketProcessorFactory.class);
-
-    protected VersionExtractor<IN> versionExtractor;
-    protected RegisteredTypeExtractor<IN> registeredTypeExtractor;
-    protected Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping;
-    protected MessageSpy<DataContainer> spy;
+public interface TicketProcessorFactory<IN extends DataObject, OUT extends DataObject> {
 
     /**
-     * @param versionExtractor the versionExtractor to set
+     * @param ticket
+     * @return runnable ticket processor
      */
-    public void setVersionExtractor(VersionExtractor<IN> versionExtractor) {
-        this.versionExtractor = versionExtractor;
-    }
+    Runnable createProcessor(final Ticket<IN, OUT> ticket);
 
     /**
-     * @param registeredTypeExtractor the registeredTypeExtractor to set
+     * @param ticket
+     * @return runnable ticket processor
      */
-    public void setRegisteredTypeExtractor(
-            RegisteredTypeExtractor<IN> registeredTypeExtractor) {
-        this.registeredTypeExtractor = registeredTypeExtractor;
-    }
+    Runnable createSyncProcessor(final Ticket<IN, OUT> ticket);
 
     /**
-     * @param translatorMapping the translatorMapping to set
+     * @param ticket
+     * @return translated messages
+     * 
      */
-    public void setTranslatorMapping(
-            Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping) {
-        this.translatorMapping = translatorMapping;
-    }
+    List<OUT> translate(Ticket<IN, OUT> ticket);
 
     /**
-     * @param spy the spy to set
+     * @param ticketFinisher setter
      */
-    public void setSpy(MessageSpy<DataContainer> spy) {
-        this.spy = spy;
-    }
-
+    void setTicketFinisher(TicketFinisher<OUT> ticketFinisher);
 
     /**
-     * @param ticket
-     * @return runnable ticket processor
+     * @param spy setter
      */
-    public Runnable createProcessor(final Ticket<IN, OUT> ticket) {
-
-        Runnable ticketProcessor = new Runnable() {
-            @Override
-            public void run() {
-                LOG.debug("message received, type: {}", registeredTypeExtractor.extractRegisteredType(
-                        ticket.getMessage()).getSimpleName());
-                List<OUT> translate;
-                try {
-                    translate = translate();
-                    ticket.getResult().set(translate);
-                    // spying on result
-                    if (spy != null) {
-                        spy.spyIn(ticket.getMessage());
-                        for (OUT outMessage : ticket.getResult().get()) {
-                            spy.spyOut(outMessage);
-                        }
-                    }
-                } catch (Exception e) {
-                    LOG.error("translation problem: {}", e.getMessage());
-                    ticket.getResult().setException(e);
-                }
-                LOG.debug("message processing done (type: {}, ticket: {})",
-                        registeredTypeExtractor.extractRegisteredType(ticket.getMessage()).getSimpleName(),
-                        System.identityHashCode(ticket));
-            }
-
-            /**
-             *
-             */
-            private List<OUT> translate() {
-                List<OUT> result = new ArrayList<>();
-
-                IN message = ticket.getMessage();
-                Class<? extends IN> messageType = registeredTypeExtractor.extractRegisteredType(ticket.getMessage());
-                ConnectionConductor conductor = ticket.getConductor();
-                Collection<IMDMessageTranslator<IN, List<OUT>>> translators = null;
-                LOG.debug("translating ticket: {}, ticket: {}", messageType.getSimpleName(), System.identityHashCode(ticket));
-
-                Short version = versionExtractor.extractVersion(message);
-                if (version == null) {
-                    throw new IllegalArgumentException("version is NULL");
-                }
-                TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
-                translators = translatorMapping.get(tKey);
+    void setSpy(MessageSpy<DataContainer> spy);
 
-                LOG.debug("translatorKey: {} + {}", version, messageType.getName());
-
-                if (translators != null) {
-                    for (IMDMessageTranslator<IN, List<OUT>> translator : translators) {
-                        SwitchConnectionDistinguisher cookie = null;
-                        // Pass cookie only for PACKT_IN
-                        if (messageType.equals("PacketInMessage.class")) {
-                            cookie = conductor.getAuxiliaryKey();
-                        }
-                        long start = System.nanoTime();
-                        List<OUT> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
-                        long end = System.nanoTime();
-                        LOG.debug("translator: {} elapsed time {} ns",translator,end-start);
-                        if(translatorOutput != null && !translatorOutput.isEmpty()) {
-                            result.addAll(translatorOutput);
-                        }
-                    }
-                } else {
-                    LOG.warn("No translators for this message Type: {}", messageType);
-                }
-                return result;
-            }
-        };
-
-        return ticketProcessor;
-    }
+    /**
+     * @param translatorMapping setter
+     */
+    void setTranslatorMapping(Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping);
 }
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketProcessorFactoryImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketProcessorFactoryImpl.java
new file mode 100644 (file)
index 0000000..720c016
--- /dev/null
@@ -0,0 +1,178 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * OfHeader to DataObject implementation
+ */
+public class TicketProcessorFactoryImpl implements TicketProcessorFactory<OfHeader, DataObject> {
+
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(TicketProcessorFactoryImpl.class);
+
+    protected Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
+    protected MessageSpy<DataContainer> spy;
+    protected TicketFinisher<DataObject> ticketFinisher;
+
+    /**
+     * @param translatorMapping the translatorMapping to set
+     */
+    @Override
+    public void setTranslatorMapping(
+            Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
+        this.translatorMapping = ImmutableMap.copyOf(translatorMapping);
+    }
+
+    /**
+     * @param spy the spy to set
+     */
+    @Override
+    public void setSpy(MessageSpy<DataContainer> spy) {
+        this.spy = spy;
+    }
+
+    /**
+     * @param ticketFinisher the finisher to set
+     */
+    @Override
+    public void setTicketFinisher(TicketFinisher<DataObject> ticketFinisher) {
+        this.ticketFinisher = ticketFinisher;
+    }
+    
+    /**
+     * @param ticket
+     * @return runnable ticket processor
+     */
+    @Override
+    public Runnable createProcessor(final Ticket<OfHeader, DataObject> ticket) {
+
+        Runnable ticketProcessor = new Runnable() {
+            @Override
+            public void run() {
+                LOG.debug("message received, type: {}", ticket.getMessage().getImplementedInterface().getSimpleName());
+                List<DataObject> translate;
+                try {
+                    translate = translate(ticket);
+                    ticket.getResult().set(translate);
+                    ticket.setDirectResult(translate);
+                    // spying on result
+                    if (spy != null) {
+                        spy.spyIn(ticket.getMessage());
+                        for (DataObject outMessage : translate) {
+                            spy.spyOut(outMessage);
+                        }
+                    }
+                } catch (Exception e) {
+                    LOG.error("translation problem: {}", e.getMessage());
+                    ticket.getResult().setException(e);
+                }
+                LOG.debug("message processing done (type: {}, ticket: {})",
+                        ticket.getMessage().getImplementedInterface().getSimpleName(),
+                        System.identityHashCode(ticket));
+            }
+        };
+
+
+        return ticketProcessor;
+    }
+    
+    /**
+     * @param ticket
+     * @return runnable ticket processor
+     */
+    @Override
+    public Runnable createSyncProcessor(final Ticket<OfHeader, DataObject> ticket) {
+
+        Runnable ticketProcessor = new Runnable() {
+            @Override
+            public void run() {
+                List<DataObject> translate;
+                try {
+                    translate = translate(ticket);
+                    // spying on result
+                    if (spy != null) {
+                        spy.spyIn(ticket.getMessage());
+                        for (DataObject outMessage : translate) {
+                            spy.spyOut(outMessage);
+                        }
+                    }
+                    ticketFinisher.firePopNotification(translate);
+                } catch (Exception e) {
+                    LOG.error("translation problem: {}", e.getMessage());
+                    ticket.getResult().setException(e);
+                }
+            }
+        };
+
+
+        return ticketProcessor;
+    }
+    
+    
+    /**
+     * @param ticket 
+     *
+     */
+    @Override
+    public List<DataObject> translate(Ticket<OfHeader, DataObject> ticket) {
+        List<DataObject> result = new ArrayList<>();
+
+        OfHeader message = ticket.getMessage();
+        Class<? extends DataContainer> messageType = ticket.getMessage().getImplementedInterface();
+        ConnectionConductor conductor = ticket.getConductor();
+        Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> translators = null;
+        LOG.trace("translating ticket: {}, ticket: {}", messageType.getSimpleName(), System.identityHashCode(ticket));
+
+        Short version = message.getVersion();
+        if (version == null) {
+            throw new IllegalArgumentException("version is NULL");
+        }
+        TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
+        translators = translatorMapping.get(tKey);
+
+        LOG.debug("translatorKey: {} + {}", version, messageType.getName());
+
+        if (translators != null) {
+            for (IMDMessageTranslator<OfHeader, List<DataObject>> translator : translators) {
+                SwitchConnectionDistinguisher cookie = null;
+                // Pass cookie only for PACKT_OfHeader
+                if (messageType.equals("PacketInMessage.class")) {
+                    cookie = conductor.getAuxiliaryKey();
+                }
+                long start = System.nanoTime();
+                List<DataObject> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
+                long end = System.nanoTime();
+                LOG.trace("translator: {} elapsed time {} ns",translator,end-start);
+                if(translatorOutput != null && !translatorOutput.isEmpty()) {
+                    result.addAll(translatorOutput);
+                }
+            }
+        } else {
+            LOG.warn("No translators for this message Type: {}", messageType);
+        }
+        
+        return result;
+    }
+}
index c8dfd84c070d53ee1d29053ffb815b250adac345..031d26ae2c58d52a5aca113d5a287072e1717c51 100644 (file)
@@ -22,4 +22,14 @@ public interface TicketResult<T> {
      */
     SettableFuture<List<T>> getResult();
 
+    /**
+     * @return direct access to result
+     */
+    List<T> getDirectResult();
+
+    /**
+     * @param directResult setter for direct result
+     */
+    void setDirectResult(List<T> directResult);
+
 }
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesZipper.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesZipper.java
new file mode 100644 (file)
index 0000000..eb16172
--- /dev/null
@@ -0,0 +1,66 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.util;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+
+import com.google.common.collect.Iterators;
+
+/**
+ * Zipper groups together a list of queues and exposes one poll method. Polling iterates through
+ * all groups and returns first not-null result of poll method on each queue. If after polling each 
+ * grouped queue for one time there is still null result, poll will return null. 
+ * <br/>
+ * Iterating keeps last position so this polling is supposed to be fairly distributed.
+ * 
+ * @param <T> common item type of zipped queues
+ */
+public class PollableQueuesZipper<T> {
+    
+    private List<Queue<T>> sources;
+    private Iterator<Queue<T>> cursor;
+    
+    /**
+     * default ctor
+     */
+    public PollableQueuesZipper() {
+        sources = new ArrayList<>();
+    }
+    
+    /**
+     * Add all member queues before first invocation of {@link PollableQueuesZipper#poll()}
+     * @param queue to be added to group
+     */
+    public void addSource(Queue<T> queue) {
+        sources.add(queue);
+    }
+
+    /**
+     * @return next common product of polling member groups
+     */
+    public T poll() {
+        T item = null;
+        if (cursor == null) {
+            cursor = Iterators.cycle(sources);
+        }
+        
+        Queue<T> queue;
+        for (int i = 0; i < sources.size(); i++) {
+            queue = cursor.next();
+            item = queue.poll();
+            if (item != null) {
+                break;
+            }
+        }
+        
+        return item;
+    }
+}
index 6f2e3a6b507d937b37230f3dca6bb3efacc3be51..2ded7ad0e898527fafe704b686bf394c9f19b80b 100644 (file)
@@ -35,7 +35,7 @@ import org.opendaylight.openflowplugin.openflow.md.core.plan.SwitchTestEvent;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
 import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
 import org.opendaylight.openflowplugin.openflow.md.queue.PopListener;
-import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessorLightImpl;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.ErrorType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortFeatures;
@@ -79,7 +79,7 @@ public class ConnectionConductorImplTest {
     private ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
             8);
 
-    private QueueKeeperLightImpl queueKeeper;
+    private QueueProcessorLightImpl queueProcessor;
 
     private PopListener<DataObject> popListener;
 
@@ -136,24 +136,24 @@ public class ConnectionConductorImplTest {
 
         popListener = new PopListenerCountingImpl<>();
 
-        queueKeeper = new QueueKeeperLightImpl();
-        queueKeeper.setMessageSpy(messageSpy);
+        controller = new MDController();
+        controller.init();
+        controller.getMessageTranslators().putAll(assembleTranslatorMapping());
+        
+        queueProcessor = new QueueProcessorLightImpl();
+        queueProcessor.setMessageSpy(messageSpy);
+        queueProcessor.setPopListenersMapping(assemblePopListenerMapping());
+        queueProcessor.setTranslatorMapping(controller.getMessageTranslators());
+        queueProcessor.init();
 
         connectionConductor = new ConnectionConductorImpl(adapter);
-        connectionConductor.setQueueKeeper(queueKeeper);
-        connectionConductor.init();
+        connectionConductor.setQueueProcessor(queueProcessor);
         connectionConductor.setErrorHandler(errorHandler);
-        controller = new MDController();
-        controller.init();
-        queueKeeper.setTranslatorMapping(controller.getMessageTranslators());
+        connectionConductor.init();
         eventPlan = new Stack<>();
         adapter.setEventPlan(eventPlan);
         adapter.setProceedTimeout(5000L);
         adapter.checkListeners();
-
-        controller.getMessageTranslators().putAll(assembleTranslatorMapping());
-        queueKeeper.setPopListenersMapping(assemblePopListenerMapping());
-        queueKeeper.init();
     }
 
     /**
@@ -176,7 +176,7 @@ public class ConnectionConductorImplTest {
         if (libSimulation != null) {
             libSimulation.join();
         }
-        queueKeeper.shutdown();
+        queueProcessor.shutdown();
         connectionConductor.shutdownPool();
 
         for (Exception problem : adapter.getOccuredExceptions()) {
index 8e45eef1c25021358a05a9f23efbe427eb6c1bd3..d01b8f8e4bddc1f25f6dd7274ba7ae2fae698760 100644 (file)
@@ -23,7 +23,7 @@ import org.opendaylight.openflowplugin.openflow.md.ModelDrivenSwitch;
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.openflow.md.core.ErrorHandler;
 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
-import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
@@ -397,7 +397,7 @@ class MockConnectionConductor implements ConnectionConductor {
     }
 
     @Override
-    public void setQueueKeeper(QueueKeeper<OfHeader, DataObject> queueKeeper) {
+    public void setQueueProcessor(QueueProcessor<OfHeader, DataObject> queueKeeper) {
         // NOOP
     }
 
@@ -405,6 +405,11 @@ class MockConnectionConductor implements ConnectionConductor {
     public void setErrorHandler(ErrorHandler errorHandler) {
         // NOOP
     }
+
+    @Override
+    public void setId(int conductorId) {
+        // NOOP
+    }
 }
 
 enum MessageType {
@@ -590,5 +595,4 @@ class MockConnectionAdapter implements ConnectionAdapter {
         // TODO Auto-generated method stub
         return null;
     }
-
 }
diff --git a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesZipperTest.java b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesZipperTest.java
new file mode 100644 (file)
index 0000000..2b6ec93
--- /dev/null
@@ -0,0 +1,56 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.openflowplugin.openflow.md.util.PollableQueuesZipper;
+
+import com.google.common.collect.Lists;
+
+/**
+ * test for {@link PollableQueuesZipper}
+ */
+public class PollableQueuesZipperTest {
+
+    /**
+     * Test method for {@link org.opendaylight.openflowplugin.openflow.md.util.PollableQueuesZipper#poll()}.
+     */
+    @Test
+    public void testPoll() {
+        Queue<String> l1 = new LinkedBlockingQueue<String>(Lists.newArrayList("1", "2", "3"));
+        Queue<String> l2 = new LinkedBlockingQueue<String>(Lists.newArrayList("a", "b", "c", "d"));
+        Queue<String> l3 = new LinkedBlockingQueue<String>(Lists.newArrayList("A", "B"));
+
+        PollableQueuesZipper<String> zipper = new PollableQueuesZipper<>();
+        zipper.addSource(l1);
+        zipper.addSource(l2);
+        zipper.addSource(l3);
+
+        String[] expected = new String[] {
+                "1", "a", "A", "2", "b", "B", "3", "c", "d", null, "XXX"
+        };
+        List<String> result = new ArrayList<>();
+        while (true) {
+            String data = zipper.poll();
+            result.add(data);
+            if (data == null) {
+                break;
+            }
+        }
+        l1.offer("XXX");
+        result.add(zipper.poll());
+        Assert.assertArrayEquals(expected, result.toArray());
+    }
+
+}