Bug 1185 :of-flow: push-mpls-action different in config and operational data store
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperLightImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.queue;
9
10 import java.util.ArrayList;
11 import java.util.Collection;
12 import java.util.List;
13 import java.util.Map;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.ScheduledThreadPoolExecutor;
18
19 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
20 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
21 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
22 import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
23 import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
25 import org.opendaylight.yangtools.yang.binding.DataContainer;
26 import org.opendaylight.yangtools.yang.binding.DataObject;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30
31 /**
32  * {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
33  * <br/>
34  * There is internal thread pool of limited size ({@link QueueKeeperLightImpl#setPoolSize(int)}) 
35  * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners)
36  * <br/>
37  * Workflow:
38  * <ol>
39  * <li>upon message push ticket is created and enqueued</li>
40  * <li>available threads from internal pool translate the massage wrapped in ticket</li>
41  * <li>when translation of particular message is finished, result is set in future result of wrapping ticket</br>
42  *     (order of tickets in queue is not touched during translate)
43  * </li>
44  * <li>at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does:
45  *    <ol>
46  *      <li>invoke blocking {@link BlockingQueue#take()} method in order to get the oldest ticket</li>
47  *      <li>invoke blocking {@link Future#get()} on the dequeued ticket</li>
48  *      <li>as soon as the result of translation is available, appropriate popListener is invoked</li>
49  *    </ol>
50  *    and this way the order of messages is preserved and also multiple threads are used by translating 
51  * </li>
52  * </ol>
53  * 
54  * 
55  */
56 public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
57
58     private static final Logger LOG = LoggerFactory
59             .getLogger(QueueKeeperLightImpl.class);
60
61     private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
62     private BlockingQueue<TicketResult<DataObject>> processQueue;
63     private ScheduledThreadPoolExecutor pool;
64     private int poolSize = 10;
65     private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
66     private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
67     private MessageSpy<DataContainer> messageSpy;
68
69     private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
70         @Override
71         public Short extractVersion(OfHeader message) {
72             return message.getVersion();
73         }
74     };
75
76     private RegisteredTypeExtractor<OfHeader> registeredSrcTypeExtractor =
77             new RegisteredTypeExtractor<OfHeader>() {
78         @SuppressWarnings("unchecked")
79         @Override
80         public Class<? extends OfHeader> extractRegisteredType(
81                 OfHeader message) {
82             return (Class<? extends OfHeader>) message.getImplementedInterface();
83         }
84     };
85
86     private RegisteredTypeExtractor<DataObject> registeredOutTypeExtractor =
87             new RegisteredTypeExtractor<DataObject>() {
88         @SuppressWarnings("unchecked")
89         @Override
90         public Class<? extends DataObject> extractRegisteredType(
91                 DataObject message) {
92             return (Class<? extends DataObject>) message.getImplementedInterface();
93         }
94     };
95
96     /**
97      * prepare queue
98      */
99     public void init() {
100         processQueue = new LinkedBlockingQueue<>(1000);
101         pool = new ScheduledThreadPoolExecutor(poolSize);
102
103         ticketProcessorFactory = new TicketProcessorFactory<>();
104         ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor);
105         ticketProcessorFactory.setTranslatorMapping(translatorMapping);
106         ticketProcessorFactory.setVersionExtractor(versionExtractor);
107         ticketProcessorFactory.setSpy(messageSpy);
108
109         TicketFinisher<DataObject> finisher = new TicketFinisher<>(
110                 processQueue, popListenersMapping, registeredOutTypeExtractor);
111         new Thread(finisher).start();
112     }
113
114     /**
115      * stop processing queue
116      */
117     public void shutdown() {
118         pool.shutdown();
119     }
120
121     @Override
122     public void push(OfHeader message, ConnectionConductor conductor) {
123         push(message,conductor,QueueKeeper.QueueType.DEFAULT);
124     }
125
126     @Override
127     public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
128         messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
129         if(queueType == QueueKeeper.QueueType.DEFAULT) {
130             TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
131             ticket.setConductor(conductor);
132             ticket.setMessage(message);
133             LOG.debug("ticket scheduling: {}, ticket: {}",
134                     message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
135             try {
136                 processQueue.put(ticket);
137                 scheduleTicket(ticket);
138             } catch (InterruptedException e) {
139                 LOG.warn("message enqueing interrupted", e);
140             }
141         } else if (queueType == QueueKeeper.QueueType.UNORDERED){
142             List<DataObject> processedMessages = translate(message,conductor);
143             pop(processedMessages,conductor);
144         }
145     }
146
147     /**
148      * @param ticket
149      */
150     private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
151         Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
152         pool.execute(ticketProcessor);
153     }
154
155     /**
156      * @param poolSize the poolSize to set
157      */
158     public void setPoolSize(int poolSize) {
159         this.poolSize = poolSize;
160     }
161
162     @Override
163     public void setTranslatorMapping(
164             Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
165         this.translatorMapping = translatorMapping;
166     }
167
168     @Override
169     public void setPopListenersMapping(
170             Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
171         this.popListenersMapping = popListenersMapping;
172     }
173
174     /**
175      * @param messageSpy the messageSpy to set
176      */
177     public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
178         this.messageSpy = messageSpy;
179     }
180
181     private List<DataObject> translate(OfHeader message, ConnectionConductor conductor) {
182         List<DataObject> result = new ArrayList<>();
183         Class<? extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
184         Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> translators = null;
185         LOG.debug("translating message: {}", messageType.getSimpleName());
186
187         Short version = versionExtractor.extractVersion(message);
188         if (version == null) {
189            throw new IllegalArgumentException("version is NULL");
190         }
191         TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
192         translators = translatorMapping.get(tKey);
193
194         LOG.debug("translatorKey: {} + {}", version, messageType.getName());
195
196         if (translators != null) {
197             for (IMDMessageTranslator<OfHeader, List<DataObject>> translator : translators) {
198                 SwitchConnectionDistinguisher cookie = null;
199                 // Pass cookie only for PACKT_IN
200                 if (messageType.equals("PacketInMessage.class")) {
201                     cookie = conductor.getAuxiliaryKey();
202                 }
203                 List<DataObject> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
204                 if(translatorOutput != null) {
205                     result.addAll(translatorOutput);
206                 }
207             }
208             if (messageSpy != null) {
209                 messageSpy.spyIn(message);
210                 for (DataObject outMsg : result) {
211                     messageSpy.spyOut(outMsg);
212                 }
213             }
214         } else {
215             LOG.warn("No translators for this message Type: {}", messageType);
216             messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
217         }
218         return result;
219     }
220
221     /**
222      * @param processedMessages
223      * @param conductor
224      */
225     private void pop(List<DataObject> processedMessages, ConnectionConductor conductor) {
226         for (DataObject msg : processedMessages) {
227             Class<? extends Object> registeredType =
228                     registeredOutTypeExtractor.extractRegisteredType(msg);
229             Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
230             if (popListeners == null) {
231                 LOG.warn("no popListener registered for type {}"+registeredType);
232             } else {
233                 for (PopListener<DataObject> consumer : popListeners) {
234                     consumer.onPop(msg);
235                 }
236             }
237         }
238     }
239 }