From b0e8d777920a430776efe07c625637fe75204505 Mon Sep 17 00:00:00 2001 From: Michal Rehak Date: Tue, 27 May 2014 13:43:42 +0200 Subject: [PATCH] BUG-1075: ingress back pressure - added fair competition queue keeper implementation - added queueProcessor (harvesting and processing messages) - per-connection-queues register for queueProcessor and are blocked if polling queue is full - removed netty thread blocking by publish - added dropping of messages - added meaningful names to threads - added harvester, queues zipper, messageSource registration stuff - changed ticketPool to blocking if poolQueue full - doubleCheckedLocking for harverster starving - handshake pool switched to logging implementation - onHello method is no more synchronized - for messages {hello, features} timeout = 8s - optimized synchronized singleton methods - used ThreadFactoryBuilder - processed comments Change-Id: Ic0b4ebc2df4e19793fa1d1d125a0d31dd2f9c6c8 Signed-off-by: Michal Rehak --- .../openflow/md/core/ConnectionConductor.java | 9 +- .../md/core/ConnectionConductorFactory.java | 19 +- .../md/core/ConnectionConductorImpl.java | 75 ++++-- .../md/core/HandshakeManagerImpl.java | 5 +- .../openflow/md/core/MDController.java | 2 +- .../md/core/SwitchConnectionHandlerImpl.java | 16 +- .../md/core/ThreadPoolLoggingExecutor.java | 11 +- .../md/core/session/SessionManagerOFImpl.java | 20 +- .../openflow/md/queue/Enqueuer.java | 28 ++ ...ionExtractor.java => HarvesterHandle.java} | 16 +- .../queue/MessageSourcePollRegistration.java | 34 +++ .../queue/MessageSourcePollRegistrator.java | 39 +++ .../openflow/md/queue/QueueItem.java | 32 +++ .../openflow/md/queue/QueueItemOFImpl.java | 51 ++++ .../openflow/md/queue/QueueKeeper.java | 24 +- .../openflow/md/queue/QueueKeeperFactory.java | 34 +++ .../md/queue/QueueKeeperFairImpl.java | 111 ++++++++ .../md/queue/QueueKeeperHarvester.java | 90 +++++++ .../md/queue/QueueKeeperLightImpl.java | 239 ----------------- .../openflow/md/queue/QueueProcessor.java | 42 +++ .../md/queue/QueueProcessorLightImpl.java | 241 ++++++++++++++++++ .../md/queue/RegisteredTypeExtractor.java | 22 -- .../openflow/md/queue/Ticket.java | 6 + .../openflow/md/queue/TicketFinisher.java | 63 +---- .../openflow/md/queue/TicketFinisherImpl.java | 80 ++++++ .../openflow/md/queue/TicketImpl.java | 33 ++- .../md/queue/TicketProcessorFactory.java | 125 ++------- .../md/queue/TicketProcessorFactoryImpl.java | 178 +++++++++++++ .../openflow/md/queue/TicketResult.java | 10 + .../md/util/PollableQueuesZipper.java | 66 +++++ .../md/core/ConnectionConductorImplTest.java | 28 +- .../MessageDispatchServiceImplTest.java | 10 +- .../md/util/PollableQueuesZipperTest.java | 56 ++++ 33 files changed, 1303 insertions(+), 512 deletions(-) create mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/Enqueuer.java rename openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/{VersionExtractor.java => HarvesterHandle.java} (57%) create mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSourcePollRegistration.java create mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSourcePollRegistrator.java create mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueItem.java create mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueItemOFImpl.java create mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFactory.java create mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFairImpl.java create mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperHarvester.java delete mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java create mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessor.java create mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessorLightImpl.java delete mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/RegisteredTypeExtractor.java create mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketFinisherImpl.java create mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketProcessorFactoryImpl.java create mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesZipper.java create mode 100644 openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesZipperTest.java diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductor.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductor.java index 668bb57515..e5720cb90d 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductor.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductor.java @@ -13,7 +13,7 @@ import java.util.concurrent.Future; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; -import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper; +import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.opendaylight.yangtools.yang.binding.DataObject; @@ -98,11 +98,16 @@ public interface ConnectionConductor { * assign global queueKeeper * @param queueKeeper */ - void setQueueKeeper(QueueKeeper queueKeeper); + void setQueueProcessor(QueueProcessor queueKeeper); /** * @param errorHandler for internal exception handling */ void setErrorHandler(ErrorHandler errorHandler); + /** + * @param conductorId + */ + void setId(int conductorId); + } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorFactory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorFactory.java index 3373761fa3..2bd97db38e 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorFactory.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorFactory.java @@ -8,8 +8,10 @@ package org.opendaylight.openflowplugin.openflow.md.core; +import java.util.concurrent.atomic.AtomicInteger; + import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; -import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper; +import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.opendaylight.yangtools.yang.binding.DataObject; @@ -17,17 +19,24 @@ import org.opendaylight.yangtools.yang.binding.DataObject; * @author mirehak * */ -public abstract class ConnectionConductorFactory { +public final class ConnectionConductorFactory { + + private static AtomicInteger conductorId = new AtomicInteger(); + + private ConnectionConductorFactory() { + throw new UnsupportedOperationException(); + } /** * @param connectionAdapter - * @param queueKeeper + * @param queueProcessor * @return conductor for given connection */ public static ConnectionConductor createConductor(ConnectionAdapter connectionAdapter, - QueueKeeper queueKeeper) { + QueueProcessor queueProcessor) { ConnectionConductor connectionConductor = new ConnectionConductorImpl(connectionAdapter); - connectionConductor.setQueueKeeper(queueKeeper); + connectionConductor.setQueueProcessor(queueProcessor); + connectionConductor.setId(conductorId.getAndIncrement()); connectionConductor.init(); return connectionConductor; } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java index 31e4c58140..81b22925af 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java @@ -21,6 +21,9 @@ import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager; import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper; +import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType; +import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory; +import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder; @@ -61,6 +64,9 @@ import com.google.common.util.concurrent.Futures; public class ConnectionConductorImpl implements OpenflowProtocolListener, SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener { + /** ingress queue limit */ + private static final int INGRESS_QUEUE_MAX_SIZE = 200; + protected static final Logger LOG = LoggerFactory .getLogger(ConnectionConductorImpl.class); @@ -79,7 +85,8 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, protected SessionContext sessionContext; - private QueueKeeper queueKeeper; + private QueueProcessor queueProcessor; + private QueueKeeper queue; private ThreadPoolExecutor hsPool; private HandshakeManager handshakeManager; @@ -87,15 +94,26 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, private PortFeaturesUtil portFeaturesUtils; + private int conductorId; + + private int ingressMaxQueueSize; + + /** * @param connectionAdapter */ public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) { + this(connectionAdapter, INGRESS_QUEUE_MAX_SIZE); + } + + /** + * @param connectionAdapter + * @param ingressMaxQueueSize ingress queue limit (blocking) + */ + public ConnectionConductorImpl(ConnectionAdapter connectionAdapter, int ingressMaxQueueSize) { this.connectionAdapter = connectionAdapter; + this.ingressMaxQueueSize = ingressMaxQueueSize; conductorState = CONDUCTOR_STATE.HANDSHAKING; - int handshakeThreadLimit = 1; - hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit , handshakeThreadLimit, 0L, - TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); firstHelloProcessed = false; handshakeManager = new HandshakeManagerImpl(connectionAdapter, ConnectionConductor.versionOrder.get(0), ConnectionConductor.versionOrder); @@ -106,14 +124,20 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void init() { + int handshakeThreadLimit = 1; + hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit , handshakeThreadLimit, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), + "OFHandshake-"+conductorId); + connectionAdapter.setMessageListener(this); connectionAdapter.setSystemListener(this); connectionAdapter.setConnectionReadyListener(this); + queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor, ingressMaxQueueSize); } @Override - public void setQueueKeeper(QueueKeeper queueKeeper) { - this.queueKeeper = queueKeeper; + public void setQueueProcessor(QueueProcessor queueProcessor) { + this.queueProcessor = queueProcessor; } /** @@ -143,17 +167,33 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void onErrorMessage(ErrorMessage errorMessage) { - queueKeeper.push(errorMessage, this); + enqueueMessage(errorMessage); + } + + + /** + * @param message + */ + private void enqueueMessage(OfHeader message) { + enqueueMessage(message, QueueType.DEFAULT); + } + + /** + * @param message + * @param queueType enqueue type + */ + private void enqueueMessage(OfHeader message, QueueType queueType) { + queue.push(message, this, queueType); } @Override public void onExperimenterMessage(ExperimenterMessage experimenterMessage) { - queueKeeper.push(experimenterMessage, this); + enqueueMessage(experimenterMessage); } @Override public void onFlowRemovedMessage(FlowRemovedMessage message) { - queueKeeper.push(message, this); + enqueueMessage(message); } @@ -167,7 +207,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, * 4. If Hello message received again with not supported version, just disconnect. */ @Override - public synchronized void onHelloMessage(final HelloMessage hello) { + public void onHelloMessage(final HelloMessage hello) { LOG.debug("processing HELLO.xid: {}", hello.getXid()); firstHelloProcessed = true; checkState(CONDUCTOR_STATE.HANDSHAKING); @@ -194,18 +234,18 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void onMultipartReplyMessage(MultipartReplyMessage message) { - queueKeeper.push(message, this); + enqueueMessage(message); } @Override public void onPacketInMessage(PacketInMessage message) { - queueKeeper.push(message, this, QueueKeeper.QueueType.UNORDERED); + enqueueMessage(message, QueueKeeper.QueueType.UNORDERED); } @Override public void onPortStatusMessage(PortStatusMessage message) { processPortStatusMsg(message); - queueKeeper.push(message, this); + enqueueMessage(message); } protected void processPortStatusMsg(PortStatus msg) { @@ -361,7 +401,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, } @Override - public synchronized void onConnectionReady() { + public void onConnectionReady() { LOG.debug("connection is ready-to-use"); if (! firstHelloProcessed) { HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper( @@ -386,7 +426,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, // Because the GetFeaturesOutput contains information about the port // in OF1.0 (that we would otherwise get from the PortDesc) we have to pass // it up for parsing to convert into a NodeConnectorUpdate - queueKeeper.push(featureOutput, this); + enqueueMessage(featureOutput); } requestDesc(); @@ -472,4 +512,9 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, hsPool.shutdownNow(); LOG.debug("pool is terminated: {}", hsPool.isTerminated()); } + + @Override + public void setId(int conductorId) { + this.conductorId = conductorId; + } } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/HandshakeManagerImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/HandshakeManagerImpl.java index acd76d6965..c17b653188 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/HandshakeManagerImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/HandshakeManagerImpl.java @@ -41,7 +41,7 @@ public class HandshakeManagerImpl implements HandshakeManager { private Short version; private ErrorHandler errorHandler; - private long maxTimeout = 1000; + private long maxTimeout = 8000; private TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS; private Short highestVersion; @@ -74,7 +74,7 @@ public class HandshakeManagerImpl implements HandshakeManager { } @Override - public synchronized void shake() { + public void shake() { LOG.trace("handshake STARTED"); setActiveXid(20L); HelloMessage receivedHelloLoc = receivedHello; @@ -292,7 +292,6 @@ public class HandshakeManagerImpl implements HandshakeManager { */ protected void postHandshake(Short proposedVersion, Long xid) throws Exception { // set version - long maxTimeout = 3000; version = proposedVersion; LOG.debug("version set: {}", proposedVersion); diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java index 945b663fbd..49b554e16d 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java @@ -239,7 +239,7 @@ public class MDController implements IMDController, AutoCloseable { }; ThreadPoolLoggingExecutor rpcPool = new ThreadPoolLoggingExecutor(rpcThreadLimit, rpcThreadLimit, 0L, - TimeUnit.MILLISECONDS, queue); + TimeUnit.MILLISECONDS, queue, "OFRpc"); rpcPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/SwitchConnectionHandlerImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/SwitchConnectionHandlerImpl.java index 50d3bee271..ecb9757f85 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/SwitchConnectionHandlerImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/SwitchConnectionHandlerImpl.java @@ -16,7 +16,7 @@ import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler; import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil; import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy; -import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl; +import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessorLightImpl; import org.opendaylight.yangtools.yang.binding.DataContainer; /** @@ -26,7 +26,7 @@ public class SwitchConnectionHandlerImpl implements SwitchConnectionHandler { private ScheduledThreadPoolExecutor spyPool; - private QueueKeeperLightImpl queueKeeper; + private QueueProcessorLightImpl queueProcessor; private ErrorHandler errorHandler; private MessageSpy messageSpy; private int spyRate = 10; @@ -35,7 +35,7 @@ public class SwitchConnectionHandlerImpl implements SwitchConnectionHandler { * */ public SwitchConnectionHandlerImpl() { - queueKeeper = new QueueKeeperLightImpl(); + queueProcessor = new QueueProcessorLightImpl(); //TODO: implement shutdown invocation upon service stop event spyPool = new ScheduledThreadPoolExecutor(1); @@ -45,11 +45,11 @@ public class SwitchConnectionHandlerImpl implements SwitchConnectionHandler { * wire all up */ public void init() { - queueKeeper.setTranslatorMapping(OFSessionUtil.getTranslatorMap()); - queueKeeper.setPopListenersMapping(OFSessionUtil.getPopListenerMapping()); - queueKeeper.setMessageSpy(messageSpy); + queueProcessor.setTranslatorMapping(OFSessionUtil.getTranslatorMap()); + queueProcessor.setPopListenersMapping(OFSessionUtil.getPopListenerMapping()); + queueProcessor.setMessageSpy(messageSpy); - queueKeeper.init(); + queueProcessor.init(); spyPool.scheduleAtFixedRate(messageSpy, spyRate, spyRate, TimeUnit.SECONDS); } @@ -63,7 +63,7 @@ public class SwitchConnectionHandlerImpl implements SwitchConnectionHandler { @Override public void onSwitchConnected(ConnectionAdapter connectionAdapter) { ConnectionConductor conductor = ConnectionConductorFactory.createConductor( - connectionAdapter, queueKeeper); + connectionAdapter, queueProcessor); conductor.setErrorHandler(errorHandler); } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ThreadPoolLoggingExecutor.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ThreadPoolLoggingExecutor.java index 448f5ece92..b08bcc2027 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ThreadPoolLoggingExecutor.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ThreadPoolLoggingExecutor.java @@ -14,8 +14,10 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** - * + * threadPoolExecutor implementation logging exceptions thrown by threads */ public class ThreadPoolLoggingExecutor extends ThreadPoolExecutor { @@ -27,10 +29,13 @@ public class ThreadPoolLoggingExecutor extends ThreadPoolExecutor { * @param keepAliveTime * @param unit * @param workQueue + * @param poolName thread name prefix */ public ThreadPoolLoggingExecutor(int corePoolSize, int maximumPoolSize, - long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, + final String poolName) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + new ThreadFactoryBuilder().setNameFormat(poolName+"-%d").build()); } @Override diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java index 24ba19a676..a314018d77 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java @@ -54,9 +54,13 @@ public class SessionManagerOFImpl implements SessionManager { /** * @return singleton instance */ - public static synchronized SessionManager getInstance() { + public static SessionManager getInstance() { if (instance == null) { - instance = new SessionManagerOFImpl(); + synchronized (SessionContextOFImpl.class) { + if (instance == null) { + instance = new SessionManagerOFImpl(); + } + } } return instance; } @@ -64,9 +68,15 @@ public class SessionManagerOFImpl implements SessionManager { /** * close and release singleton instace */ - public static synchronized void releaseInstance() { - instance.close(); - instance = null; + public static void releaseInstance() { + if (instance != null) { + synchronized (SessionManagerOFImpl.class) { + if (instance != null) { + instance.close(); + instance = null; + } + } + } } private SessionManagerOFImpl() { diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/Enqueuer.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/Enqueuer.java new file mode 100644 index 0000000000..01a69d3d99 --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/Enqueuer.java @@ -0,0 +1,28 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.queue; + +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; + +/** + * @param type of queue items + */ +public interface Enqueuer { + + /** + * @param queueItem item to be enqueued + */ + void enqueueQueueItem(IN queueItem); + + /** + * @param queueItem + * @deprecated for testing and comparing purposes - this strategy blocks netty threads + */ + @Deprecated + void directProcessQueueItem(QueueItem queueItem); +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/VersionExtractor.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/HarvesterHandle.java similarity index 57% rename from openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/VersionExtractor.java rename to openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/HarvesterHandle.java index 0c9f830d84..d1cefa70ff 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/VersionExtractor.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/HarvesterHandle.java @@ -1,6 +1,6 @@ /** - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html @@ -8,15 +8,13 @@ package org.opendaylight.openflowplugin.openflow.md.queue; /** - * @author mirehak - * @param message type - * + * message harvester simple control */ -public interface VersionExtractor { +public interface HarvesterHandle { /** - * @param message - * @return version of message + * wakeup harvester in case it is in phase of starving sleep */ - Short extractVersion(T message); + void ping(); + } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSourcePollRegistration.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSourcePollRegistration.java new file mode 100644 index 0000000000..eff8189686 --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSourcePollRegistration.java @@ -0,0 +1,34 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.queue; + + +/** + * closable registration for {@link MessageSourcePollRegistrator} + * @param queue input message type + */ +public class MessageSourcePollRegistration implements AutoCloseable { + + private MessageSourcePollRegistrator messageSourceRegistry; + private IN messageSource; + + /** + * @param messageSourceRegistry + * @param messageSource + */ + public MessageSourcePollRegistration(MessageSourcePollRegistrator messageSourceRegistry, + IN messageSource) { + this.messageSourceRegistry = messageSourceRegistry; + this.messageSource = messageSource; + } + + @Override + public void close() throws Exception { + messageSourceRegistry.unregisterMessageSource(messageSource); + } +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSourcePollRegistrator.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSourcePollRegistrator.java new file mode 100644 index 0000000000..be43c831ec --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSourcePollRegistrator.java @@ -0,0 +1,39 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.queue; + +import java.util.Collection; + +/** + * @param message wrapping type + * + */ +public interface MessageSourcePollRegistrator { + + /** + * @param messageSource to read from during processing + * @return closeable registration + */ + AutoCloseable registerMessageSource(IN messageSource); + + /** + * @param messageSource to be unregistered + * @return true if successfully unregistered + */ + boolean unregisterMessageSource(IN messageSource); + + /** + * @return collection of registered message sources + */ + Collection getMessageSources(); + + /** + * @return the harvest handle + */ + HarvesterHandle getHarvesterHandle(); +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueItem.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueItem.java new file mode 100644 index 0000000000..a612d901a2 --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueItem.java @@ -0,0 +1,32 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.queue; + +import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; +import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType; + +/** + * @param input message type + */ +public interface QueueItem { + + /** + * @return wrapped message + */ + IN getMessage(); + + /** + * @return conductor the message arrived to + */ + ConnectionConductor getConnectionConductor(); + + /** + * @return queue type associated to this item + */ + QueueType getQueueType(); +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueItemOFImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueItemOFImpl.java new file mode 100644 index 0000000000..57fab66cde --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueItemOFImpl.java @@ -0,0 +1,51 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.queue; + +import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; +import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; + +/** + * QueueItem implementation based on {@link OfHeader} + */ +public class QueueItemOFImpl implements QueueItem { + + private OfHeader message; + private ConnectionConductor connectionConductor; + private QueueType queueType; + + + + /** + * @param message + * @param connectionConductor + * @param queueType + */ + public QueueItemOFImpl(OfHeader message, + ConnectionConductor connectionConductor, QueueType queueType) { + this.message = message; + this.connectionConductor = connectionConductor; + this.queueType = queueType; + } + + @Override + public OfHeader getMessage() { + return message; + } + + @Override + public ConnectionConductor getConnectionConductor() { + return connectionConductor; + } + + @Override + public QueueType getQueueType() { + return queueType; + } +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeper.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeper.java index c22a81a220..35eaacf156 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeper.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeper.java @@ -7,13 +7,7 @@ */ package org.opendaylight.openflowplugin.openflow.md.queue; -import java.util.Collection; -import java.util.List; -import java.util.Map; - import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; -import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator; -import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey; /** * This processing mechanism based on queue. Processing consists of 2 steps: translate and publish. @@ -28,7 +22,7 @@ import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey; * @param source type * @param result type */ -public interface QueueKeeper { +public interface QueueKeeper extends AutoCloseable { /** type of message enqueue */ public enum QueueType { @@ -37,18 +31,6 @@ public interface QueueKeeper { /** unordered processing - bypass fair processing */ UNORDERED} - /** - * @param translatorMapping translators for message processing - */ - void setTranslatorMapping(Map>>> translatorMapping); - - /** - * enqueue message for processing using {@link QueueType#DEFAULT} - * @param message - * @param conductor source of message - */ - void push(IN message, ConnectionConductor conductor); - /** * enqueue message for processing * @param message @@ -58,7 +40,7 @@ public interface QueueKeeper { void push(IN message, ConnectionConductor conductor, QueueType queueType); /** - * @param popListenersMapping listeners invoked when processing done + * @return oldest item from queue - if available and remove it from queue */ - void setPopListenersMapping(Map, Collection>> popListenersMapping); + QueueItem poll(); } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFactory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFactory.java new file mode 100644 index 0000000000..e023fd47ce --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFactory.java @@ -0,0 +1,34 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.queue; + +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; + +/** + * factory for {@link QueueKeeper} implementations + */ +public abstract class QueueKeeperFactory { + + /** + * @param sourceRegistrator + * @param capacity blocking queue capacity + * @return fair reading implementation of {@link QueueKeeper} + */ + @SuppressWarnings("resource") + public static QueueKeeper createFairQueueKeeper( + MessageSourcePollRegistrator> sourceRegistrator, int capacity) { + QueueKeeperFairImpl queueKeeper = new QueueKeeperFairImpl(); + queueKeeper.setCapacity(capacity); + queueKeeper.setHarvesterHandle(sourceRegistrator.getHarvesterHandle()); + queueKeeper.init(); + + AutoCloseable registration = sourceRegistrator.registerMessageSource(queueKeeper); + queueKeeper.setPollRegistration(registration); + return queueKeeper; + } +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFairImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFairImpl.java new file mode 100644 index 0000000000..30b4172786 --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperFairImpl.java @@ -0,0 +1,111 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.queue; + +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; +import org.opendaylight.openflowplugin.openflow.md.util.PollableQueuesZipper; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * QueueKeeper implementation based on {@link OfHeader} + */ +public class QueueKeeperFairImpl implements QueueKeeper { + + private static Logger LOG = LoggerFactory + .getLogger(QueueKeeperFairImpl.class); + + private Queue> queueDefault; + private BlockingQueue> queueUnordered; + private AutoCloseable pollRegistration; + private int capacity = 5000; + private HarvesterHandle harvesterHandle; + private PollableQueuesZipper> queueZipper; + + @Override + public void close() throws Exception { + Preconditions.checkNotNull(pollRegistration, "pollRegistration not available"); + pollRegistration.close(); + } + + @Override + public void push( + OfHeader message, + ConnectionConductor conductor, + QueueKeeper.QueueType queueType) { + QueueItemOFImpl qItem = new QueueItemOFImpl(message, conductor, queueType); + boolean enqueued = false; + + switch (queueType) { + case DEFAULT: + enqueued = queueDefault.offer(qItem); + break; + case UNORDERED: + enqueued = queueUnordered.offer(qItem); + break; + default: + LOG.warn("unsupported queue type: [{}] -> dropping message [{}]", queueType, message.getImplementedInterface()); + } + + if (enqueued) { + harvesterHandle.ping(); + } else { + LOG.debug("ingress throttling is use -> {}", queueType); + } + + // if enqueueing fails -> message will be dropped + } + + /** + * @return the ingressQueue + */ + @Override + public QueueItem poll() { + QueueItem nextQueueItem = queueZipper.poll(); + return nextQueueItem; + } + + /** + * @param processingRegistration the processingRegistration to set + */ + public void setPollRegistration(AutoCloseable processingRegistration) { + this.pollRegistration = processingRegistration; + } + + /** + * @param capacity the capacity of internal blocking queue + */ + public void setCapacity(int capacity) { + this.capacity = capacity; + } + + /** + * init blocking queue + */ + public void init() { + queueUnordered = new ArrayBlockingQueue<>(capacity); + queueDefault = new ArrayBlockingQueue<>(capacity); + queueZipper = new PollableQueuesZipper<>(); + queueZipper.addSource(queueDefault); + queueZipper.addSource(queueUnordered); + } + + /** + * @param harvesterHandle + */ + public void setHarvesterHandle(HarvesterHandle harvesterHandle) { + this.harvesterHandle = harvesterHandle; + } +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperHarvester.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperHarvester.java new file mode 100644 index 0000000000..e362aeaa6d --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperHarvester.java @@ -0,0 +1,90 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.queue; + +import java.util.Collection; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @param + * + */ +public class QueueKeeperHarvester implements Runnable, HarvesterHandle { + private static Logger LOG = LoggerFactory.getLogger(QueueKeeperHarvester.class); + + private Enqueuer> enqueuer; + private Collection> messageSources; + + private boolean finishing = false; + private boolean starving; + + private Object harvestLock; + + + /** + * @param enqueuer + * @param messageSources + * @param harvestLock + */ + public QueueKeeperHarvester(Enqueuer> enqueuer, + Collection> messageSources) { + this.enqueuer = enqueuer; + this.messageSources = messageSources; + harvestLock = new Object(); + } + + @Override + public void run() { + while (! finishing ) { + starving = true; + for (QueueKeeper source : messageSources) { + QueueItem qItem = source.poll(); + if (qItem != null) { + starving = false; + enqueuer.enqueueQueueItem(qItem); + } + } + + if (starving) { + synchronized (harvestLock) { + try { + if (starving) { + LOG.trace("messageHarvester is about to make a starve sleep"); + harvestLock.wait(); + LOG.trace("messageHarvester is waking up from a starve sleep"); + } + } catch (InterruptedException e) { + LOG.warn("message harvester has been interrupted during starve sleep", e); + } + } + } + } + } + + /** + * finish harvester + */ + public void shutdown() { + this.finishing = true; + } + + @Override + public void ping() { + if (starving) { + LOG.debug("pinging message harvester in starve status"); + synchronized (harvestLock) { + if (starving) { + starving = false; + harvestLock.notify(); + } + } + } + } +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java deleted file mode 100644 index 04a7ad78e9..0000000000 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java +++ /dev/null @@ -1,239 +0,0 @@ -/** - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.openflowplugin.openflow.md.queue; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledThreadPoolExecutor; - -import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; -import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator; -import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; -import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey; -import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; -import org.opendaylight.yangtools.yang.binding.DataContainer; -import org.opendaylight.yangtools.yang.binding.DataObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase. - *
- * There is internal thread pool of limited size ({@link QueueKeeperLightImpl#setPoolSize(int)}) - * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners) - *
- * Workflow: - *
    - *
  1. upon message push ticket is created and enqueued
  2. - *
  3. available threads from internal pool translate the massage wrapped in ticket
  4. - *
  5. when translation of particular message is finished, result is set in future result of wrapping ticket
    - * (order of tickets in queue is not touched during translate) - *
  6. - *
  7. at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does: - *
      - *
    1. invoke blocking {@link BlockingQueue#take()} method in order to get the oldest ticket
    2. - *
    3. invoke blocking {@link Future#get()} on the dequeued ticket
    4. - *
    5. as soon as the result of translation is available, appropriate popListener is invoked
    6. - *
    - * and this way the order of messages is preserved and also multiple threads are used by translating - *
  8. - *
- * - * - */ -public class QueueKeeperLightImpl implements QueueKeeper { - - private static final Logger LOG = LoggerFactory - .getLogger(QueueKeeperLightImpl.class); - - private Map, Collection>> popListenersMapping; - private BlockingQueue> processQueue; - private ScheduledThreadPoolExecutor pool; - private int poolSize = 10; - private Map>>> translatorMapping; - private TicketProcessorFactory ticketProcessorFactory; - private MessageSpy messageSpy; - - private VersionExtractor versionExtractor = new VersionExtractor() { - @Override - public Short extractVersion(OfHeader message) { - return message.getVersion(); - } - }; - - private RegisteredTypeExtractor registeredSrcTypeExtractor = - new RegisteredTypeExtractor() { - @SuppressWarnings("unchecked") - @Override - public Class extractRegisteredType( - OfHeader message) { - return (Class) message.getImplementedInterface(); - } - }; - - private RegisteredTypeExtractor registeredOutTypeExtractor = - new RegisteredTypeExtractor() { - @SuppressWarnings("unchecked") - @Override - public Class extractRegisteredType( - DataObject message) { - return (Class) message.getImplementedInterface(); - } - }; - - /** - * prepare queue - */ - public void init() { - processQueue = new LinkedBlockingQueue<>(1000); - pool = new ScheduledThreadPoolExecutor(poolSize); - - ticketProcessorFactory = new TicketProcessorFactory<>(); - ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor); - ticketProcessorFactory.setTranslatorMapping(translatorMapping); - ticketProcessorFactory.setVersionExtractor(versionExtractor); - ticketProcessorFactory.setSpy(messageSpy); - - TicketFinisher finisher = new TicketFinisher<>( - processQueue, popListenersMapping, registeredOutTypeExtractor); - new Thread(finisher).start(); - } - - /** - * stop processing queue - */ - public void shutdown() { - pool.shutdown(); - } - - @Override - public void push(OfHeader message, ConnectionConductor conductor) { - push(message,conductor,QueueKeeper.QueueType.DEFAULT); - } - - @Override - public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) { - messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED); - if(queueType == QueueKeeper.QueueType.DEFAULT) { - TicketImpl ticket = new TicketImpl<>(); - ticket.setConductor(conductor); - ticket.setMessage(message); - LOG.debug("ticket scheduling: {}, ticket: {}", - message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket)); - try { - processQueue.put(ticket); - scheduleTicket(ticket); - } catch (InterruptedException e) { - LOG.warn("message enqueing interrupted", e); - } - } else if (queueType == QueueKeeper.QueueType.UNORDERED){ - List processedMessages = translate(message,conductor); - pop(processedMessages,conductor); - } - } - - /** - * @param ticket - */ - private void scheduleTicket(Ticket ticket) { - Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket); - pool.execute(ticketProcessor); - } - - /** - * @param poolSize the poolSize to set - */ - public void setPoolSize(int poolSize) { - this.poolSize = poolSize; - } - - @Override - public void setTranslatorMapping( - Map>>> translatorMapping) { - this.translatorMapping = translatorMapping; - } - - @Override - public void setPopListenersMapping( - Map, Collection>> popListenersMapping) { - this.popListenersMapping = popListenersMapping; - } - - /** - * @param messageSpy the messageSpy to set - */ - public void setMessageSpy(MessageSpy messageSpy) { - this.messageSpy = messageSpy; - } - - private List translate(OfHeader message, ConnectionConductor conductor) { - List result = new ArrayList<>(); - Class messageType = registeredSrcTypeExtractor.extractRegisteredType(message); - Collection>> translators = null; - LOG.debug("translating message: {}", messageType.getSimpleName()); - - Short version = versionExtractor.extractVersion(message); - if (version == null) { - throw new IllegalArgumentException("version is NULL"); - } - TranslatorKey tKey = new TranslatorKey(version, messageType.getName()); - translators = translatorMapping.get(tKey); - - LOG.debug("translatorKey: {} + {}", version, messageType.getName()); - - if (translators != null) { - for (IMDMessageTranslator> translator : translators) { - SwitchConnectionDistinguisher cookie = null; - // Pass cookie only for PACKT_IN - if (messageType.equals("PacketInMessage.class")) { - cookie = conductor.getAuxiliaryKey(); - } - List translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message); - if(translatorOutput != null) { - result.addAll(translatorOutput); - } - } - if (messageSpy != null) { - messageSpy.spyIn(message); - for (DataObject outMsg : result) { - messageSpy.spyOut(outMsg); - } - } - } else { - LOG.warn("No translators for this message Type: {}", messageType); - messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE); - } - return result; - } - - /** - * @param processedMessages - * @param conductor - */ - private void pop(List processedMessages, ConnectionConductor conductor) { - for (DataObject msg : processedMessages) { - Class registeredType = - registeredOutTypeExtractor.extractRegisteredType(msg); - Collection> popListeners = popListenersMapping.get(registeredType); - if (popListeners == null) { - LOG.warn("no popListener registered for type {}"+registeredType); - } else { - for (PopListener consumer : popListeners) { - consumer.onPop(msg); - } - } - } - } -} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessor.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessor.java new file mode 100644 index 0000000000..bd74c33a23 --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessor.java @@ -0,0 +1,42 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.queue; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; +import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator; +import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey; + +/** + * This processing mechanism based on queue. Processing consists of 2 steps: translate and publish. + * Proposed workflow (might slightly deviate in implementations): + *
    + *
  1. messages of input type are pushed in (via {@link QueueProcessor#push(Object, ConnectionConductor)} and similar)
  2. + *
  3. ticket (executable task) is build upon each pushed message and enqueued
  4. + *
  5. ticket is translated using appropriate translator
  6. + *
  7. ticket is dequeued and result is published by appropriate popListener
  8. + *
+ * Message order might be not important, e.g. when speed is of the essence + * @param source type + * @param result type + */ +public interface QueueProcessor extends MessageSourcePollRegistrator>, Enqueuer> { + + /** + * @param translatorMapping translators for message processing + */ + void setTranslatorMapping(Map>>> translatorMapping); + + /** + * @param popListenersMapping listeners invoked when processing done + */ + void setPopListenersMapping(Map, Collection>> popListenersMapping); +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessorLightImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessorLightImpl.java new file mode 100644 index 0000000000..ad62f014b1 --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueProcessorLightImpl.java @@ -0,0 +1,241 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.queue; + +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator; +import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor; +import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey; +import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; +import org.opendaylight.yangtools.yang.binding.DataContainer; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase. + *
+ * There is internal thread pool of limited size ({@link QueueProcessorLightImpl#setProcessingPoolSize(int)}) + * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners) + *
+ * Workflow: + *
    + *
  1. upon message push ticket is created and enqueued
  2. + *
  3. available threads from internal pool translate the massage wrapped in ticket
  4. + *
  5. when translation of particular message is finished, result is set in future result of wrapping ticket
    + * (order of tickets in queue is not touched during translate) + *
  6. + *
  7. at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does: + *
      + *
    1. invoke blocking {@link BlockingQueue#take()} method in order to get the oldest ticket
    2. + *
    3. invoke blocking {@link Future#get()} on the dequeued ticket
    4. + *
    5. as soon as the result of translation is available, appropriate popListener is invoked
    6. + *
    + * and this way the order of messages is preserved and also multiple threads are used by translating + *
  8. + *
+ * + * + */ +public class QueueProcessorLightImpl implements QueueProcessor { + + protected static final Logger LOG = LoggerFactory + .getLogger(QueueProcessorLightImpl.class); + + private BlockingQueue> ticketQueue; + private ThreadPoolExecutor processorPool; + private int processingPoolSize = 4; + private ExecutorService harvesterPool; + private ExecutorService finisherPool; + + protected Map, Collection>> popListenersMapping; + private Map>>> translatorMapping; + private TicketProcessorFactory ticketProcessorFactory; + private MessageSpy messageSpy; + protected Collection> messageSources; + private QueueKeeperHarvester harvester; + + protected TicketFinisher finisher; + + /** + * prepare queue + */ + public void init() { + int ticketQueueCapacity = 1500; + ticketQueue = new ArrayBlockingQueue<>(ticketQueueCapacity); + messageSources = new ConcurrentSkipListSet<>( + new Comparator>() { + @Override + public int compare(QueueKeeper o1, + QueueKeeper o2) { + return Integer.valueOf(o1.hashCode()).compareTo(o2.hashCode()); + } + }); + + processorPool = new ThreadPoolLoggingExecutor(processingPoolSize, processingPoolSize, 0, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(ticketQueueCapacity), + "OFmsgProcessor"); + // force blocking when pool queue is full + processorPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + executor.getQueue().put(r); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + }); + + harvesterPool = new ThreadPoolLoggingExecutor(1, 1, 0, + TimeUnit.MILLISECONDS, new ArrayBlockingQueue(1), "OFmsgHarvester"); + finisherPool = new ThreadPoolLoggingExecutor(1, 1, 0, + TimeUnit.MILLISECONDS, new ArrayBlockingQueue(1), "OFmsgFinisher"); + finisher = new TicketFinisherImpl( + ticketQueue, popListenersMapping); + finisherPool.execute(finisher); + + harvester = new QueueKeeperHarvester(this, messageSources); + harvesterPool.execute(harvester); + + ticketProcessorFactory = new TicketProcessorFactoryImpl(); + ticketProcessorFactory.setTranslatorMapping(translatorMapping); + ticketProcessorFactory.setSpy(messageSpy); + ticketProcessorFactory.setTicketFinisher(finisher); + } + + /** + * stop processing queue + */ + public void shutdown() { + processorPool.shutdown(); + } + + @Override + public void enqueueQueueItem(QueueItem queueItem) { + messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED); + TicketImpl ticket = new TicketImpl<>(); + ticket.setConductor(queueItem.getConnectionConductor()); + ticket.setMessage(queueItem.getMessage()); + ticket.setQueueType(queueItem.getQueueType()); + + LOG.trace("ticket scheduling: {}, ticket: {}", + queueItem.getMessage().getImplementedInterface().getSimpleName(), + System.identityHashCode(queueItem)); + scheduleTicket(ticket); + } + + + @Override + public void directProcessQueueItem(QueueItem queueItem) { + messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED); + TicketImpl ticket = new TicketImpl<>(); + ticket.setConductor(queueItem.getConnectionConductor()); + ticket.setMessage(queueItem.getMessage()); + + LOG.debug("ticket scheduling: {}, ticket: {}", + queueItem.getMessage().getImplementedInterface().getSimpleName(), + System.identityHashCode(queueItem)); + + ticketProcessorFactory.createProcessor(ticket).run(); + + // publish notification + finisher.firePopNotification(ticket.getDirectResult()); + } + + /** + * @param ticket + */ + private void scheduleTicket(Ticket ticket) { + switch (ticket.getQueueType()) { + case DEFAULT: + Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket); + processorPool.execute(ticketProcessor); + try { + ticketQueue.put(ticket); + } catch (InterruptedException e) { + LOG.warn("enqeueue of unordered message ticket failed", e); + } + break; + case UNORDERED: + Runnable ticketProcessorSync = ticketProcessorFactory.createSyncProcessor(ticket); + processorPool.execute(ticketProcessorSync); + break; + default: + LOG.warn("unsupported enqueue type: {}", ticket.getQueueType()); + } + } + + /** + * @param poolSize the poolSize to set + */ + public void setProcessingPoolSize(int poolSize) { + this.processingPoolSize = poolSize; + } + + @Override + public void setTranslatorMapping( + Map>>> translatorMapping) { + this.translatorMapping = translatorMapping; + } + + @Override + public void setPopListenersMapping( + Map, Collection>> popListenersMapping) { + this.popListenersMapping = popListenersMapping; + } + + /** + * @param messageSpy the messageSpy to set + */ + public void setMessageSpy(MessageSpy messageSpy) { + this.messageSpy = messageSpy; + } + + @Override + public AutoCloseable registerMessageSource(QueueKeeper queue) { + boolean added = messageSources.add(queue); + if (! added) { + LOG.debug("registration of message source queue failed - already registered"); + } + MessageSourcePollRegistration> queuePollRegistration = + new MessageSourcePollRegistration<>(this, queue); + return queuePollRegistration; + } + + @Override + public boolean unregisterMessageSource(QueueKeeper queue) { + return messageSources.remove(queue); + } + + @Override + public Collection> getMessageSources() { + return messageSources; + } + + @Override + public HarvesterHandle getHarvesterHandle() { + return harvester; + } +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/RegisteredTypeExtractor.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/RegisteredTypeExtractor.java deleted file mode 100644 index 4d805d7b6f..0000000000 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/RegisteredTypeExtractor.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.openflowplugin.openflow.md.queue; - -/** - * @author mirehak - * @param parent type of message - */ -public interface RegisteredTypeExtractor { - - /** - * @param message - * @return registered message type - */ - public Class extractRegisteredType(T message); - -} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/Ticket.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/Ticket.java index 195284ccde..e778d612af 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/Ticket.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/Ticket.java @@ -8,6 +8,7 @@ package org.opendaylight.openflowplugin.openflow.md.queue; import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; +import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType; /** * @author mirehak @@ -26,4 +27,9 @@ public interface Ticket extends TicketResult { * @return processed message */ IN getMessage(); + + /** + * @return queue type associated with ticket + */ + QueueType getQueueType(); } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketFinisher.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketFinisher.java index deeb4d9382..0018b2da99 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketFinisher.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketFinisher.java @@ -7,67 +7,22 @@ */ package org.opendaylight.openflowplugin.openflow.md.queue; -import java.util.Collection; import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @param result type * */ -public class TicketFinisher implements Runnable { - - private static final Logger LOG = LoggerFactory - .getLogger(TicketFinisher.class); - - private final BlockingQueue> queue; - private final Map, Collection>> popListenersMapping; - private final RegisteredTypeExtractor registeredOutTypeExtractor; +public interface TicketFinisher extends Runnable { /** - * @param queue - * @param popListenersMapping - * @param registeredOutTypeExtractor + * initiate shutdown of this worker */ - public TicketFinisher(BlockingQueue> queue, - Map, Collection>> popListenersMapping, - RegisteredTypeExtractor registeredOutTypeExtractor) { - this.queue = queue; - this.popListenersMapping = popListenersMapping; - this.registeredOutTypeExtractor = registeredOutTypeExtractor; - } - - - @Override - public void run() { - while (true) { - try { - //TODO:: handle shutdown of queue - TicketResult result = queue.take(); - long before = System.nanoTime(); - LOG.debug("finishing ticket(before): {}, {} remain in queue, {} capacity remaining", System.identityHashCode(result),queue.size(), queue.remainingCapacity()); - List processedMessages = result.getResult().get(); - long after = System.nanoTime(); - LOG.debug("finishing ticket(after): {}, {} remain in queue, {} capacity remaining, processingTime {} ns", System.identityHashCode(result),queue.size(), queue.remainingCapacity(),after-before); - for (OUT msg : processedMessages) { - Class registeredType = - registeredOutTypeExtractor.extractRegisteredType(msg); - Collection> popListeners = popListenersMapping.get(registeredType); - if (popListeners == null) { - LOG.warn("no popListener registered for type {}"+registeredType); - } else { - for (PopListener consumer : popListeners) { - consumer.onPop(msg); - } - } - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - } - } + void finish(); + + /** + * notify popListeners + * @param processedMessages + */ + void firePopNotification(List processedMessages); } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketFinisherImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketFinisherImpl.java new file mode 100644 index 0000000000..aae4cad3db --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketFinisherImpl.java @@ -0,0 +1,80 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.queue; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class TicketFinisherImpl implements TicketFinisher { + + private static final Logger LOG = LoggerFactory + .getLogger(TicketFinisherImpl.class); + + private final Map, Collection>> popListenersMapping; + + private boolean finished; + + private BlockingQueue> queue; + + /** + * @param queue + * @param popListenersMapping + */ + public TicketFinisherImpl(BlockingQueue> queue, + Map, Collection>> popListenersMapping) { + this.queue = queue; + this.popListenersMapping = popListenersMapping; + } + + @Override + public void run() { + while (! finished ) { + try { + //TODO:: handle shutdown of queue + TicketResult result = queue.take(); + List processedMessages = result.getResult().get(); + firePopNotification(processedMessages); + } catch (Exception e) { + LOG.warn("processing (translate, publish) of ticket failed", e); + } + } + } + + @Override + public void firePopNotification(List processedMessages) { + for (DataObject msg : processedMessages) { + Class registeredType = + msg.getImplementedInterface(); + Collection> popListeners = popListenersMapping.get(registeredType); + if (popListeners == null) { + LOG.warn("no popListener registered for type {}", registeredType); + } else { + for (PopListener consumer : popListeners) { + consumer.onPop(msg); + } + } + } + } + + /** + * initiate shutdown of this worker + */ + @Override + public void finish() { + finished = true; + } +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketImpl.java index 81c72ef645..7a11f3f00c 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketImpl.java @@ -10,20 +10,21 @@ package org.opendaylight.openflowplugin.openflow.md.queue; import java.util.List; import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; +import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType; import com.google.common.util.concurrent.SettableFuture; /** - * @author mirehak * @param source type * @param result type - * */ public class TicketImpl implements Ticket { private IN message; private ConnectionConductor conductor; private SettableFuture> future; + private QueueType queueType; + private List directResult; /** * default ctor @@ -66,4 +67,32 @@ public class TicketImpl implements Ticket { public void setConductor(ConnectionConductor conductor) { this.conductor = conductor; } + + /** + * @param queueType the queueType to set + */ + public void setQueueType(QueueType queueType) { + this.queueType = queueType; + } + + @Override + public QueueType getQueueType() { + return queueType; + } + + /** + * @return the directResult + */ + @Override + public List getDirectResult() { + return directResult; + } + + /** + * @param directResult the directResult to set + */ + @Override + public void setDirectResult(List directResult) { + this.directResult = directResult; + } } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketProcessorFactory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketProcessorFactory.java index ace8288829..17d7539343 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketProcessorFactory.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketProcessorFactory.java @@ -7,139 +7,52 @@ */ package org.opendaylight.openflowplugin.openflow.md.queue; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; -import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator; -import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey; import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.DataObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @param * @param */ -public class TicketProcessorFactory { - - protected static final Logger LOG = LoggerFactory - .getLogger(TicketProcessorFactory.class); - - protected VersionExtractor versionExtractor; - protected RegisteredTypeExtractor registeredTypeExtractor; - protected Map>>> translatorMapping; - protected MessageSpy spy; +public interface TicketProcessorFactory { /** - * @param versionExtractor the versionExtractor to set + * @param ticket + * @return runnable ticket processor */ - public void setVersionExtractor(VersionExtractor versionExtractor) { - this.versionExtractor = versionExtractor; - } + Runnable createProcessor(final Ticket ticket); /** - * @param registeredTypeExtractor the registeredTypeExtractor to set + * @param ticket + * @return runnable ticket processor */ - public void setRegisteredTypeExtractor( - RegisteredTypeExtractor registeredTypeExtractor) { - this.registeredTypeExtractor = registeredTypeExtractor; - } + Runnable createSyncProcessor(final Ticket ticket); /** - * @param translatorMapping the translatorMapping to set + * @param ticket + * @return translated messages + * */ - public void setTranslatorMapping( - Map>>> translatorMapping) { - this.translatorMapping = translatorMapping; - } + List translate(Ticket ticket); /** - * @param spy the spy to set + * @param ticketFinisher setter */ - public void setSpy(MessageSpy spy) { - this.spy = spy; - } - + void setTicketFinisher(TicketFinisher ticketFinisher); /** - * @param ticket - * @return runnable ticket processor + * @param spy setter */ - public Runnable createProcessor(final Ticket ticket) { - - Runnable ticketProcessor = new Runnable() { - @Override - public void run() { - LOG.debug("message received, type: {}", registeredTypeExtractor.extractRegisteredType( - ticket.getMessage()).getSimpleName()); - List translate; - try { - translate = translate(); - ticket.getResult().set(translate); - // spying on result - if (spy != null) { - spy.spyIn(ticket.getMessage()); - for (OUT outMessage : ticket.getResult().get()) { - spy.spyOut(outMessage); - } - } - } catch (Exception e) { - LOG.error("translation problem: {}", e.getMessage()); - ticket.getResult().setException(e); - } - LOG.debug("message processing done (type: {}, ticket: {})", - registeredTypeExtractor.extractRegisteredType(ticket.getMessage()).getSimpleName(), - System.identityHashCode(ticket)); - } - - /** - * - */ - private List translate() { - List result = new ArrayList<>(); - - IN message = ticket.getMessage(); - Class messageType = registeredTypeExtractor.extractRegisteredType(ticket.getMessage()); - ConnectionConductor conductor = ticket.getConductor(); - Collection>> translators = null; - LOG.debug("translating ticket: {}, ticket: {}", messageType.getSimpleName(), System.identityHashCode(ticket)); - - Short version = versionExtractor.extractVersion(message); - if (version == null) { - throw new IllegalArgumentException("version is NULL"); - } - TranslatorKey tKey = new TranslatorKey(version, messageType.getName()); - translators = translatorMapping.get(tKey); + void setSpy(MessageSpy spy); - LOG.debug("translatorKey: {} + {}", version, messageType.getName()); - - if (translators != null) { - for (IMDMessageTranslator> translator : translators) { - SwitchConnectionDistinguisher cookie = null; - // Pass cookie only for PACKT_IN - if (messageType.equals("PacketInMessage.class")) { - cookie = conductor.getAuxiliaryKey(); - } - long start = System.nanoTime(); - List translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message); - long end = System.nanoTime(); - LOG.debug("translator: {} elapsed time {} ns",translator,end-start); - if(translatorOutput != null && !translatorOutput.isEmpty()) { - result.addAll(translatorOutput); - } - } - } else { - LOG.warn("No translators for this message Type: {}", messageType); - } - return result; - } - }; - - return ticketProcessor; - } + /** + * @param translatorMapping setter + */ + void setTranslatorMapping(Map>>> translatorMapping); } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketProcessorFactoryImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketProcessorFactoryImpl.java new file mode 100644 index 0000000000..720c016617 --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketProcessorFactoryImpl.java @@ -0,0 +1,178 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.queue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; +import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator; +import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; +import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; +import org.opendaylight.yangtools.yang.binding.DataContainer; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableMap; + +/** + * OfHeader to DataObject implementation + */ +public class TicketProcessorFactoryImpl implements TicketProcessorFactory { + + protected static final Logger LOG = LoggerFactory + .getLogger(TicketProcessorFactoryImpl.class); + + protected Map>>> translatorMapping; + protected MessageSpy spy; + protected TicketFinisher ticketFinisher; + + /** + * @param translatorMapping the translatorMapping to set + */ + @Override + public void setTranslatorMapping( + Map>>> translatorMapping) { + this.translatorMapping = ImmutableMap.copyOf(translatorMapping); + } + + /** + * @param spy the spy to set + */ + @Override + public void setSpy(MessageSpy spy) { + this.spy = spy; + } + + /** + * @param ticketFinisher the finisher to set + */ + @Override + public void setTicketFinisher(TicketFinisher ticketFinisher) { + this.ticketFinisher = ticketFinisher; + } + + /** + * @param ticket + * @return runnable ticket processor + */ + @Override + public Runnable createProcessor(final Ticket ticket) { + + Runnable ticketProcessor = new Runnable() { + @Override + public void run() { + LOG.debug("message received, type: {}", ticket.getMessage().getImplementedInterface().getSimpleName()); + List translate; + try { + translate = translate(ticket); + ticket.getResult().set(translate); + ticket.setDirectResult(translate); + // spying on result + if (spy != null) { + spy.spyIn(ticket.getMessage()); + for (DataObject outMessage : translate) { + spy.spyOut(outMessage); + } + } + } catch (Exception e) { + LOG.error("translation problem: {}", e.getMessage()); + ticket.getResult().setException(e); + } + LOG.debug("message processing done (type: {}, ticket: {})", + ticket.getMessage().getImplementedInterface().getSimpleName(), + System.identityHashCode(ticket)); + } + }; + + + return ticketProcessor; + } + + /** + * @param ticket + * @return runnable ticket processor + */ + @Override + public Runnable createSyncProcessor(final Ticket ticket) { + + Runnable ticketProcessor = new Runnable() { + @Override + public void run() { + List translate; + try { + translate = translate(ticket); + // spying on result + if (spy != null) { + spy.spyIn(ticket.getMessage()); + for (DataObject outMessage : translate) { + spy.spyOut(outMessage); + } + } + ticketFinisher.firePopNotification(translate); + } catch (Exception e) { + LOG.error("translation problem: {}", e.getMessage()); + ticket.getResult().setException(e); + } + } + }; + + + return ticketProcessor; + } + + + /** + * @param ticket + * + */ + @Override + public List translate(Ticket ticket) { + List result = new ArrayList<>(); + + OfHeader message = ticket.getMessage(); + Class messageType = ticket.getMessage().getImplementedInterface(); + ConnectionConductor conductor = ticket.getConductor(); + Collection>> translators = null; + LOG.trace("translating ticket: {}, ticket: {}", messageType.getSimpleName(), System.identityHashCode(ticket)); + + Short version = message.getVersion(); + if (version == null) { + throw new IllegalArgumentException("version is NULL"); + } + TranslatorKey tKey = new TranslatorKey(version, messageType.getName()); + translators = translatorMapping.get(tKey); + + LOG.debug("translatorKey: {} + {}", version, messageType.getName()); + + if (translators != null) { + for (IMDMessageTranslator> translator : translators) { + SwitchConnectionDistinguisher cookie = null; + // Pass cookie only for PACKT_OfHeader + if (messageType.equals("PacketInMessage.class")) { + cookie = conductor.getAuxiliaryKey(); + } + long start = System.nanoTime(); + List translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message); + long end = System.nanoTime(); + LOG.trace("translator: {} elapsed time {} ns",translator,end-start); + if(translatorOutput != null && !translatorOutput.isEmpty()) { + result.addAll(translatorOutput); + } + } + } else { + LOG.warn("No translators for this message Type: {}", messageType); + } + + return result; + } +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketResult.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketResult.java index c8dfd84c07..031d26ae2c 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketResult.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketResult.java @@ -22,4 +22,14 @@ public interface TicketResult { */ SettableFuture> getResult(); + /** + * @return direct access to result + */ + List getDirectResult(); + + /** + * @param directResult setter for direct result + */ + void setDirectResult(List directResult); + } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesZipper.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesZipper.java new file mode 100644 index 0000000000..eb161727dc --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesZipper.java @@ -0,0 +1,66 @@ +/** + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.util; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; + +import com.google.common.collect.Iterators; + +/** + * Zipper groups together a list of queues and exposes one poll method. Polling iterates through + * all groups and returns first not-null result of poll method on each queue. If after polling each + * grouped queue for one time there is still null result, poll will return null. + *
+ * Iterating keeps last position so this polling is supposed to be fairly distributed. + * + * @param common item type of zipped queues + */ +public class PollableQueuesZipper { + + private List> sources; + private Iterator> cursor; + + /** + * default ctor + */ + public PollableQueuesZipper() { + sources = new ArrayList<>(); + } + + /** + * Add all member queues before first invocation of {@link PollableQueuesZipper#poll()} + * @param queue to be added to group + */ + public void addSource(Queue queue) { + sources.add(queue); + } + + /** + * @return next common product of polling member groups + */ + public T poll() { + T item = null; + if (cursor == null) { + cursor = Iterators.cycle(sources); + } + + Queue queue; + for (int i = 0; i < sources.size(); i++) { + queue = cursor.next(); + item = queue.poll(); + if (item != null) { + break; + } + } + + return item; + } +} diff --git a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImplTest.java b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImplTest.java index 6f2e3a6b50..2ded7ad0e8 100644 --- a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImplTest.java +++ b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImplTest.java @@ -35,7 +35,7 @@ import org.opendaylight.openflowplugin.openflow.md.core.plan.SwitchTestEvent; import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy; import org.opendaylight.openflowplugin.openflow.md.queue.PopListener; -import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl; +import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessorLightImpl; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.ErrorType; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortFeatures; @@ -79,7 +79,7 @@ public class ConnectionConductorImplTest { private ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor( 8); - private QueueKeeperLightImpl queueKeeper; + private QueueProcessorLightImpl queueProcessor; private PopListener popListener; @@ -136,24 +136,24 @@ public class ConnectionConductorImplTest { popListener = new PopListenerCountingImpl<>(); - queueKeeper = new QueueKeeperLightImpl(); - queueKeeper.setMessageSpy(messageSpy); + controller = new MDController(); + controller.init(); + controller.getMessageTranslators().putAll(assembleTranslatorMapping()); + + queueProcessor = new QueueProcessorLightImpl(); + queueProcessor.setMessageSpy(messageSpy); + queueProcessor.setPopListenersMapping(assemblePopListenerMapping()); + queueProcessor.setTranslatorMapping(controller.getMessageTranslators()); + queueProcessor.init(); connectionConductor = new ConnectionConductorImpl(adapter); - connectionConductor.setQueueKeeper(queueKeeper); - connectionConductor.init(); + connectionConductor.setQueueProcessor(queueProcessor); connectionConductor.setErrorHandler(errorHandler); - controller = new MDController(); - controller.init(); - queueKeeper.setTranslatorMapping(controller.getMessageTranslators()); + connectionConductor.init(); eventPlan = new Stack<>(); adapter.setEventPlan(eventPlan); adapter.setProceedTimeout(5000L); adapter.checkListeners(); - - controller.getMessageTranslators().putAll(assembleTranslatorMapping()); - queueKeeper.setPopListenersMapping(assemblePopListenerMapping()); - queueKeeper.init(); } /** @@ -176,7 +176,7 @@ public class ConnectionConductorImplTest { if (libSimulation != null) { libSimulation.join(); } - queueKeeper.shutdown(); + queueProcessor.shutdown(); connectionConductor.shutdownPool(); for (Exception problem : adapter.getOccuredExceptions()) { diff --git a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImplTest.java b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImplTest.java index 8e45eef1c2..d01b8f8e4b 100644 --- a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImplTest.java +++ b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImplTest.java @@ -23,7 +23,7 @@ import org.opendaylight.openflowplugin.openflow.md.ModelDrivenSwitch; import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; import org.opendaylight.openflowplugin.openflow.md.core.ErrorHandler; import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; -import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper; +import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput; @@ -397,7 +397,7 @@ class MockConnectionConductor implements ConnectionConductor { } @Override - public void setQueueKeeper(QueueKeeper queueKeeper) { + public void setQueueProcessor(QueueProcessor queueKeeper) { // NOOP } @@ -405,6 +405,11 @@ class MockConnectionConductor implements ConnectionConductor { public void setErrorHandler(ErrorHandler errorHandler) { // NOOP } + + @Override + public void setId(int conductorId) { + // NOOP + } } enum MessageType { @@ -590,5 +595,4 @@ class MockConnectionAdapter implements ConnectionAdapter { // TODO Auto-generated method stub return null; } - } diff --git a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesZipperTest.java b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesZipperTest.java new file mode 100644 index 0000000000..2b6ec937df --- /dev/null +++ b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/util/PollableQueuesZipperTest.java @@ -0,0 +1,56 @@ +/** + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.junit.Assert; +import org.junit.Test; +import org.opendaylight.openflowplugin.openflow.md.util.PollableQueuesZipper; + +import com.google.common.collect.Lists; + +/** + * test for {@link PollableQueuesZipper} + */ +public class PollableQueuesZipperTest { + + /** + * Test method for {@link org.opendaylight.openflowplugin.openflow.md.util.PollableQueuesZipper#poll()}. + */ + @Test + public void testPoll() { + Queue l1 = new LinkedBlockingQueue(Lists.newArrayList("1", "2", "3")); + Queue l2 = new LinkedBlockingQueue(Lists.newArrayList("a", "b", "c", "d")); + Queue l3 = new LinkedBlockingQueue(Lists.newArrayList("A", "B")); + + PollableQueuesZipper zipper = new PollableQueuesZipper<>(); + zipper.addSource(l1); + zipper.addSource(l2); + zipper.addSource(l3); + + String[] expected = new String[] { + "1", "a", "A", "2", "b", "B", "3", "c", "d", null, "XXX" + }; + List result = new ArrayList<>(); + while (true) { + String data = zipper.poll(); + result.add(data); + if (data == null) { + break; + } + } + l1.offer("XXX"); + result.add(zipper.poll()); + Assert.assertArrayEquals(expected, result.toArray()); + } + +} -- 2.36.6