/**
* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.openflowplugin.openflow.md.queue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
*
* There is internal thread pool of limited size ({@link QueueKeeperLightImpl#setPoolSize(int)})
* dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners)
*
* Workflow:
*
* - upon message push ticket is created and enqueued
* - available threads from internal pool translate the massage wrapped in ticket
* - when translation of particular message is finished, result is set in future result of wrapping ticket
* (order of tickets in queue is not touched during translate)
*
* - at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does:
*
* - invoke blocking {@link BlockingQueue#take()} method in order to get the oldest ticket
* - invoke blocking {@link Future#get()} on the dequeued ticket
* - as soon as the result of translation is available, appropriate popListener is invoked
*
* and this way the order of messages is preserved and also multiple threads are used by translating
*
*
*
*
*/
public class QueueKeeperLightImpl implements QueueKeeper {
private static final Logger LOG = LoggerFactory
.getLogger(QueueKeeperLightImpl.class);
private Map, Collection>> popListenersMapping;
private BlockingQueue> processQueue;
private ScheduledThreadPoolExecutor pool;
private int poolSize = 10;
private Map>>> translatorMapping;
private TicketProcessorFactory ticketProcessorFactory;
private MessageSpy messageSpy;
private VersionExtractor versionExtractor = new VersionExtractor() {
@Override
public Short extractVersion(OfHeader message) {
return message.getVersion();
}
};
private RegisteredTypeExtractor registeredSrcTypeExtractor =
new RegisteredTypeExtractor() {
@SuppressWarnings("unchecked")
@Override
public Class extends OfHeader> extractRegisteredType(
OfHeader message) {
return (Class extends OfHeader>) message.getImplementedInterface();
}
};
private RegisteredTypeExtractor registeredOutTypeExtractor =
new RegisteredTypeExtractor() {
@SuppressWarnings("unchecked")
@Override
public Class extends DataObject> extractRegisteredType(
DataObject message) {
return (Class extends DataObject>) message.getImplementedInterface();
}
};
/**
* prepare queue
*/
public void init() {
processQueue = new LinkedBlockingQueue<>(1000);
pool = new ScheduledThreadPoolExecutor(poolSize);
ticketProcessorFactory = new TicketProcessorFactory<>();
ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor);
ticketProcessorFactory.setTranslatorMapping(translatorMapping);
ticketProcessorFactory.setVersionExtractor(versionExtractor);
ticketProcessorFactory.setSpy(messageSpy);
TicketFinisher finisher = new TicketFinisher<>(
processQueue, popListenersMapping, registeredOutTypeExtractor);
new Thread(finisher).start();
}
/**
* stop processing queue
*/
public void shutdown() {
pool.shutdown();
}
@Override
public void push(OfHeader message, ConnectionConductor conductor) {
push(message,conductor,QueueKeeper.QueueType.DEFAULT);
}
@Override
public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
if(queueType == QueueKeeper.QueueType.DEFAULT) {
TicketImpl ticket = new TicketImpl<>();
ticket.setConductor(conductor);
ticket.setMessage(message);
LOG.debug("ticket scheduling: {}, ticket: {}",
message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
try {
processQueue.put(ticket);
scheduleTicket(ticket);
} catch (InterruptedException e) {
LOG.warn("message enqueing interrupted", e);
}
} else if (queueType == QueueKeeper.QueueType.UNORDERED){
List processedMessages = translate(message,conductor);
pop(processedMessages,conductor);
}
}
/**
* @param ticket
*/
private void scheduleTicket(Ticket ticket) {
Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
pool.execute(ticketProcessor);
}
/**
* @param poolSize the poolSize to set
*/
public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
}
@Override
public void setTranslatorMapping(
Map>>> translatorMapping) {
this.translatorMapping = translatorMapping;
}
@Override
public void setPopListenersMapping(
Map, Collection>> popListenersMapping) {
this.popListenersMapping = popListenersMapping;
}
/**
* @param messageSpy the messageSpy to set
*/
public void setMessageSpy(MessageSpy messageSpy) {
this.messageSpy = messageSpy;
}
private List translate(OfHeader message, ConnectionConductor conductor) {
List result = new ArrayList<>();
Class extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
Collection>> translators = null;
LOG.debug("translating message: {}", messageType.getSimpleName());
Short version = versionExtractor.extractVersion(message);
if (version == null) {
throw new IllegalArgumentException("version is NULL");
}
TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
translators = translatorMapping.get(tKey);
LOG.debug("translatorKey: {} + {}", version, messageType.getName());
if (translators != null) {
for (IMDMessageTranslator> translator : translators) {
SwitchConnectionDistinguisher cookie = null;
// Pass cookie only for PACKT_IN
if (messageType.equals("PacketInMessage.class")) {
cookie = conductor.getAuxiliaryKey();
}
List translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
if(translatorOutput != null) {
result.addAll(translatorOutput);
}
}
if (messageSpy != null) {
messageSpy.spyIn(message);
for (DataObject outMsg : result) {
messageSpy.spyOut(outMsg);
}
}
} else {
LOG.warn("No translators for this message Type: {}", messageType);
messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
}
return result;
}
/**
* @param processedMessages
* @param conductor
*/
private void pop(List processedMessages, ConnectionConductor conductor) {
for (DataObject msg : processedMessages) {
Class extends Object> registeredType =
registeredOutTypeExtractor.extractRegisteredType(msg);
Collection> popListeners = popListenersMapping.get(registeredType);
if (popListeners == null) {
LOG.warn("no popListener registered for type {}"+registeredType);
} else {
for (PopListener consumer : popListeners) {
consumer.onPop(msg);
}
}
}
}
}