import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
-import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageListener;
+import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @author mirehak
- * @param <T> result type
- *
*/
-public class QueueKeeperLightImpl<T> implements QueueKeeper<T> {
+public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(QueueKeeperLightImpl.class);
- private Set<PopListener<T>> listeners;
- private BlockingQueue<Ticket<T>> processQueue;
+ private Set<PopListener<DataObject>> listeners;
+ private BlockingQueue<TicketResult<DataObject>> processQueue;
private ScheduledThreadPoolExecutor pool;
private int poolSize = 10;
- private Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping;
+ private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, DataObject>>> translatorMapping;
+
+ private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
+ @Override
+ public Short extractVersion(OfHeader message) {
+ return message.getVersion();
+ }
+ };
/**
* prepare queue
*/
public void init() {
- listeners = Collections.synchronizedSet(new HashSet<PopListener<T>>());
+ listeners = Collections.synchronizedSet(new HashSet<PopListener<DataObject>>());
processQueue = new LinkedBlockingQueue<>(100);
pool = new ScheduledThreadPoolExecutor(poolSize);
- TicketFinisher<T> finisher = new TicketFinisher<>(processQueue, listeners);
+ TicketFinisher<DataObject> finisher = new TicketFinisher<>(processQueue, listeners);
new Thread(finisher).start();
}
}
@Override
- public void push(Class<? extends DataObject> registeredMessageType, DataObject message, ConnectionConductor conductor) {
- TicketImpl<T> ticket = new TicketImpl<>();
+ public void push(Class<? extends OfHeader> registeredMessageType, OfHeader message, ConnectionConductor conductor) {
+ TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
ticket.setConductor(conductor);
ticket.setMessage(message);
ticket.setRegisteredMessageType(registeredMessageType);
+ LOG.debug("ticket scheduling: {}, ticket: {}", registeredMessageType.getSimpleName(), System.identityHashCode(ticket));
//TODO: block if queue limit reached
processQueue.add(ticket);
scheduleTicket(ticket);
/**
* @param ticket
*/
- private void scheduleTicket(Ticket<T> ticket) {
- pool.execute(TicketProcessorFactory.createProcessor(ticket, listenerMapping));
+ private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
+ pool.execute(TicketProcessorFactory.createProcessor(ticket, versionExtractor, translatorMapping));
}
@Override
- public synchronized void addPopListener(PopListener<T> listener) {
+ public synchronized void addPopListener(PopListener<DataObject> listener) {
listeners.add(listener);
}
@Override
- public synchronized boolean removePopListener(PopListener<T> listener) {
+ public synchronized boolean removePopListener(PopListener<DataObject> listener) {
return listeners.remove(listener);
}
}
@Override
- public void setListenerMapping(
- Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping) {
- this.listenerMapping = listenerMapping;
+ public void setTranslatorMapping(
+ Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, DataObject>>> translatorMapping) {
+ this.translatorMapping = translatorMapping;
}
}