3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 * @file DataPacketService.java
13 * @brief Implementation of Data Packet services in SAL
15 * Implementation of Data Packet services in SAL
18 package org.opendaylight.controller.sal.implementation.internal;
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.atomic.AtomicInteger;
31 import org.opendaylight.controller.sal.core.ConstructionException;
32 import org.opendaylight.controller.sal.core.NodeConnector;
33 import org.opendaylight.controller.sal.match.Match;
34 import org.opendaylight.controller.sal.packet.Ethernet;
35 import org.opendaylight.controller.sal.packet.IDataPacketService;
36 import org.opendaylight.controller.sal.packet.IListenDataPacket;
37 import org.opendaylight.controller.sal.packet.IPluginInDataPacketService;
38 import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService;
39 import org.opendaylight.controller.sal.packet.LinkEncap;
40 import org.opendaylight.controller.sal.packet.Packet;
41 import org.opendaylight.controller.sal.packet.PacketResult;
42 import org.opendaylight.controller.sal.packet.RawPacket;
43 import org.opendaylight.controller.sal.utils.NetUtils;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
47 public class DataPacketService implements IPluginOutDataPacketService,
49 private int RXMAXQUEUESIZE = 1000;
50 private int TXMAXQUEUESIZE = 1000;
51 protected static final Logger logger = LoggerFactory
52 .getLogger(DataPacketService.class);
54 * Database that associates one NodeIDType to the
55 * IPluginDataPacketService, in fact we expect that there will be
56 * one instance of IPluginDataPacketService for each southbound
58 * Using the ConcurrentHashMap because the threads that will be
59 * adding a new service, removing a service, going through all of
60 * them maybe different.
62 private ConcurrentHashMap<String, IPluginInDataPacketService>
64 new ConcurrentHashMap<String, IPluginInDataPacketService>();
65 private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
67 * Queue for packets received from Data Path
69 private LinkedBlockingQueue<RawPacket> rxQueue = new LinkedBlockingQueue<RawPacket>(
72 * Queue for packets that need to be transmitted to Data Path
74 private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
79 private Thread txThread = new Thread(new TxLoop(),
80 "DataPacketService TX thread");
84 private Thread rxThread = new Thread(new RxLoop(),
85 "DataPacketService RX thread");
88 * Representation of a Data Packet Listener including of its
92 private class DataPacketListener {
94 private String listenerName;
96 private IListenDataPacket listener;
97 private String dependency;
100 DataPacketListener(String name, IListenDataPacket s, String dependency,
102 this.listenerName = name;
104 this.dependency = dependency;
109 public boolean equals(Object obj) {
114 if (getClass() != obj.getClass())
116 DataPacketListener other = (DataPacketListener) obj;
117 if (!getOuterType().equals(other.getOuterType()))
119 if (listenerName == null) {
120 if (other.listenerName != null)
122 } else if (!listenerName.equals(other.listenerName))
128 public int hashCode() {
129 final int prime = 31;
131 result = prime * result + getOuterType().hashCode();
132 result = prime * result
133 + ((listenerName == null) ? 0 : listenerName.hashCode());
137 private DataPacketService getOuterType() {
138 return DataPacketService.this;
143 * This very expensive version of List is being used because it
144 * work well in concurrent situation, as we expect new service
145 * addition, service removal and walk of the service will happen
146 * from different places
148 private List<List<DataPacketListener>> listenDataPacket = new CopyOnWriteArrayList<List<DataPacketListener>>();
149 // Quick index to make sure there are no duplicate elements
150 private Set<DataPacketListener> indexDataPacket = Collections
151 .synchronizedSet(new HashSet<DataPacketListener>());
154 * Loop for processing Received packets
157 private class RxLoop implements Runnable {
161 for (pkt = rxQueue.take(); pkt != null; pkt = rxQueue.take()) {
162 for (List<DataPacketListener> serialListeners : listenDataPacket) {
164 for (i = 0; i < serialListeners.size(); i++) {
165 RawPacket copyPkt = null;
167 copyPkt = new RawPacket(pkt);
168 } catch (ConstructionException cex) {
169 logger.debug("Error while cloning the packet");
171 if (copyPkt == null) {
172 increaseStat("RXPacketCopyFailed");
175 DataPacketListener l = serialListeners.get(i);
176 IListenDataPacket s = (l == null ? null
180 // TODO Make sure to filter based
181 // on the match too, later on
183 .receiveDataPacket(copyPkt);
184 increaseStat("RXPacketSuccess");
185 if (res.equals(PacketResult.CONSUME)) {
186 increaseStat("RXPacketSerialExit");
189 } catch (Exception e) {
190 increaseStat("RXPacketFailedForException");
196 } catch (InterruptedException e) {
203 * Loop for processing packets to be transmitted
206 private class TxLoop implements Runnable {
210 for (pkt = txQueue.take(); pkt != null; pkt = txQueue.take()) {
211 // Retrieve outgoing node connector so to send out
212 // the packet to corresponding node
213 NodeConnector p = pkt.getOutgoingNodeConnector();
215 String t = p.getNode()
217 // Now locate the TX dispatcher
218 IPluginInDataPacketService s = pluginInDataService
222 s.transmitDataPacket(pkt);
223 increaseStat("TXPacketSuccess");
224 } catch (Exception e) {
225 increaseStat("TXPacketFailedForException");
228 increaseStat("TXpluginNotFound");
232 } catch (InterruptedException e) {
238 void setPluginInDataService(Map props, IPluginInDataPacketService s) {
239 if (this.pluginInDataService == null) {
240 logger.error("pluginInDataService store null");
244 logger.trace("Received setPluginInDataService request");
245 for (Object e : props.entrySet()) {
246 Map.Entry entry = (Map.Entry) e;
247 logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
250 Object value = props.get("protocolPluginType");
251 if (value instanceof String) {
252 type = (String) value;
255 logger.error("Received a PluginInDataService without any "
256 + "protocolPluginType provided");
258 this.pluginInDataService.put(type, s);
259 logger.debug("Stored the PluginInDataService for type: {}", type);
263 void unsetPluginInDataService(Map props, IPluginInDataPacketService s) {
264 if (this.pluginInDataService == null) {
265 logger.error("pluginInDataService store null");
270 logger.trace("Received unsetPluginInDataService request");
271 for (Object e : props.entrySet()) {
272 Map.Entry entry = (Map.Entry) e;
273 logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
276 Object value = props.get("protocoloPluginType");
277 if (value instanceof String) {
278 type = (String) value;
281 logger.error("Received a PluginInDataService without any "
282 + "protocolPluginType provided");
283 } else if (this.pluginInDataService.get(type).equals(s)) {
284 this.pluginInDataService.remove(type);
285 logger.debug("Removed the PluginInDataService for type: {}", type);
289 void setListenDataPacket(Map props, IListenDataPacket s) {
290 if (this.listenDataPacket == null || this.indexDataPacket == null) {
291 logger.error("data structure to store data is NULL");
294 logger.trace("Received setListenDataPacket request");
295 for (Object e : props.entrySet()) {
296 Map.Entry entry = (Map.Entry) e;
297 logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
300 String listenerName = null;
301 String listenerDependency = null;
304 // Read the listenerName
305 value = props.get("salListenerName");
306 if (value instanceof String) {
307 listenerName = (String) value;
310 if (listenerName == null) {
311 logger.error("Trying to set a listener without a Name");
315 //Read the dependency
316 value = props.get("salListenerDependency");
317 if (value instanceof String) {
318 listenerDependency = (String) value;
321 //Read match filter if any
322 value = props.get("salListenerFilter");
323 if (value instanceof Match) {
324 filter = (Match) value;
327 DataPacketListener l = new DataPacketListener(listenerName, s,
328 listenerDependency, filter);
330 DataPacketListener lDependency = new DataPacketListener(
331 listenerDependency, null, null, null);
333 // Now let see if there is any dependency
334 if (listenerDependency == null) {
335 logger.debug("listener without any dependency");
336 if (this.indexDataPacket.contains(l)) {
337 logger.error("trying to add an existing element");
339 logger.debug("adding listener: {}", listenerName);
340 CopyOnWriteArrayList<DataPacketListener> serialListeners = new CopyOnWriteArrayList<DataPacketListener>();
341 serialListeners.add(l);
342 this.listenDataPacket.add(serialListeners);
343 this.indexDataPacket.add(l);
346 logger.debug("listener with dependency");
347 // Now search for the dependency and put things in order
348 if (this.indexDataPacket.contains(l)) {
349 logger.error("trying to add an existing element");
351 logger.debug("adding listener: {}", listenerName);
352 // Lets find the set with the dependency in it, if we
353 // find it lets just add our dependency at the end of
355 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
357 boolean done = false;
358 if (serialListeners.contains(lDependency)) {
359 serialListeners.add(l);
362 // If we did fine the element, lets break early
368 this.indexDataPacket.add(l);
373 void unsetListenDataPacket(Map props, IListenDataPacket s) {
374 if (this.listenDataPacket == null || this.indexDataPacket == null) {
375 logger.error("data structure to store data is NULL");
378 logger.trace("Received UNsetListenDataPacket request");
379 for (Object e : props.entrySet()) {
380 Map.Entry entry = (Map.Entry) e;
381 logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
384 String listenerName = null;
386 // Read the listenerName
387 value = props.get("salListenerName");
388 if (value instanceof String) {
389 listenerName = (String) value;
392 if (listenerName == null) {
393 logger.error("Trying to set a listener without a Name");
397 DataPacketListener l = new DataPacketListener(listenerName, s, null,
399 if (!this.indexDataPacket.contains(l)) {
400 logger.error("trying to remove a non-existing element");
402 logger.debug("removing listener: {}", listenerName);
403 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
405 boolean done = false;
406 for (i = 0; i < serialListeners.size(); i++) {
407 if (serialListeners.get(i).equals(l)) {
408 serialListeners.remove(i);
413 // Now remove a serialListener that maybe empty
414 if (serialListeners.isEmpty()) {
415 this.listenDataPacket.remove(serialListeners);
417 // If we did fine the element, lets break early
423 this.indexDataPacket.remove(l);
428 * Function called by the dependency manager when all the required
429 * dependencies are satisfied
433 this.txThread.start();
434 this.rxThread.start();
438 * Function called by the dependency manager when at least one
439 * dependency become unsatisfied or when the component is shutting
440 * down because for example bundle is being stopped.
444 // Make sure to cleanup the data structure we use to track
446 this.listenDataPacket.clear();
447 this.indexDataPacket.clear();
448 this.pluginInDataService.clear();
449 this.statistics.clear();
450 this.rxQueue.clear();
451 this.txQueue.clear();
452 this.txThread.interrupt();
453 this.rxThread.interrupt();
454 // Wait for them to be done
456 this.txThread.join();
457 this.rxThread.join();
458 } catch (InterruptedException ex) {
463 private void increaseStat(String name) {
464 if (this.statistics == null) {
468 AtomicInteger currValue = null;
469 synchronized (this.statistics) {
470 currValue = this.statistics.get(name);
472 if (currValue == null) {
473 this.statistics.put(name, new AtomicInteger(0));
477 currValue.incrementAndGet();
481 public PacketResult receiveDataPacket(RawPacket inPkt) {
482 if (inPkt.getIncomingNodeConnector() == null) {
483 increaseStat("nullIncomingNodeConnector");
484 return PacketResult.IGNORED;
487 // If the queue was full don't wait, rather increase a counter
489 if (!this.rxQueue.offer(inPkt)) {
490 increaseStat("fullRXQueue");
491 return PacketResult.IGNORED;
494 // Walk the chain of listener going first throw all the
495 // parallel ones and for each parallel in serial
496 return PacketResult.IGNORED;
500 public void transmitDataPacket(RawPacket outPkt) {
501 if (outPkt.getOutgoingNodeConnector() == null) {
502 increaseStat("nullOutgoingNodeConnector");
506 if (!this.txQueue.offer(outPkt)) {
507 increaseStat("fullTXQueue");
513 public Packet decodeDataPacket(RawPacket pkt) {
518 byte[] data = pkt.getPacketData();
519 if (data.length <= 0) {
522 if (pkt.getEncap().equals(LinkEncap.ETHERNET)) {
523 Ethernet res = new Ethernet();
525 res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
526 } catch (Exception e) {
527 logger.warn("Failed to decode packet: {}", e.getMessage());
535 public RawPacket encodeDataPacket(Packet pkt) {
542 data = pkt.serialize();
543 } catch (Exception e) {
547 if (data.length <= 0) {
551 RawPacket res = new RawPacket(data);
553 } catch (ConstructionException cex) {
555 // If something goes wrong then we have to return null