2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.openflow.md.queue;
10 import java.util.ArrayList;
11 import java.util.Collection;
12 import java.util.List;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import java.util.concurrent.ScheduledThreadPoolExecutor;
18 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
19 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
20 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
21 import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
22 import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
24 import org.opendaylight.yangtools.yang.binding.DataContainer;
25 import org.opendaylight.yangtools.yang.binding.DataObject;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
32 public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
34 private static final Logger LOG = LoggerFactory
35 .getLogger(QueueKeeperLightImpl.class);
37 private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
38 private BlockingQueue<TicketResult<DataObject>> processQueue;
39 private ScheduledThreadPoolExecutor pool;
40 private int poolSize = 10;
41 private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
42 private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
43 private MessageSpy<DataContainer> messageSpy;
45 private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
47 public Short extractVersion(OfHeader message) {
48 return message.getVersion();
52 private RegisteredTypeExtractor<OfHeader> registeredSrcTypeExtractor =
53 new RegisteredTypeExtractor<OfHeader>() {
54 @SuppressWarnings("unchecked")
56 public Class<? extends OfHeader> extractRegisteredType(
58 return (Class<? extends OfHeader>) message.getImplementedInterface();
62 private RegisteredTypeExtractor<DataObject> registeredOutTypeExtractor =
63 new RegisteredTypeExtractor<DataObject>() {
64 @SuppressWarnings("unchecked")
66 public Class<? extends DataObject> extractRegisteredType(
68 return (Class<? extends DataObject>) message.getImplementedInterface();
76 processQueue = new LinkedBlockingQueue<>(1000);
77 pool = new ScheduledThreadPoolExecutor(poolSize);
79 ticketProcessorFactory = new TicketProcessorFactory<>();
80 ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor);
81 ticketProcessorFactory.setTranslatorMapping(translatorMapping);
82 ticketProcessorFactory.setVersionExtractor(versionExtractor);
83 ticketProcessorFactory.setSpy(messageSpy);
85 TicketFinisher<DataObject> finisher = new TicketFinisher<>(
86 processQueue, popListenersMapping, registeredOutTypeExtractor);
87 new Thread(finisher).start();
91 * stop processing queue
93 public void shutdown() {
98 public void push(OfHeader message, ConnectionConductor conductor) {
99 push(message,conductor,QueueKeeper.QueueType.DEFAULT);
103 public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
104 messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
105 if(queueType == QueueKeeper.QueueType.DEFAULT) {
106 TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
107 ticket.setConductor(conductor);
108 ticket.setMessage(message);
109 LOG.debug("ticket scheduling: {}, ticket: {}",
110 message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
112 processQueue.put(ticket);
113 scheduleTicket(ticket);
114 } catch (InterruptedException e) {
115 LOG.warn("message enqueing interrupted", e);
117 } else if (queueType == QueueKeeper.QueueType.UNORDERED){
118 List<DataObject> processedMessages = translate(message,conductor);
119 pop(processedMessages,conductor);
126 private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
127 Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
128 pool.execute(ticketProcessor);
132 * @param poolSize the poolSize to set
134 public void setPoolSize(int poolSize) {
135 this.poolSize = poolSize;
139 public void setTranslatorMapping(
140 Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
141 this.translatorMapping = translatorMapping;
145 public void setPopListenersMapping(
146 Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
147 this.popListenersMapping = popListenersMapping;
151 * @param messageSpy the messageSpy to set
153 public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
154 this.messageSpy = messageSpy;
157 private List<DataObject> translate(OfHeader message, ConnectionConductor conductor) {
158 List<DataObject> result = new ArrayList<>();
159 Class<? extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
160 Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> translators = null;
161 LOG.debug("translating message: {}", messageType.getSimpleName());
163 Short version = versionExtractor.extractVersion(message);
164 if (version == null) {
165 throw new IllegalArgumentException("version is NULL");
167 TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
168 translators = translatorMapping.get(tKey);
170 LOG.debug("translatorKey: {} + {}", version, messageType.getName());
172 if (translators != null) {
173 for (IMDMessageTranslator<OfHeader, List<DataObject>> translator : translators) {
174 SwitchConnectionDistinguisher cookie = null;
175 // Pass cookie only for PACKT_IN
176 if (messageType.equals("PacketInMessage.class")) {
177 cookie = conductor.getAuxiliaryKey();
179 List<DataObject> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
180 if(translatorOutput != null) {
181 result.addAll(translatorOutput);
184 if (messageSpy != null) {
185 messageSpy.spyIn(message);
186 for (DataObject outMsg : result) {
187 messageSpy.spyOut(outMsg);
191 LOG.warn("No translators for this message Type: {}", messageType);
192 messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
198 * @param processedMessages
201 private void pop(List<DataObject> processedMessages, ConnectionConductor conductor) {
202 for (DataObject msg : processedMessages) {
203 Class<? extends Object> registeredType =
204 registeredOutTypeExtractor.extractRegisteredType(msg);
205 Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
206 if (popListeners == null) {
207 LOG.warn("no popListener registered for type {}"+registeredType);
209 for (PopListener<DataObject> consumer : popListeners) {