Move adsal into its own subdirectory.
[controller.git] / opendaylight / adsal / sal / implementation / src / main / java / org / opendaylight / controller / sal / implementation / internal / DataPacketService.java
1 /*
2  * Copyright (c) 2013-2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 /**
10  * @file   DataPacketService.java
11  *
12  * @brief  Implementation of Data Packet services in SAL
13  *
14  * Implementation of Data Packet services in SAL
15  */
16
17 package org.opendaylight.controller.sal.implementation.internal;
18
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.HashSet;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.CopyOnWriteArrayList;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import org.opendaylight.controller.sal.core.ConstructionException;
31 import org.opendaylight.controller.sal.core.NodeConnector;
32 import org.opendaylight.controller.sal.match.Match;
33 import org.opendaylight.controller.sal.packet.Ethernet;
34 import org.opendaylight.controller.sal.packet.IDataPacketService;
35 import org.opendaylight.controller.sal.packet.IListenDataPacket;
36 import org.opendaylight.controller.sal.packet.IPluginInDataPacketService;
37 import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService;
38 import org.opendaylight.controller.sal.packet.LinkEncap;
39 import org.opendaylight.controller.sal.packet.Packet;
40 import org.opendaylight.controller.sal.packet.PacketResult;
41 import org.opendaylight.controller.sal.packet.RawPacket;
42 import org.opendaylight.controller.sal.utils.NetUtils;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 public class DataPacketService implements IPluginOutDataPacketService,
47         IDataPacketService {
48     private int TXMAXQUEUESIZE = 1000;
49     protected static final Logger logger = LoggerFactory
50             .getLogger(DataPacketService.class);
51     /**
52      * Database that associates one NodeIDType to the
53      * IPluginDataPacketService, in fact we expect that there will be
54      * one instance of IPluginDataPacketService for each southbound
55      * plugin.
56      * Using the ConcurrentHashMap because the threads that will be
57      * adding a new service, removing a service, going through all of
58      * them maybe different.
59      */
60     private ConcurrentHashMap<String, ProtocolService<IPluginInDataPacketService>>
61         pluginInDataService =
62         new ConcurrentHashMap<String, ProtocolService<IPluginInDataPacketService>>();
63     private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
64
65     /**
66      * Queue for packets that need to be transmitted to Data Path
67      */
68     private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
69             TXMAXQUEUESIZE);
70     /**
71      * Transmission thread
72      */
73     private Thread txThread = new Thread(new TxLoop(),
74             "DataPacketService TX thread");
75
76     /**
77      * Representation of a Data Packet Listener including of its
78      * properties
79      */
80     private class DataPacketListener {
81         // Key fields
82         private String listenerName;
83         // Attribute fields
84         private IListenDataPacket listener;
85         private String dependency;
86         private Match match;
87
88         DataPacketListener(String name, IListenDataPacket s, String dependency,
89                 Match match) {
90             this.listenerName = name;
91             this.listener = s;
92             this.dependency = dependency;
93             this.match = match;
94         }
95
96         @Override
97         public boolean equals(Object obj) {
98             if (this == obj)
99                 return true;
100             if (obj == null)
101                 return false;
102             if (getClass() != obj.getClass())
103                 return false;
104             DataPacketListener other = (DataPacketListener) obj;
105             if (!getOuterType().equals(other.getOuterType()))
106                 return false;
107             if (listenerName == null) {
108                 if (other.listenerName != null)
109                     return false;
110             } else if (!listenerName.equals(other.listenerName))
111                 return false;
112             return true;
113         }
114
115         @Override
116         public int hashCode() {
117             final int prime = 31;
118             int result = 1;
119             result = prime * result + getOuterType().hashCode();
120             result = prime * result
121                     + ((listenerName == null) ? 0 : listenerName.hashCode());
122             return result;
123         }
124
125         private DataPacketService getOuterType() {
126             return DataPacketService.this;
127         }
128     }
129
130     /**
131      * This very expensive version of List is being used because it
132      * work well in concurrent situation, as we expect new service
133      * addition, service removal and walk of the service will happen
134      * from different places
135      */
136     private List<List<DataPacketListener>> listenDataPacket = new CopyOnWriteArrayList<List<DataPacketListener>>();
137     // Quick index to make sure there are no duplicate elements
138     private Set<DataPacketListener> indexDataPacket = Collections
139             .synchronizedSet(new HashSet<DataPacketListener>());
140
141     /**
142      * Loop for processing Received packets
143      */
144     private void dispatchPacket(RawPacket pkt) {
145
146         // for now we treat all listeners as serial listeners
147         for (List<DataPacketListener> serialListeners : listenDataPacket) {
148             for (DataPacketListener l : serialListeners) {
149
150                 // TODO: possibly deal with read-only and read-write packet
151                 // copies
152                 IListenDataPacket s = (l == null ? null : l.listener);
153                 if (s != null) {
154                     try {
155                         // TODO Make sure to filter based on the match too,
156                         // later on
157                         PacketResult res = s.receiveDataPacket(pkt);
158                         increaseStat("RXPacketSuccess");
159                         if (res.equals(PacketResult.CONSUME)) {
160                             increaseStat("RXPacketSerialExit");
161                             break;
162                         }
163                     } catch (Exception e) {
164                         increaseStat("RXPacketFailedForException");
165                     }
166                 }
167             }
168         }
169     }
170
171     /**
172      * Loop for processing packets to be transmitted
173      *
174      */
175     private class TxLoop implements Runnable {
176         public void run() {
177             RawPacket pkt;
178             try {
179                 for (pkt = txQueue.take(); pkt != null; pkt = txQueue.take()) {
180                     // Retrieve outgoing node connector so to send out
181                     // the packet to corresponding node
182                     NodeConnector p = pkt.getOutgoingNodeConnector();
183                     if (p != null) {
184                         String t = p.getNode()
185                                 .getType();
186                         // Now locate the TX dispatcher
187                         ProtocolService<IPluginInDataPacketService> service =
188                             pluginInDataService.get(t);
189                         if (service != null) {
190                             try {
191                                 service.getService().transmitDataPacket(pkt);
192                                 increaseStat("TXPacketSuccess");
193                             } catch (Exception e) {
194                                 increaseStat("TXPacketFailedForException");
195                             }
196                         } else {
197                             increaseStat("TXpluginNotFound");
198                         }
199                     }
200                 }
201             } catch (InterruptedException e) {
202                 // Not a big deal
203             }
204         }
205     }
206
207     void setPluginInDataService(Map<?, ?> props, IPluginInDataPacketService s) {
208         ProtocolService.set(this.pluginInDataService, props, s, logger);
209     }
210
211     void unsetPluginInDataService(Map<?, ?> props, IPluginInDataPacketService s) {
212         ProtocolService.unset(this.pluginInDataService, props, s, logger);
213     }
214
215     void setListenDataPacket(Map<?, ?> props, IListenDataPacket s) {
216         if (this.listenDataPacket == null || this.indexDataPacket == null) {
217             logger.error("data structure to store data is NULL");
218             return;
219         }
220         logger.trace("Received setListenDataPacket request");
221         for (Map.Entry<?, ?> e : props.entrySet()) {
222             logger.trace("Prop key:({}) value:({})",e.getKey(), e.getValue());
223         }
224
225         String listenerName = null;
226         String listenerDependency = null;
227         Match filter = null;
228         Object value;
229         // Read the listenerName
230         value = props.get("salListenerName");
231         if (value instanceof String) {
232             listenerName = (String) value;
233         }
234
235         if (listenerName == null) {
236             logger.error("Trying to set a listener without a Name");
237             return;
238         }
239
240         //Read the dependency
241         value = props.get("salListenerDependency");
242         if (value instanceof String) {
243             listenerDependency = (String) value;
244         }
245
246         //Read match filter if any
247         value = props.get("salListenerFilter");
248         if (value instanceof Match) {
249             filter = (Match) value;
250         }
251
252         DataPacketListener l = new DataPacketListener(listenerName, s,
253                 listenerDependency, filter);
254
255         DataPacketListener lDependency = new DataPacketListener(
256                 listenerDependency, null, null, null);
257
258         // Now let see if there is any dependency
259         if (listenerDependency == null) {
260             logger.debug("listener without any dependency");
261             if (this.indexDataPacket.contains(l)) {
262                 logger.error("trying to add an existing element");
263             } else {
264                 logger.debug("adding listener: {}", listenerName);
265                 CopyOnWriteArrayList<DataPacketListener> serialListeners = new CopyOnWriteArrayList<DataPacketListener>();
266                 serialListeners.add(l);
267                 this.listenDataPacket.add(serialListeners);
268                 this.indexDataPacket.add(l);
269             }
270         } else {
271             logger.debug("listener with dependency");
272             // Now search for the dependency and put things in order
273             if (this.indexDataPacket.contains(l)) {
274                 logger.error("trying to add an existing element");
275             } else {
276                 logger.debug("adding listener: {}", listenerName);
277                 // Lets find the set with the dependency in it, if we
278                 // find it lets just add our dependency at the end of
279                 // the list.
280                 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
281                     boolean done = false;
282                     if (serialListeners.contains(lDependency)) {
283                         serialListeners.add(l);
284                         done = true;
285                     }
286                     // If we did fine the element, lets break early
287                     if (done) {
288                         break;
289                     }
290                 }
291
292                 this.indexDataPacket.add(l);
293             }
294         }
295     }
296
297     void unsetListenDataPacket(Map<?, ?> props, IListenDataPacket s) {
298         if (this.listenDataPacket == null || this.indexDataPacket == null) {
299             logger.error("data structure to store data is NULL");
300             return;
301         }
302         logger.trace("Received UNsetListenDataPacket request");
303         for (Map.Entry<?, ?> e : props.entrySet()) {
304             logger.trace("Prop key:({}) value:({})",e.getKey(), e.getValue());
305         }
306
307         String listenerName = null;
308         Object value;
309         // Read the listenerName
310         value = props.get("salListenerName");
311         if (value instanceof String) {
312             listenerName = (String) value;
313         }
314
315         if (listenerName == null) {
316             logger.error("Trying to set a listener without a Name");
317             return;
318         }
319
320         DataPacketListener l = new DataPacketListener(listenerName, s, null,
321                 null);
322         if (!this.indexDataPacket.contains(l)) {
323             logger.error("trying to remove a non-existing element");
324         } else {
325             logger.debug("removing listener: {}", listenerName);
326             for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
327                 int i = 0;
328                 boolean done = false;
329                 for (i = 0; i < serialListeners.size(); i++) {
330                     if (serialListeners.get(i).equals(l)) {
331                         serialListeners.remove(i);
332                         done = true;
333                         break;
334                     }
335                 }
336                 // Now remove a serialListener that maybe empty
337                 if (serialListeners.isEmpty()) {
338                     this.listenDataPacket.remove(serialListeners);
339                 }
340                 // If we did fine the element, lets break early
341                 if (done) {
342                     break;
343                 }
344             }
345
346             this.indexDataPacket.remove(l);
347         }
348     }
349
350     /**
351      * Function called by the dependency manager when all the required
352      * dependencies are satisfied
353      *
354      */
355     void init() {
356         this.txThread.start();
357     }
358
359     /**
360      * Function called by the dependency manager when at least one
361      * dependency become unsatisfied or when the component is shutting
362      * down because for example bundle is being stopped.
363      *
364      */
365     void destroy() {
366         // Make sure to cleanup the data structure we use to track
367         // services
368         this.listenDataPacket.clear();
369         this.indexDataPacket.clear();
370         this.pluginInDataService.clear();
371         this.statistics.clear();
372         this.txQueue.clear();
373         this.txThread.interrupt();
374         // Wait for them to be done
375         try {
376             this.txThread.join();
377         } catch (InterruptedException ex) {
378             // Not a big deal
379         }
380     }
381
382     private void increaseStat(String name) {
383         if (this.statistics == null) {
384             return;
385         }
386
387         AtomicInteger currValue = null;
388         synchronized (this.statistics) {
389             currValue = this.statistics.get(name);
390
391             if (currValue == null) {
392                 this.statistics.put(name, new AtomicInteger(0));
393                 return;
394             }
395         }
396         currValue.incrementAndGet();
397     }
398
399     @Override
400     public PacketResult receiveDataPacket(RawPacket inPkt) {
401         if (inPkt.getIncomingNodeConnector() == null) {
402             increaseStat("nullIncomingNodeConnector");
403             return PacketResult.IGNORED;
404         }
405
406         // send the packet off to be processed by listeners
407         this.dispatchPacket(inPkt);
408
409         // Walk the chain of listener going first throw all the
410         // parallel ones and for each parallel in serial
411         return PacketResult.IGNORED;
412     }
413
414     @Override
415     public void transmitDataPacket(RawPacket outPkt) {
416         if (outPkt.getOutgoingNodeConnector() == null) {
417             increaseStat("nullOutgoingNodeConnector");
418             return;
419         }
420
421         if (!this.txQueue.offer(outPkt)) {
422             increaseStat("fullTXQueue");
423             return;
424         }
425     }
426
427     @Override
428     public Packet decodeDataPacket(RawPacket pkt) {
429         // Sanity checks
430         if (pkt == null) {
431             return null;
432         }
433         byte[] data = pkt.getPacketData();
434         if (data.length <= 0) {
435             return null;
436         }
437         if (pkt.getEncap().equals(LinkEncap.ETHERNET)) {
438             Ethernet res = new Ethernet();
439             try {
440                 res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
441             } catch (Exception e) {
442                 logger.warn("Failed to decode packet: {}", e.getMessage());
443             }
444             return res;
445         }
446         return null;
447     }
448
449     @Override
450     public RawPacket encodeDataPacket(Packet pkt) {
451         // Sanity checks
452         if (pkt == null) {
453             return null;
454         }
455         byte[] data;
456         try {
457             data = pkt.serialize();
458         } catch (Exception e) {
459             logger.error("",e);
460             return null;
461         }
462         if (data.length <= 0) {
463             return null;
464         }
465         try {
466             RawPacket res = new RawPacket(data);
467             return res;
468         } catch (ConstructionException cex) {
469         }
470         // If something goes wrong then we have to return null
471         return null;
472     }
473 }