import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
-import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yangtools.yang.binding.DataObject;
* assign global queueKeeper
* @param queueKeeper
*/
- void setQueueKeeper(QueueKeeper<OfHeader, DataObject> queueKeeper);
+ void setQueueProcessor(QueueProcessor<OfHeader, DataObject> queueKeeper);
/**
* @param errorHandler for internal exception handling
*/
void setErrorHandler(ErrorHandler errorHandler);
+ /**
+ * @param conductorId
+ */
+ void setId(int conductorId);
+
}
package org.opendaylight.openflowplugin.openflow.md.core;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
-import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yangtools.yang.binding.DataObject;
* @author mirehak
*
*/
-public abstract class ConnectionConductorFactory {
+public final class ConnectionConductorFactory {
+
+ private static AtomicInteger conductorId = new AtomicInteger();
+
+ private ConnectionConductorFactory() {
+ throw new UnsupportedOperationException();
+ }
/**
* @param connectionAdapter
- * @param queueKeeper
+ * @param queueProcessor
* @return conductor for given connection
*/
public static ConnectionConductor createConductor(ConnectionAdapter connectionAdapter,
- QueueKeeper<OfHeader, DataObject> queueKeeper) {
+ QueueProcessor<OfHeader, DataObject> queueProcessor) {
ConnectionConductor connectionConductor = new ConnectionConductorImpl(connectionAdapter);
- connectionConductor.setQueueKeeper(queueKeeper);
+ connectionConductor.setQueueProcessor(queueProcessor);
+ connectionConductor.setId(conductorId.getAndIncrement());
connectionConductor.init();
return connectionConductor;
}
import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
public class ConnectionConductorImpl implements OpenflowProtocolListener,
SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener {
+ /** ingress queue limit */
+ private static final int INGRESS_QUEUE_MAX_SIZE = 200;
+
protected static final Logger LOG = LoggerFactory
.getLogger(ConnectionConductorImpl.class);
protected SessionContext sessionContext;
- private QueueKeeper<OfHeader, DataObject> queueKeeper;
+ private QueueProcessor<OfHeader, DataObject> queueProcessor;
+ private QueueKeeper<OfHeader> queue;
private ThreadPoolExecutor hsPool;
private HandshakeManager handshakeManager;
private PortFeaturesUtil portFeaturesUtils;
+ private int conductorId;
+
+ private int ingressMaxQueueSize;
+
+
/**
* @param connectionAdapter
*/
public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
+ this(connectionAdapter, INGRESS_QUEUE_MAX_SIZE);
+ }
+
+ /**
+ * @param connectionAdapter
+ * @param ingressMaxQueueSize ingress queue limit (blocking)
+ */
+ public ConnectionConductorImpl(ConnectionAdapter connectionAdapter, int ingressMaxQueueSize) {
this.connectionAdapter = connectionAdapter;
+ this.ingressMaxQueueSize = ingressMaxQueueSize;
conductorState = CONDUCTOR_STATE.HANDSHAKING;
- int handshakeThreadLimit = 1;
- hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit , handshakeThreadLimit, 0L,
- TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
firstHelloProcessed = false;
handshakeManager = new HandshakeManagerImpl(connectionAdapter,
ConnectionConductor.versionOrder.get(0), ConnectionConductor.versionOrder);
@Override
public void init() {
+ int handshakeThreadLimit = 1;
+ hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit , handshakeThreadLimit, 0L,
+ TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
+ "OFHandshake-"+conductorId);
+
connectionAdapter.setMessageListener(this);
connectionAdapter.setSystemListener(this);
connectionAdapter.setConnectionReadyListener(this);
+ queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor, ingressMaxQueueSize);
}
@Override
- public void setQueueKeeper(QueueKeeper<OfHeader, DataObject> queueKeeper) {
- this.queueKeeper = queueKeeper;
+ public void setQueueProcessor(QueueProcessor<OfHeader, DataObject> queueProcessor) {
+ this.queueProcessor = queueProcessor;
}
/**
@Override
public void onErrorMessage(ErrorMessage errorMessage) {
- queueKeeper.push(errorMessage, this);
+ enqueueMessage(errorMessage);
+ }
+
+
+ /**
+ * @param message
+ */
+ private void enqueueMessage(OfHeader message) {
+ enqueueMessage(message, QueueType.DEFAULT);
+ }
+
+ /**
+ * @param message
+ * @param queueType enqueue type
+ */
+ private void enqueueMessage(OfHeader message, QueueType queueType) {
+ queue.push(message, this, queueType);
}
@Override
public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
- queueKeeper.push(experimenterMessage, this);
+ enqueueMessage(experimenterMessage);
}
@Override
public void onFlowRemovedMessage(FlowRemovedMessage message) {
- queueKeeper.push(message, this);
+ enqueueMessage(message);
}
* 4. If Hello message received again with not supported version, just disconnect.
*/
@Override
- public synchronized void onHelloMessage(final HelloMessage hello) {
+ public void onHelloMessage(final HelloMessage hello) {
LOG.debug("processing HELLO.xid: {}", hello.getXid());
firstHelloProcessed = true;
checkState(CONDUCTOR_STATE.HANDSHAKING);
@Override
public void onMultipartReplyMessage(MultipartReplyMessage message) {
- queueKeeper.push(message, this);
+ enqueueMessage(message);
}
@Override
public void onPacketInMessage(PacketInMessage message) {
- queueKeeper.push(message, this, QueueKeeper.QueueType.UNORDERED);
+ enqueueMessage(message, QueueKeeper.QueueType.UNORDERED);
}
@Override
public void onPortStatusMessage(PortStatusMessage message) {
processPortStatusMsg(message);
- queueKeeper.push(message, this);
+ enqueueMessage(message);
}
protected void processPortStatusMsg(PortStatus msg) {
}
@Override
- public synchronized void onConnectionReady() {
+ public void onConnectionReady() {
LOG.debug("connection is ready-to-use");
if (! firstHelloProcessed) {
HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
// Because the GetFeaturesOutput contains information about the port
// in OF1.0 (that we would otherwise get from the PortDesc) we have to pass
// it up for parsing to convert into a NodeConnectorUpdate
- queueKeeper.push(featureOutput, this);
+ enqueueMessage(featureOutput);
}
requestDesc();
hsPool.shutdownNow();
LOG.debug("pool is terminated: {}", hsPool.isTerminated());
}
+
+ @Override
+ public void setId(int conductorId) {
+ this.conductorId = conductorId;
+ }
}
private Short version;
private ErrorHandler errorHandler;
- private long maxTimeout = 1000;
+ private long maxTimeout = 8000;
private TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS;
private Short highestVersion;
}
@Override
- public synchronized void shake() {
+ public void shake() {
LOG.trace("handshake STARTED");
setActiveXid(20L);
HelloMessage receivedHelloLoc = receivedHello;
*/
protected void postHandshake(Short proposedVersion, Long xid) throws Exception {
// set version
- long maxTimeout = 3000;
version = proposedVersion;
LOG.debug("version set: {}", proposedVersion);
};
ThreadPoolLoggingExecutor rpcPool = new ThreadPoolLoggingExecutor(rpcThreadLimit, rpcThreadLimit, 0L,
- TimeUnit.MILLISECONDS, queue);
+ TimeUnit.MILLISECONDS, queue, "OFRpc");
rpcPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
-import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessorLightImpl;
import org.opendaylight.yangtools.yang.binding.DataContainer;
/**
private ScheduledThreadPoolExecutor spyPool;
- private QueueKeeperLightImpl queueKeeper;
+ private QueueProcessorLightImpl queueProcessor;
private ErrorHandler errorHandler;
private MessageSpy<DataContainer> messageSpy;
private int spyRate = 10;
*
*/
public SwitchConnectionHandlerImpl() {
- queueKeeper = new QueueKeeperLightImpl();
+ queueProcessor = new QueueProcessorLightImpl();
//TODO: implement shutdown invocation upon service stop event
spyPool = new ScheduledThreadPoolExecutor(1);
* wire all up
*/
public void init() {
- queueKeeper.setTranslatorMapping(OFSessionUtil.getTranslatorMap());
- queueKeeper.setPopListenersMapping(OFSessionUtil.getPopListenerMapping());
- queueKeeper.setMessageSpy(messageSpy);
+ queueProcessor.setTranslatorMapping(OFSessionUtil.getTranslatorMap());
+ queueProcessor.setPopListenersMapping(OFSessionUtil.getPopListenerMapping());
+ queueProcessor.setMessageSpy(messageSpy);
- queueKeeper.init();
+ queueProcessor.init();
spyPool.scheduleAtFixedRate(messageSpy, spyRate, spyRate, TimeUnit.SECONDS);
}
@Override
public void onSwitchConnected(ConnectionAdapter connectionAdapter) {
ConnectionConductor conductor = ConnectionConductorFactory.createConductor(
- connectionAdapter, queueKeeper);
+ connectionAdapter, queueProcessor);
conductor.setErrorHandler(errorHandler);
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
- *
+ * threadPoolExecutor implementation logging exceptions thrown by threads
*/
public class ThreadPoolLoggingExecutor extends ThreadPoolExecutor {
* @param keepAliveTime
* @param unit
* @param workQueue
+ * @param poolName thread name prefix
*/
public ThreadPoolLoggingExecutor(int corePoolSize, int maximumPoolSize,
- long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+ long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
+ final String poolName) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+ new ThreadFactoryBuilder().setNameFormat(poolName+"-%d").build());
}
@Override
/**
* @return singleton instance
*/
- public static synchronized SessionManager getInstance() {
+ public static SessionManager getInstance() {
if (instance == null) {
- instance = new SessionManagerOFImpl();
+ synchronized (SessionContextOFImpl.class) {
+ if (instance == null) {
+ instance = new SessionManagerOFImpl();
+ }
+ }
}
return instance;
}
/**
* close and release singleton instace
*/
- public static synchronized void releaseInstance() {
- instance.close();
- instance = null;
+ public static void releaseInstance() {
+ if (instance != null) {
+ synchronized (SessionManagerOFImpl.class) {
+ if (instance != null) {
+ instance.close();
+ instance = null;
+ }
+ }
+ }
}
private SessionManagerOFImpl() {
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
+/**
+ * @param <IN> type of queue items
+ */
+public interface Enqueuer<IN> {
+
+ /**
+ * @param queueItem item to be enqueued
+ */
+ void enqueueQueueItem(IN queueItem);
+
+ /**
+ * @param queueItem
+ * @deprecated for testing and comparing purposes - this strategy blocks netty threads
+ */
+ @Deprecated
+ void directProcessQueueItem(QueueItem<OfHeader> queueItem);
+}
/**
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
package org.opendaylight.openflowplugin.openflow.md.queue;
/**
- * @author mirehak
- * @param <T> message type
- *
+ * message harvester simple control
*/
-public interface VersionExtractor<T> {
+public interface HarvesterHandle {
/**
- * @param message
- * @return version of message
+ * wakeup harvester in case it is in phase of starving sleep
*/
- Short extractVersion(T message);
+ void ping();
+
}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+
+/**
+ * closable registration for {@link MessageSourcePollRegistrator}
+ * @param <IN> queue input message type
+ */
+public class MessageSourcePollRegistration<IN> implements AutoCloseable {
+
+ private MessageSourcePollRegistrator<IN> messageSourceRegistry;
+ private IN messageSource;
+
+ /**
+ * @param messageSourceRegistry
+ * @param messageSource
+ */
+ public MessageSourcePollRegistration(MessageSourcePollRegistrator<IN> messageSourceRegistry,
+ IN messageSource) {
+ this.messageSourceRegistry = messageSourceRegistry;
+ this.messageSource = messageSource;
+ }
+
+ @Override
+ public void close() throws Exception {
+ messageSourceRegistry.unregisterMessageSource(messageSource);
+ }
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.Collection;
+
+/**
+ * @param <IN> message wrapping type
+ *
+ */
+public interface MessageSourcePollRegistrator<IN> {
+
+ /**
+ * @param messageSource to read from during processing
+ * @return closeable registration
+ */
+ AutoCloseable registerMessageSource(IN messageSource);
+
+ /**
+ * @param messageSource to be unregistered
+ * @return true if successfully unregistered
+ */
+ boolean unregisterMessageSource(IN messageSource);
+
+ /**
+ * @return collection of registered message sources
+ */
+ Collection<IN> getMessageSources();
+
+ /**
+ * @return the harvest handle
+ */
+ HarvesterHandle getHarvesterHandle();
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType;
+
+/**
+ * @param <IN> input message type
+ */
+public interface QueueItem<IN> {
+
+ /**
+ * @return wrapped message
+ */
+ IN getMessage();
+
+ /**
+ * @return conductor the message arrived to
+ */
+ ConnectionConductor getConnectionConductor();
+
+ /**
+ * @return queue type associated to this item
+ */
+ QueueType getQueueType();
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
+/**
+ * QueueItem implementation based on {@link OfHeader}
+ */
+public class QueueItemOFImpl implements QueueItem<OfHeader> {
+
+ private OfHeader message;
+ private ConnectionConductor connectionConductor;
+ private QueueType queueType;
+
+
+
+ /**
+ * @param message
+ * @param connectionConductor
+ * @param queueType
+ */
+ public QueueItemOFImpl(OfHeader message,
+ ConnectionConductor connectionConductor, QueueType queueType) {
+ this.message = message;
+ this.connectionConductor = connectionConductor;
+ this.queueType = queueType;
+ }
+
+ @Override
+ public OfHeader getMessage() {
+ return message;
+ }
+
+ @Override
+ public ConnectionConductor getConnectionConductor() {
+ return connectionConductor;
+ }
+
+ @Override
+ public QueueType getQueueType() {
+ return queueType;
+ }
+}
*/
package org.opendaylight.openflowplugin.openflow.md.queue;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
-import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
-import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
/**
* This processing mechanism based on queue. Processing consists of 2 steps: translate and publish.
* @param <IN> source type
* @param <OUT> result type
*/
-public interface QueueKeeper<IN, OUT> {
+public interface QueueKeeper<IN> extends AutoCloseable {
/** type of message enqueue */
public enum QueueType {
/** unordered processing - bypass fair processing */
UNORDERED}
- /**
- * @param translatorMapping translators for message processing
- */
- void setTranslatorMapping(Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping);
-
- /**
- * enqueue message for processing using {@link QueueType#DEFAULT}
- * @param message
- * @param conductor source of message
- */
- void push(IN message, ConnectionConductor conductor);
-
/**
* enqueue message for processing
* @param message
void push(IN message, ConnectionConductor conductor, QueueType queueType);
/**
- * @param popListenersMapping listeners invoked when processing done
+ * @return oldest item from queue - if available and remove it from queue
*/
- void setPopListenersMapping(Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping);
+ QueueItem<IN> poll();
}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
+/**
+ * factory for {@link QueueKeeper} implementations
+ */
+public abstract class QueueKeeperFactory {
+
+ /**
+ * @param sourceRegistrator
+ * @param capacity blocking queue capacity
+ * @return fair reading implementation of {@link QueueKeeper}
+ */
+ @SuppressWarnings("resource")
+ public static QueueKeeper<OfHeader> createFairQueueKeeper(
+ MessageSourcePollRegistrator<QueueKeeper<OfHeader>> sourceRegistrator, int capacity) {
+ QueueKeeperFairImpl queueKeeper = new QueueKeeperFairImpl();
+ queueKeeper.setCapacity(capacity);
+ queueKeeper.setHarvesterHandle(sourceRegistrator.getHarvesterHandle());
+ queueKeeper.init();
+
+ AutoCloseable registration = sourceRegistrator.registerMessageSource(queueKeeper);
+ queueKeeper.setPollRegistration(registration);
+ return queueKeeper;
+ }
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.util.PollableQueuesZipper;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * QueueKeeper implementation based on {@link OfHeader}
+ */
+public class QueueKeeperFairImpl implements QueueKeeper<OfHeader> {
+
+ private static Logger LOG = LoggerFactory
+ .getLogger(QueueKeeperFairImpl.class);
+
+ private Queue<QueueItem<OfHeader>> queueDefault;
+ private BlockingQueue<QueueItem<OfHeader>> queueUnordered;
+ private AutoCloseable pollRegistration;
+ private int capacity = 5000;
+ private HarvesterHandle harvesterHandle;
+ private PollableQueuesZipper<QueueItem<OfHeader>> queueZipper;
+
+ @Override
+ public void close() throws Exception {
+ Preconditions.checkNotNull(pollRegistration, "pollRegistration not available");
+ pollRegistration.close();
+ }
+
+ @Override
+ public void push(
+ OfHeader message,
+ ConnectionConductor conductor,
+ QueueKeeper.QueueType queueType) {
+ QueueItemOFImpl qItem = new QueueItemOFImpl(message, conductor, queueType);
+ boolean enqueued = false;
+
+ switch (queueType) {
+ case DEFAULT:
+ enqueued = queueDefault.offer(qItem);
+ break;
+ case UNORDERED:
+ enqueued = queueUnordered.offer(qItem);
+ break;
+ default:
+ LOG.warn("unsupported queue type: [{}] -> dropping message [{}]", queueType, message.getImplementedInterface());
+ }
+
+ if (enqueued) {
+ harvesterHandle.ping();
+ } else {
+ LOG.debug("ingress throttling is use -> {}", queueType);
+ }
+
+ // if enqueueing fails -> message will be dropped
+ }
+
+ /**
+ * @return the ingressQueue
+ */
+ @Override
+ public QueueItem<OfHeader> poll() {
+ QueueItem<OfHeader> nextQueueItem = queueZipper.poll();
+ return nextQueueItem;
+ }
+
+ /**
+ * @param processingRegistration the processingRegistration to set
+ */
+ public void setPollRegistration(AutoCloseable processingRegistration) {
+ this.pollRegistration = processingRegistration;
+ }
+
+ /**
+ * @param capacity the capacity of internal blocking queue
+ */
+ public void setCapacity(int capacity) {
+ this.capacity = capacity;
+ }
+
+ /**
+ * init blocking queue
+ */
+ public void init() {
+ queueUnordered = new ArrayBlockingQueue<>(capacity);
+ queueDefault = new ArrayBlockingQueue<>(capacity);
+ queueZipper = new PollableQueuesZipper<>();
+ queueZipper.addSource(queueDefault);
+ queueZipper.addSource(queueUnordered);
+ }
+
+ /**
+ * @param harvesterHandle
+ */
+ public void setHarvesterHandle(HarvesterHandle harvesterHandle) {
+ this.harvesterHandle = harvesterHandle;
+ }
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.Collection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @param <IN>
+ *
+ */
+public class QueueKeeperHarvester<IN> implements Runnable, HarvesterHandle {
+ private static Logger LOG = LoggerFactory.getLogger(QueueKeeperHarvester.class);
+
+ private Enqueuer<QueueItem<IN>> enqueuer;
+ private Collection<QueueKeeper<IN>> messageSources;
+
+ private boolean finishing = false;
+ private boolean starving;
+
+ private Object harvestLock;
+
+
+ /**
+ * @param enqueuer
+ * @param messageSources
+ * @param harvestLock
+ */
+ public QueueKeeperHarvester(Enqueuer<QueueItem<IN>> enqueuer,
+ Collection<QueueKeeper<IN>> messageSources) {
+ this.enqueuer = enqueuer;
+ this.messageSources = messageSources;
+ harvestLock = new Object();
+ }
+
+ @Override
+ public void run() {
+ while (! finishing ) {
+ starving = true;
+ for (QueueKeeper<IN> source : messageSources) {
+ QueueItem<IN> qItem = source.poll();
+ if (qItem != null) {
+ starving = false;
+ enqueuer.enqueueQueueItem(qItem);
+ }
+ }
+
+ if (starving) {
+ synchronized (harvestLock) {
+ try {
+ if (starving) {
+ LOG.trace("messageHarvester is about to make a starve sleep");
+ harvestLock.wait();
+ LOG.trace("messageHarvester is waking up from a starve sleep");
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("message harvester has been interrupted during starve sleep", e);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * finish harvester
+ */
+ public void shutdown() {
+ this.finishing = true;
+ }
+
+ @Override
+ public void ping() {
+ if (starving) {
+ LOG.debug("pinging message harvester in starve status");
+ synchronized (harvestLock) {
+ if (starving) {
+ starving = false;
+ harvestLock.notify();
+ }
+ }
+ }
+ }
+}
+++ /dev/null
-/**
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowplugin.openflow.md.queue;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
-import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
-import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
-import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
-import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
-import org.opendaylight.yangtools.yang.binding.DataContainer;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
- * <br/>
- * There is internal thread pool of limited size ({@link QueueKeeperLightImpl#setPoolSize(int)})
- * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners)
- * <br/>
- * Workflow:
- * <ol>
- * <li>upon message push ticket is created and enqueued</li>
- * <li>available threads from internal pool translate the massage wrapped in ticket</li>
- * <li>when translation of particular message is finished, result is set in future result of wrapping ticket</br>
- * (order of tickets in queue is not touched during translate)
- * </li>
- * <li>at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does:
- * <ol>
- * <li>invoke blocking {@link BlockingQueue#take()} method in order to get the oldest ticket</li>
- * <li>invoke blocking {@link Future#get()} on the dequeued ticket</li>
- * <li>as soon as the result of translation is available, appropriate popListener is invoked</li>
- * </ol>
- * and this way the order of messages is preserved and also multiple threads are used by translating
- * </li>
- * </ol>
- *
- *
- */
-public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
-
- private static final Logger LOG = LoggerFactory
- .getLogger(QueueKeeperLightImpl.class);
-
- private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
- private BlockingQueue<TicketResult<DataObject>> processQueue;
- private ScheduledThreadPoolExecutor pool;
- private int poolSize = 10;
- private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
- private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
- private MessageSpy<DataContainer> messageSpy;
-
- private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
- @Override
- public Short extractVersion(OfHeader message) {
- return message.getVersion();
- }
- };
-
- private RegisteredTypeExtractor<OfHeader> registeredSrcTypeExtractor =
- new RegisteredTypeExtractor<OfHeader>() {
- @SuppressWarnings("unchecked")
- @Override
- public Class<? extends OfHeader> extractRegisteredType(
- OfHeader message) {
- return (Class<? extends OfHeader>) message.getImplementedInterface();
- }
- };
-
- private RegisteredTypeExtractor<DataObject> registeredOutTypeExtractor =
- new RegisteredTypeExtractor<DataObject>() {
- @SuppressWarnings("unchecked")
- @Override
- public Class<? extends DataObject> extractRegisteredType(
- DataObject message) {
- return (Class<? extends DataObject>) message.getImplementedInterface();
- }
- };
-
- /**
- * prepare queue
- */
- public void init() {
- processQueue = new LinkedBlockingQueue<>(1000);
- pool = new ScheduledThreadPoolExecutor(poolSize);
-
- ticketProcessorFactory = new TicketProcessorFactory<>();
- ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor);
- ticketProcessorFactory.setTranslatorMapping(translatorMapping);
- ticketProcessorFactory.setVersionExtractor(versionExtractor);
- ticketProcessorFactory.setSpy(messageSpy);
-
- TicketFinisher<DataObject> finisher = new TicketFinisher<>(
- processQueue, popListenersMapping, registeredOutTypeExtractor);
- new Thread(finisher).start();
- }
-
- /**
- * stop processing queue
- */
- public void shutdown() {
- pool.shutdown();
- }
-
- @Override
- public void push(OfHeader message, ConnectionConductor conductor) {
- push(message,conductor,QueueKeeper.QueueType.DEFAULT);
- }
-
- @Override
- public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
- messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
- if(queueType == QueueKeeper.QueueType.DEFAULT) {
- TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
- ticket.setConductor(conductor);
- ticket.setMessage(message);
- LOG.debug("ticket scheduling: {}, ticket: {}",
- message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
- try {
- processQueue.put(ticket);
- scheduleTicket(ticket);
- } catch (InterruptedException e) {
- LOG.warn("message enqueing interrupted", e);
- }
- } else if (queueType == QueueKeeper.QueueType.UNORDERED){
- List<DataObject> processedMessages = translate(message,conductor);
- pop(processedMessages,conductor);
- }
- }
-
- /**
- * @param ticket
- */
- private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
- Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
- pool.execute(ticketProcessor);
- }
-
- /**
- * @param poolSize the poolSize to set
- */
- public void setPoolSize(int poolSize) {
- this.poolSize = poolSize;
- }
-
- @Override
- public void setTranslatorMapping(
- Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
- this.translatorMapping = translatorMapping;
- }
-
- @Override
- public void setPopListenersMapping(
- Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
- this.popListenersMapping = popListenersMapping;
- }
-
- /**
- * @param messageSpy the messageSpy to set
- */
- public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
- this.messageSpy = messageSpy;
- }
-
- private List<DataObject> translate(OfHeader message, ConnectionConductor conductor) {
- List<DataObject> result = new ArrayList<>();
- Class<? extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
- Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> translators = null;
- LOG.debug("translating message: {}", messageType.getSimpleName());
-
- Short version = versionExtractor.extractVersion(message);
- if (version == null) {
- throw new IllegalArgumentException("version is NULL");
- }
- TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
- translators = translatorMapping.get(tKey);
-
- LOG.debug("translatorKey: {} + {}", version, messageType.getName());
-
- if (translators != null) {
- for (IMDMessageTranslator<OfHeader, List<DataObject>> translator : translators) {
- SwitchConnectionDistinguisher cookie = null;
- // Pass cookie only for PACKT_IN
- if (messageType.equals("PacketInMessage.class")) {
- cookie = conductor.getAuxiliaryKey();
- }
- List<DataObject> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
- if(translatorOutput != null) {
- result.addAll(translatorOutput);
- }
- }
- if (messageSpy != null) {
- messageSpy.spyIn(message);
- for (DataObject outMsg : result) {
- messageSpy.spyOut(outMsg);
- }
- }
- } else {
- LOG.warn("No translators for this message Type: {}", messageType);
- messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
- }
- return result;
- }
-
- /**
- * @param processedMessages
- * @param conductor
- */
- private void pop(List<DataObject> processedMessages, ConnectionConductor conductor) {
- for (DataObject msg : processedMessages) {
- Class<? extends Object> registeredType =
- registeredOutTypeExtractor.extractRegisteredType(msg);
- Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
- if (popListeners == null) {
- LOG.warn("no popListener registered for type {}"+registeredType);
- } else {
- for (PopListener<DataObject> consumer : popListeners) {
- consumer.onPop(msg);
- }
- }
- }
- }
-}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+
+/**
+ * This processing mechanism based on queue. Processing consists of 2 steps: translate and publish.
+ * Proposed workflow (might slightly deviate in implementations):
+ * <ol>
+ * <li>messages of input type are pushed in (via {@link QueueProcessor#push(Object, ConnectionConductor)} and similar)</li>
+ * <li>ticket (executable task) is build upon each pushed message and enqueued</li>
+ * <li>ticket is translated using appropriate translator</li>
+ * <li>ticket is dequeued and result is published by appropriate popListener</li>
+ * </ol>
+ * Message order might be not important, e.g. when speed is of the essence
+ * @param <IN> source type
+ * @param <OUT> result type
+ */
+public interface QueueProcessor<IN, OUT> extends MessageSourcePollRegistrator<QueueKeeper<IN>>, Enqueuer<QueueItem<IN>> {
+
+ /**
+ * @param translatorMapping translators for message processing
+ */
+ void setTranslatorMapping(Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping);
+
+ /**
+ * @param popListenersMapping listeners invoked when processing done
+ */
+ void setPopListenersMapping(Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping);
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
+import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
+ * <br/>
+ * There is internal thread pool of limited size ({@link QueueProcessorLightImpl#setProcessingPoolSize(int)})
+ * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners)
+ * <br/>
+ * Workflow:
+ * <ol>
+ * <li>upon message push ticket is created and enqueued</li>
+ * <li>available threads from internal pool translate the massage wrapped in ticket</li>
+ * <li>when translation of particular message is finished, result is set in future result of wrapping ticket</br>
+ * (order of tickets in queue is not touched during translate)
+ * </li>
+ * <li>at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does:
+ * <ol>
+ * <li>invoke blocking {@link BlockingQueue#take()} method in order to get the oldest ticket</li>
+ * <li>invoke blocking {@link Future#get()} on the dequeued ticket</li>
+ * <li>as soon as the result of translation is available, appropriate popListener is invoked</li>
+ * </ol>
+ * and this way the order of messages is preserved and also multiple threads are used by translating
+ * </li>
+ * </ol>
+ *
+ *
+ */
+public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObject> {
+
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(QueueProcessorLightImpl.class);
+
+ private BlockingQueue<TicketResult<DataObject>> ticketQueue;
+ private ThreadPoolExecutor processorPool;
+ private int processingPoolSize = 4;
+ private ExecutorService harvesterPool;
+ private ExecutorService finisherPool;
+
+ protected Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
+ private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
+ private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
+ private MessageSpy<DataContainer> messageSpy;
+ protected Collection<QueueKeeper<OfHeader>> messageSources;
+ private QueueKeeperHarvester<OfHeader> harvester;
+
+ protected TicketFinisher<DataObject> finisher;
+
+ /**
+ * prepare queue
+ */
+ public void init() {
+ int ticketQueueCapacity = 1500;
+ ticketQueue = new ArrayBlockingQueue<>(ticketQueueCapacity);
+ messageSources = new ConcurrentSkipListSet<>(
+ new Comparator<QueueKeeper<OfHeader>>() {
+ @Override
+ public int compare(QueueKeeper<OfHeader> o1,
+ QueueKeeper<OfHeader> o2) {
+ return Integer.valueOf(o1.hashCode()).compareTo(o2.hashCode());
+ }
+ });
+
+ processorPool = new ThreadPoolLoggingExecutor(processingPoolSize, processingPoolSize, 0,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(ticketQueueCapacity),
+ "OFmsgProcessor");
+ // force blocking when pool queue is full
+ processorPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ try {
+ executor.getQueue().put(r);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ }
+ }
+ });
+
+ harvesterPool = new ThreadPoolLoggingExecutor(1, 1, 0,
+ TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "OFmsgHarvester");
+ finisherPool = new ThreadPoolLoggingExecutor(1, 1, 0,
+ TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "OFmsgFinisher");
+ finisher = new TicketFinisherImpl(
+ ticketQueue, popListenersMapping);
+ finisherPool.execute(finisher);
+
+ harvester = new QueueKeeperHarvester<OfHeader>(this, messageSources);
+ harvesterPool.execute(harvester);
+
+ ticketProcessorFactory = new TicketProcessorFactoryImpl();
+ ticketProcessorFactory.setTranslatorMapping(translatorMapping);
+ ticketProcessorFactory.setSpy(messageSpy);
+ ticketProcessorFactory.setTicketFinisher(finisher);
+ }
+
+ /**
+ * stop processing queue
+ */
+ public void shutdown() {
+ processorPool.shutdown();
+ }
+
+ @Override
+ public void enqueueQueueItem(QueueItem<OfHeader> queueItem) {
+ messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
+ TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
+ ticket.setConductor(queueItem.getConnectionConductor());
+ ticket.setMessage(queueItem.getMessage());
+ ticket.setQueueType(queueItem.getQueueType());
+
+ LOG.trace("ticket scheduling: {}, ticket: {}",
+ queueItem.getMessage().getImplementedInterface().getSimpleName(),
+ System.identityHashCode(queueItem));
+ scheduleTicket(ticket);
+ }
+
+
+ @Override
+ public void directProcessQueueItem(QueueItem<OfHeader> queueItem) {
+ messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
+ TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
+ ticket.setConductor(queueItem.getConnectionConductor());
+ ticket.setMessage(queueItem.getMessage());
+
+ LOG.debug("ticket scheduling: {}, ticket: {}",
+ queueItem.getMessage().getImplementedInterface().getSimpleName(),
+ System.identityHashCode(queueItem));
+
+ ticketProcessorFactory.createProcessor(ticket).run();
+
+ // publish notification
+ finisher.firePopNotification(ticket.getDirectResult());
+ }
+
+ /**
+ * @param ticket
+ */
+ private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
+ switch (ticket.getQueueType()) {
+ case DEFAULT:
+ Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
+ processorPool.execute(ticketProcessor);
+ try {
+ ticketQueue.put(ticket);
+ } catch (InterruptedException e) {
+ LOG.warn("enqeueue of unordered message ticket failed", e);
+ }
+ break;
+ case UNORDERED:
+ Runnable ticketProcessorSync = ticketProcessorFactory.createSyncProcessor(ticket);
+ processorPool.execute(ticketProcessorSync);
+ break;
+ default:
+ LOG.warn("unsupported enqueue type: {}", ticket.getQueueType());
+ }
+ }
+
+ /**
+ * @param poolSize the poolSize to set
+ */
+ public void setProcessingPoolSize(int poolSize) {
+ this.processingPoolSize = poolSize;
+ }
+
+ @Override
+ public void setTranslatorMapping(
+ Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
+ this.translatorMapping = translatorMapping;
+ }
+
+ @Override
+ public void setPopListenersMapping(
+ Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
+ this.popListenersMapping = popListenersMapping;
+ }
+
+ /**
+ * @param messageSpy the messageSpy to set
+ */
+ public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
+ this.messageSpy = messageSpy;
+ }
+
+ @Override
+ public AutoCloseable registerMessageSource(QueueKeeper<OfHeader> queue) {
+ boolean added = messageSources.add(queue);
+ if (! added) {
+ LOG.debug("registration of message source queue failed - already registered");
+ }
+ MessageSourcePollRegistration<QueueKeeper<OfHeader>> queuePollRegistration =
+ new MessageSourcePollRegistration<>(this, queue);
+ return queuePollRegistration;
+ }
+
+ @Override
+ public boolean unregisterMessageSource(QueueKeeper<OfHeader> queue) {
+ return messageSources.remove(queue);
+ }
+
+ @Override
+ public Collection<QueueKeeper<OfHeader>> getMessageSources() {
+ return messageSources;
+ }
+
+ @Override
+ public HarvesterHandle getHarvesterHandle() {
+ return harvester;
+ }
+}
+++ /dev/null
-/**
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowplugin.openflow.md.queue;
-
-/**
- * @author mirehak
- * @param <T> parent type of message
- */
-public interface RegisteredTypeExtractor<T> {
-
- /**
- * @param message
- * @return registered message type
- */
- public Class<? extends T> extractRegisteredType(T message);
-
-}
package org.opendaylight.openflowplugin.openflow.md.queue;
import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType;
/**
* @author mirehak
* @return processed message
*/
IN getMessage();
+
+ /**
+ * @return queue type associated with ticket
+ */
+ QueueType getQueueType();
}
*/
package org.opendaylight.openflowplugin.openflow.md.queue;
-import java.util.Collection;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* @param <OUT> result type
*
*/
-public class TicketFinisher<OUT> implements Runnable {
-
- private static final Logger LOG = LoggerFactory
- .getLogger(TicketFinisher.class);
-
- private final BlockingQueue<TicketResult<OUT>> queue;
- private final Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping;
- private final RegisteredTypeExtractor<OUT> registeredOutTypeExtractor;
+public interface TicketFinisher<OUT> extends Runnable {
/**
- * @param queue
- * @param popListenersMapping
- * @param registeredOutTypeExtractor
+ * initiate shutdown of this worker
*/
- public TicketFinisher(BlockingQueue<TicketResult<OUT>> queue,
- Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping,
- RegisteredTypeExtractor<OUT> registeredOutTypeExtractor) {
- this.queue = queue;
- this.popListenersMapping = popListenersMapping;
- this.registeredOutTypeExtractor = registeredOutTypeExtractor;
- }
-
-
- @Override
- public void run() {
- while (true) {
- try {
- //TODO:: handle shutdown of queue
- TicketResult<OUT> result = queue.take();
- long before = System.nanoTime();
- LOG.debug("finishing ticket(before): {}, {} remain in queue, {} capacity remaining", System.identityHashCode(result),queue.size(), queue.remainingCapacity());
- List<OUT> processedMessages = result.getResult().get();
- long after = System.nanoTime();
- LOG.debug("finishing ticket(after): {}, {} remain in queue, {} capacity remaining, processingTime {} ns", System.identityHashCode(result),queue.size(), queue.remainingCapacity(),after-before);
- for (OUT msg : processedMessages) {
- Class<? extends Object> registeredType =
- registeredOutTypeExtractor.extractRegisteredType(msg);
- Collection<PopListener<OUT>> popListeners = popListenersMapping.get(registeredType);
- if (popListeners == null) {
- LOG.warn("no popListener registered for type {}"+registeredType);
- } else {
- for (PopListener<OUT> consumer : popListeners) {
- consumer.onPop(msg);
- }
- }
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
- }
+ void finish();
+
+ /**
+ * notify popListeners
+ * @param processedMessages
+ */
+ void firePopNotification(List<OUT> processedMessages);
}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class TicketFinisherImpl implements TicketFinisher<DataObject> {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TicketFinisherImpl.class);
+
+ private final Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
+
+ private boolean finished;
+
+ private BlockingQueue<TicketResult<DataObject>> queue;
+
+ /**
+ * @param queue
+ * @param popListenersMapping
+ */
+ public TicketFinisherImpl(BlockingQueue<TicketResult<DataObject>> queue,
+ Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
+ this.queue = queue;
+ this.popListenersMapping = popListenersMapping;
+ }
+
+ @Override
+ public void run() {
+ while (! finished ) {
+ try {
+ //TODO:: handle shutdown of queue
+ TicketResult<DataObject> result = queue.take();
+ List<DataObject> processedMessages = result.getResult().get();
+ firePopNotification(processedMessages);
+ } catch (Exception e) {
+ LOG.warn("processing (translate, publish) of ticket failed", e);
+ }
+ }
+ }
+
+ @Override
+ public void firePopNotification(List<DataObject> processedMessages) {
+ for (DataObject msg : processedMessages) {
+ Class<? extends Object> registeredType =
+ msg.getImplementedInterface();
+ Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
+ if (popListeners == null) {
+ LOG.warn("no popListener registered for type {}", registeredType);
+ } else {
+ for (PopListener<DataObject> consumer : popListeners) {
+ consumer.onPop(msg);
+ }
+ }
+ }
+ }
+
+ /**
+ * initiate shutdown of this worker
+ */
+ @Override
+ public void finish() {
+ finished = true;
+ }
+}
import java.util.List;
import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType;
import com.google.common.util.concurrent.SettableFuture;
/**
- * @author mirehak
* @param <IN> source type
* @param <OUT> result type
- *
*/
public class TicketImpl<IN, OUT> implements Ticket<IN, OUT> {
private IN message;
private ConnectionConductor conductor;
private SettableFuture<List<OUT>> future;
+ private QueueType queueType;
+ private List<OUT> directResult;
/**
* default ctor
public void setConductor(ConnectionConductor conductor) {
this.conductor = conductor;
}
+
+ /**
+ * @param queueType the queueType to set
+ */
+ public void setQueueType(QueueType queueType) {
+ this.queueType = queueType;
+ }
+
+ @Override
+ public QueueType getQueueType() {
+ return queueType;
+ }
+
+ /**
+ * @return the directResult
+ */
+ @Override
+ public List<OUT> getDirectResult() {
+ return directResult;
+ }
+
+ /**
+ * @param directResult the directResult to set
+ */
+ @Override
+ public void setDirectResult(List<OUT> directResult) {
+ this.directResult = directResult;
+ }
}
*/
package org.opendaylight.openflowplugin.openflow.md.queue;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
-import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* @param <IN>
* @param <OUT>
*/
-public class TicketProcessorFactory<IN extends DataObject, OUT extends DataObject> {
-
- protected static final Logger LOG = LoggerFactory
- .getLogger(TicketProcessorFactory.class);
-
- protected VersionExtractor<IN> versionExtractor;
- protected RegisteredTypeExtractor<IN> registeredTypeExtractor;
- protected Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping;
- protected MessageSpy<DataContainer> spy;
+public interface TicketProcessorFactory<IN extends DataObject, OUT extends DataObject> {
/**
- * @param versionExtractor the versionExtractor to set
+ * @param ticket
+ * @return runnable ticket processor
*/
- public void setVersionExtractor(VersionExtractor<IN> versionExtractor) {
- this.versionExtractor = versionExtractor;
- }
+ Runnable createProcessor(final Ticket<IN, OUT> ticket);
/**
- * @param registeredTypeExtractor the registeredTypeExtractor to set
+ * @param ticket
+ * @return runnable ticket processor
*/
- public void setRegisteredTypeExtractor(
- RegisteredTypeExtractor<IN> registeredTypeExtractor) {
- this.registeredTypeExtractor = registeredTypeExtractor;
- }
+ Runnable createSyncProcessor(final Ticket<IN, OUT> ticket);
/**
- * @param translatorMapping the translatorMapping to set
+ * @param ticket
+ * @return translated messages
+ *
*/
- public void setTranslatorMapping(
- Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping) {
- this.translatorMapping = translatorMapping;
- }
+ List<OUT> translate(Ticket<IN, OUT> ticket);
/**
- * @param spy the spy to set
+ * @param ticketFinisher setter
*/
- public void setSpy(MessageSpy<DataContainer> spy) {
- this.spy = spy;
- }
-
+ void setTicketFinisher(TicketFinisher<OUT> ticketFinisher);
/**
- * @param ticket
- * @return runnable ticket processor
+ * @param spy setter
*/
- public Runnable createProcessor(final Ticket<IN, OUT> ticket) {
-
- Runnable ticketProcessor = new Runnable() {
- @Override
- public void run() {
- LOG.debug("message received, type: {}", registeredTypeExtractor.extractRegisteredType(
- ticket.getMessage()).getSimpleName());
- List<OUT> translate;
- try {
- translate = translate();
- ticket.getResult().set(translate);
- // spying on result
- if (spy != null) {
- spy.spyIn(ticket.getMessage());
- for (OUT outMessage : ticket.getResult().get()) {
- spy.spyOut(outMessage);
- }
- }
- } catch (Exception e) {
- LOG.error("translation problem: {}", e.getMessage());
- ticket.getResult().setException(e);
- }
- LOG.debug("message processing done (type: {}, ticket: {})",
- registeredTypeExtractor.extractRegisteredType(ticket.getMessage()).getSimpleName(),
- System.identityHashCode(ticket));
- }
-
- /**
- *
- */
- private List<OUT> translate() {
- List<OUT> result = new ArrayList<>();
-
- IN message = ticket.getMessage();
- Class<? extends IN> messageType = registeredTypeExtractor.extractRegisteredType(ticket.getMessage());
- ConnectionConductor conductor = ticket.getConductor();
- Collection<IMDMessageTranslator<IN, List<OUT>>> translators = null;
- LOG.debug("translating ticket: {}, ticket: {}", messageType.getSimpleName(), System.identityHashCode(ticket));
-
- Short version = versionExtractor.extractVersion(message);
- if (version == null) {
- throw new IllegalArgumentException("version is NULL");
- }
- TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
- translators = translatorMapping.get(tKey);
+ void setSpy(MessageSpy<DataContainer> spy);
- LOG.debug("translatorKey: {} + {}", version, messageType.getName());
-
- if (translators != null) {
- for (IMDMessageTranslator<IN, List<OUT>> translator : translators) {
- SwitchConnectionDistinguisher cookie = null;
- // Pass cookie only for PACKT_IN
- if (messageType.equals("PacketInMessage.class")) {
- cookie = conductor.getAuxiliaryKey();
- }
- long start = System.nanoTime();
- List<OUT> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
- long end = System.nanoTime();
- LOG.debug("translator: {} elapsed time {} ns",translator,end-start);
- if(translatorOutput != null && !translatorOutput.isEmpty()) {
- result.addAll(translatorOutput);
- }
- }
- } else {
- LOG.warn("No translators for this message Type: {}", messageType);
- }
- return result;
- }
- };
-
- return ticketProcessor;
- }
+ /**
+ * @param translatorMapping setter
+ */
+ void setTranslatorMapping(Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping);
}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * OfHeader to DataObject implementation
+ */
+public class TicketProcessorFactoryImpl implements TicketProcessorFactory<OfHeader, DataObject> {
+
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(TicketProcessorFactoryImpl.class);
+
+ protected Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
+ protected MessageSpy<DataContainer> spy;
+ protected TicketFinisher<DataObject> ticketFinisher;
+
+ /**
+ * @param translatorMapping the translatorMapping to set
+ */
+ @Override
+ public void setTranslatorMapping(
+ Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
+ this.translatorMapping = ImmutableMap.copyOf(translatorMapping);
+ }
+
+ /**
+ * @param spy the spy to set
+ */
+ @Override
+ public void setSpy(MessageSpy<DataContainer> spy) {
+ this.spy = spy;
+ }
+
+ /**
+ * @param ticketFinisher the finisher to set
+ */
+ @Override
+ public void setTicketFinisher(TicketFinisher<DataObject> ticketFinisher) {
+ this.ticketFinisher = ticketFinisher;
+ }
+
+ /**
+ * @param ticket
+ * @return runnable ticket processor
+ */
+ @Override
+ public Runnable createProcessor(final Ticket<OfHeader, DataObject> ticket) {
+
+ Runnable ticketProcessor = new Runnable() {
+ @Override
+ public void run() {
+ LOG.debug("message received, type: {}", ticket.getMessage().getImplementedInterface().getSimpleName());
+ List<DataObject> translate;
+ try {
+ translate = translate(ticket);
+ ticket.getResult().set(translate);
+ ticket.setDirectResult(translate);
+ // spying on result
+ if (spy != null) {
+ spy.spyIn(ticket.getMessage());
+ for (DataObject outMessage : translate) {
+ spy.spyOut(outMessage);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("translation problem: {}", e.getMessage());
+ ticket.getResult().setException(e);
+ }
+ LOG.debug("message processing done (type: {}, ticket: {})",
+ ticket.getMessage().getImplementedInterface().getSimpleName(),
+ System.identityHashCode(ticket));
+ }
+ };
+
+
+ return ticketProcessor;
+ }
+
+ /**
+ * @param ticket
+ * @return runnable ticket processor
+ */
+ @Override
+ public Runnable createSyncProcessor(final Ticket<OfHeader, DataObject> ticket) {
+
+ Runnable ticketProcessor = new Runnable() {
+ @Override
+ public void run() {
+ List<DataObject> translate;
+ try {
+ translate = translate(ticket);
+ // spying on result
+ if (spy != null) {
+ spy.spyIn(ticket.getMessage());
+ for (DataObject outMessage : translate) {
+ spy.spyOut(outMessage);
+ }
+ }
+ ticketFinisher.firePopNotification(translate);
+ } catch (Exception e) {
+ LOG.error("translation problem: {}", e.getMessage());
+ ticket.getResult().setException(e);
+ }
+ }
+ };
+
+
+ return ticketProcessor;
+ }
+
+
+ /**
+ * @param ticket
+ *
+ */
+ @Override
+ public List<DataObject> translate(Ticket<OfHeader, DataObject> ticket) {
+ List<DataObject> result = new ArrayList<>();
+
+ OfHeader message = ticket.getMessage();
+ Class<? extends DataContainer> messageType = ticket.getMessage().getImplementedInterface();
+ ConnectionConductor conductor = ticket.getConductor();
+ Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> translators = null;
+ LOG.trace("translating ticket: {}, ticket: {}", messageType.getSimpleName(), System.identityHashCode(ticket));
+
+ Short version = message.getVersion();
+ if (version == null) {
+ throw new IllegalArgumentException("version is NULL");
+ }
+ TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
+ translators = translatorMapping.get(tKey);
+
+ LOG.debug("translatorKey: {} + {}", version, messageType.getName());
+
+ if (translators != null) {
+ for (IMDMessageTranslator<OfHeader, List<DataObject>> translator : translators) {
+ SwitchConnectionDistinguisher cookie = null;
+ // Pass cookie only for PACKT_OfHeader
+ if (messageType.equals("PacketInMessage.class")) {
+ cookie = conductor.getAuxiliaryKey();
+ }
+ long start = System.nanoTime();
+ List<DataObject> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
+ long end = System.nanoTime();
+ LOG.trace("translator: {} elapsed time {} ns",translator,end-start);
+ if(translatorOutput != null && !translatorOutput.isEmpty()) {
+ result.addAll(translatorOutput);
+ }
+ }
+ } else {
+ LOG.warn("No translators for this message Type: {}", messageType);
+ }
+
+ return result;
+ }
+}
*/
SettableFuture<List<T>> getResult();
+ /**
+ * @return direct access to result
+ */
+ List<T> getDirectResult();
+
+ /**
+ * @param directResult setter for direct result
+ */
+ void setDirectResult(List<T> directResult);
+
}
--- /dev/null
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.util;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+
+import com.google.common.collect.Iterators;
+
+/**
+ * Zipper groups together a list of queues and exposes one poll method. Polling iterates through
+ * all groups and returns first not-null result of poll method on each queue. If after polling each
+ * grouped queue for one time there is still null result, poll will return null.
+ * <br/>
+ * Iterating keeps last position so this polling is supposed to be fairly distributed.
+ *
+ * @param <T> common item type of zipped queues
+ */
+public class PollableQueuesZipper<T> {
+
+ private List<Queue<T>> sources;
+ private Iterator<Queue<T>> cursor;
+
+ /**
+ * default ctor
+ */
+ public PollableQueuesZipper() {
+ sources = new ArrayList<>();
+ }
+
+ /**
+ * Add all member queues before first invocation of {@link PollableQueuesZipper#poll()}
+ * @param queue to be added to group
+ */
+ public void addSource(Queue<T> queue) {
+ sources.add(queue);
+ }
+
+ /**
+ * @return next common product of polling member groups
+ */
+ public T poll() {
+ T item = null;
+ if (cursor == null) {
+ cursor = Iterators.cycle(sources);
+ }
+
+ Queue<T> queue;
+ for (int i = 0; i < sources.size(); i++) {
+ queue = cursor.next();
+ item = queue.poll();
+ if (item != null) {
+ break;
+ }
+ }
+
+ return item;
+ }
+}
import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
import org.opendaylight.openflowplugin.openflow.md.queue.PopListener;
-import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessorLightImpl;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.ErrorType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortFeatures;
private ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
8);
- private QueueKeeperLightImpl queueKeeper;
+ private QueueProcessorLightImpl queueProcessor;
private PopListener<DataObject> popListener;
popListener = new PopListenerCountingImpl<>();
- queueKeeper = new QueueKeeperLightImpl();
- queueKeeper.setMessageSpy(messageSpy);
+ controller = new MDController();
+ controller.init();
+ controller.getMessageTranslators().putAll(assembleTranslatorMapping());
+
+ queueProcessor = new QueueProcessorLightImpl();
+ queueProcessor.setMessageSpy(messageSpy);
+ queueProcessor.setPopListenersMapping(assemblePopListenerMapping());
+ queueProcessor.setTranslatorMapping(controller.getMessageTranslators());
+ queueProcessor.init();
connectionConductor = new ConnectionConductorImpl(adapter);
- connectionConductor.setQueueKeeper(queueKeeper);
- connectionConductor.init();
+ connectionConductor.setQueueProcessor(queueProcessor);
connectionConductor.setErrorHandler(errorHandler);
- controller = new MDController();
- controller.init();
- queueKeeper.setTranslatorMapping(controller.getMessageTranslators());
+ connectionConductor.init();
eventPlan = new Stack<>();
adapter.setEventPlan(eventPlan);
adapter.setProceedTimeout(5000L);
adapter.checkListeners();
-
- controller.getMessageTranslators().putAll(assembleTranslatorMapping());
- queueKeeper.setPopListenersMapping(assemblePopListenerMapping());
- queueKeeper.init();
}
/**
if (libSimulation != null) {
libSimulation.join();
}
- queueKeeper.shutdown();
+ queueProcessor.shutdown();
connectionConductor.shutdownPool();
for (Exception problem : adapter.getOccuredExceptions()) {
import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
import org.opendaylight.openflowplugin.openflow.md.core.ErrorHandler;
import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
-import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
}
@Override
- public void setQueueKeeper(QueueKeeper<OfHeader, DataObject> queueKeeper) {
+ public void setQueueProcessor(QueueProcessor<OfHeader, DataObject> queueKeeper) {
// NOOP
}
public void setErrorHandler(ErrorHandler errorHandler) {
// NOOP
}
+
+ @Override
+ public void setId(int conductorId) {
+ // NOOP
+ }
}
enum MessageType {
// TODO Auto-generated method stub
return null;
}
-
}
--- /dev/null
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.openflowplugin.openflow.md.util.PollableQueuesZipper;
+
+import com.google.common.collect.Lists;
+
+/**
+ * test for {@link PollableQueuesZipper}
+ */
+public class PollableQueuesZipperTest {
+
+ /**
+ * Test method for {@link org.opendaylight.openflowplugin.openflow.md.util.PollableQueuesZipper#poll()}.
+ */
+ @Test
+ public void testPoll() {
+ Queue<String> l1 = new LinkedBlockingQueue<String>(Lists.newArrayList("1", "2", "3"));
+ Queue<String> l2 = new LinkedBlockingQueue<String>(Lists.newArrayList("a", "b", "c", "d"));
+ Queue<String> l3 = new LinkedBlockingQueue<String>(Lists.newArrayList("A", "B"));
+
+ PollableQueuesZipper<String> zipper = new PollableQueuesZipper<>();
+ zipper.addSource(l1);
+ zipper.addSource(l2);
+ zipper.addSource(l3);
+
+ String[] expected = new String[] {
+ "1", "a", "A", "2", "b", "B", "3", "c", "d", null, "XXX"
+ };
+ List<String> result = new ArrayList<>();
+ while (true) {
+ String data = zipper.poll();
+ result.add(data);
+ if (data == null) {
+ break;
+ }
+ }
+ l1.offer("XXX");
+ result.add(zipper.poll());
+ Assert.assertArrayEquals(expected, result.toArray());
+ }
+
+}