Initial opendaylight infrastructure commit!!
[controller.git] / opendaylight / sal / implementation / src / main / java / org / opendaylight / controller / sal / implementation / internal / DataPacketService.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 /**
11  * @file   DataPacketService.java
12  *
13  * @brief  Implementation of Data Packet services in SAL
14  *
15  * Implementation of Data Packet services in SAL
16  */
17
18 package org.opendaylight.controller.sal.implementation.internal;
19
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.apache.commons.lang3.builder.EqualsBuilder;
32 import org.apache.commons.lang3.builder.HashCodeBuilder;
33 import org.opendaylight.controller.sal.core.ConstructionException;
34 import org.opendaylight.controller.sal.core.Node;
35 import org.opendaylight.controller.sal.core.NodeConnector;
36 import org.opendaylight.controller.sal.match.Match;
37 import org.opendaylight.controller.sal.packet.Ethernet;
38 import org.opendaylight.controller.sal.packet.IDataPacketService;
39 import org.opendaylight.controller.sal.packet.IListenDataPacket;
40 import org.opendaylight.controller.sal.packet.IPluginInDataPacketService;
41 import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService;
42 import org.opendaylight.controller.sal.packet.LinkEncap;
43 import org.opendaylight.controller.sal.packet.Packet;
44 import org.opendaylight.controller.sal.packet.PacketResult;
45 import org.opendaylight.controller.sal.packet.RawPacket;
46 import org.opendaylight.controller.sal.utils.NetUtils;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 public class DataPacketService implements IPluginOutDataPacketService,
51         IDataPacketService {
52     private int RXMAXQUEUESIZE = 1000;
53     private int TXMAXQUEUESIZE = 1000;
54     protected static final Logger logger = LoggerFactory
55             .getLogger(DataPacketService.class);
56     /**
57      * Database that associates one NodeIDType to the
58      * IPluginDataPacketService, in fact we expect that there will be
59      * one instance of IPluginDataPacketService for each southbound
60      * plugin.
61      * Using the ConcurrentHashMap because the threads that will be
62      * adding a new service, removing a service, going through all of
63      * them maybe different.
64      */
65     private ConcurrentHashMap<String, IPluginInDataPacketService>
66         pluginInDataService =
67         new ConcurrentHashMap<String, IPluginInDataPacketService>();
68     private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
69     /**
70      * Queue for packets received from Data Path
71      */
72     private LinkedBlockingQueue<RawPacket> rxQueue = new LinkedBlockingQueue<RawPacket>(
73             RXMAXQUEUESIZE);
74     /**
75      * Queue for packets that need to be transmitted to Data Path
76      */
77     private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
78             RXMAXQUEUESIZE);
79     /**
80      * Transmission thread
81      */
82     private Thread txThread = new Thread(new TxLoop(),
83             "DataPacketService TX thread");
84     /**
85      * Receiving thread
86      */
87     private Thread rxThread = new Thread(new RxLoop(),
88             "DataPacketService RX thread");
89
90     /**
91      * Representation of a Data Packet Listener including of its
92      * properties
93      *
94      */
95     private class DataPacketListener {
96         // Key fields
97         private String listenerName;
98         // Attribute fields
99         private IListenDataPacket listener;
100         private String dependency;
101         private Match match;
102
103         DataPacketListener(String name, IListenDataPacket s, String dependency,
104                 Match match) {
105             this.listenerName = name;
106             this.listener = s;
107             this.dependency = dependency;
108             this.match = match;
109         }
110
111         @Override
112         public boolean equals(Object obj) {
113             if (obj == null) {
114                 return false;
115             }
116             if (obj == this) {
117                 return true;
118             }
119             if (obj.getClass() != getClass()) {
120                 return false;
121             }
122             DataPacketListener rhs = (DataPacketListener) obj;
123             return new EqualsBuilder().append(this.listenerName,
124                     rhs.listenerName).isEquals();
125         }
126
127         @Override
128         public int hashCode() {
129             return new HashCodeBuilder(13, 31).append(listenerName)
130                     .toHashCode();
131         }
132     }
133
134     /**
135      * This very expensive version of List is being used because it
136      * work well in concurrent situation, as we expect new service
137      * addition, service removal and walk of the service will happen
138      * from different places
139      */
140     private List<List<DataPacketListener>> listenDataPacket = new CopyOnWriteArrayList<List<DataPacketListener>>();
141     // Quick index to make sure there are no duplicate elements
142     private Set<DataPacketListener> indexDataPacket = Collections
143             .synchronizedSet(new HashSet<DataPacketListener>());
144
145     /**
146      * Loop for processing Received packets
147      *
148      */
149     private class RxLoop implements Runnable {
150         public void run() {
151             RawPacket pkt;
152             try {
153                 for (pkt = rxQueue.take(); pkt != null; pkt = rxQueue.take()) {
154                     for (List<DataPacketListener> serialListeners : listenDataPacket) {
155                         int i = 0;
156                         for (i = 0; i < serialListeners.size(); i++) {
157                             RawPacket copyPkt = null;
158                             try {
159                                 copyPkt = new RawPacket(pkt);
160                             } catch (ConstructionException cex) {
161                                 logger.debug("Error while cloning the packet");
162                             }
163                             if (copyPkt == null) {
164                                 increaseStat("RXPacketCopyFailed");
165                                 continue;
166                             }
167                             DataPacketListener l = serialListeners.get(i);
168                             IListenDataPacket s = (l == null ? null
169                                     : l.listener);
170                             if (s != null) {
171                                 try {
172                                     // TODO Make sure to filter based
173                                     // on the match too, later on
174                                     PacketResult res = s
175                                             .receiveDataPacket(copyPkt);
176                                     increaseStat("RXPacketSuccess");
177                                     if (res.equals(PacketResult.CONSUME)) {
178                                         increaseStat("RXPacketSerialExit");
179                                         break;
180                                     }
181                                 } catch (Exception e) {
182                                     increaseStat("RXPacketFailedForException");
183                                 }
184                             }
185                         }
186                     }
187                 }
188             } catch (InterruptedException e) {
189                 // Not a big deal
190             }
191         }
192     }
193
194     /**
195      * Loop for processing packets to be transmitted
196      *
197      */
198     private class TxLoop implements Runnable {
199         public void run() {
200             RawPacket pkt;
201             try {
202                 for (pkt = txQueue.take(); pkt != null; pkt = txQueue.take()) {
203                     // Retrieve outgoing node connector so to send out
204                     // the packet to corresponding node
205                     NodeConnector p = pkt.getOutgoingNodeConnector();
206                     if (p != null) {
207                         String t = p.getNode()
208                                 .getType();
209                         // Now locate the TX dispatcher
210                         IPluginInDataPacketService s = pluginInDataService
211                                 .get(t);
212                         if (s != null) {
213                             try {
214                                 s.transmitDataPacket(pkt);
215                                 increaseStat("TXPacketSuccess");
216                             } catch (Exception e) {
217                                 increaseStat("TXPacketFailedForException");
218                             }
219                         } else {
220                             increaseStat("TXpluginNotFound");
221                         }
222                     }
223                 }
224             } catch (InterruptedException e) {
225                 // Not a big deal
226             }
227         }
228     }
229
230     void setPluginInDataService(Map props, IPluginInDataPacketService s) {
231         if (this.pluginInDataService == null) {
232             logger.error("pluginInDataService store null");
233             return;
234         }
235         String type = null;
236         logger.trace("Received setPluginInDataService request");
237         for (Object e : props.entrySet()) {
238             Map.Entry entry = (Map.Entry) e;
239             logger.trace("Prop key:(" + entry.getKey() + ") value:("
240                     + entry.getValue() + ")");
241         }
242
243         Object value = props.get("protocolPluginType");
244         if (value instanceof String) {
245             type = (String) value;
246         }
247         if (type == null) {
248             logger.error("Received a PluginInDataService without any "
249                     + "protocolPluginType provided");
250         } else {
251             this.pluginInDataService.put(type, s);
252             logger.debug("Stored the PluginInDataService for type:" + type);
253         }
254     }
255
256     void unsetPluginInDataService(Map props, IPluginInDataPacketService s) {
257         if (this.pluginInDataService == null) {
258             logger.error("pluginInDataService store null");
259             return;
260         }
261
262         String type = null;
263         logger.trace("Received unsetPluginInDataService request");
264         for (Object e : props.entrySet()) {
265             Map.Entry entry = (Map.Entry) e;
266             logger.trace("Prop key:(" + entry.getKey() + ") value:("
267                     + entry.getValue() + ")");
268         }
269
270         Object value = props.get("protocoloPluginType");
271         if (value instanceof String) {
272             type = (String) value;
273         }
274         if (type == null) {
275             logger.error("Received a PluginInDataService without any "
276                     + "protocolPluginType provided");
277         } else if (this.pluginInDataService.get(type).equals(s)) {
278             this.pluginInDataService.remove(type);
279             logger.debug("Removed the PluginInDataService for type:" + type);
280         }
281     }
282
283     void setListenDataPacket(Map props, IListenDataPacket s) {
284         if (this.listenDataPacket == null || this.indexDataPacket == null) {
285             logger.error("data structure to store data is NULL");
286             return;
287         }
288         logger.trace("Received setListenDataPacket request");
289         for (Object e : props.entrySet()) {
290             Map.Entry entry = (Map.Entry) e;
291             logger.trace("Prop key:(" + entry.getKey() + ") value:("
292                     + entry.getValue() + ")");
293         }
294
295         String listenerName = null;
296         String listenerDependency = null;
297         Match filter = null;
298         Object value;
299         // Read the listenerName
300         value = props.get("salListenerName");
301         if (value instanceof String) {
302             listenerName = (String) value;
303         }
304
305         if (listenerName == null) {
306             logger.error("Trying to set a listener without a Name");
307             return;
308         }
309
310         //Read the dependency
311         value = props.get("salListenerDependency");
312         if (value instanceof String) {
313             listenerDependency = (String) value;
314         }
315
316         //Read match filter if any
317         value = props.get("salListenerFilter");
318         if (value instanceof Match) {
319             filter = (Match) value;
320         }
321
322         DataPacketListener l = new DataPacketListener(listenerName, s,
323                 listenerDependency, filter);
324
325         DataPacketListener lDependency = new DataPacketListener(
326                 listenerDependency, null, null, null);
327
328         // Now let see if there is any dependency
329         if (listenerDependency == null) {
330             logger.debug("listener without any dependency");
331             if (this.indexDataPacket.contains(l)) {
332                 logger.error("trying to add an existing element");
333             } else {
334                 logger.debug("adding listener: " + listenerName);
335                 CopyOnWriteArrayList<DataPacketListener> serialListeners = new CopyOnWriteArrayList<DataPacketListener>();
336                 serialListeners.add(l);
337                 this.listenDataPacket.add(serialListeners);
338                 this.indexDataPacket.add(l);
339             }
340         } else {
341             logger.debug("listener with dependency");
342             // Now search for the dependency and put things in order
343             if (this.indexDataPacket.contains(l)) {
344                 logger.error("trying to add an existing element");
345             } else {
346                 logger.debug("adding listener: " + listenerName);
347                 // Lets find the set with the dependency in it, if we
348                 // find it lets just add our dependency at the end of
349                 // the list.
350                 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
351                     int i = 0;
352                     boolean done = false;
353                     if (serialListeners.contains(lDependency)) {
354                         serialListeners.add(l);
355                         done = true;
356                     }
357                     // If we did fine the element, lets break early
358                     if (done) {
359                         break;
360                     }
361                 }
362
363                 this.indexDataPacket.add(l);
364             }
365         }
366     }
367
368     void unsetListenDataPacket(Map props, IListenDataPacket s) {
369         if (this.listenDataPacket == null || this.indexDataPacket == null) {
370             logger.error("data structure to store data is NULL");
371             return;
372         }
373         logger.trace("Received UNsetListenDataPacket request");
374         for (Object e : props.entrySet()) {
375             Map.Entry entry = (Map.Entry) e;
376             logger.trace("Prop key:(" + entry.getKey() + ") value:("
377                     + entry.getValue() + ")");
378         }
379
380         String listenerName = null;
381         Object value;
382         // Read the listenerName
383         value = props.get("salListenerName");
384         if (value instanceof String) {
385             listenerName = (String) value;
386         }
387
388         if (listenerName == null) {
389             logger.error("Trying to set a listener without a Name");
390             return;
391         }
392
393         DataPacketListener l = new DataPacketListener(listenerName, s, null,
394                 null);
395         if (!this.indexDataPacket.contains(l)) {
396             logger.error("trying to remove a non-existing element");
397         } else {
398             logger.debug("removing listener: " + listenerName);
399             for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
400                 int i = 0;
401                 boolean done = false;
402                 for (i = 0; i < serialListeners.size(); i++) {
403                     if (serialListeners.get(i).equals(l)) {
404                         serialListeners.remove(i);
405                         done = true;
406                         break;
407                     }
408                 }
409                 // Now remove a serialListener that maybe empty
410                 if (serialListeners.isEmpty()) {
411                     this.listenDataPacket.remove(serialListeners);
412                 }
413                 // If we did fine the element, lets break early
414                 if (done) {
415                     break;
416                 }
417             }
418
419             this.indexDataPacket.remove(l);
420         }
421     }
422
423     /**
424      * Function called by the dependency manager when all the required
425      * dependencies are satisfied
426      *
427      */
428     void init() {
429         this.txThread.start();
430         this.rxThread.start();
431     }
432
433     /**
434      * Function called by the dependency manager when at least one
435      * dependency become unsatisfied or when the component is shutting
436      * down because for example bundle is being stopped.
437      *
438      */
439     void destroy() {
440         // Make sure to cleanup the data structure we use to track
441         // services
442         this.listenDataPacket.clear();
443         this.indexDataPacket.clear();
444         this.pluginInDataService.clear();
445         this.statistics.clear();
446         this.rxQueue.clear();
447         this.txQueue.clear();
448         this.txThread.interrupt();
449         this.rxThread.interrupt();
450         // Wait for them to be done
451         try {
452             this.txThread.join();
453             this.rxThread.join();
454         } catch (InterruptedException ex) {
455             // Not a big deal
456         }
457     }
458
459     private void increaseStat(String name) {
460         if (this.statistics == null) {
461             return;
462         }
463
464         AtomicInteger currValue = null;
465         synchronized (this.statistics) {
466             currValue = this.statistics.get(name);
467
468             if (currValue == null) {
469                 this.statistics.put(name, new AtomicInteger(0));
470                 return;
471             }
472         }
473         currValue.incrementAndGet();
474     }
475
476     @Override
477     public PacketResult receiveDataPacket(RawPacket inPkt) {
478         if (inPkt.getIncomingNodeConnector() == null) {
479             increaseStat("nullIncomingNodeConnector");
480             return PacketResult.IGNORED;
481         }
482
483         // If the queue was full don't wait, rather increase a counter
484         // for it
485         if (!this.rxQueue.offer(inPkt)) {
486             increaseStat("fullRXQueue");
487             return PacketResult.IGNORED;
488         }
489
490         // Walk the chain of listener going first throw all the
491         // parallel ones and for each parallel in serial
492         return PacketResult.IGNORED;
493     }
494
495     @Override
496     public void transmitDataPacket(RawPacket outPkt) {
497         if (outPkt.getOutgoingNodeConnector() == null) {
498             increaseStat("nullOutgoingNodeConnector");
499             return;
500         }
501
502         if (!this.txQueue.offer(outPkt)) {
503             increaseStat("fullTXQueue");
504             return;
505         }
506     }
507
508     @Override
509     public Packet decodeDataPacket(RawPacket pkt) {
510         // Sanity checks
511         if (pkt == null) {
512             return null;
513         }
514         byte[] data = pkt.getPacketData();
515         if (data.length <= 0) {
516             return null;
517         }
518         if (pkt.getEncap().equals(LinkEncap.ETHERNET)) {
519             Ethernet res = new Ethernet();
520             try {
521                 res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
522             } catch (Exception e) {
523                 logger.warn("", e);
524             }
525             return res;
526         }
527         return null;
528     }
529
530     @Override
531     public RawPacket encodeDataPacket(Packet pkt) {
532         // Sanity checks
533         if (pkt == null) {
534             return null;
535         }
536         byte[] data;
537         try {
538             data = pkt.serialize();
539         } catch (Exception e) {
540             e.printStackTrace();
541             return null;
542         }
543         if (data.length <= 0) {
544             return null;
545         }
546         try {
547             RawPacket res = new RawPacket(data);
548             return res;
549         } catch (ConstructionException cex) {
550         }
551         // If something goes wrong then we have to return null
552         return null;
553     }
554 }