removing DataPacketService's receive queue
[controller.git] / opendaylight / sal / implementation / src / main / java / org / opendaylight / controller / sal / implementation / internal / DataPacketService.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 /**
11  * @file   DataPacketService.java
12  *
13  * @brief  Implementation of Data Packet services in SAL
14  *
15  * Implementation of Data Packet services in SAL
16  */
17
18 package org.opendaylight.controller.sal.implementation.internal;
19
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.opendaylight.controller.sal.core.ConstructionException;
32 import org.opendaylight.controller.sal.core.NodeConnector;
33 import org.opendaylight.controller.sal.match.Match;
34 import org.opendaylight.controller.sal.packet.Ethernet;
35 import org.opendaylight.controller.sal.packet.IDataPacketService;
36 import org.opendaylight.controller.sal.packet.IListenDataPacket;
37 import org.opendaylight.controller.sal.packet.IPluginInDataPacketService;
38 import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService;
39 import org.opendaylight.controller.sal.packet.LinkEncap;
40 import org.opendaylight.controller.sal.packet.Packet;
41 import org.opendaylight.controller.sal.packet.PacketResult;
42 import org.opendaylight.controller.sal.packet.RawPacket;
43 import org.opendaylight.controller.sal.utils.NetUtils;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 public class DataPacketService implements IPluginOutDataPacketService,
48         IDataPacketService {
49     private int TXMAXQUEUESIZE = 1000;
50     protected static final Logger logger = LoggerFactory
51             .getLogger(DataPacketService.class);
52     /**
53      * Database that associates one NodeIDType to the
54      * IPluginDataPacketService, in fact we expect that there will be
55      * one instance of IPluginDataPacketService for each southbound
56      * plugin.
57      * Using the ConcurrentHashMap because the threads that will be
58      * adding a new service, removing a service, going through all of
59      * them maybe different.
60      */
61     private ConcurrentHashMap<String, IPluginInDataPacketService>
62         pluginInDataService =
63         new ConcurrentHashMap<String, IPluginInDataPacketService>();
64     private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
65
66     /**
67      * Queue for packets that need to be transmitted to Data Path
68      */
69     private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
70             TXMAXQUEUESIZE);
71     /**
72      * Transmission thread
73      */
74     private Thread txThread = new Thread(new TxLoop(),
75             "DataPacketService TX thread");
76
77     /**
78      * Representation of a Data Packet Listener including of its
79      * properties
80      */
81     private class DataPacketListener {
82         // Key fields
83         private String listenerName;
84         // Attribute fields
85         private IListenDataPacket listener;
86         private String dependency;
87         private Match match;
88
89         DataPacketListener(String name, IListenDataPacket s, String dependency,
90                 Match match) {
91             this.listenerName = name;
92             this.listener = s;
93             this.dependency = dependency;
94             this.match = match;
95         }
96
97         @Override
98         public boolean equals(Object obj) {
99             if (this == obj)
100                 return true;
101             if (obj == null)
102                 return false;
103             if (getClass() != obj.getClass())
104                 return false;
105             DataPacketListener other = (DataPacketListener) obj;
106             if (!getOuterType().equals(other.getOuterType()))
107                 return false;
108             if (listenerName == null) {
109                 if (other.listenerName != null)
110                     return false;
111             } else if (!listenerName.equals(other.listenerName))
112                 return false;
113             return true;
114         }
115
116         @Override
117         public int hashCode() {
118             final int prime = 31;
119             int result = 1;
120             result = prime * result + getOuterType().hashCode();
121             result = prime * result
122                     + ((listenerName == null) ? 0 : listenerName.hashCode());
123             return result;
124         }
125
126         private DataPacketService getOuterType() {
127             return DataPacketService.this;
128         }
129     }
130
131     /**
132      * This very expensive version of List is being used because it
133      * work well in concurrent situation, as we expect new service
134      * addition, service removal and walk of the service will happen
135      * from different places
136      */
137     private List<List<DataPacketListener>> listenDataPacket = new CopyOnWriteArrayList<List<DataPacketListener>>();
138     // Quick index to make sure there are no duplicate elements
139     private Set<DataPacketListener> indexDataPacket = Collections
140             .synchronizedSet(new HashSet<DataPacketListener>());
141
142     /**
143      * Loop for processing Received packets
144      */
145     private void dispatchPacket(RawPacket pkt) {
146
147         // for now we treat all listeners as serial listeners
148         for (List<DataPacketListener> serialListeners : listenDataPacket) {
149             for (DataPacketListener l : serialListeners) {
150
151                 // TODO: possibly deal with read-only and read-write packet
152                 // copies
153                 IListenDataPacket s = (l == null ? null : l.listener);
154                 if (s != null) {
155                     try {
156                         // TODO Make sure to filter based on the match too,
157                         // later on
158                         PacketResult res = s.receiveDataPacket(pkt);
159                         increaseStat("RXPacketSuccess");
160                         if (res.equals(PacketResult.CONSUME)) {
161                             increaseStat("RXPacketSerialExit");
162                             break;
163                         }
164                     } catch (Exception e) {
165                         increaseStat("RXPacketFailedForException");
166                     }
167                 }
168             }
169         }
170     }
171
172     /**
173      * Loop for processing packets to be transmitted
174      *
175      */
176     private class TxLoop implements Runnable {
177         public void run() {
178             RawPacket pkt;
179             try {
180                 for (pkt = txQueue.take(); pkt != null; pkt = txQueue.take()) {
181                     // Retrieve outgoing node connector so to send out
182                     // the packet to corresponding node
183                     NodeConnector p = pkt.getOutgoingNodeConnector();
184                     if (p != null) {
185                         String t = p.getNode()
186                                 .getType();
187                         // Now locate the TX dispatcher
188                         IPluginInDataPacketService s = pluginInDataService
189                                 .get(t);
190                         if (s != null) {
191                             try {
192                                 s.transmitDataPacket(pkt);
193                                 increaseStat("TXPacketSuccess");
194                             } catch (Exception e) {
195                                 increaseStat("TXPacketFailedForException");
196                             }
197                         } else {
198                             increaseStat("TXpluginNotFound");
199                         }
200                     }
201                 }
202             } catch (InterruptedException e) {
203                 // Not a big deal
204             }
205         }
206     }
207
208     void setPluginInDataService(Map props, IPluginInDataPacketService s) {
209         if (this.pluginInDataService == null) {
210             logger.error("pluginInDataService store null");
211             return;
212         }
213         String type = null;
214         logger.trace("Received setPluginInDataService request");
215         for (Object e : props.entrySet()) {
216             Map.Entry entry = (Map.Entry) e;
217             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
218         }
219
220         Object value = props.get("protocolPluginType");
221         if (value instanceof String) {
222             type = (String) value;
223         }
224         if (type == null) {
225             logger.error("Received a PluginInDataService without any "
226                     + "protocolPluginType provided");
227         } else {
228             this.pluginInDataService.put(type, s);
229             logger.debug("Stored the PluginInDataService for type: {}", type);
230         }
231     }
232
233     void unsetPluginInDataService(Map props, IPluginInDataPacketService s) {
234         if (this.pluginInDataService == null) {
235             logger.error("pluginInDataService store null");
236             return;
237         }
238
239         String type = null;
240         logger.trace("Received unsetPluginInDataService request");
241         for (Object e : props.entrySet()) {
242             Map.Entry entry = (Map.Entry) e;
243             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
244         }
245
246         Object value = props.get("protocoloPluginType");
247         if (value instanceof String) {
248             type = (String) value;
249         }
250         if (type == null) {
251             logger.error("Received a PluginInDataService without any "
252                     + "protocolPluginType provided");
253         } else if (this.pluginInDataService.get(type).equals(s)) {
254             this.pluginInDataService.remove(type);
255             logger.debug("Removed the PluginInDataService for type: {}", type);
256         }
257     }
258
259     void setListenDataPacket(Map props, IListenDataPacket s) {
260         if (this.listenDataPacket == null || this.indexDataPacket == null) {
261             logger.error("data structure to store data is NULL");
262             return;
263         }
264         logger.trace("Received setListenDataPacket request");
265         for (Object e : props.entrySet()) {
266             Map.Entry entry = (Map.Entry) e;
267             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
268         }
269
270         String listenerName = null;
271         String listenerDependency = null;
272         Match filter = null;
273         Object value;
274         // Read the listenerName
275         value = props.get("salListenerName");
276         if (value instanceof String) {
277             listenerName = (String) value;
278         }
279
280         if (listenerName == null) {
281             logger.error("Trying to set a listener without a Name");
282             return;
283         }
284
285         //Read the dependency
286         value = props.get("salListenerDependency");
287         if (value instanceof String) {
288             listenerDependency = (String) value;
289         }
290
291         //Read match filter if any
292         value = props.get("salListenerFilter");
293         if (value instanceof Match) {
294             filter = (Match) value;
295         }
296
297         DataPacketListener l = new DataPacketListener(listenerName, s,
298                 listenerDependency, filter);
299
300         DataPacketListener lDependency = new DataPacketListener(
301                 listenerDependency, null, null, null);
302
303         // Now let see if there is any dependency
304         if (listenerDependency == null) {
305             logger.debug("listener without any dependency");
306             if (this.indexDataPacket.contains(l)) {
307                 logger.error("trying to add an existing element");
308             } else {
309                 logger.debug("adding listener: {}", listenerName);
310                 CopyOnWriteArrayList<DataPacketListener> serialListeners = new CopyOnWriteArrayList<DataPacketListener>();
311                 serialListeners.add(l);
312                 this.listenDataPacket.add(serialListeners);
313                 this.indexDataPacket.add(l);
314             }
315         } else {
316             logger.debug("listener with dependency");
317             // Now search for the dependency and put things in order
318             if (this.indexDataPacket.contains(l)) {
319                 logger.error("trying to add an existing element");
320             } else {
321                 logger.debug("adding listener: {}", listenerName);
322                 // Lets find the set with the dependency in it, if we
323                 // find it lets just add our dependency at the end of
324                 // the list.
325                 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
326                     int i = 0;
327                     boolean done = false;
328                     if (serialListeners.contains(lDependency)) {
329                         serialListeners.add(l);
330                         done = true;
331                     }
332                     // If we did fine the element, lets break early
333                     if (done) {
334                         break;
335                     }
336                 }
337
338                 this.indexDataPacket.add(l);
339             }
340         }
341     }
342
343     void unsetListenDataPacket(Map props, IListenDataPacket s) {
344         if (this.listenDataPacket == null || this.indexDataPacket == null) {
345             logger.error("data structure to store data is NULL");
346             return;
347         }
348         logger.trace("Received UNsetListenDataPacket request");
349         for (Object e : props.entrySet()) {
350             Map.Entry entry = (Map.Entry) e;
351             logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
352         }
353
354         String listenerName = null;
355         Object value;
356         // Read the listenerName
357         value = props.get("salListenerName");
358         if (value instanceof String) {
359             listenerName = (String) value;
360         }
361
362         if (listenerName == null) {
363             logger.error("Trying to set a listener without a Name");
364             return;
365         }
366
367         DataPacketListener l = new DataPacketListener(listenerName, s, null,
368                 null);
369         if (!this.indexDataPacket.contains(l)) {
370             logger.error("trying to remove a non-existing element");
371         } else {
372             logger.debug("removing listener: {}", listenerName);
373             for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
374                 int i = 0;
375                 boolean done = false;
376                 for (i = 0; i < serialListeners.size(); i++) {
377                     if (serialListeners.get(i).equals(l)) {
378                         serialListeners.remove(i);
379                         done = true;
380                         break;
381                     }
382                 }
383                 // Now remove a serialListener that maybe empty
384                 if (serialListeners.isEmpty()) {
385                     this.listenDataPacket.remove(serialListeners);
386                 }
387                 // If we did fine the element, lets break early
388                 if (done) {
389                     break;
390                 }
391             }
392
393             this.indexDataPacket.remove(l);
394         }
395     }
396
397     /**
398      * Function called by the dependency manager when all the required
399      * dependencies are satisfied
400      *
401      */
402     void init() {
403         this.txThread.start();
404     }
405
406     /**
407      * Function called by the dependency manager when at least one
408      * dependency become unsatisfied or when the component is shutting
409      * down because for example bundle is being stopped.
410      *
411      */
412     void destroy() {
413         // Make sure to cleanup the data structure we use to track
414         // services
415         this.listenDataPacket.clear();
416         this.indexDataPacket.clear();
417         this.pluginInDataService.clear();
418         this.statistics.clear();
419         this.txQueue.clear();
420         this.txThread.interrupt();
421         // Wait for them to be done
422         try {
423             this.txThread.join();
424         } catch (InterruptedException ex) {
425             // Not a big deal
426         }
427     }
428
429     private void increaseStat(String name) {
430         if (this.statistics == null) {
431             return;
432         }
433
434         AtomicInteger currValue = null;
435         synchronized (this.statistics) {
436             currValue = this.statistics.get(name);
437
438             if (currValue == null) {
439                 this.statistics.put(name, new AtomicInteger(0));
440                 return;
441             }
442         }
443         currValue.incrementAndGet();
444     }
445
446     @Override
447     public PacketResult receiveDataPacket(RawPacket inPkt) {
448         if (inPkt.getIncomingNodeConnector() == null) {
449             increaseStat("nullIncomingNodeConnector");
450             return PacketResult.IGNORED;
451         }
452
453         // send the packet off to be processed by listeners
454         this.dispatchPacket(inPkt);
455
456         // Walk the chain of listener going first throw all the
457         // parallel ones and for each parallel in serial
458         return PacketResult.IGNORED;
459     }
460
461     @Override
462     public void transmitDataPacket(RawPacket outPkt) {
463         if (outPkt.getOutgoingNodeConnector() == null) {
464             increaseStat("nullOutgoingNodeConnector");
465             return;
466         }
467
468         if (!this.txQueue.offer(outPkt)) {
469             increaseStat("fullTXQueue");
470             return;
471         }
472     }
473
474     @Override
475     public Packet decodeDataPacket(RawPacket pkt) {
476         // Sanity checks
477         if (pkt == null) {
478             return null;
479         }
480         byte[] data = pkt.getPacketData();
481         if (data.length <= 0) {
482             return null;
483         }
484         if (pkt.getEncap().equals(LinkEncap.ETHERNET)) {
485             Ethernet res = new Ethernet();
486             try {
487                 res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
488             } catch (Exception e) {
489                 logger.warn("Failed to decode packet: {}", e.getMessage());
490             }
491             return res;
492         }
493         return null;
494     }
495
496     @Override
497     public RawPacket encodeDataPacket(Packet pkt) {
498         // Sanity checks
499         if (pkt == null) {
500             return null;
501         }
502         byte[] data;
503         try {
504             data = pkt.serialize();
505         } catch (Exception e) {
506             logger.error("",e);
507             return null;
508         }
509         if (data.length <= 0) {
510             return null;
511         }
512         try {
513             RawPacket res = new RawPacket(data);
514             return res;
515         } catch (ConstructionException cex) {
516         }
517         // If something goes wrong then we have to return null
518         return null;
519     }
520 }