29d1c71204a68bde4edb0d66936b121590551ac2
[controller.git] / opendaylight / sal / implementation / src / main / java / org / opendaylight / controller / sal / implementation / internal / DataPacketService.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 /**
11  * @file   DataPacketService.java
12  *
13  * @brief  Implementation of Data Packet services in SAL
14  *
15  * Implementation of Data Packet services in SAL
16  */
17
18 package org.opendaylight.controller.sal.implementation.internal;
19
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.opendaylight.controller.sal.core.ConstructionException;
32 import org.opendaylight.controller.sal.core.NodeConnector;
33 import org.opendaylight.controller.sal.match.Match;
34 import org.opendaylight.controller.sal.packet.Ethernet;
35 import org.opendaylight.controller.sal.packet.IDataPacketService;
36 import org.opendaylight.controller.sal.packet.IListenDataPacket;
37 import org.opendaylight.controller.sal.packet.IPluginInDataPacketService;
38 import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService;
39 import org.opendaylight.controller.sal.packet.LinkEncap;
40 import org.opendaylight.controller.sal.packet.Packet;
41 import org.opendaylight.controller.sal.packet.PacketResult;
42 import org.opendaylight.controller.sal.packet.RawPacket;
43 import org.opendaylight.controller.sal.utils.NetUtils;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 public class DataPacketService implements IPluginOutDataPacketService,
48         IDataPacketService {
49     private int RXMAXQUEUESIZE = 1000;
50     private int TXMAXQUEUESIZE = 1000;
51     protected static final Logger logger = LoggerFactory
52             .getLogger(DataPacketService.class);
53     /**
54      * Database that associates one NodeIDType to the
55      * IPluginDataPacketService, in fact we expect that there will be
56      * one instance of IPluginDataPacketService for each southbound
57      * plugin.
58      * Using the ConcurrentHashMap because the threads that will be
59      * adding a new service, removing a service, going through all of
60      * them maybe different.
61      */
62     private ConcurrentHashMap<String, IPluginInDataPacketService>
63         pluginInDataService =
64         new ConcurrentHashMap<String, IPluginInDataPacketService>();
65     private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
66     /**
67      * Queue for packets received from Data Path
68      */
69     private LinkedBlockingQueue<RawPacket> rxQueue = new LinkedBlockingQueue<RawPacket>(
70             RXMAXQUEUESIZE);
71     /**
72      * Queue for packets that need to be transmitted to Data Path
73      */
74     private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
75             RXMAXQUEUESIZE);
76     /**
77      * Transmission thread
78      */
79     private Thread txThread = new Thread(new TxLoop(),
80             "DataPacketService TX thread");
81     /**
82      * Receiving thread
83      */
84     private Thread rxThread = new Thread(new RxLoop(),
85             "DataPacketService RX thread");
86
87     /**
88      * Representation of a Data Packet Listener including of its
89      * properties
90      *
91      */
92     private class DataPacketListener {
93         // Key fields
94         private String listenerName;
95         // Attribute fields
96         private IListenDataPacket listener;
97         private String dependency;
98         private Match match;
99
100         DataPacketListener(String name, IListenDataPacket s, String dependency,
101                 Match match) {
102             this.listenerName = name;
103             this.listener = s;
104             this.dependency = dependency;
105             this.match = match;
106         }
107
108         @Override
109         public boolean equals(Object obj) {
110             if (this == obj)
111                 return true;
112             if (obj == null)
113                 return false;
114             if (getClass() != obj.getClass())
115                 return false;
116             DataPacketListener other = (DataPacketListener) obj;
117             if (!getOuterType().equals(other.getOuterType()))
118                 return false;
119             if (listenerName == null) {
120                 if (other.listenerName != null)
121                     return false;
122             } else if (!listenerName.equals(other.listenerName))
123                 return false;
124             return true;
125         }
126
127         @Override
128         public int hashCode() {
129             final int prime = 31;
130             int result = 1;
131             result = prime * result + getOuterType().hashCode();
132             result = prime * result
133                     + ((listenerName == null) ? 0 : listenerName.hashCode());
134             return result;
135         }
136
137         private DataPacketService getOuterType() {
138             return DataPacketService.this;
139         }
140     }
141
142     /**
143      * This very expensive version of List is being used because it
144      * work well in concurrent situation, as we expect new service
145      * addition, service removal and walk of the service will happen
146      * from different places
147      */
148     private List<List<DataPacketListener>> listenDataPacket = new CopyOnWriteArrayList<List<DataPacketListener>>();
149     // Quick index to make sure there are no duplicate elements
150     private Set<DataPacketListener> indexDataPacket = Collections
151             .synchronizedSet(new HashSet<DataPacketListener>());
152
153     /**
154      * Loop for processing Received packets
155      *
156      */
157     private class RxLoop implements Runnable {
158         public void run() {
159             RawPacket pkt;
160             try {
161                 for (pkt = rxQueue.take(); pkt != null; pkt = rxQueue.take()) {
162                     for (List<DataPacketListener> serialListeners : listenDataPacket) {
163                         int i = 0;
164                         for (i = 0; i < serialListeners.size(); i++) {
165                             RawPacket copyPkt = null;
166                             try {
167                                 copyPkt = new RawPacket(pkt);
168                             } catch (ConstructionException cex) {
169                                 logger.debug("Error while cloning the packet");
170                             }
171                             if (copyPkt == null) {
172                                 increaseStat("RXPacketCopyFailed");
173                                 continue;
174                             }
175                             DataPacketListener l = serialListeners.get(i);
176                             IListenDataPacket s = (l == null ? null
177                                     : l.listener);
178                             if (s != null) {
179                                 try {
180                                     // TODO Make sure to filter based
181                                     // on the match too, later on
182                                     PacketResult res = s
183                                             .receiveDataPacket(copyPkt);
184                                     increaseStat("RXPacketSuccess");
185                                     if (res.equals(PacketResult.CONSUME)) {
186                                         increaseStat("RXPacketSerialExit");
187                                         break;
188                                     }
189                                 } catch (Exception e) {
190                                     increaseStat("RXPacketFailedForException");
191                                 }
192                             }
193                         }
194                     }
195                 }
196             } catch (InterruptedException e) {
197                 // Not a big deal
198             }
199         }
200     }
201
202     /**
203      * Loop for processing packets to be transmitted
204      *
205      */
206     private class TxLoop implements Runnable {
207         public void run() {
208             RawPacket pkt;
209             try {
210                 for (pkt = txQueue.take(); pkt != null; pkt = txQueue.take()) {
211                     // Retrieve outgoing node connector so to send out
212                     // the packet to corresponding node
213                     NodeConnector p = pkt.getOutgoingNodeConnector();
214                     if (p != null) {
215                         String t = p.getNode()
216                                 .getType();
217                         // Now locate the TX dispatcher
218                         IPluginInDataPacketService s = pluginInDataService
219                                 .get(t);
220                         if (s != null) {
221                             try {
222                                 s.transmitDataPacket(pkt);
223                                 increaseStat("TXPacketSuccess");
224                             } catch (Exception e) {
225                                 increaseStat("TXPacketFailedForException");
226                             }
227                         } else {
228                             increaseStat("TXpluginNotFound");
229                         }
230                     }
231                 }
232             } catch (InterruptedException e) {
233                 // Not a big deal
234             }
235         }
236     }
237
238     void setPluginInDataService(Map props, IPluginInDataPacketService s) {
239         if (this.pluginInDataService == null) {
240             logger.error("pluginInDataService store null");
241             return;
242         }
243         String type = null;
244         logger.trace("Received setPluginInDataService request");
245         for (Object e : props.entrySet()) {
246             Map.Entry entry = (Map.Entry) e;
247             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
248         }
249
250         Object value = props.get("protocolPluginType");
251         if (value instanceof String) {
252             type = (String) value;
253         }
254         if (type == null) {
255             logger.error("Received a PluginInDataService without any "
256                     + "protocolPluginType provided");
257         } else {
258             this.pluginInDataService.put(type, s);
259             logger.debug("Stored the PluginInDataService for type: {}", type);
260         }
261     }
262
263     void unsetPluginInDataService(Map props, IPluginInDataPacketService s) {
264         if (this.pluginInDataService == null) {
265             logger.error("pluginInDataService store null");
266             return;
267         }
268
269         String type = null;
270         logger.trace("Received unsetPluginInDataService request");
271         for (Object e : props.entrySet()) {
272             Map.Entry entry = (Map.Entry) e;
273             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
274         }
275
276         Object value = props.get("protocoloPluginType");
277         if (value instanceof String) {
278             type = (String) value;
279         }
280         if (type == null) {
281             logger.error("Received a PluginInDataService without any "
282                     + "protocolPluginType provided");
283         } else if (this.pluginInDataService.get(type).equals(s)) {
284             this.pluginInDataService.remove(type);
285             logger.debug("Removed the PluginInDataService for type: {}", type);
286         }
287     }
288
289     void setListenDataPacket(Map props, IListenDataPacket s) {
290         if (this.listenDataPacket == null || this.indexDataPacket == null) {
291             logger.error("data structure to store data is NULL");
292             return;
293         }
294         logger.trace("Received setListenDataPacket request");
295         for (Object e : props.entrySet()) {
296             Map.Entry entry = (Map.Entry) e;
297             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
298         }
299
300         String listenerName = null;
301         String listenerDependency = null;
302         Match filter = null;
303         Object value;
304         // Read the listenerName
305         value = props.get("salListenerName");
306         if (value instanceof String) {
307             listenerName = (String) value;
308         }
309
310         if (listenerName == null) {
311             logger.error("Trying to set a listener without a Name");
312             return;
313         }
314
315         //Read the dependency
316         value = props.get("salListenerDependency");
317         if (value instanceof String) {
318             listenerDependency = (String) value;
319         }
320
321         //Read match filter if any
322         value = props.get("salListenerFilter");
323         if (value instanceof Match) {
324             filter = (Match) value;
325         }
326
327         DataPacketListener l = new DataPacketListener(listenerName, s,
328                 listenerDependency, filter);
329
330         DataPacketListener lDependency = new DataPacketListener(
331                 listenerDependency, null, null, null);
332
333         // Now let see if there is any dependency
334         if (listenerDependency == null) {
335             logger.debug("listener without any dependency");
336             if (this.indexDataPacket.contains(l)) {
337                 logger.error("trying to add an existing element");
338             } else {
339                 logger.debug("adding listener: {}", listenerName);
340                 CopyOnWriteArrayList<DataPacketListener> serialListeners = new CopyOnWriteArrayList<DataPacketListener>();
341                 serialListeners.add(l);
342                 this.listenDataPacket.add(serialListeners);
343                 this.indexDataPacket.add(l);
344             }
345         } else {
346             logger.debug("listener with dependency");
347             // Now search for the dependency and put things in order
348             if (this.indexDataPacket.contains(l)) {
349                 logger.error("trying to add an existing element");
350             } else {
351                 logger.debug("adding listener: {}", listenerName);
352                 // Lets find the set with the dependency in it, if we
353                 // find it lets just add our dependency at the end of
354                 // the list.
355                 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
356                     int i = 0;
357                     boolean done = false;
358                     if (serialListeners.contains(lDependency)) {
359                         serialListeners.add(l);
360                         done = true;
361                     }
362                     // If we did fine the element, lets break early
363                     if (done) {
364                         break;
365                     }
366                 }
367
368                 this.indexDataPacket.add(l);
369             }
370         }
371     }
372
373     void unsetListenDataPacket(Map props, IListenDataPacket s) {
374         if (this.listenDataPacket == null || this.indexDataPacket == null) {
375             logger.error("data structure to store data is NULL");
376             return;
377         }
378         logger.trace("Received UNsetListenDataPacket request");
379         for (Object e : props.entrySet()) {
380             Map.Entry entry = (Map.Entry) e;
381             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
382         }
383
384         String listenerName = null;
385         Object value;
386         // Read the listenerName
387         value = props.get("salListenerName");
388         if (value instanceof String) {
389             listenerName = (String) value;
390         }
391
392         if (listenerName == null) {
393             logger.error("Trying to set a listener without a Name");
394             return;
395         }
396
397         DataPacketListener l = new DataPacketListener(listenerName, s, null,
398                 null);
399         if (!this.indexDataPacket.contains(l)) {
400             logger.error("trying to remove a non-existing element");
401         } else {
402             logger.debug("removing listener: {}", listenerName);
403             for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
404                 int i = 0;
405                 boolean done = false;
406                 for (i = 0; i < serialListeners.size(); i++) {
407                     if (serialListeners.get(i).equals(l)) {
408                         serialListeners.remove(i);
409                         done = true;
410                         break;
411                     }
412                 }
413                 // Now remove a serialListener that maybe empty
414                 if (serialListeners.isEmpty()) {
415                     this.listenDataPacket.remove(serialListeners);
416                 }
417                 // If we did fine the element, lets break early
418                 if (done) {
419                     break;
420                 }
421             }
422
423             this.indexDataPacket.remove(l);
424         }
425     }
426
427     /**
428      * Function called by the dependency manager when all the required
429      * dependencies are satisfied
430      *
431      */
432     void init() {
433         this.txThread.start();
434         this.rxThread.start();
435     }
436
437     /**
438      * Function called by the dependency manager when at least one
439      * dependency become unsatisfied or when the component is shutting
440      * down because for example bundle is being stopped.
441      *
442      */
443     void destroy() {
444         // Make sure to cleanup the data structure we use to track
445         // services
446         this.listenDataPacket.clear();
447         this.indexDataPacket.clear();
448         this.pluginInDataService.clear();
449         this.statistics.clear();
450         this.rxQueue.clear();
451         this.txQueue.clear();
452         this.txThread.interrupt();
453         this.rxThread.interrupt();
454         // Wait for them to be done
455         try {
456             this.txThread.join();
457             this.rxThread.join();
458         } catch (InterruptedException ex) {
459             // Not a big deal
460         }
461     }
462
463     private void increaseStat(String name) {
464         if (this.statistics == null) {
465             return;
466         }
467
468         AtomicInteger currValue = null;
469         synchronized (this.statistics) {
470             currValue = this.statistics.get(name);
471
472             if (currValue == null) {
473                 this.statistics.put(name, new AtomicInteger(0));
474                 return;
475             }
476         }
477         currValue.incrementAndGet();
478     }
479
480     @Override
481     public PacketResult receiveDataPacket(RawPacket inPkt) {
482         if (inPkt.getIncomingNodeConnector() == null) {
483             increaseStat("nullIncomingNodeConnector");
484             return PacketResult.IGNORED;
485         }
486
487         // If the queue was full don't wait, rather increase a counter
488         // for it
489         if (!this.rxQueue.offer(inPkt)) {
490             increaseStat("fullRXQueue");
491             return PacketResult.IGNORED;
492         }
493
494         // Walk the chain of listener going first throw all the
495         // parallel ones and for each parallel in serial
496         return PacketResult.IGNORED;
497     }
498
499     @Override
500     public void transmitDataPacket(RawPacket outPkt) {
501         if (outPkt.getOutgoingNodeConnector() == null) {
502             increaseStat("nullOutgoingNodeConnector");
503             return;
504         }
505
506         if (!this.txQueue.offer(outPkt)) {
507             increaseStat("fullTXQueue");
508             return;
509         }
510     }
511
512     @Override
513     public Packet decodeDataPacket(RawPacket pkt) {
514         // Sanity checks
515         if (pkt == null) {
516             return null;
517         }
518         byte[] data = pkt.getPacketData();
519         if (data.length <= 0) {
520             return null;
521         }
522         if (pkt.getEncap().equals(LinkEncap.ETHERNET)) {
523             Ethernet res = new Ethernet();
524             try {
525                 res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
526             } catch (Exception e) {
527                 logger.warn("Failed to decode packet: {}", e.getMessage());
528             }
529             return res;
530         }
531         return null;
532     }
533
534     @Override
535     public RawPacket encodeDataPacket(Packet pkt) {
536         // Sanity checks
537         if (pkt == null) {
538             return null;
539         }
540         byte[] data;
541         try {
542             data = pkt.serialize();
543         } catch (Exception e) {
544             logger.error("",e);
545             return null;
546         }
547         if (data.length <= 0) {
548             return null;
549         }
550         try {
551             RawPacket res = new RawPacket(data);
552             return res;
553         } catch (ConstructionException cex) {
554         }
555         // If something goes wrong then we have to return null
556         return null;
557     }
558 }