Merge "Bug 1234: Force to use legacy OF plugin if it is installed."
[controller.git] / opendaylight / sal / implementation / src / main / java / org / opendaylight / controller / sal / implementation / internal / DataPacketService.java
1 /*
2  * Copyright (c) 2013-2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 /**
10  * @file   DataPacketService.java
11  *
12  * @brief  Implementation of Data Packet services in SAL
13  *
14  * Implementation of Data Packet services in SAL
15  */
16
17 package org.opendaylight.controller.sal.implementation.internal;
18
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.HashSet;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.CopyOnWriteArrayList;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import org.opendaylight.controller.sal.core.ConstructionException;
31 import org.opendaylight.controller.sal.core.NodeConnector;
32 import org.opendaylight.controller.sal.match.Match;
33 import org.opendaylight.controller.sal.packet.Ethernet;
34 import org.opendaylight.controller.sal.packet.IDataPacketService;
35 import org.opendaylight.controller.sal.packet.IListenDataPacket;
36 import org.opendaylight.controller.sal.packet.IPluginInDataPacketService;
37 import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService;
38 import org.opendaylight.controller.sal.packet.LinkEncap;
39 import org.opendaylight.controller.sal.packet.Packet;
40 import org.opendaylight.controller.sal.packet.PacketResult;
41 import org.opendaylight.controller.sal.packet.RawPacket;
42 import org.opendaylight.controller.sal.utils.NetUtils;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 public class DataPacketService implements IPluginOutDataPacketService,
47         IDataPacketService {
48     private int TXMAXQUEUESIZE = 1000;
49     protected static final Logger logger = LoggerFactory
50             .getLogger(DataPacketService.class);
51     /**
52      * Database that associates one NodeIDType to the
53      * IPluginDataPacketService, in fact we expect that there will be
54      * one instance of IPluginDataPacketService for each southbound
55      * plugin.
56      * Using the ConcurrentHashMap because the threads that will be
57      * adding a new service, removing a service, going through all of
58      * them maybe different.
59      */
60     private ConcurrentHashMap<String, ProtocolService<IPluginInDataPacketService>>
61         pluginInDataService =
62         new ConcurrentHashMap<String, ProtocolService<IPluginInDataPacketService>>();
63     private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
64
65     /**
66      * Queue for packets that need to be transmitted to Data Path
67      */
68     private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
69             TXMAXQUEUESIZE);
70     /**
71      * Transmission thread
72      */
73     private Thread txThread = new Thread(new TxLoop(),
74             "DataPacketService TX thread");
75
76     /**
77      * Representation of a Data Packet Listener including of its
78      * properties
79      */
80     private class DataPacketListener {
81         // Key fields
82         private String listenerName;
83         // Attribute fields
84         private IListenDataPacket listener;
85         private String dependency;
86         private Match match;
87
88         DataPacketListener(String name, IListenDataPacket s, String dependency,
89                 Match match) {
90             this.listenerName = name;
91             this.listener = s;
92             this.dependency = dependency;
93             this.match = match;
94         }
95
96         @Override
97         public boolean equals(Object obj) {
98             if (this == obj)
99                 return true;
100             if (obj == null)
101                 return false;
102             if (getClass() != obj.getClass())
103                 return false;
104             DataPacketListener other = (DataPacketListener) obj;
105             if (!getOuterType().equals(other.getOuterType()))
106                 return false;
107             if (listenerName == null) {
108                 if (other.listenerName != null)
109                     return false;
110             } else if (!listenerName.equals(other.listenerName))
111                 return false;
112             return true;
113         }
114
115         @Override
116         public int hashCode() {
117             final int prime = 31;
118             int result = 1;
119             result = prime * result + getOuterType().hashCode();
120             result = prime * result
121                     + ((listenerName == null) ? 0 : listenerName.hashCode());
122             return result;
123         }
124
125         private DataPacketService getOuterType() {
126             return DataPacketService.this;
127         }
128     }
129
130     /**
131      * This very expensive version of List is being used because it
132      * work well in concurrent situation, as we expect new service
133      * addition, service removal and walk of the service will happen
134      * from different places
135      */
136     private List<List<DataPacketListener>> listenDataPacket = new CopyOnWriteArrayList<List<DataPacketListener>>();
137     // Quick index to make sure there are no duplicate elements
138     private Set<DataPacketListener> indexDataPacket = Collections
139             .synchronizedSet(new HashSet<DataPacketListener>());
140
141     /**
142      * Loop for processing Received packets
143      */
144     private void dispatchPacket(RawPacket pkt) {
145
146         // for now we treat all listeners as serial listeners
147         for (List<DataPacketListener> serialListeners : listenDataPacket) {
148             for (DataPacketListener l : serialListeners) {
149
150                 // TODO: possibly deal with read-only and read-write packet
151                 // copies
152                 IListenDataPacket s = (l == null ? null : l.listener);
153                 if (s != null) {
154                     try {
155                         // TODO Make sure to filter based on the match too,
156                         // later on
157                         PacketResult res = s.receiveDataPacket(pkt);
158                         increaseStat("RXPacketSuccess");
159                         if (res.equals(PacketResult.CONSUME)) {
160                             increaseStat("RXPacketSerialExit");
161                             break;
162                         }
163                     } catch (Exception e) {
164                         increaseStat("RXPacketFailedForException");
165                     }
166                 }
167             }
168         }
169     }
170
171     /**
172      * Loop for processing packets to be transmitted
173      *
174      */
175     private class TxLoop implements Runnable {
176         public void run() {
177             RawPacket pkt;
178             try {
179                 for (pkt = txQueue.take(); pkt != null; pkt = txQueue.take()) {
180                     // Retrieve outgoing node connector so to send out
181                     // the packet to corresponding node
182                     NodeConnector p = pkt.getOutgoingNodeConnector();
183                     if (p != null) {
184                         String t = p.getNode()
185                                 .getType();
186                         // Now locate the TX dispatcher
187                         ProtocolService<IPluginInDataPacketService> service =
188                             pluginInDataService.get(t);
189                         if (service != null) {
190                             try {
191                                 service.getService().transmitDataPacket(pkt);
192                                 increaseStat("TXPacketSuccess");
193                             } catch (Exception e) {
194                                 increaseStat("TXPacketFailedForException");
195                             }
196                         } else {
197                             increaseStat("TXpluginNotFound");
198                         }
199                     }
200                 }
201             } catch (InterruptedException e) {
202                 // Not a big deal
203             }
204         }
205     }
206
207     void setPluginInDataService(Map props, IPluginInDataPacketService s) {
208         ProtocolService.set(this.pluginInDataService, props, s, logger);
209     }
210
211     void unsetPluginInDataService(Map props, IPluginInDataPacketService s) {
212         ProtocolService.unset(this.pluginInDataService, props, s, logger);
213     }
214
215     void setListenDataPacket(Map props, IListenDataPacket s) {
216         if (this.listenDataPacket == null || this.indexDataPacket == null) {
217             logger.error("data structure to store data is NULL");
218             return;
219         }
220         logger.trace("Received setListenDataPacket request");
221         for (Object e : props.entrySet()) {
222             Map.Entry entry = (Map.Entry) e;
223             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
224         }
225
226         String listenerName = null;
227         String listenerDependency = null;
228         Match filter = null;
229         Object value;
230         // Read the listenerName
231         value = props.get("salListenerName");
232         if (value instanceof String) {
233             listenerName = (String) value;
234         }
235
236         if (listenerName == null) {
237             logger.error("Trying to set a listener without a Name");
238             return;
239         }
240
241         //Read the dependency
242         value = props.get("salListenerDependency");
243         if (value instanceof String) {
244             listenerDependency = (String) value;
245         }
246
247         //Read match filter if any
248         value = props.get("salListenerFilter");
249         if (value instanceof Match) {
250             filter = (Match) value;
251         }
252
253         DataPacketListener l = new DataPacketListener(listenerName, s,
254                 listenerDependency, filter);
255
256         DataPacketListener lDependency = new DataPacketListener(
257                 listenerDependency, null, null, null);
258
259         // Now let see if there is any dependency
260         if (listenerDependency == null) {
261             logger.debug("listener without any dependency");
262             if (this.indexDataPacket.contains(l)) {
263                 logger.error("trying to add an existing element");
264             } else {
265                 logger.debug("adding listener: {}", listenerName);
266                 CopyOnWriteArrayList<DataPacketListener> serialListeners = new CopyOnWriteArrayList<DataPacketListener>();
267                 serialListeners.add(l);
268                 this.listenDataPacket.add(serialListeners);
269                 this.indexDataPacket.add(l);
270             }
271         } else {
272             logger.debug("listener with dependency");
273             // Now search for the dependency and put things in order
274             if (this.indexDataPacket.contains(l)) {
275                 logger.error("trying to add an existing element");
276             } else {
277                 logger.debug("adding listener: {}", listenerName);
278                 // Lets find the set with the dependency in it, if we
279                 // find it lets just add our dependency at the end of
280                 // the list.
281                 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
282                     boolean done = false;
283                     if (serialListeners.contains(lDependency)) {
284                         serialListeners.add(l);
285                         done = true;
286                     }
287                     // If we did fine the element, lets break early
288                     if (done) {
289                         break;
290                     }
291                 }
292
293                 this.indexDataPacket.add(l);
294             }
295         }
296     }
297
298     void unsetListenDataPacket(Map props, IListenDataPacket s) {
299         if (this.listenDataPacket == null || this.indexDataPacket == null) {
300             logger.error("data structure to store data is NULL");
301             return;
302         }
303         logger.trace("Received UNsetListenDataPacket request");
304         for (Object e : props.entrySet()) {
305             Map.Entry entry = (Map.Entry) e;
306             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
307         }
308
309         String listenerName = null;
310         Object value;
311         // Read the listenerName
312         value = props.get("salListenerName");
313         if (value instanceof String) {
314             listenerName = (String) value;
315         }
316
317         if (listenerName == null) {
318             logger.error("Trying to set a listener without a Name");
319             return;
320         }
321
322         DataPacketListener l = new DataPacketListener(listenerName, s, null,
323                 null);
324         if (!this.indexDataPacket.contains(l)) {
325             logger.error("trying to remove a non-existing element");
326         } else {
327             logger.debug("removing listener: {}", listenerName);
328             for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
329                 int i = 0;
330                 boolean done = false;
331                 for (i = 0; i < serialListeners.size(); i++) {
332                     if (serialListeners.get(i).equals(l)) {
333                         serialListeners.remove(i);
334                         done = true;
335                         break;
336                     }
337                 }
338                 // Now remove a serialListener that maybe empty
339                 if (serialListeners.isEmpty()) {
340                     this.listenDataPacket.remove(serialListeners);
341                 }
342                 // If we did fine the element, lets break early
343                 if (done) {
344                     break;
345                 }
346             }
347
348             this.indexDataPacket.remove(l);
349         }
350     }
351
352     /**
353      * Function called by the dependency manager when all the required
354      * dependencies are satisfied
355      *
356      */
357     void init() {
358         this.txThread.start();
359     }
360
361     /**
362      * Function called by the dependency manager when at least one
363      * dependency become unsatisfied or when the component is shutting
364      * down because for example bundle is being stopped.
365      *
366      */
367     void destroy() {
368         // Make sure to cleanup the data structure we use to track
369         // services
370         this.listenDataPacket.clear();
371         this.indexDataPacket.clear();
372         this.pluginInDataService.clear();
373         this.statistics.clear();
374         this.txQueue.clear();
375         this.txThread.interrupt();
376         // Wait for them to be done
377         try {
378             this.txThread.join();
379         } catch (InterruptedException ex) {
380             // Not a big deal
381         }
382     }
383
384     private void increaseStat(String name) {
385         if (this.statistics == null) {
386             return;
387         }
388
389         AtomicInteger currValue = null;
390         synchronized (this.statistics) {
391             currValue = this.statistics.get(name);
392
393             if (currValue == null) {
394                 this.statistics.put(name, new AtomicInteger(0));
395                 return;
396             }
397         }
398         currValue.incrementAndGet();
399     }
400
401     @Override
402     public PacketResult receiveDataPacket(RawPacket inPkt) {
403         if (inPkt.getIncomingNodeConnector() == null) {
404             increaseStat("nullIncomingNodeConnector");
405             return PacketResult.IGNORED;
406         }
407
408         // send the packet off to be processed by listeners
409         this.dispatchPacket(inPkt);
410
411         // Walk the chain of listener going first throw all the
412         // parallel ones and for each parallel in serial
413         return PacketResult.IGNORED;
414     }
415
416     @Override
417     public void transmitDataPacket(RawPacket outPkt) {
418         if (outPkt.getOutgoingNodeConnector() == null) {
419             increaseStat("nullOutgoingNodeConnector");
420             return;
421         }
422
423         if (!this.txQueue.offer(outPkt)) {
424             increaseStat("fullTXQueue");
425             return;
426         }
427     }
428
429     @Override
430     public Packet decodeDataPacket(RawPacket pkt) {
431         // Sanity checks
432         if (pkt == null) {
433             return null;
434         }
435         byte[] data = pkt.getPacketData();
436         if (data.length <= 0) {
437             return null;
438         }
439         if (pkt.getEncap().equals(LinkEncap.ETHERNET)) {
440             Ethernet res = new Ethernet();
441             try {
442                 res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
443             } catch (Exception e) {
444                 logger.warn("Failed to decode packet: {}", e.getMessage());
445             }
446             return res;
447         }
448         return null;
449     }
450
451     @Override
452     public RawPacket encodeDataPacket(Packet pkt) {
453         // Sanity checks
454         if (pkt == null) {
455             return null;
456         }
457         byte[] data;
458         try {
459             data = pkt.serialize();
460         } catch (Exception e) {
461             logger.error("",e);
462             return null;
463         }
464         if (data.length <= 0) {
465             return null;
466         }
467         try {
468             RawPacket res = new RawPacket(data);
469             return res;
470         } catch (ConstructionException cex) {
471         }
472         // If something goes wrong then we have to return null
473         return null;
474     }
475 }