3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 * @file DataPacketService.java
13 * @brief Implementation of Data Packet services in SAL
15 * Implementation of Data Packet services in SAL
18 package org.opendaylight.controller.sal.implementation.internal;
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.atomic.AtomicInteger;
31 import org.apache.commons.lang3.builder.EqualsBuilder;
32 import org.apache.commons.lang3.builder.HashCodeBuilder;
33 import org.opendaylight.controller.sal.core.ConstructionException;
34 import org.opendaylight.controller.sal.core.Node;
35 import org.opendaylight.controller.sal.core.NodeConnector;
36 import org.opendaylight.controller.sal.match.Match;
37 import org.opendaylight.controller.sal.packet.Ethernet;
38 import org.opendaylight.controller.sal.packet.IDataPacketService;
39 import org.opendaylight.controller.sal.packet.IListenDataPacket;
40 import org.opendaylight.controller.sal.packet.IPluginInDataPacketService;
41 import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService;
42 import org.opendaylight.controller.sal.packet.LinkEncap;
43 import org.opendaylight.controller.sal.packet.Packet;
44 import org.opendaylight.controller.sal.packet.PacketResult;
45 import org.opendaylight.controller.sal.packet.RawPacket;
46 import org.opendaylight.controller.sal.utils.NetUtils;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
50 public class DataPacketService implements IPluginOutDataPacketService,
52 private int RXMAXQUEUESIZE = 1000;
53 private int TXMAXQUEUESIZE = 1000;
54 protected static final Logger logger = LoggerFactory
55 .getLogger(DataPacketService.class);
57 * Database that associates one NodeIDType to the
58 * IPluginDataPacketService, in fact we expect that there will be
59 * one instance of IPluginDataPacketService for each southbound
61 * Using the ConcurrentHashMap because the threads that will be
62 * adding a new service, removing a service, going through all of
63 * them maybe different.
65 private ConcurrentHashMap<String, IPluginInDataPacketService>
67 new ConcurrentHashMap<String, IPluginInDataPacketService>();
68 private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
70 * Queue for packets received from Data Path
72 private LinkedBlockingQueue<RawPacket> rxQueue = new LinkedBlockingQueue<RawPacket>(
75 * Queue for packets that need to be transmitted to Data Path
77 private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
82 private Thread txThread = new Thread(new TxLoop(),
83 "DataPacketService TX thread");
87 private Thread rxThread = new Thread(new RxLoop(),
88 "DataPacketService RX thread");
91 * Representation of a Data Packet Listener including of its
95 private class DataPacketListener {
97 private String listenerName;
99 private IListenDataPacket listener;
100 private String dependency;
103 DataPacketListener(String name, IListenDataPacket s, String dependency,
105 this.listenerName = name;
107 this.dependency = dependency;
112 public boolean equals(Object obj) {
119 if (obj.getClass() != getClass()) {
122 DataPacketListener rhs = (DataPacketListener) obj;
123 return new EqualsBuilder().append(this.listenerName,
124 rhs.listenerName).isEquals();
128 public int hashCode() {
129 return new HashCodeBuilder(13, 31).append(listenerName)
135 * This very expensive version of List is being used because it
136 * work well in concurrent situation, as we expect new service
137 * addition, service removal and walk of the service will happen
138 * from different places
140 private List<List<DataPacketListener>> listenDataPacket = new CopyOnWriteArrayList<List<DataPacketListener>>();
141 // Quick index to make sure there are no duplicate elements
142 private Set<DataPacketListener> indexDataPacket = Collections
143 .synchronizedSet(new HashSet<DataPacketListener>());
146 * Loop for processing Received packets
149 private class RxLoop implements Runnable {
153 for (pkt = rxQueue.take(); pkt != null; pkt = rxQueue.take()) {
154 for (List<DataPacketListener> serialListeners : listenDataPacket) {
156 for (i = 0; i < serialListeners.size(); i++) {
157 RawPacket copyPkt = null;
159 copyPkt = new RawPacket(pkt);
160 } catch (ConstructionException cex) {
161 logger.debug("Error while cloning the packet");
163 if (copyPkt == null) {
164 increaseStat("RXPacketCopyFailed");
167 DataPacketListener l = serialListeners.get(i);
168 IListenDataPacket s = (l == null ? null
172 // TODO Make sure to filter based
173 // on the match too, later on
175 .receiveDataPacket(copyPkt);
176 increaseStat("RXPacketSuccess");
177 if (res.equals(PacketResult.CONSUME)) {
178 increaseStat("RXPacketSerialExit");
181 } catch (Exception e) {
182 increaseStat("RXPacketFailedForException");
188 } catch (InterruptedException e) {
195 * Loop for processing packets to be transmitted
198 private class TxLoop implements Runnable {
202 for (pkt = txQueue.take(); pkt != null; pkt = txQueue.take()) {
203 // Retrieve outgoing node connector so to send out
204 // the packet to corresponding node
205 NodeConnector p = pkt.getOutgoingNodeConnector();
207 String t = p.getNode()
209 // Now locate the TX dispatcher
210 IPluginInDataPacketService s = pluginInDataService
214 s.transmitDataPacket(pkt);
215 increaseStat("TXPacketSuccess");
216 } catch (Exception e) {
217 increaseStat("TXPacketFailedForException");
220 increaseStat("TXpluginNotFound");
224 } catch (InterruptedException e) {
230 void setPluginInDataService(Map props, IPluginInDataPacketService s) {
231 if (this.pluginInDataService == null) {
232 logger.error("pluginInDataService store null");
236 logger.trace("Received setPluginInDataService request");
237 for (Object e : props.entrySet()) {
238 Map.Entry entry = (Map.Entry) e;
239 logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
242 Object value = props.get("protocolPluginType");
243 if (value instanceof String) {
244 type = (String) value;
247 logger.error("Received a PluginInDataService without any "
248 + "protocolPluginType provided");
250 this.pluginInDataService.put(type, s);
251 logger.debug("Stored the PluginInDataService for type: {}", type);
255 void unsetPluginInDataService(Map props, IPluginInDataPacketService s) {
256 if (this.pluginInDataService == null) {
257 logger.error("pluginInDataService store null");
262 logger.trace("Received unsetPluginInDataService request");
263 for (Object e : props.entrySet()) {
264 Map.Entry entry = (Map.Entry) e;
265 logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
268 Object value = props.get("protocoloPluginType");
269 if (value instanceof String) {
270 type = (String) value;
273 logger.error("Received a PluginInDataService without any "
274 + "protocolPluginType provided");
275 } else if (this.pluginInDataService.get(type).equals(s)) {
276 this.pluginInDataService.remove(type);
277 logger.debug("Removed the PluginInDataService for type: {}", type);
281 void setListenDataPacket(Map props, IListenDataPacket s) {
282 if (this.listenDataPacket == null || this.indexDataPacket == null) {
283 logger.error("data structure to store data is NULL");
286 logger.trace("Received setListenDataPacket request");
287 for (Object e : props.entrySet()) {
288 Map.Entry entry = (Map.Entry) e;
289 logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
292 String listenerName = null;
293 String listenerDependency = null;
296 // Read the listenerName
297 value = props.get("salListenerName");
298 if (value instanceof String) {
299 listenerName = (String) value;
302 if (listenerName == null) {
303 logger.error("Trying to set a listener without a Name");
307 //Read the dependency
308 value = props.get("salListenerDependency");
309 if (value instanceof String) {
310 listenerDependency = (String) value;
313 //Read match filter if any
314 value = props.get("salListenerFilter");
315 if (value instanceof Match) {
316 filter = (Match) value;
319 DataPacketListener l = new DataPacketListener(listenerName, s,
320 listenerDependency, filter);
322 DataPacketListener lDependency = new DataPacketListener(
323 listenerDependency, null, null, null);
325 // Now let see if there is any dependency
326 if (listenerDependency == null) {
327 logger.debug("listener without any dependency");
328 if (this.indexDataPacket.contains(l)) {
329 logger.error("trying to add an existing element");
331 logger.debug("adding listener: {}", listenerName);
332 CopyOnWriteArrayList<DataPacketListener> serialListeners = new CopyOnWriteArrayList<DataPacketListener>();
333 serialListeners.add(l);
334 this.listenDataPacket.add(serialListeners);
335 this.indexDataPacket.add(l);
338 logger.debug("listener with dependency");
339 // Now search for the dependency and put things in order
340 if (this.indexDataPacket.contains(l)) {
341 logger.error("trying to add an existing element");
343 logger.debug("adding listener: {}", listenerName);
344 // Lets find the set with the dependency in it, if we
345 // find it lets just add our dependency at the end of
347 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
349 boolean done = false;
350 if (serialListeners.contains(lDependency)) {
351 serialListeners.add(l);
354 // If we did fine the element, lets break early
360 this.indexDataPacket.add(l);
365 void unsetListenDataPacket(Map props, IListenDataPacket s) {
366 if (this.listenDataPacket == null || this.indexDataPacket == null) {
367 logger.error("data structure to store data is NULL");
370 logger.trace("Received UNsetListenDataPacket request");
371 for (Object e : props.entrySet()) {
372 Map.Entry entry = (Map.Entry) e;
373 logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
376 String listenerName = null;
378 // Read the listenerName
379 value = props.get("salListenerName");
380 if (value instanceof String) {
381 listenerName = (String) value;
384 if (listenerName == null) {
385 logger.error("Trying to set a listener without a Name");
389 DataPacketListener l = new DataPacketListener(listenerName, s, null,
391 if (!this.indexDataPacket.contains(l)) {
392 logger.error("trying to remove a non-existing element");
394 logger.debug("removing listener: {}", listenerName);
395 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
397 boolean done = false;
398 for (i = 0; i < serialListeners.size(); i++) {
399 if (serialListeners.get(i).equals(l)) {
400 serialListeners.remove(i);
405 // Now remove a serialListener that maybe empty
406 if (serialListeners.isEmpty()) {
407 this.listenDataPacket.remove(serialListeners);
409 // If we did fine the element, lets break early
415 this.indexDataPacket.remove(l);
420 * Function called by the dependency manager when all the required
421 * dependencies are satisfied
425 this.txThread.start();
426 this.rxThread.start();
430 * Function called by the dependency manager when at least one
431 * dependency become unsatisfied or when the component is shutting
432 * down because for example bundle is being stopped.
436 // Make sure to cleanup the data structure we use to track
438 this.listenDataPacket.clear();
439 this.indexDataPacket.clear();
440 this.pluginInDataService.clear();
441 this.statistics.clear();
442 this.rxQueue.clear();
443 this.txQueue.clear();
444 this.txThread.interrupt();
445 this.rxThread.interrupt();
446 // Wait for them to be done
448 this.txThread.join();
449 this.rxThread.join();
450 } catch (InterruptedException ex) {
455 private void increaseStat(String name) {
456 if (this.statistics == null) {
460 AtomicInteger currValue = null;
461 synchronized (this.statistics) {
462 currValue = this.statistics.get(name);
464 if (currValue == null) {
465 this.statistics.put(name, new AtomicInteger(0));
469 currValue.incrementAndGet();
473 public PacketResult receiveDataPacket(RawPacket inPkt) {
474 if (inPkt.getIncomingNodeConnector() == null) {
475 increaseStat("nullIncomingNodeConnector");
476 return PacketResult.IGNORED;
479 // If the queue was full don't wait, rather increase a counter
481 if (!this.rxQueue.offer(inPkt)) {
482 increaseStat("fullRXQueue");
483 return PacketResult.IGNORED;
486 // Walk the chain of listener going first throw all the
487 // parallel ones and for each parallel in serial
488 return PacketResult.IGNORED;
492 public void transmitDataPacket(RawPacket outPkt) {
493 if (outPkt.getOutgoingNodeConnector() == null) {
494 increaseStat("nullOutgoingNodeConnector");
498 if (!this.txQueue.offer(outPkt)) {
499 increaseStat("fullTXQueue");
505 public Packet decodeDataPacket(RawPacket pkt) {
510 byte[] data = pkt.getPacketData();
511 if (data.length <= 0) {
514 if (pkt.getEncap().equals(LinkEncap.ETHERNET)) {
515 Ethernet res = new Ethernet();
517 res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
518 } catch (Exception e) {
527 public RawPacket encodeDataPacket(Packet pkt) {
534 data = pkt.serialize();
535 } catch (Exception e) {
539 if (data.length <= 0) {
543 RawPacket res = new RawPacket(data);
545 } catch (ConstructionException cex) {
547 // If something goes wrong then we have to return null