3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 * @file DataPacketService.java
13 * @brief Implementation of Data Packet services in SAL
15 * Implementation of Data Packet services in SAL
18 package org.opendaylight.controller.sal.implementation.internal;
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.atomic.AtomicInteger;
31 import org.apache.commons.lang3.builder.EqualsBuilder;
32 import org.apache.commons.lang3.builder.HashCodeBuilder;
33 import org.opendaylight.controller.sal.core.ConstructionException;
34 import org.opendaylight.controller.sal.core.Node;
35 import org.opendaylight.controller.sal.core.NodeConnector;
36 import org.opendaylight.controller.sal.match.Match;
37 import org.opendaylight.controller.sal.packet.Ethernet;
38 import org.opendaylight.controller.sal.packet.IDataPacketService;
39 import org.opendaylight.controller.sal.packet.IListenDataPacket;
40 import org.opendaylight.controller.sal.packet.IPluginInDataPacketService;
41 import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService;
42 import org.opendaylight.controller.sal.packet.LinkEncap;
43 import org.opendaylight.controller.sal.packet.Packet;
44 import org.opendaylight.controller.sal.packet.PacketResult;
45 import org.opendaylight.controller.sal.packet.RawPacket;
46 import org.opendaylight.controller.sal.utils.NetUtils;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
50 public class DataPacketService implements IPluginOutDataPacketService,
52 private int RXMAXQUEUESIZE = 1000;
53 private int TXMAXQUEUESIZE = 1000;
54 protected static final Logger logger = LoggerFactory
55 .getLogger(DataPacketService.class);
57 * Database that associates one NodeIDType to the
58 * IPluginDataPacketService, in fact we expect that there will be
59 * one instance of IPluginDataPacketService for each southbound
61 * Using the ConcurrentHashMap because the threads that will be
62 * adding a new service, removing a service, going through all of
63 * them maybe different.
65 private ConcurrentHashMap<String, IPluginInDataPacketService>
67 new ConcurrentHashMap<String, IPluginInDataPacketService>();
68 private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
70 * Queue for packets received from Data Path
72 private LinkedBlockingQueue<RawPacket> rxQueue = new LinkedBlockingQueue<RawPacket>(
75 * Queue for packets that need to be transmitted to Data Path
77 private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
82 private Thread txThread = new Thread(new TxLoop(),
83 "DataPacketService TX thread");
87 private Thread rxThread = new Thread(new RxLoop(),
88 "DataPacketService RX thread");
91 * Representation of a Data Packet Listener including of its
95 private class DataPacketListener {
97 private String listenerName;
99 private IListenDataPacket listener;
100 private String dependency;
103 DataPacketListener(String name, IListenDataPacket s, String dependency,
105 this.listenerName = name;
107 this.dependency = dependency;
112 public boolean equals(Object obj) {
119 if (obj.getClass() != getClass()) {
122 DataPacketListener rhs = (DataPacketListener) obj;
123 return new EqualsBuilder().append(this.listenerName,
124 rhs.listenerName).isEquals();
128 public int hashCode() {
129 return new HashCodeBuilder(13, 31).append(listenerName)
135 * This very expensive version of List is being used because it
136 * work well in concurrent situation, as we expect new service
137 * addition, service removal and walk of the service will happen
138 * from different places
140 private List<List<DataPacketListener>> listenDataPacket = new CopyOnWriteArrayList<List<DataPacketListener>>();
141 // Quick index to make sure there are no duplicate elements
142 private Set<DataPacketListener> indexDataPacket = Collections
143 .synchronizedSet(new HashSet<DataPacketListener>());
146 * Loop for processing Received packets
149 private class RxLoop implements Runnable {
153 for (pkt = rxQueue.take(); pkt != null; pkt = rxQueue.take()) {
154 for (List<DataPacketListener> serialListeners : listenDataPacket) {
156 for (i = 0; i < serialListeners.size(); i++) {
157 RawPacket copyPkt = null;
159 copyPkt = new RawPacket(pkt);
160 } catch (ConstructionException cex) {
161 logger.debug("Error while cloning the packet");
163 if (copyPkt == null) {
164 increaseStat("RXPacketCopyFailed");
167 DataPacketListener l = serialListeners.get(i);
168 IListenDataPacket s = (l == null ? null
172 // TODO Make sure to filter based
173 // on the match too, later on
175 .receiveDataPacket(copyPkt);
176 increaseStat("RXPacketSuccess");
177 if (res.equals(PacketResult.CONSUME)) {
178 increaseStat("RXPacketSerialExit");
181 } catch (Exception e) {
182 increaseStat("RXPacketFailedForException");
188 } catch (InterruptedException e) {
195 * Loop for processing packets to be transmitted
198 private class TxLoop implements Runnable {
202 for (pkt = txQueue.take(); pkt != null; pkt = txQueue.take()) {
203 // Retrieve outgoing node connector so to send out
204 // the packet to corresponding node
205 NodeConnector p = pkt.getOutgoingNodeConnector();
207 String t = p.getNode()
209 // Now locate the TX dispatcher
210 IPluginInDataPacketService s = pluginInDataService
214 s.transmitDataPacket(pkt);
215 increaseStat("TXPacketSuccess");
216 } catch (Exception e) {
217 increaseStat("TXPacketFailedForException");
220 increaseStat("TXpluginNotFound");
224 } catch (InterruptedException e) {
230 void setPluginInDataService(Map props, IPluginInDataPacketService s) {
231 if (this.pluginInDataService == null) {
232 logger.error("pluginInDataService store null");
236 logger.trace("Received setPluginInDataService request");
237 for (Object e : props.entrySet()) {
238 Map.Entry entry = (Map.Entry) e;
239 logger.trace("Prop key:(" + entry.getKey() + ") value:("
240 + entry.getValue() + ")");
243 Object value = props.get("protocolPluginType");
244 if (value instanceof String) {
245 type = (String) value;
248 logger.error("Received a PluginInDataService without any "
249 + "protocolPluginType provided");
251 this.pluginInDataService.put(type, s);
252 logger.debug("Stored the PluginInDataService for type:" + type);
256 void unsetPluginInDataService(Map props, IPluginInDataPacketService s) {
257 if (this.pluginInDataService == null) {
258 logger.error("pluginInDataService store null");
263 logger.trace("Received unsetPluginInDataService request");
264 for (Object e : props.entrySet()) {
265 Map.Entry entry = (Map.Entry) e;
266 logger.trace("Prop key:(" + entry.getKey() + ") value:("
267 + entry.getValue() + ")");
270 Object value = props.get("protocoloPluginType");
271 if (value instanceof String) {
272 type = (String) value;
275 logger.error("Received a PluginInDataService without any "
276 + "protocolPluginType provided");
277 } else if (this.pluginInDataService.get(type).equals(s)) {
278 this.pluginInDataService.remove(type);
279 logger.debug("Removed the PluginInDataService for type:" + type);
283 void setListenDataPacket(Map props, IListenDataPacket s) {
284 if (this.listenDataPacket == null || this.indexDataPacket == null) {
285 logger.error("data structure to store data is NULL");
288 logger.trace("Received setListenDataPacket request");
289 for (Object e : props.entrySet()) {
290 Map.Entry entry = (Map.Entry) e;
291 logger.trace("Prop key:(" + entry.getKey() + ") value:("
292 + entry.getValue() + ")");
295 String listenerName = null;
296 String listenerDependency = null;
299 // Read the listenerName
300 value = props.get("salListenerName");
301 if (value instanceof String) {
302 listenerName = (String) value;
305 if (listenerName == null) {
306 logger.error("Trying to set a listener without a Name");
310 //Read the dependency
311 value = props.get("salListenerDependency");
312 if (value instanceof String) {
313 listenerDependency = (String) value;
316 //Read match filter if any
317 value = props.get("salListenerFilter");
318 if (value instanceof Match) {
319 filter = (Match) value;
322 DataPacketListener l = new DataPacketListener(listenerName, s,
323 listenerDependency, filter);
325 DataPacketListener lDependency = new DataPacketListener(
326 listenerDependency, null, null, null);
328 // Now let see if there is any dependency
329 if (listenerDependency == null) {
330 logger.debug("listener without any dependency");
331 if (this.indexDataPacket.contains(l)) {
332 logger.error("trying to add an existing element");
334 logger.debug("adding listener: " + listenerName);
335 CopyOnWriteArrayList<DataPacketListener> serialListeners = new CopyOnWriteArrayList<DataPacketListener>();
336 serialListeners.add(l);
337 this.listenDataPacket.add(serialListeners);
338 this.indexDataPacket.add(l);
341 logger.debug("listener with dependency");
342 // Now search for the dependency and put things in order
343 if (this.indexDataPacket.contains(l)) {
344 logger.error("trying to add an existing element");
346 logger.debug("adding listener: " + listenerName);
347 // Lets find the set with the dependency in it, if we
348 // find it lets just add our dependency at the end of
350 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
352 boolean done = false;
353 if (serialListeners.contains(lDependency)) {
354 serialListeners.add(l);
357 // If we did fine the element, lets break early
363 this.indexDataPacket.add(l);
368 void unsetListenDataPacket(Map props, IListenDataPacket s) {
369 if (this.listenDataPacket == null || this.indexDataPacket == null) {
370 logger.error("data structure to store data is NULL");
373 logger.trace("Received UNsetListenDataPacket request");
374 for (Object e : props.entrySet()) {
375 Map.Entry entry = (Map.Entry) e;
376 logger.trace("Prop key:(" + entry.getKey() + ") value:("
377 + entry.getValue() + ")");
380 String listenerName = null;
382 // Read the listenerName
383 value = props.get("salListenerName");
384 if (value instanceof String) {
385 listenerName = (String) value;
388 if (listenerName == null) {
389 logger.error("Trying to set a listener without a Name");
393 DataPacketListener l = new DataPacketListener(listenerName, s, null,
395 if (!this.indexDataPacket.contains(l)) {
396 logger.error("trying to remove a non-existing element");
398 logger.debug("removing listener: " + listenerName);
399 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
401 boolean done = false;
402 for (i = 0; i < serialListeners.size(); i++) {
403 if (serialListeners.get(i).equals(l)) {
404 serialListeners.remove(i);
409 // Now remove a serialListener that maybe empty
410 if (serialListeners.isEmpty()) {
411 this.listenDataPacket.remove(serialListeners);
413 // If we did fine the element, lets break early
419 this.indexDataPacket.remove(l);
424 * Function called by the dependency manager when all the required
425 * dependencies are satisfied
429 this.txThread.start();
430 this.rxThread.start();
434 * Function called by the dependency manager when at least one
435 * dependency become unsatisfied or when the component is shutting
436 * down because for example bundle is being stopped.
440 // Make sure to cleanup the data structure we use to track
442 this.listenDataPacket.clear();
443 this.indexDataPacket.clear();
444 this.pluginInDataService.clear();
445 this.statistics.clear();
446 this.rxQueue.clear();
447 this.txQueue.clear();
448 this.txThread.interrupt();
449 this.rxThread.interrupt();
450 // Wait for them to be done
452 this.txThread.join();
453 this.rxThread.join();
454 } catch (InterruptedException ex) {
459 private void increaseStat(String name) {
460 if (this.statistics == null) {
464 AtomicInteger currValue = null;
465 synchronized (this.statistics) {
466 currValue = this.statistics.get(name);
468 if (currValue == null) {
469 this.statistics.put(name, new AtomicInteger(0));
473 currValue.incrementAndGet();
477 public PacketResult receiveDataPacket(RawPacket inPkt) {
478 if (inPkt.getIncomingNodeConnector() == null) {
479 increaseStat("nullIncomingNodeConnector");
480 return PacketResult.IGNORED;
483 // If the queue was full don't wait, rather increase a counter
485 if (!this.rxQueue.offer(inPkt)) {
486 increaseStat("fullRXQueue");
487 return PacketResult.IGNORED;
490 // Walk the chain of listener going first throw all the
491 // parallel ones and for each parallel in serial
492 return PacketResult.IGNORED;
496 public void transmitDataPacket(RawPacket outPkt) {
497 if (outPkt.getOutgoingNodeConnector() == null) {
498 increaseStat("nullOutgoingNodeConnector");
502 if (!this.txQueue.offer(outPkt)) {
503 increaseStat("fullTXQueue");
509 public Packet decodeDataPacket(RawPacket pkt) {
514 byte[] data = pkt.getPacketData();
515 if (data.length <= 0) {
518 if (pkt.getEncap().equals(LinkEncap.ETHERNET)) {
519 Ethernet res = new Ethernet();
521 res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
522 } catch (Exception e) {
531 public RawPacket encodeDataPacket(Packet pkt) {
538 data = pkt.serialize();
539 } catch (Exception e) {
543 if (data.length <= 0) {
547 RawPacket res = new RawPacket(data);
549 } catch (ConstructionException cex) {
551 // If something goes wrong then we have to return null