78fedee36ad420fa75957268e0a9ae01219790b9
[controller.git] / opendaylight / sal / implementation / src / main / java / org / opendaylight / controller / sal / implementation / internal / DataPacketService.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 /**
11  * @file   DataPacketService.java
12  *
13  * @brief  Implementation of Data Packet services in SAL
14  *
15  * Implementation of Data Packet services in SAL
16  */
17
18 package org.opendaylight.controller.sal.implementation.internal;
19
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.apache.commons.lang3.builder.EqualsBuilder;
32 import org.apache.commons.lang3.builder.HashCodeBuilder;
33 import org.opendaylight.controller.sal.core.ConstructionException;
34 import org.opendaylight.controller.sal.core.Node;
35 import org.opendaylight.controller.sal.core.NodeConnector;
36 import org.opendaylight.controller.sal.match.Match;
37 import org.opendaylight.controller.sal.packet.Ethernet;
38 import org.opendaylight.controller.sal.packet.IDataPacketService;
39 import org.opendaylight.controller.sal.packet.IListenDataPacket;
40 import org.opendaylight.controller.sal.packet.IPluginInDataPacketService;
41 import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService;
42 import org.opendaylight.controller.sal.packet.LinkEncap;
43 import org.opendaylight.controller.sal.packet.Packet;
44 import org.opendaylight.controller.sal.packet.PacketResult;
45 import org.opendaylight.controller.sal.packet.RawPacket;
46 import org.opendaylight.controller.sal.utils.NetUtils;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 public class DataPacketService implements IPluginOutDataPacketService,
51         IDataPacketService {
52     private int RXMAXQUEUESIZE = 1000;
53     private int TXMAXQUEUESIZE = 1000;
54     protected static final Logger logger = LoggerFactory
55             .getLogger(DataPacketService.class);
56     /**
57      * Database that associates one NodeIDType to the
58      * IPluginDataPacketService, in fact we expect that there will be
59      * one instance of IPluginDataPacketService for each southbound
60      * plugin.
61      * Using the ConcurrentHashMap because the threads that will be
62      * adding a new service, removing a service, going through all of
63      * them maybe different.
64      */
65     private ConcurrentHashMap<String, IPluginInDataPacketService>
66         pluginInDataService =
67         new ConcurrentHashMap<String, IPluginInDataPacketService>();
68     private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
69     /**
70      * Queue for packets received from Data Path
71      */
72     private LinkedBlockingQueue<RawPacket> rxQueue = new LinkedBlockingQueue<RawPacket>(
73             RXMAXQUEUESIZE);
74     /**
75      * Queue for packets that need to be transmitted to Data Path
76      */
77     private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
78             RXMAXQUEUESIZE);
79     /**
80      * Transmission thread
81      */
82     private Thread txThread = new Thread(new TxLoop(),
83             "DataPacketService TX thread");
84     /**
85      * Receiving thread
86      */
87     private Thread rxThread = new Thread(new RxLoop(),
88             "DataPacketService RX thread");
89
90     /**
91      * Representation of a Data Packet Listener including of its
92      * properties
93      *
94      */
95     private class DataPacketListener {
96         // Key fields
97         private String listenerName;
98         // Attribute fields
99         private IListenDataPacket listener;
100         private String dependency;
101         private Match match;
102
103         DataPacketListener(String name, IListenDataPacket s, String dependency,
104                 Match match) {
105             this.listenerName = name;
106             this.listener = s;
107             this.dependency = dependency;
108             this.match = match;
109         }
110
111         @Override
112         public boolean equals(Object obj) {
113             if (obj == null) {
114                 return false;
115             }
116             if (obj == this) {
117                 return true;
118             }
119             if (obj.getClass() != getClass()) {
120                 return false;
121             }
122             DataPacketListener rhs = (DataPacketListener) obj;
123             return new EqualsBuilder().append(this.listenerName,
124                     rhs.listenerName).isEquals();
125         }
126
127         @Override
128         public int hashCode() {
129             return new HashCodeBuilder(13, 31).append(listenerName)
130                     .toHashCode();
131         }
132     }
133
134     /**
135      * This very expensive version of List is being used because it
136      * work well in concurrent situation, as we expect new service
137      * addition, service removal and walk of the service will happen
138      * from different places
139      */
140     private List<List<DataPacketListener>> listenDataPacket = new CopyOnWriteArrayList<List<DataPacketListener>>();
141     // Quick index to make sure there are no duplicate elements
142     private Set<DataPacketListener> indexDataPacket = Collections
143             .synchronizedSet(new HashSet<DataPacketListener>());
144
145     /**
146      * Loop for processing Received packets
147      *
148      */
149     private class RxLoop implements Runnable {
150         public void run() {
151             RawPacket pkt;
152             try {
153                 for (pkt = rxQueue.take(); pkt != null; pkt = rxQueue.take()) {
154                     for (List<DataPacketListener> serialListeners : listenDataPacket) {
155                         int i = 0;
156                         for (i = 0; i < serialListeners.size(); i++) {
157                             RawPacket copyPkt = null;
158                             try {
159                                 copyPkt = new RawPacket(pkt);
160                             } catch (ConstructionException cex) {
161                                 logger.debug("Error while cloning the packet");
162                             }
163                             if (copyPkt == null) {
164                                 increaseStat("RXPacketCopyFailed");
165                                 continue;
166                             }
167                             DataPacketListener l = serialListeners.get(i);
168                             IListenDataPacket s = (l == null ? null
169                                     : l.listener);
170                             if (s != null) {
171                                 try {
172                                     // TODO Make sure to filter based
173                                     // on the match too, later on
174                                     PacketResult res = s
175                                             .receiveDataPacket(copyPkt);
176                                     increaseStat("RXPacketSuccess");
177                                     if (res.equals(PacketResult.CONSUME)) {
178                                         increaseStat("RXPacketSerialExit");
179                                         break;
180                                     }
181                                 } catch (Exception e) {
182                                     increaseStat("RXPacketFailedForException");
183                                 }
184                             }
185                         }
186                     }
187                 }
188             } catch (InterruptedException e) {
189                 // Not a big deal
190             }
191         }
192     }
193
194     /**
195      * Loop for processing packets to be transmitted
196      *
197      */
198     private class TxLoop implements Runnable {
199         public void run() {
200             RawPacket pkt;
201             try {
202                 for (pkt = txQueue.take(); pkt != null; pkt = txQueue.take()) {
203                     // Retrieve outgoing node connector so to send out
204                     // the packet to corresponding node
205                     NodeConnector p = pkt.getOutgoingNodeConnector();
206                     if (p != null) {
207                         String t = p.getNode()
208                                 .getType();
209                         // Now locate the TX dispatcher
210                         IPluginInDataPacketService s = pluginInDataService
211                                 .get(t);
212                         if (s != null) {
213                             try {
214                                 s.transmitDataPacket(pkt);
215                                 increaseStat("TXPacketSuccess");
216                             } catch (Exception e) {
217                                 increaseStat("TXPacketFailedForException");
218                             }
219                         } else {
220                             increaseStat("TXpluginNotFound");
221                         }
222                     }
223                 }
224             } catch (InterruptedException e) {
225                 // Not a big deal
226             }
227         }
228     }
229
230     void setPluginInDataService(Map props, IPluginInDataPacketService s) {
231         if (this.pluginInDataService == null) {
232             logger.error("pluginInDataService store null");
233             return;
234         }
235         String type = null;
236         logger.trace("Received setPluginInDataService request");
237         for (Object e : props.entrySet()) {
238             Map.Entry entry = (Map.Entry) e;
239             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
240         }
241
242         Object value = props.get("protocolPluginType");
243         if (value instanceof String) {
244             type = (String) value;
245         }
246         if (type == null) {
247             logger.error("Received a PluginInDataService without any "
248                     + "protocolPluginType provided");
249         } else {
250             this.pluginInDataService.put(type, s);
251             logger.debug("Stored the PluginInDataService for type: {}", type);
252         }
253     }
254
255     void unsetPluginInDataService(Map props, IPluginInDataPacketService s) {
256         if (this.pluginInDataService == null) {
257             logger.error("pluginInDataService store null");
258             return;
259         }
260
261         String type = null;
262         logger.trace("Received unsetPluginInDataService request");
263         for (Object e : props.entrySet()) {
264             Map.Entry entry = (Map.Entry) e;
265             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
266         }
267
268         Object value = props.get("protocoloPluginType");
269         if (value instanceof String) {
270             type = (String) value;
271         }
272         if (type == null) {
273             logger.error("Received a PluginInDataService without any "
274                     + "protocolPluginType provided");
275         } else if (this.pluginInDataService.get(type).equals(s)) {
276             this.pluginInDataService.remove(type);
277             logger.debug("Removed the PluginInDataService for type: {}", type);
278         }
279     }
280
281     void setListenDataPacket(Map props, IListenDataPacket s) {
282         if (this.listenDataPacket == null || this.indexDataPacket == null) {
283             logger.error("data structure to store data is NULL");
284             return;
285         }
286         logger.trace("Received setListenDataPacket request");
287         for (Object e : props.entrySet()) {
288             Map.Entry entry = (Map.Entry) e;
289             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
290         }
291
292         String listenerName = null;
293         String listenerDependency = null;
294         Match filter = null;
295         Object value;
296         // Read the listenerName
297         value = props.get("salListenerName");
298         if (value instanceof String) {
299             listenerName = (String) value;
300         }
301
302         if (listenerName == null) {
303             logger.error("Trying to set a listener without a Name");
304             return;
305         }
306
307         //Read the dependency
308         value = props.get("salListenerDependency");
309         if (value instanceof String) {
310             listenerDependency = (String) value;
311         }
312
313         //Read match filter if any
314         value = props.get("salListenerFilter");
315         if (value instanceof Match) {
316             filter = (Match) value;
317         }
318
319         DataPacketListener l = new DataPacketListener(listenerName, s,
320                 listenerDependency, filter);
321
322         DataPacketListener lDependency = new DataPacketListener(
323                 listenerDependency, null, null, null);
324
325         // Now let see if there is any dependency
326         if (listenerDependency == null) {
327             logger.debug("listener without any dependency");
328             if (this.indexDataPacket.contains(l)) {
329                 logger.error("trying to add an existing element");
330             } else {
331                 logger.debug("adding listener: {}", listenerName);
332                 CopyOnWriteArrayList<DataPacketListener> serialListeners = new CopyOnWriteArrayList<DataPacketListener>();
333                 serialListeners.add(l);
334                 this.listenDataPacket.add(serialListeners);
335                 this.indexDataPacket.add(l);
336             }
337         } else {
338             logger.debug("listener with dependency");
339             // Now search for the dependency and put things in order
340             if (this.indexDataPacket.contains(l)) {
341                 logger.error("trying to add an existing element");
342             } else {
343                 logger.debug("adding listener: {}", listenerName);
344                 // Lets find the set with the dependency in it, if we
345                 // find it lets just add our dependency at the end of
346                 // the list.
347                 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
348                     int i = 0;
349                     boolean done = false;
350                     if (serialListeners.contains(lDependency)) {
351                         serialListeners.add(l);
352                         done = true;
353                     }
354                     // If we did fine the element, lets break early
355                     if (done) {
356                         break;
357                     }
358                 }
359
360                 this.indexDataPacket.add(l);
361             }
362         }
363     }
364
365     void unsetListenDataPacket(Map props, IListenDataPacket s) {
366         if (this.listenDataPacket == null || this.indexDataPacket == null) {
367             logger.error("data structure to store data is NULL");
368             return;
369         }
370         logger.trace("Received UNsetListenDataPacket request");
371         for (Object e : props.entrySet()) {
372             Map.Entry entry = (Map.Entry) e;
373             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
374         }
375
376         String listenerName = null;
377         Object value;
378         // Read the listenerName
379         value = props.get("salListenerName");
380         if (value instanceof String) {
381             listenerName = (String) value;
382         }
383
384         if (listenerName == null) {
385             logger.error("Trying to set a listener without a Name");
386             return;
387         }
388
389         DataPacketListener l = new DataPacketListener(listenerName, s, null,
390                 null);
391         if (!this.indexDataPacket.contains(l)) {
392             logger.error("trying to remove a non-existing element");
393         } else {
394             logger.debug("removing listener: {}", listenerName);
395             for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
396                 int i = 0;
397                 boolean done = false;
398                 for (i = 0; i < serialListeners.size(); i++) {
399                     if (serialListeners.get(i).equals(l)) {
400                         serialListeners.remove(i);
401                         done = true;
402                         break;
403                     }
404                 }
405                 // Now remove a serialListener that maybe empty
406                 if (serialListeners.isEmpty()) {
407                     this.listenDataPacket.remove(serialListeners);
408                 }
409                 // If we did fine the element, lets break early
410                 if (done) {
411                     break;
412                 }
413             }
414
415             this.indexDataPacket.remove(l);
416         }
417     }
418
419     /**
420      * Function called by the dependency manager when all the required
421      * dependencies are satisfied
422      *
423      */
424     void init() {
425         this.txThread.start();
426         this.rxThread.start();
427     }
428
429     /**
430      * Function called by the dependency manager when at least one
431      * dependency become unsatisfied or when the component is shutting
432      * down because for example bundle is being stopped.
433      *
434      */
435     void destroy() {
436         // Make sure to cleanup the data structure we use to track
437         // services
438         this.listenDataPacket.clear();
439         this.indexDataPacket.clear();
440         this.pluginInDataService.clear();
441         this.statistics.clear();
442         this.rxQueue.clear();
443         this.txQueue.clear();
444         this.txThread.interrupt();
445         this.rxThread.interrupt();
446         // Wait for them to be done
447         try {
448             this.txThread.join();
449             this.rxThread.join();
450         } catch (InterruptedException ex) {
451             // Not a big deal
452         }
453     }
454
455     private void increaseStat(String name) {
456         if (this.statistics == null) {
457             return;
458         }
459
460         AtomicInteger currValue = null;
461         synchronized (this.statistics) {
462             currValue = this.statistics.get(name);
463
464             if (currValue == null) {
465                 this.statistics.put(name, new AtomicInteger(0));
466                 return;
467             }
468         }
469         currValue.incrementAndGet();
470     }
471
472     @Override
473     public PacketResult receiveDataPacket(RawPacket inPkt) {
474         if (inPkt.getIncomingNodeConnector() == null) {
475             increaseStat("nullIncomingNodeConnector");
476             return PacketResult.IGNORED;
477         }
478
479         // If the queue was full don't wait, rather increase a counter
480         // for it
481         if (!this.rxQueue.offer(inPkt)) {
482             increaseStat("fullRXQueue");
483             return PacketResult.IGNORED;
484         }
485
486         // Walk the chain of listener going first throw all the
487         // parallel ones and for each parallel in serial
488         return PacketResult.IGNORED;
489     }
490
491     @Override
492     public void transmitDataPacket(RawPacket outPkt) {
493         if (outPkt.getOutgoingNodeConnector() == null) {
494             increaseStat("nullOutgoingNodeConnector");
495             return;
496         }
497
498         if (!this.txQueue.offer(outPkt)) {
499             increaseStat("fullTXQueue");
500             return;
501         }
502     }
503
504     @Override
505     public Packet decodeDataPacket(RawPacket pkt) {
506         // Sanity checks
507         if (pkt == null) {
508             return null;
509         }
510         byte[] data = pkt.getPacketData();
511         if (data.length <= 0) {
512             return null;
513         }
514         if (pkt.getEncap().equals(LinkEncap.ETHERNET)) {
515             Ethernet res = new Ethernet();
516             try {
517                 res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
518             } catch (Exception e) {
519                 logger.warn("", e);
520             }
521             return res;
522         }
523         return null;
524     }
525
526     @Override
527     public RawPacket encodeDataPacket(Packet pkt) {
528         // Sanity checks
529         if (pkt == null) {
530             return null;
531         }
532         byte[] data;
533         try {
534             data = pkt.serialize();
535         } catch (Exception e) {
536             e.printStackTrace();
537             return null;
538         }
539         if (data.length <= 0) {
540             return null;
541         }
542         try {
543             RawPacket res = new RawPacket(data);
544             return res;
545         } catch (ConstructionException cex) {
546         }
547         // If something goes wrong then we have to return null
548         return null;
549     }
550 }