7741945e73db55a56a496444375747f146adaca0
[controller.git] / opendaylight / sal / implementation / src / main / java / org / opendaylight / controller / sal / implementation / internal / DataPacketService.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 /**
11  * @file   DataPacketService.java
12  *
13  * @brief  Implementation of Data Packet services in SAL
14  *
15  * Implementation of Data Packet services in SAL
16  */
17
18 package org.opendaylight.controller.sal.implementation.internal;
19
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.apache.commons.lang3.builder.EqualsBuilder;
32 import org.apache.commons.lang3.builder.HashCodeBuilder;
33 import org.opendaylight.controller.sal.core.ConstructionException;
34 import org.opendaylight.controller.sal.core.NodeConnector;
35 import org.opendaylight.controller.sal.match.Match;
36 import org.opendaylight.controller.sal.packet.Ethernet;
37 import org.opendaylight.controller.sal.packet.IDataPacketService;
38 import org.opendaylight.controller.sal.packet.IListenDataPacket;
39 import org.opendaylight.controller.sal.packet.IPluginInDataPacketService;
40 import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService;
41 import org.opendaylight.controller.sal.packet.LinkEncap;
42 import org.opendaylight.controller.sal.packet.Packet;
43 import org.opendaylight.controller.sal.packet.PacketResult;
44 import org.opendaylight.controller.sal.packet.RawPacket;
45 import org.opendaylight.controller.sal.utils.NetUtils;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 public class DataPacketService implements IPluginOutDataPacketService,
50         IDataPacketService {
51     private int RXMAXQUEUESIZE = 1000;
52     private int TXMAXQUEUESIZE = 1000;
53     protected static final Logger logger = LoggerFactory
54             .getLogger(DataPacketService.class);
55     /**
56      * Database that associates one NodeIDType to the
57      * IPluginDataPacketService, in fact we expect that there will be
58      * one instance of IPluginDataPacketService for each southbound
59      * plugin.
60      * Using the ConcurrentHashMap because the threads that will be
61      * adding a new service, removing a service, going through all of
62      * them maybe different.
63      */
64     private ConcurrentHashMap<String, IPluginInDataPacketService>
65         pluginInDataService =
66         new ConcurrentHashMap<String, IPluginInDataPacketService>();
67     private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
68     /**
69      * Queue for packets received from Data Path
70      */
71     private LinkedBlockingQueue<RawPacket> rxQueue = new LinkedBlockingQueue<RawPacket>(
72             RXMAXQUEUESIZE);
73     /**
74      * Queue for packets that need to be transmitted to Data Path
75      */
76     private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
77             RXMAXQUEUESIZE);
78     /**
79      * Transmission thread
80      */
81     private Thread txThread = new Thread(new TxLoop(),
82             "DataPacketService TX thread");
83     /**
84      * Receiving thread
85      */
86     private Thread rxThread = new Thread(new RxLoop(),
87             "DataPacketService RX thread");
88
89     /**
90      * Representation of a Data Packet Listener including of its
91      * properties
92      *
93      */
94     private class DataPacketListener {
95         // Key fields
96         private String listenerName;
97         // Attribute fields
98         private IListenDataPacket listener;
99         private String dependency;
100         private Match match;
101
102         DataPacketListener(String name, IListenDataPacket s, String dependency,
103                 Match match) {
104             this.listenerName = name;
105             this.listener = s;
106             this.dependency = dependency;
107             this.match = match;
108         }
109
110         @Override
111         public boolean equals(Object obj) {
112             if (obj == null) {
113                 return false;
114             }
115             if (obj == this) {
116                 return true;
117             }
118             if (obj.getClass() != getClass()) {
119                 return false;
120             }
121             DataPacketListener rhs = (DataPacketListener) obj;
122             return new EqualsBuilder().append(this.listenerName,
123                     rhs.listenerName).isEquals();
124         }
125
126         @Override
127         public int hashCode() {
128             return new HashCodeBuilder(13, 31).append(listenerName)
129                     .toHashCode();
130         }
131     }
132
133     /**
134      * This very expensive version of List is being used because it
135      * work well in concurrent situation, as we expect new service
136      * addition, service removal and walk of the service will happen
137      * from different places
138      */
139     private List<List<DataPacketListener>> listenDataPacket = new CopyOnWriteArrayList<List<DataPacketListener>>();
140     // Quick index to make sure there are no duplicate elements
141     private Set<DataPacketListener> indexDataPacket = Collections
142             .synchronizedSet(new HashSet<DataPacketListener>());
143
144     /**
145      * Loop for processing Received packets
146      *
147      */
148     private class RxLoop implements Runnable {
149         public void run() {
150             RawPacket pkt;
151             try {
152                 for (pkt = rxQueue.take(); pkt != null; pkt = rxQueue.take()) {
153                     for (List<DataPacketListener> serialListeners : listenDataPacket) {
154                         int i = 0;
155                         for (i = 0; i < serialListeners.size(); i++) {
156                             RawPacket copyPkt = null;
157                             try {
158                                 copyPkt = new RawPacket(pkt);
159                             } catch (ConstructionException cex) {
160                                 logger.debug("Error while cloning the packet");
161                             }
162                             if (copyPkt == null) {
163                                 increaseStat("RXPacketCopyFailed");
164                                 continue;
165                             }
166                             DataPacketListener l = serialListeners.get(i);
167                             IListenDataPacket s = (l == null ? null
168                                     : l.listener);
169                             if (s != null) {
170                                 try {
171                                     // TODO Make sure to filter based
172                                     // on the match too, later on
173                                     PacketResult res = s
174                                             .receiveDataPacket(copyPkt);
175                                     increaseStat("RXPacketSuccess");
176                                     if (res.equals(PacketResult.CONSUME)) {
177                                         increaseStat("RXPacketSerialExit");
178                                         break;
179                                     }
180                                 } catch (Exception e) {
181                                     increaseStat("RXPacketFailedForException");
182                                 }
183                             }
184                         }
185                     }
186                 }
187             } catch (InterruptedException e) {
188                 // Not a big deal
189             }
190         }
191     }
192
193     /**
194      * Loop for processing packets to be transmitted
195      *
196      */
197     private class TxLoop implements Runnable {
198         public void run() {
199             RawPacket pkt;
200             try {
201                 for (pkt = txQueue.take(); pkt != null; pkt = txQueue.take()) {
202                     // Retrieve outgoing node connector so to send out
203                     // the packet to corresponding node
204                     NodeConnector p = pkt.getOutgoingNodeConnector();
205                     if (p != null) {
206                         String t = p.getNode()
207                                 .getType();
208                         // Now locate the TX dispatcher
209                         IPluginInDataPacketService s = pluginInDataService
210                                 .get(t);
211                         if (s != null) {
212                             try {
213                                 s.transmitDataPacket(pkt);
214                                 increaseStat("TXPacketSuccess");
215                             } catch (Exception e) {
216                                 increaseStat("TXPacketFailedForException");
217                             }
218                         } else {
219                             increaseStat("TXpluginNotFound");
220                         }
221                     }
222                 }
223             } catch (InterruptedException e) {
224                 // Not a big deal
225             }
226         }
227     }
228
229     void setPluginInDataService(Map props, IPluginInDataPacketService s) {
230         if (this.pluginInDataService == null) {
231             logger.error("pluginInDataService store null");
232             return;
233         }
234         String type = null;
235         logger.trace("Received setPluginInDataService request");
236         for (Object e : props.entrySet()) {
237             Map.Entry entry = (Map.Entry) e;
238             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
239         }
240
241         Object value = props.get("protocolPluginType");
242         if (value instanceof String) {
243             type = (String) value;
244         }
245         if (type == null) {
246             logger.error("Received a PluginInDataService without any "
247                     + "protocolPluginType provided");
248         } else {
249             this.pluginInDataService.put(type, s);
250             logger.debug("Stored the PluginInDataService for type: {}", type);
251         }
252     }
253
254     void unsetPluginInDataService(Map props, IPluginInDataPacketService s) {
255         if (this.pluginInDataService == null) {
256             logger.error("pluginInDataService store null");
257             return;
258         }
259
260         String type = null;
261         logger.trace("Received unsetPluginInDataService request");
262         for (Object e : props.entrySet()) {
263             Map.Entry entry = (Map.Entry) e;
264             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
265         }
266
267         Object value = props.get("protocoloPluginType");
268         if (value instanceof String) {
269             type = (String) value;
270         }
271         if (type == null) {
272             logger.error("Received a PluginInDataService without any "
273                     + "protocolPluginType provided");
274         } else if (this.pluginInDataService.get(type).equals(s)) {
275             this.pluginInDataService.remove(type);
276             logger.debug("Removed the PluginInDataService for type: {}", type);
277         }
278     }
279
280     void setListenDataPacket(Map props, IListenDataPacket s) {
281         if (this.listenDataPacket == null || this.indexDataPacket == null) {
282             logger.error("data structure to store data is NULL");
283             return;
284         }
285         logger.trace("Received setListenDataPacket request");
286         for (Object e : props.entrySet()) {
287             Map.Entry entry = (Map.Entry) e;
288             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
289         }
290
291         String listenerName = null;
292         String listenerDependency = null;
293         Match filter = null;
294         Object value;
295         // Read the listenerName
296         value = props.get("salListenerName");
297         if (value instanceof String) {
298             listenerName = (String) value;
299         }
300
301         if (listenerName == null) {
302             logger.error("Trying to set a listener without a Name");
303             return;
304         }
305
306         //Read the dependency
307         value = props.get("salListenerDependency");
308         if (value instanceof String) {
309             listenerDependency = (String) value;
310         }
311
312         //Read match filter if any
313         value = props.get("salListenerFilter");
314         if (value instanceof Match) {
315             filter = (Match) value;
316         }
317
318         DataPacketListener l = new DataPacketListener(listenerName, s,
319                 listenerDependency, filter);
320
321         DataPacketListener lDependency = new DataPacketListener(
322                 listenerDependency, null, null, null);
323
324         // Now let see if there is any dependency
325         if (listenerDependency == null) {
326             logger.debug("listener without any dependency");
327             if (this.indexDataPacket.contains(l)) {
328                 logger.error("trying to add an existing element");
329             } else {
330                 logger.debug("adding listener: {}", listenerName);
331                 CopyOnWriteArrayList<DataPacketListener> serialListeners = new CopyOnWriteArrayList<DataPacketListener>();
332                 serialListeners.add(l);
333                 this.listenDataPacket.add(serialListeners);
334                 this.indexDataPacket.add(l);
335             }
336         } else {
337             logger.debug("listener with dependency");
338             // Now search for the dependency and put things in order
339             if (this.indexDataPacket.contains(l)) {
340                 logger.error("trying to add an existing element");
341             } else {
342                 logger.debug("adding listener: {}", listenerName);
343                 // Lets find the set with the dependency in it, if we
344                 // find it lets just add our dependency at the end of
345                 // the list.
346                 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
347                     int i = 0;
348                     boolean done = false;
349                     if (serialListeners.contains(lDependency)) {
350                         serialListeners.add(l);
351                         done = true;
352                     }
353                     // If we did fine the element, lets break early
354                     if (done) {
355                         break;
356                     }
357                 }
358
359                 this.indexDataPacket.add(l);
360             }
361         }
362     }
363
364     void unsetListenDataPacket(Map props, IListenDataPacket s) {
365         if (this.listenDataPacket == null || this.indexDataPacket == null) {
366             logger.error("data structure to store data is NULL");
367             return;
368         }
369         logger.trace("Received UNsetListenDataPacket request");
370         for (Object e : props.entrySet()) {
371             Map.Entry entry = (Map.Entry) e;
372             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
373         }
374
375         String listenerName = null;
376         Object value;
377         // Read the listenerName
378         value = props.get("salListenerName");
379         if (value instanceof String) {
380             listenerName = (String) value;
381         }
382
383         if (listenerName == null) {
384             logger.error("Trying to set a listener without a Name");
385             return;
386         }
387
388         DataPacketListener l = new DataPacketListener(listenerName, s, null,
389                 null);
390         if (!this.indexDataPacket.contains(l)) {
391             logger.error("trying to remove a non-existing element");
392         } else {
393             logger.debug("removing listener: {}", listenerName);
394             for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
395                 int i = 0;
396                 boolean done = false;
397                 for (i = 0; i < serialListeners.size(); i++) {
398                     if (serialListeners.get(i).equals(l)) {
399                         serialListeners.remove(i);
400                         done = true;
401                         break;
402                     }
403                 }
404                 // Now remove a serialListener that maybe empty
405                 if (serialListeners.isEmpty()) {
406                     this.listenDataPacket.remove(serialListeners);
407                 }
408                 // If we did fine the element, lets break early
409                 if (done) {
410                     break;
411                 }
412             }
413
414             this.indexDataPacket.remove(l);
415         }
416     }
417
418     /**
419      * Function called by the dependency manager when all the required
420      * dependencies are satisfied
421      *
422      */
423     void init() {
424         this.txThread.start();
425         this.rxThread.start();
426     }
427
428     /**
429      * Function called by the dependency manager when at least one
430      * dependency become unsatisfied or when the component is shutting
431      * down because for example bundle is being stopped.
432      *
433      */
434     void destroy() {
435         // Make sure to cleanup the data structure we use to track
436         // services
437         this.listenDataPacket.clear();
438         this.indexDataPacket.clear();
439         this.pluginInDataService.clear();
440         this.statistics.clear();
441         this.rxQueue.clear();
442         this.txQueue.clear();
443         this.txThread.interrupt();
444         this.rxThread.interrupt();
445         // Wait for them to be done
446         try {
447             this.txThread.join();
448             this.rxThread.join();
449         } catch (InterruptedException ex) {
450             // Not a big deal
451         }
452     }
453
454     private void increaseStat(String name) {
455         if (this.statistics == null) {
456             return;
457         }
458
459         AtomicInteger currValue = null;
460         synchronized (this.statistics) {
461             currValue = this.statistics.get(name);
462
463             if (currValue == null) {
464                 this.statistics.put(name, new AtomicInteger(0));
465                 return;
466             }
467         }
468         currValue.incrementAndGet();
469     }
470
471     @Override
472     public PacketResult receiveDataPacket(RawPacket inPkt) {
473         if (inPkt.getIncomingNodeConnector() == null) {
474             increaseStat("nullIncomingNodeConnector");
475             return PacketResult.IGNORED;
476         }
477
478         // If the queue was full don't wait, rather increase a counter
479         // for it
480         if (!this.rxQueue.offer(inPkt)) {
481             increaseStat("fullRXQueue");
482             return PacketResult.IGNORED;
483         }
484
485         // Walk the chain of listener going first throw all the
486         // parallel ones and for each parallel in serial
487         return PacketResult.IGNORED;
488     }
489
490     @Override
491     public void transmitDataPacket(RawPacket outPkt) {
492         if (outPkt.getOutgoingNodeConnector() == null) {
493             increaseStat("nullOutgoingNodeConnector");
494             return;
495         }
496
497         if (!this.txQueue.offer(outPkt)) {
498             increaseStat("fullTXQueue");
499             return;
500         }
501     }
502
503     @Override
504     public Packet decodeDataPacket(RawPacket pkt) {
505         // Sanity checks
506         if (pkt == null) {
507             return null;
508         }
509         byte[] data = pkt.getPacketData();
510         if (data.length <= 0) {
511             return null;
512         }
513         if (pkt.getEncap().equals(LinkEncap.ETHERNET)) {
514             Ethernet res = new Ethernet();
515             try {
516                 res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
517             } catch (Exception e) {
518                 logger.warn("Failed to decode packet: {}", e.getMessage());
519             }
520             return res;
521         }
522         return null;
523     }
524
525     @Override
526     public RawPacket encodeDataPacket(Packet pkt) {
527         // Sanity checks
528         if (pkt == null) {
529             return null;
530         }
531         byte[] data;
532         try {
533             data = pkt.serialize();
534         } catch (Exception e) {
535             logger.error("",e);
536             return null;
537         }
538         if (data.length <= 0) {
539             return null;
540         }
541         try {
542             RawPacket res = new RawPacket(data);
543             return res;
544         } catch (ConstructionException cex) {
545         }
546         // If something goes wrong then we have to return null
547         return null;
548     }
549 }