2 * Copyright (c) 2013-2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 * @file DataPacketService.java
12 * @brief Implementation of Data Packet services in SAL
14 * Implementation of Data Packet services in SAL
17 package org.opendaylight.controller.sal.implementation.internal;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.HashSet;
22 import java.util.List;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.CopyOnWriteArrayList;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.atomic.AtomicInteger;
30 import org.opendaylight.controller.sal.core.ConstructionException;
31 import org.opendaylight.controller.sal.core.NodeConnector;
32 import org.opendaylight.controller.sal.match.Match;
33 import org.opendaylight.controller.sal.packet.Ethernet;
34 import org.opendaylight.controller.sal.packet.IDataPacketService;
35 import org.opendaylight.controller.sal.packet.IListenDataPacket;
36 import org.opendaylight.controller.sal.packet.IPluginInDataPacketService;
37 import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService;
38 import org.opendaylight.controller.sal.packet.LinkEncap;
39 import org.opendaylight.controller.sal.packet.Packet;
40 import org.opendaylight.controller.sal.packet.PacketResult;
41 import org.opendaylight.controller.sal.packet.RawPacket;
42 import org.opendaylight.controller.sal.utils.NetUtils;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
46 public class DataPacketService implements IPluginOutDataPacketService,
48 private int TXMAXQUEUESIZE = 1000;
49 protected static final Logger logger = LoggerFactory
50 .getLogger(DataPacketService.class);
52 * Database that associates one NodeIDType to the
53 * IPluginDataPacketService, in fact we expect that there will be
54 * one instance of IPluginDataPacketService for each southbound
56 * Using the ConcurrentHashMap because the threads that will be
57 * adding a new service, removing a service, going through all of
58 * them maybe different.
60 private ConcurrentHashMap<String, ProtocolService<IPluginInDataPacketService>>
62 new ConcurrentHashMap<String, ProtocolService<IPluginInDataPacketService>>();
63 private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
66 * Queue for packets that need to be transmitted to Data Path
68 private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
73 private Thread txThread = new Thread(new TxLoop(),
74 "DataPacketService TX thread");
77 * Representation of a Data Packet Listener including of its
80 private class DataPacketListener {
82 private String listenerName;
84 private IListenDataPacket listener;
85 private String dependency;
88 DataPacketListener(String name, IListenDataPacket s, String dependency,
90 this.listenerName = name;
92 this.dependency = dependency;
97 public boolean equals(Object obj) {
102 if (getClass() != obj.getClass())
104 DataPacketListener other = (DataPacketListener) obj;
105 if (!getOuterType().equals(other.getOuterType()))
107 if (listenerName == null) {
108 if (other.listenerName != null)
110 } else if (!listenerName.equals(other.listenerName))
116 public int hashCode() {
117 final int prime = 31;
119 result = prime * result + getOuterType().hashCode();
120 result = prime * result
121 + ((listenerName == null) ? 0 : listenerName.hashCode());
125 private DataPacketService getOuterType() {
126 return DataPacketService.this;
131 * This very expensive version of List is being used because it
132 * work well in concurrent situation, as we expect new service
133 * addition, service removal and walk of the service will happen
134 * from different places
136 private List<List<DataPacketListener>> listenDataPacket = new CopyOnWriteArrayList<List<DataPacketListener>>();
137 // Quick index to make sure there are no duplicate elements
138 private Set<DataPacketListener> indexDataPacket = Collections
139 .synchronizedSet(new HashSet<DataPacketListener>());
142 * Loop for processing Received packets
144 private void dispatchPacket(RawPacket pkt) {
146 // for now we treat all listeners as serial listeners
147 for (List<DataPacketListener> serialListeners : listenDataPacket) {
148 for (DataPacketListener l : serialListeners) {
150 // TODO: possibly deal with read-only and read-write packet
152 IListenDataPacket s = (l == null ? null : l.listener);
155 // TODO Make sure to filter based on the match too,
157 PacketResult res = s.receiveDataPacket(pkt);
158 increaseStat("RXPacketSuccess");
159 if (res.equals(PacketResult.CONSUME)) {
160 increaseStat("RXPacketSerialExit");
163 } catch (Exception e) {
164 increaseStat("RXPacketFailedForException");
172 * Loop for processing packets to be transmitted
175 private class TxLoop implements Runnable {
179 for (pkt = txQueue.take(); pkt != null; pkt = txQueue.take()) {
180 // Retrieve outgoing node connector so to send out
181 // the packet to corresponding node
182 NodeConnector p = pkt.getOutgoingNodeConnector();
184 String t = p.getNode()
186 // Now locate the TX dispatcher
187 ProtocolService<IPluginInDataPacketService> service =
188 pluginInDataService.get(t);
189 if (service != null) {
191 service.getService().transmitDataPacket(pkt);
192 increaseStat("TXPacketSuccess");
193 } catch (Exception e) {
194 increaseStat("TXPacketFailedForException");
197 increaseStat("TXpluginNotFound");
201 } catch (InterruptedException e) {
207 void setPluginInDataService(Map<?, ?> props, IPluginInDataPacketService s) {
208 ProtocolService.set(this.pluginInDataService, props, s, logger);
211 void unsetPluginInDataService(Map<?, ?> props, IPluginInDataPacketService s) {
212 ProtocolService.unset(this.pluginInDataService, props, s, logger);
215 void setListenDataPacket(Map<?, ?> props, IListenDataPacket s) {
216 if (this.listenDataPacket == null || this.indexDataPacket == null) {
217 logger.error("data structure to store data is NULL");
220 logger.trace("Received setListenDataPacket request");
221 for (Map.Entry<?, ?> e : props.entrySet()) {
222 logger.trace("Prop key:({}) value:({})",e.getKey(), e.getValue());
225 String listenerName = null;
226 String listenerDependency = null;
229 // Read the listenerName
230 value = props.get("salListenerName");
231 if (value instanceof String) {
232 listenerName = (String) value;
235 if (listenerName == null) {
236 logger.error("Trying to set a listener without a Name");
240 //Read the dependency
241 value = props.get("salListenerDependency");
242 if (value instanceof String) {
243 listenerDependency = (String) value;
246 //Read match filter if any
247 value = props.get("salListenerFilter");
248 if (value instanceof Match) {
249 filter = (Match) value;
252 DataPacketListener l = new DataPacketListener(listenerName, s,
253 listenerDependency, filter);
255 DataPacketListener lDependency = new DataPacketListener(
256 listenerDependency, null, null, null);
258 // Now let see if there is any dependency
259 if (listenerDependency == null) {
260 logger.debug("listener without any dependency");
261 if (this.indexDataPacket.contains(l)) {
262 logger.error("trying to add an existing element");
264 logger.debug("adding listener: {}", listenerName);
265 CopyOnWriteArrayList<DataPacketListener> serialListeners = new CopyOnWriteArrayList<DataPacketListener>();
266 serialListeners.add(l);
267 this.listenDataPacket.add(serialListeners);
268 this.indexDataPacket.add(l);
271 logger.debug("listener with dependency");
272 // Now search for the dependency and put things in order
273 if (this.indexDataPacket.contains(l)) {
274 logger.error("trying to add an existing element");
276 logger.debug("adding listener: {}", listenerName);
277 // Lets find the set with the dependency in it, if we
278 // find it lets just add our dependency at the end of
280 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
281 boolean done = false;
282 if (serialListeners.contains(lDependency)) {
283 serialListeners.add(l);
286 // If we did fine the element, lets break early
292 this.indexDataPacket.add(l);
297 void unsetListenDataPacket(Map<?, ?> props, IListenDataPacket s) {
298 if (this.listenDataPacket == null || this.indexDataPacket == null) {
299 logger.error("data structure to store data is NULL");
302 logger.trace("Received UNsetListenDataPacket request");
303 for (Map.Entry<?, ?> e : props.entrySet()) {
304 logger.trace("Prop key:({}) value:({})",e.getKey(), e.getValue());
307 String listenerName = null;
309 // Read the listenerName
310 value = props.get("salListenerName");
311 if (value instanceof String) {
312 listenerName = (String) value;
315 if (listenerName == null) {
316 logger.error("Trying to set a listener without a Name");
320 DataPacketListener l = new DataPacketListener(listenerName, s, null,
322 if (!this.indexDataPacket.contains(l)) {
323 logger.error("trying to remove a non-existing element");
325 logger.debug("removing listener: {}", listenerName);
326 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
328 boolean done = false;
329 for (i = 0; i < serialListeners.size(); i++) {
330 if (serialListeners.get(i).equals(l)) {
331 serialListeners.remove(i);
336 // Now remove a serialListener that maybe empty
337 if (serialListeners.isEmpty()) {
338 this.listenDataPacket.remove(serialListeners);
340 // If we did fine the element, lets break early
346 this.indexDataPacket.remove(l);
351 * Function called by the dependency manager when all the required
352 * dependencies are satisfied
356 this.txThread.start();
360 * Function called by the dependency manager when at least one
361 * dependency become unsatisfied or when the component is shutting
362 * down because for example bundle is being stopped.
366 // Make sure to cleanup the data structure we use to track
368 this.listenDataPacket.clear();
369 this.indexDataPacket.clear();
370 this.pluginInDataService.clear();
371 this.statistics.clear();
372 this.txQueue.clear();
373 this.txThread.interrupt();
374 // Wait for them to be done
376 this.txThread.join();
377 } catch (InterruptedException ex) {
382 private void increaseStat(String name) {
383 if (this.statistics == null) {
387 AtomicInteger currValue = null;
388 synchronized (this.statistics) {
389 currValue = this.statistics.get(name);
391 if (currValue == null) {
392 this.statistics.put(name, new AtomicInteger(0));
396 currValue.incrementAndGet();
400 public PacketResult receiveDataPacket(RawPacket inPkt) {
401 if (inPkt.getIncomingNodeConnector() == null) {
402 increaseStat("nullIncomingNodeConnector");
403 return PacketResult.IGNORED;
406 // send the packet off to be processed by listeners
407 this.dispatchPacket(inPkt);
409 // Walk the chain of listener going first throw all the
410 // parallel ones and for each parallel in serial
411 return PacketResult.IGNORED;
415 public void transmitDataPacket(RawPacket outPkt) {
416 if (outPkt.getOutgoingNodeConnector() == null) {
417 increaseStat("nullOutgoingNodeConnector");
421 if (!this.txQueue.offer(outPkt)) {
422 increaseStat("fullTXQueue");
428 public Packet decodeDataPacket(RawPacket pkt) {
433 byte[] data = pkt.getPacketData();
434 if (data.length <= 0) {
437 if (pkt.getEncap().equals(LinkEncap.ETHERNET)) {
438 Ethernet res = new Ethernet();
440 res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
441 } catch (Exception e) {
442 logger.warn("Failed to decode packet: {}", e.getMessage());
450 public RawPacket encodeDataPacket(Packet pkt) {
457 data = pkt.serialize();
458 } catch (Exception e) {
462 if (data.length <= 0) {
466 RawPacket res = new RawPacket(data);
468 } catch (ConstructionException cex) {
470 // If something goes wrong then we have to return null