3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 * @file DataPacketService.java
13 * @brief Implementation of Data Packet services in SAL
15 * Implementation of Data Packet services in SAL
18 package org.opendaylight.controller.sal.implementation.internal;
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.atomic.AtomicInteger;
31 import org.apache.commons.lang3.builder.EqualsBuilder;
32 import org.apache.commons.lang3.builder.HashCodeBuilder;
33 import org.opendaylight.controller.sal.core.ConstructionException;
34 import org.opendaylight.controller.sal.core.NodeConnector;
35 import org.opendaylight.controller.sal.match.Match;
36 import org.opendaylight.controller.sal.packet.Ethernet;
37 import org.opendaylight.controller.sal.packet.IDataPacketService;
38 import org.opendaylight.controller.sal.packet.IListenDataPacket;
39 import org.opendaylight.controller.sal.packet.IPluginInDataPacketService;
40 import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService;
41 import org.opendaylight.controller.sal.packet.LinkEncap;
42 import org.opendaylight.controller.sal.packet.Packet;
43 import org.opendaylight.controller.sal.packet.PacketResult;
44 import org.opendaylight.controller.sal.packet.RawPacket;
45 import org.opendaylight.controller.sal.utils.NetUtils;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
49 public class DataPacketService implements IPluginOutDataPacketService,
51 private int RXMAXQUEUESIZE = 1000;
52 private int TXMAXQUEUESIZE = 1000;
53 protected static final Logger logger = LoggerFactory
54 .getLogger(DataPacketService.class);
56 * Database that associates one NodeIDType to the
57 * IPluginDataPacketService, in fact we expect that there will be
58 * one instance of IPluginDataPacketService for each southbound
60 * Using the ConcurrentHashMap because the threads that will be
61 * adding a new service, removing a service, going through all of
62 * them maybe different.
64 private ConcurrentHashMap<String, IPluginInDataPacketService>
66 new ConcurrentHashMap<String, IPluginInDataPacketService>();
67 private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
69 * Queue for packets received from Data Path
71 private LinkedBlockingQueue<RawPacket> rxQueue = new LinkedBlockingQueue<RawPacket>(
74 * Queue for packets that need to be transmitted to Data Path
76 private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
81 private Thread txThread = new Thread(new TxLoop(),
82 "DataPacketService TX thread");
86 private Thread rxThread = new Thread(new RxLoop(),
87 "DataPacketService RX thread");
90 * Representation of a Data Packet Listener including of its
94 private class DataPacketListener {
96 private String listenerName;
98 private IListenDataPacket listener;
99 private String dependency;
102 DataPacketListener(String name, IListenDataPacket s, String dependency,
104 this.listenerName = name;
106 this.dependency = dependency;
111 public boolean equals(Object obj) {
118 if (obj.getClass() != getClass()) {
121 DataPacketListener rhs = (DataPacketListener) obj;
122 return new EqualsBuilder().append(this.listenerName,
123 rhs.listenerName).isEquals();
127 public int hashCode() {
128 return new HashCodeBuilder(13, 31).append(listenerName)
134 * This very expensive version of List is being used because it
135 * work well in concurrent situation, as we expect new service
136 * addition, service removal and walk of the service will happen
137 * from different places
139 private List<List<DataPacketListener>> listenDataPacket = new CopyOnWriteArrayList<List<DataPacketListener>>();
140 // Quick index to make sure there are no duplicate elements
141 private Set<DataPacketListener> indexDataPacket = Collections
142 .synchronizedSet(new HashSet<DataPacketListener>());
145 * Loop for processing Received packets
148 private class RxLoop implements Runnable {
152 for (pkt = rxQueue.take(); pkt != null; pkt = rxQueue.take()) {
153 for (List<DataPacketListener> serialListeners : listenDataPacket) {
155 for (i = 0; i < serialListeners.size(); i++) {
156 RawPacket copyPkt = null;
158 copyPkt = new RawPacket(pkt);
159 } catch (ConstructionException cex) {
160 logger.debug("Error while cloning the packet");
162 if (copyPkt == null) {
163 increaseStat("RXPacketCopyFailed");
166 DataPacketListener l = serialListeners.get(i);
167 IListenDataPacket s = (l == null ? null
171 // TODO Make sure to filter based
172 // on the match too, later on
174 .receiveDataPacket(copyPkt);
175 increaseStat("RXPacketSuccess");
176 if (res.equals(PacketResult.CONSUME)) {
177 increaseStat("RXPacketSerialExit");
180 } catch (Exception e) {
181 increaseStat("RXPacketFailedForException");
187 } catch (InterruptedException e) {
194 * Loop for processing packets to be transmitted
197 private class TxLoop implements Runnable {
201 for (pkt = txQueue.take(); pkt != null; pkt = txQueue.take()) {
202 // Retrieve outgoing node connector so to send out
203 // the packet to corresponding node
204 NodeConnector p = pkt.getOutgoingNodeConnector();
206 String t = p.getNode()
208 // Now locate the TX dispatcher
209 IPluginInDataPacketService s = pluginInDataService
213 s.transmitDataPacket(pkt);
214 increaseStat("TXPacketSuccess");
215 } catch (Exception e) {
216 increaseStat("TXPacketFailedForException");
219 increaseStat("TXpluginNotFound");
223 } catch (InterruptedException e) {
229 void setPluginInDataService(Map props, IPluginInDataPacketService s) {
230 if (this.pluginInDataService == null) {
231 logger.error("pluginInDataService store null");
235 logger.trace("Received setPluginInDataService request");
236 for (Object e : props.entrySet()) {
237 Map.Entry entry = (Map.Entry) e;
238 logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
241 Object value = props.get("protocolPluginType");
242 if (value instanceof String) {
243 type = (String) value;
246 logger.error("Received a PluginInDataService without any "
247 + "protocolPluginType provided");
249 this.pluginInDataService.put(type, s);
250 logger.debug("Stored the PluginInDataService for type: {}", type);
254 void unsetPluginInDataService(Map props, IPluginInDataPacketService s) {
255 if (this.pluginInDataService == null) {
256 logger.error("pluginInDataService store null");
261 logger.trace("Received unsetPluginInDataService request");
262 for (Object e : props.entrySet()) {
263 Map.Entry entry = (Map.Entry) e;
264 logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
267 Object value = props.get("protocoloPluginType");
268 if (value instanceof String) {
269 type = (String) value;
272 logger.error("Received a PluginInDataService without any "
273 + "protocolPluginType provided");
274 } else if (this.pluginInDataService.get(type).equals(s)) {
275 this.pluginInDataService.remove(type);
276 logger.debug("Removed the PluginInDataService for type: {}", type);
280 void setListenDataPacket(Map props, IListenDataPacket s) {
281 if (this.listenDataPacket == null || this.indexDataPacket == null) {
282 logger.error("data structure to store data is NULL");
285 logger.trace("Received setListenDataPacket request");
286 for (Object e : props.entrySet()) {
287 Map.Entry entry = (Map.Entry) e;
288 logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
291 String listenerName = null;
292 String listenerDependency = null;
295 // Read the listenerName
296 value = props.get("salListenerName");
297 if (value instanceof String) {
298 listenerName = (String) value;
301 if (listenerName == null) {
302 logger.error("Trying to set a listener without a Name");
306 //Read the dependency
307 value = props.get("salListenerDependency");
308 if (value instanceof String) {
309 listenerDependency = (String) value;
312 //Read match filter if any
313 value = props.get("salListenerFilter");
314 if (value instanceof Match) {
315 filter = (Match) value;
318 DataPacketListener l = new DataPacketListener(listenerName, s,
319 listenerDependency, filter);
321 DataPacketListener lDependency = new DataPacketListener(
322 listenerDependency, null, null, null);
324 // Now let see if there is any dependency
325 if (listenerDependency == null) {
326 logger.debug("listener without any dependency");
327 if (this.indexDataPacket.contains(l)) {
328 logger.error("trying to add an existing element");
330 logger.debug("adding listener: {}", listenerName);
331 CopyOnWriteArrayList<DataPacketListener> serialListeners = new CopyOnWriteArrayList<DataPacketListener>();
332 serialListeners.add(l);
333 this.listenDataPacket.add(serialListeners);
334 this.indexDataPacket.add(l);
337 logger.debug("listener with dependency");
338 // Now search for the dependency and put things in order
339 if (this.indexDataPacket.contains(l)) {
340 logger.error("trying to add an existing element");
342 logger.debug("adding listener: {}", listenerName);
343 // Lets find the set with the dependency in it, if we
344 // find it lets just add our dependency at the end of
346 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
348 boolean done = false;
349 if (serialListeners.contains(lDependency)) {
350 serialListeners.add(l);
353 // If we did fine the element, lets break early
359 this.indexDataPacket.add(l);
364 void unsetListenDataPacket(Map props, IListenDataPacket s) {
365 if (this.listenDataPacket == null || this.indexDataPacket == null) {
366 logger.error("data structure to store data is NULL");
369 logger.trace("Received UNsetListenDataPacket request");
370 for (Object e : props.entrySet()) {
371 Map.Entry entry = (Map.Entry) e;
372 logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
375 String listenerName = null;
377 // Read the listenerName
378 value = props.get("salListenerName");
379 if (value instanceof String) {
380 listenerName = (String) value;
383 if (listenerName == null) {
384 logger.error("Trying to set a listener without a Name");
388 DataPacketListener l = new DataPacketListener(listenerName, s, null,
390 if (!this.indexDataPacket.contains(l)) {
391 logger.error("trying to remove a non-existing element");
393 logger.debug("removing listener: {}", listenerName);
394 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
396 boolean done = false;
397 for (i = 0; i < serialListeners.size(); i++) {
398 if (serialListeners.get(i).equals(l)) {
399 serialListeners.remove(i);
404 // Now remove a serialListener that maybe empty
405 if (serialListeners.isEmpty()) {
406 this.listenDataPacket.remove(serialListeners);
408 // If we did fine the element, lets break early
414 this.indexDataPacket.remove(l);
419 * Function called by the dependency manager when all the required
420 * dependencies are satisfied
424 this.txThread.start();
425 this.rxThread.start();
429 * Function called by the dependency manager when at least one
430 * dependency become unsatisfied or when the component is shutting
431 * down because for example bundle is being stopped.
435 // Make sure to cleanup the data structure we use to track
437 this.listenDataPacket.clear();
438 this.indexDataPacket.clear();
439 this.pluginInDataService.clear();
440 this.statistics.clear();
441 this.rxQueue.clear();
442 this.txQueue.clear();
443 this.txThread.interrupt();
444 this.rxThread.interrupt();
445 // Wait for them to be done
447 this.txThread.join();
448 this.rxThread.join();
449 } catch (InterruptedException ex) {
454 private void increaseStat(String name) {
455 if (this.statistics == null) {
459 AtomicInteger currValue = null;
460 synchronized (this.statistics) {
461 currValue = this.statistics.get(name);
463 if (currValue == null) {
464 this.statistics.put(name, new AtomicInteger(0));
468 currValue.incrementAndGet();
472 public PacketResult receiveDataPacket(RawPacket inPkt) {
473 if (inPkt.getIncomingNodeConnector() == null) {
474 increaseStat("nullIncomingNodeConnector");
475 return PacketResult.IGNORED;
478 // If the queue was full don't wait, rather increase a counter
480 if (!this.rxQueue.offer(inPkt)) {
481 increaseStat("fullRXQueue");
482 return PacketResult.IGNORED;
485 // Walk the chain of listener going first throw all the
486 // parallel ones and for each parallel in serial
487 return PacketResult.IGNORED;
491 public void transmitDataPacket(RawPacket outPkt) {
492 if (outPkt.getOutgoingNodeConnector() == null) {
493 increaseStat("nullOutgoingNodeConnector");
497 if (!this.txQueue.offer(outPkt)) {
498 increaseStat("fullTXQueue");
504 public Packet decodeDataPacket(RawPacket pkt) {
509 byte[] data = pkt.getPacketData();
510 if (data.length <= 0) {
513 if (pkt.getEncap().equals(LinkEncap.ETHERNET)) {
514 Ethernet res = new Ethernet();
516 res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
517 } catch (Exception e) {
518 logger.warn("Failed to decode packet: {}", e.getMessage());
526 public RawPacket encodeDataPacket(Packet pkt) {
533 data = pkt.serialize();
534 } catch (Exception e) {
538 if (data.length <= 0) {
542 RawPacket res = new RawPacket(data);
544 } catch (ConstructionException cex) {
546 // If something goes wrong then we have to return null