Move stats caching to FM StatisticsManager
[controller.git] / opendaylight / sal / implementation / src / main / java / org / opendaylight / controller / sal / implementation / internal / DataPacketService.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 /**
11  * @file   DataPacketService.java
12  *
13  * @brief  Implementation of Data Packet services in SAL
14  *
15  * Implementation of Data Packet services in SAL
16  */
17
18 package org.opendaylight.controller.sal.implementation.internal;
19
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.opendaylight.controller.sal.core.ConstructionException;
32 import org.opendaylight.controller.sal.core.NodeConnector;
33 import org.opendaylight.controller.sal.match.Match;
34 import org.opendaylight.controller.sal.packet.Ethernet;
35 import org.opendaylight.controller.sal.packet.IDataPacketService;
36 import org.opendaylight.controller.sal.packet.IListenDataPacket;
37 import org.opendaylight.controller.sal.packet.IPluginInDataPacketService;
38 import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService;
39 import org.opendaylight.controller.sal.packet.LinkEncap;
40 import org.opendaylight.controller.sal.packet.Packet;
41 import org.opendaylight.controller.sal.packet.PacketResult;
42 import org.opendaylight.controller.sal.packet.RawPacket;
43 import org.opendaylight.controller.sal.utils.GlobalConstants;
44 import org.opendaylight.controller.sal.utils.NetUtils;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 public class DataPacketService implements IPluginOutDataPacketService,
49         IDataPacketService {
50     private int TXMAXQUEUESIZE = 1000;
51     protected static final Logger logger = LoggerFactory
52             .getLogger(DataPacketService.class);
53     /**
54      * Database that associates one NodeIDType to the
55      * IPluginDataPacketService, in fact we expect that there will be
56      * one instance of IPluginDataPacketService for each southbound
57      * plugin.
58      * Using the ConcurrentHashMap because the threads that will be
59      * adding a new service, removing a service, going through all of
60      * them maybe different.
61      */
62     private ConcurrentHashMap<String, IPluginInDataPacketService>
63         pluginInDataService =
64         new ConcurrentHashMap<String, IPluginInDataPacketService>();
65     private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
66
67     /**
68      * Queue for packets that need to be transmitted to Data Path
69      */
70     private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
71             TXMAXQUEUESIZE);
72     /**
73      * Transmission thread
74      */
75     private Thread txThread = new Thread(new TxLoop(),
76             "DataPacketService TX thread");
77
78     /**
79      * Representation of a Data Packet Listener including of its
80      * properties
81      */
82     private class DataPacketListener {
83         // Key fields
84         private String listenerName;
85         // Attribute fields
86         private IListenDataPacket listener;
87         private String dependency;
88         private Match match;
89
90         DataPacketListener(String name, IListenDataPacket s, String dependency,
91                 Match match) {
92             this.listenerName = name;
93             this.listener = s;
94             this.dependency = dependency;
95             this.match = match;
96         }
97
98         @Override
99         public boolean equals(Object obj) {
100             if (this == obj)
101                 return true;
102             if (obj == null)
103                 return false;
104             if (getClass() != obj.getClass())
105                 return false;
106             DataPacketListener other = (DataPacketListener) obj;
107             if (!getOuterType().equals(other.getOuterType()))
108                 return false;
109             if (listenerName == null) {
110                 if (other.listenerName != null)
111                     return false;
112             } else if (!listenerName.equals(other.listenerName))
113                 return false;
114             return true;
115         }
116
117         @Override
118         public int hashCode() {
119             final int prime = 31;
120             int result = 1;
121             result = prime * result + getOuterType().hashCode();
122             result = prime * result
123                     + ((listenerName == null) ? 0 : listenerName.hashCode());
124             return result;
125         }
126
127         private DataPacketService getOuterType() {
128             return DataPacketService.this;
129         }
130     }
131
132     /**
133      * This very expensive version of List is being used because it
134      * work well in concurrent situation, as we expect new service
135      * addition, service removal and walk of the service will happen
136      * from different places
137      */
138     private List<List<DataPacketListener>> listenDataPacket = new CopyOnWriteArrayList<List<DataPacketListener>>();
139     // Quick index to make sure there are no duplicate elements
140     private Set<DataPacketListener> indexDataPacket = Collections
141             .synchronizedSet(new HashSet<DataPacketListener>());
142
143     /**
144      * Loop for processing Received packets
145      */
146     private void dispatchPacket(RawPacket pkt) {
147
148         // for now we treat all listeners as serial listeners
149         for (List<DataPacketListener> serialListeners : listenDataPacket) {
150             for (DataPacketListener l : serialListeners) {
151
152                 // TODO: possibly deal with read-only and read-write packet
153                 // copies
154                 IListenDataPacket s = (l == null ? null : l.listener);
155                 if (s != null) {
156                     try {
157                         // TODO Make sure to filter based on the match too,
158                         // later on
159                         PacketResult res = s.receiveDataPacket(pkt);
160                         increaseStat("RXPacketSuccess");
161                         if (res.equals(PacketResult.CONSUME)) {
162                             increaseStat("RXPacketSerialExit");
163                             break;
164                         }
165                     } catch (Exception e) {
166                         increaseStat("RXPacketFailedForException");
167                     }
168                 }
169             }
170         }
171     }
172
173     /**
174      * Loop for processing packets to be transmitted
175      *
176      */
177     private class TxLoop implements Runnable {
178         public void run() {
179             RawPacket pkt;
180             try {
181                 for (pkt = txQueue.take(); pkt != null; pkt = txQueue.take()) {
182                     // Retrieve outgoing node connector so to send out
183                     // the packet to corresponding node
184                     NodeConnector p = pkt.getOutgoingNodeConnector();
185                     if (p != null) {
186                         String t = p.getNode()
187                                 .getType();
188                         // Now locate the TX dispatcher
189                         IPluginInDataPacketService s = pluginInDataService
190                                 .get(t);
191                         if (s != null) {
192                             try {
193                                 s.transmitDataPacket(pkt);
194                                 increaseStat("TXPacketSuccess");
195                             } catch (Exception e) {
196                                 increaseStat("TXPacketFailedForException");
197                             }
198                         } else {
199                             increaseStat("TXpluginNotFound");
200                         }
201                     }
202                 }
203             } catch (InterruptedException e) {
204                 // Not a big deal
205             }
206         }
207     }
208
209     void setPluginInDataService(Map props, IPluginInDataPacketService s) {
210         if (this.pluginInDataService == null) {
211             logger.error("pluginInDataService store null");
212             return;
213         }
214         String type = null;
215         logger.trace("Received setPluginInDataService request");
216         for (Object e : props.entrySet()) {
217             Map.Entry entry = (Map.Entry) e;
218             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
219         }
220
221         Object value = props.get(GlobalConstants.PROTOCOLPLUGINTYPE.toString());
222         if (value instanceof String) {
223             type = (String) value;
224         }
225         if (type == null) {
226             logger.error("Received a PluginInDataService without any "
227                     + "protocolPluginType provided");
228         } else {
229             this.pluginInDataService.put(type, s);
230             logger.debug("Stored the PluginInDataService for type: {}", type);
231         }
232     }
233
234     void unsetPluginInDataService(Map props, IPluginInDataPacketService s) {
235         if (this.pluginInDataService == null) {
236             logger.error("pluginInDataService store null");
237             return;
238         }
239
240         String type = null;
241         logger.trace("Received unsetPluginInDataService request");
242         for (Object e : props.entrySet()) {
243             Map.Entry entry = (Map.Entry) e;
244             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
245         }
246
247         Object value = props.get(GlobalConstants.PROTOCOLPLUGINTYPE.toString());
248         if (value instanceof String) {
249             type = (String) value;
250         }
251         if (type == null) {
252             logger.error("Received a PluginInDataService without any "
253                     + "protocolPluginType provided");
254         } else if (this.pluginInDataService.get(type).equals(s)) {
255             this.pluginInDataService.remove(type);
256             logger.debug("Removed the PluginInDataService for type: {}", type);
257         }
258     }
259
260     void setListenDataPacket(Map props, IListenDataPacket s) {
261         if (this.listenDataPacket == null || this.indexDataPacket == null) {
262             logger.error("data structure to store data is NULL");
263             return;
264         }
265         logger.trace("Received setListenDataPacket request");
266         for (Object e : props.entrySet()) {
267             Map.Entry entry = (Map.Entry) e;
268             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
269         }
270
271         String listenerName = null;
272         String listenerDependency = null;
273         Match filter = null;
274         Object value;
275         // Read the listenerName
276         value = props.get("salListenerName");
277         if (value instanceof String) {
278             listenerName = (String) value;
279         }
280
281         if (listenerName == null) {
282             logger.error("Trying to set a listener without a Name");
283             return;
284         }
285
286         //Read the dependency
287         value = props.get("salListenerDependency");
288         if (value instanceof String) {
289             listenerDependency = (String) value;
290         }
291
292         //Read match filter if any
293         value = props.get("salListenerFilter");
294         if (value instanceof Match) {
295             filter = (Match) value;
296         }
297
298         DataPacketListener l = new DataPacketListener(listenerName, s,
299                 listenerDependency, filter);
300
301         DataPacketListener lDependency = new DataPacketListener(
302                 listenerDependency, null, null, null);
303
304         // Now let see if there is any dependency
305         if (listenerDependency == null) {
306             logger.debug("listener without any dependency");
307             if (this.indexDataPacket.contains(l)) {
308                 logger.error("trying to add an existing element");
309             } else {
310                 logger.debug("adding listener: {}", listenerName);
311                 CopyOnWriteArrayList<DataPacketListener> serialListeners = new CopyOnWriteArrayList<DataPacketListener>();
312                 serialListeners.add(l);
313                 this.listenDataPacket.add(serialListeners);
314                 this.indexDataPacket.add(l);
315             }
316         } else {
317             logger.debug("listener with dependency");
318             // Now search for the dependency and put things in order
319             if (this.indexDataPacket.contains(l)) {
320                 logger.error("trying to add an existing element");
321             } else {
322                 logger.debug("adding listener: {}", listenerName);
323                 // Lets find the set with the dependency in it, if we
324                 // find it lets just add our dependency at the end of
325                 // the list.
326                 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
327                     int i = 0;
328                     boolean done = false;
329                     if (serialListeners.contains(lDependency)) {
330                         serialListeners.add(l);
331                         done = true;
332                     }
333                     // If we did fine the element, lets break early
334                     if (done) {
335                         break;
336                     }
337                 }
338
339                 this.indexDataPacket.add(l);
340             }
341         }
342     }
343
344     void unsetListenDataPacket(Map props, IListenDataPacket s) {
345         if (this.listenDataPacket == null || this.indexDataPacket == null) {
346             logger.error("data structure to store data is NULL");
347             return;
348         }
349         logger.trace("Received UNsetListenDataPacket request");
350         for (Object e : props.entrySet()) {
351             Map.Entry entry = (Map.Entry) e;
352             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
353         }
354
355         String listenerName = null;
356         Object value;
357         // Read the listenerName
358         value = props.get("salListenerName");
359         if (value instanceof String) {
360             listenerName = (String) value;
361         }
362
363         if (listenerName == null) {
364             logger.error("Trying to set a listener without a Name");
365             return;
366         }
367
368         DataPacketListener l = new DataPacketListener(listenerName, s, null,
369                 null);
370         if (!this.indexDataPacket.contains(l)) {
371             logger.error("trying to remove a non-existing element");
372         } else {
373             logger.debug("removing listener: {}", listenerName);
374             for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
375                 int i = 0;
376                 boolean done = false;
377                 for (i = 0; i < serialListeners.size(); i++) {
378                     if (serialListeners.get(i).equals(l)) {
379                         serialListeners.remove(i);
380                         done = true;
381                         break;
382                     }
383                 }
384                 // Now remove a serialListener that maybe empty
385                 if (serialListeners.isEmpty()) {
386                     this.listenDataPacket.remove(serialListeners);
387                 }
388                 // If we did fine the element, lets break early
389                 if (done) {
390                     break;
391                 }
392             }
393
394             this.indexDataPacket.remove(l);
395         }
396     }
397
398     /**
399      * Function called by the dependency manager when all the required
400      * dependencies are satisfied
401      *
402      */
403     void init() {
404         this.txThread.start();
405     }
406
407     /**
408      * Function called by the dependency manager when at least one
409      * dependency become unsatisfied or when the component is shutting
410      * down because for example bundle is being stopped.
411      *
412      */
413     void destroy() {
414         // Make sure to cleanup the data structure we use to track
415         // services
416         this.listenDataPacket.clear();
417         this.indexDataPacket.clear();
418         this.pluginInDataService.clear();
419         this.statistics.clear();
420         this.txQueue.clear();
421         this.txThread.interrupt();
422         // Wait for them to be done
423         try {
424             this.txThread.join();
425         } catch (InterruptedException ex) {
426             // Not a big deal
427         }
428     }
429
430     private void increaseStat(String name) {
431         if (this.statistics == null) {
432             return;
433         }
434
435         AtomicInteger currValue = null;
436         synchronized (this.statistics) {
437             currValue = this.statistics.get(name);
438
439             if (currValue == null) {
440                 this.statistics.put(name, new AtomicInteger(0));
441                 return;
442             }
443         }
444         currValue.incrementAndGet();
445     }
446
447     @Override
448     public PacketResult receiveDataPacket(RawPacket inPkt) {
449         if (inPkt.getIncomingNodeConnector() == null) {
450             increaseStat("nullIncomingNodeConnector");
451             return PacketResult.IGNORED;
452         }
453
454         // send the packet off to be processed by listeners
455         this.dispatchPacket(inPkt);
456
457         // Walk the chain of listener going first throw all the
458         // parallel ones and for each parallel in serial
459         return PacketResult.IGNORED;
460     }
461
462     @Override
463     public void transmitDataPacket(RawPacket outPkt) {
464         if (outPkt.getOutgoingNodeConnector() == null) {
465             increaseStat("nullOutgoingNodeConnector");
466             return;
467         }
468
469         if (!this.txQueue.offer(outPkt)) {
470             increaseStat("fullTXQueue");
471             return;
472         }
473     }
474
475     @Override
476     public Packet decodeDataPacket(RawPacket pkt) {
477         // Sanity checks
478         if (pkt == null) {
479             return null;
480         }
481         byte[] data = pkt.getPacketData();
482         if (data.length <= 0) {
483             return null;
484         }
485         if (pkt.getEncap().equals(LinkEncap.ETHERNET)) {
486             Ethernet res = new Ethernet();
487             try {
488                 res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
489             } catch (Exception e) {
490                 logger.warn("Failed to decode packet: {}", e.getMessage());
491             }
492             return res;
493         }
494         return null;
495     }
496
497     @Override
498     public RawPacket encodeDataPacket(Packet pkt) {
499         // Sanity checks
500         if (pkt == null) {
501             return null;
502         }
503         byte[] data;
504         try {
505             data = pkt.serialize();
506         } catch (Exception e) {
507             logger.error("",e);
508             return null;
509         }
510         if (data.length <= 0) {
511             return null;
512         }
513         try {
514             RawPacket res = new RawPacket(data);
515             return res;
516         } catch (ConstructionException cex) {
517         }
518         // If something goes wrong then we have to return null
519         return null;
520     }
521 }