2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.openflow.md.queue;
10 import java.util.ArrayList;
11 import java.util.Collection;
12 import java.util.List;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.ScheduledThreadPoolExecutor;
19 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
20 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
21 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
22 import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
23 import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
25 import org.opendaylight.yangtools.yang.binding.DataContainer;
26 import org.opendaylight.yangtools.yang.binding.DataObject;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
32 * {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
34 * There is internal thread pool of limited size ({@link QueueKeeperLightImpl#setPoolSize(int)})
35 * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners)
39 * <li>upon message push ticket is created and enqueued</li>
40 * <li>available threads from internal pool translate the massage wrapped in ticket</li>
41 * <li>when translation of particular message is finished, result is set in future result of wrapping ticket</br>
42 * (order of tickets in queue is not touched during translate)
44 * <li>at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does:
46 * <li>invoke blocking {@link BlockingQueue#take()} method in order to get the oldest ticket</li>
47 * <li>invoke blocking {@link Future#get()} on the dequeued ticket</li>
48 * <li>as soon as the result of translation is available, appropriate popListener is invoked</li>
50 * and this way the order of messages is preserved and also multiple threads are used by translating
56 public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
58 private static final Logger LOG = LoggerFactory
59 .getLogger(QueueKeeperLightImpl.class);
61 private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
62 private BlockingQueue<TicketResult<DataObject>> processQueue;
63 private ScheduledThreadPoolExecutor pool;
64 private int poolSize = 10;
65 private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
66 private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
67 private MessageSpy<DataContainer> messageSpy;
69 private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
71 public Short extractVersion(OfHeader message) {
72 return message.getVersion();
76 private RegisteredTypeExtractor<OfHeader> registeredSrcTypeExtractor =
77 new RegisteredTypeExtractor<OfHeader>() {
78 @SuppressWarnings("unchecked")
80 public Class<? extends OfHeader> extractRegisteredType(
82 return (Class<? extends OfHeader>) message.getImplementedInterface();
86 private RegisteredTypeExtractor<DataObject> registeredOutTypeExtractor =
87 new RegisteredTypeExtractor<DataObject>() {
88 @SuppressWarnings("unchecked")
90 public Class<? extends DataObject> extractRegisteredType(
92 return (Class<? extends DataObject>) message.getImplementedInterface();
100 processQueue = new LinkedBlockingQueue<>(1000);
101 pool = new ScheduledThreadPoolExecutor(poolSize);
103 ticketProcessorFactory = new TicketProcessorFactory<>();
104 ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor);
105 ticketProcessorFactory.setTranslatorMapping(translatorMapping);
106 ticketProcessorFactory.setVersionExtractor(versionExtractor);
107 ticketProcessorFactory.setSpy(messageSpy);
109 TicketFinisher<DataObject> finisher = new TicketFinisher<>(
110 processQueue, popListenersMapping, registeredOutTypeExtractor);
111 new Thread(finisher).start();
115 * stop processing queue
117 public void shutdown() {
122 public void push(OfHeader message, ConnectionConductor conductor) {
123 push(message,conductor,QueueKeeper.QueueType.DEFAULT);
127 public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
128 messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
129 if(queueType == QueueKeeper.QueueType.DEFAULT) {
130 TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
131 ticket.setConductor(conductor);
132 ticket.setMessage(message);
133 LOG.debug("ticket scheduling: {}, ticket: {}",
134 message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
136 processQueue.put(ticket);
137 scheduleTicket(ticket);
138 } catch (InterruptedException e) {
139 LOG.warn("message enqueing interrupted", e);
141 } else if (queueType == QueueKeeper.QueueType.UNORDERED){
142 List<DataObject> processedMessages = translate(message,conductor);
143 pop(processedMessages,conductor);
150 private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
151 Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
152 pool.execute(ticketProcessor);
156 * @param poolSize the poolSize to set
158 public void setPoolSize(int poolSize) {
159 this.poolSize = poolSize;
163 public void setTranslatorMapping(
164 Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
165 this.translatorMapping = translatorMapping;
169 public void setPopListenersMapping(
170 Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
171 this.popListenersMapping = popListenersMapping;
175 * @param messageSpy the messageSpy to set
177 public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
178 this.messageSpy = messageSpy;
181 private List<DataObject> translate(OfHeader message, ConnectionConductor conductor) {
182 List<DataObject> result = new ArrayList<>();
183 Class<? extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
184 Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> translators = null;
185 LOG.debug("translating message: {}", messageType.getSimpleName());
187 Short version = versionExtractor.extractVersion(message);
188 if (version == null) {
189 throw new IllegalArgumentException("version is NULL");
191 TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
192 translators = translatorMapping.get(tKey);
194 LOG.debug("translatorKey: {} + {}", version, messageType.getName());
196 if (translators != null) {
197 for (IMDMessageTranslator<OfHeader, List<DataObject>> translator : translators) {
198 SwitchConnectionDistinguisher cookie = null;
199 // Pass cookie only for PACKT_IN
200 if (messageType.equals("PacketInMessage.class")) {
201 cookie = conductor.getAuxiliaryKey();
203 List<DataObject> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
204 if(translatorOutput != null) {
205 result.addAll(translatorOutput);
208 if (messageSpy != null) {
209 messageSpy.spyIn(message);
210 for (DataObject outMsg : result) {
211 messageSpy.spyOut(outMsg);
215 LOG.warn("No translators for this message Type: {}", messageType);
216 messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
222 * @param processedMessages
225 private void pop(List<DataObject> processedMessages, ConnectionConductor conductor) {
226 for (DataObject msg : processedMessages) {
227 Class<? extends Object> registeredType =
228 registeredOutTypeExtractor.extractRegisteredType(msg);
229 Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
230 if (popListeners == null) {
231 LOG.warn("no popListener registered for type {}"+registeredType);
233 for (PopListener<DataObject> consumer : popListeners) {