3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 * @file DataPacketService.java
13 * @brief Implementation of Data Packet services in SAL
15 * Implementation of Data Packet services in SAL
18 package org.opendaylight.controller.sal.implementation.internal;
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.atomic.AtomicInteger;
31 import org.opendaylight.controller.sal.core.ConstructionException;
32 import org.opendaylight.controller.sal.core.NodeConnector;
33 import org.opendaylight.controller.sal.match.Match;
34 import org.opendaylight.controller.sal.packet.Ethernet;
35 import org.opendaylight.controller.sal.packet.IDataPacketService;
36 import org.opendaylight.controller.sal.packet.IListenDataPacket;
37 import org.opendaylight.controller.sal.packet.IPluginInDataPacketService;
38 import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService;
39 import org.opendaylight.controller.sal.packet.LinkEncap;
40 import org.opendaylight.controller.sal.packet.Packet;
41 import org.opendaylight.controller.sal.packet.PacketResult;
42 import org.opendaylight.controller.sal.packet.RawPacket;
43 import org.opendaylight.controller.sal.utils.GlobalConstants;
44 import org.opendaylight.controller.sal.utils.NetUtils;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
48 public class DataPacketService implements IPluginOutDataPacketService,
50 private int TXMAXQUEUESIZE = 1000;
51 protected static final Logger logger = LoggerFactory
52 .getLogger(DataPacketService.class);
54 * Database that associates one NodeIDType to the
55 * IPluginDataPacketService, in fact we expect that there will be
56 * one instance of IPluginDataPacketService for each southbound
58 * Using the ConcurrentHashMap because the threads that will be
59 * adding a new service, removing a service, going through all of
60 * them maybe different.
62 private ConcurrentHashMap<String, IPluginInDataPacketService>
64 new ConcurrentHashMap<String, IPluginInDataPacketService>();
65 private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
68 * Queue for packets that need to be transmitted to Data Path
70 private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
75 private Thread txThread = new Thread(new TxLoop(),
76 "DataPacketService TX thread");
79 * Representation of a Data Packet Listener including of its
82 private class DataPacketListener {
84 private String listenerName;
86 private IListenDataPacket listener;
87 private String dependency;
90 DataPacketListener(String name, IListenDataPacket s, String dependency,
92 this.listenerName = name;
94 this.dependency = dependency;
99 public boolean equals(Object obj) {
104 if (getClass() != obj.getClass())
106 DataPacketListener other = (DataPacketListener) obj;
107 if (!getOuterType().equals(other.getOuterType()))
109 if (listenerName == null) {
110 if (other.listenerName != null)
112 } else if (!listenerName.equals(other.listenerName))
118 public int hashCode() {
119 final int prime = 31;
121 result = prime * result + getOuterType().hashCode();
122 result = prime * result
123 + ((listenerName == null) ? 0 : listenerName.hashCode());
127 private DataPacketService getOuterType() {
128 return DataPacketService.this;
133 * This very expensive version of List is being used because it
134 * work well in concurrent situation, as we expect new service
135 * addition, service removal and walk of the service will happen
136 * from different places
138 private List<List<DataPacketListener>> listenDataPacket = new CopyOnWriteArrayList<List<DataPacketListener>>();
139 // Quick index to make sure there are no duplicate elements
140 private Set<DataPacketListener> indexDataPacket = Collections
141 .synchronizedSet(new HashSet<DataPacketListener>());
144 * Loop for processing Received packets
146 private void dispatchPacket(RawPacket pkt) {
148 // for now we treat all listeners as serial listeners
149 for (List<DataPacketListener> serialListeners : listenDataPacket) {
150 for (DataPacketListener l : serialListeners) {
152 // TODO: possibly deal with read-only and read-write packet
154 IListenDataPacket s = (l == null ? null : l.listener);
157 // TODO Make sure to filter based on the match too,
159 PacketResult res = s.receiveDataPacket(pkt);
160 increaseStat("RXPacketSuccess");
161 if (res.equals(PacketResult.CONSUME)) {
162 increaseStat("RXPacketSerialExit");
165 } catch (Exception e) {
166 increaseStat("RXPacketFailedForException");
174 * Loop for processing packets to be transmitted
177 private class TxLoop implements Runnable {
181 for (pkt = txQueue.take(); pkt != null; pkt = txQueue.take()) {
182 // Retrieve outgoing node connector so to send out
183 // the packet to corresponding node
184 NodeConnector p = pkt.getOutgoingNodeConnector();
186 String t = p.getNode()
188 // Now locate the TX dispatcher
189 IPluginInDataPacketService s = pluginInDataService
193 s.transmitDataPacket(pkt);
194 increaseStat("TXPacketSuccess");
195 } catch (Exception e) {
196 increaseStat("TXPacketFailedForException");
199 increaseStat("TXpluginNotFound");
203 } catch (InterruptedException e) {
209 void setPluginInDataService(Map props, IPluginInDataPacketService s) {
210 if (this.pluginInDataService == null) {
211 logger.error("pluginInDataService store null");
215 logger.trace("Received setPluginInDataService request");
216 for (Object e : props.entrySet()) {
217 Map.Entry entry = (Map.Entry) e;
218 logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
221 Object value = props.get(GlobalConstants.PROTOCOLPLUGINTYPE.toString());
222 if (value instanceof String) {
223 type = (String) value;
226 logger.error("Received a PluginInDataService without any "
227 + "protocolPluginType provided");
229 this.pluginInDataService.put(type, s);
230 logger.debug("Stored the PluginInDataService for type: {}", type);
234 void unsetPluginInDataService(Map props, IPluginInDataPacketService s) {
235 if (this.pluginInDataService == null) {
236 logger.error("pluginInDataService store null");
241 logger.trace("Received unsetPluginInDataService request");
242 for (Object e : props.entrySet()) {
243 Map.Entry entry = (Map.Entry) e;
244 logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
247 Object value = props.get(GlobalConstants.PROTOCOLPLUGINTYPE.toString());
248 if (value instanceof String) {
249 type = (String) value;
252 logger.error("Received a PluginInDataService without any "
253 + "protocolPluginType provided");
254 } else if (this.pluginInDataService.get(type).equals(s)) {
255 this.pluginInDataService.remove(type);
256 logger.debug("Removed the PluginInDataService for type: {}", type);
260 void setListenDataPacket(Map props, IListenDataPacket s) {
261 if (this.listenDataPacket == null || this.indexDataPacket == null) {
262 logger.error("data structure to store data is NULL");
265 logger.trace("Received setListenDataPacket request");
266 for (Object e : props.entrySet()) {
267 Map.Entry entry = (Map.Entry) e;
268 logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
271 String listenerName = null;
272 String listenerDependency = null;
275 // Read the listenerName
276 value = props.get("salListenerName");
277 if (value instanceof String) {
278 listenerName = (String) value;
281 if (listenerName == null) {
282 logger.error("Trying to set a listener without a Name");
286 //Read the dependency
287 value = props.get("salListenerDependency");
288 if (value instanceof String) {
289 listenerDependency = (String) value;
292 //Read match filter if any
293 value = props.get("salListenerFilter");
294 if (value instanceof Match) {
295 filter = (Match) value;
298 DataPacketListener l = new DataPacketListener(listenerName, s,
299 listenerDependency, filter);
301 DataPacketListener lDependency = new DataPacketListener(
302 listenerDependency, null, null, null);
304 // Now let see if there is any dependency
305 if (listenerDependency == null) {
306 logger.debug("listener without any dependency");
307 if (this.indexDataPacket.contains(l)) {
308 logger.error("trying to add an existing element");
310 logger.debug("adding listener: {}", listenerName);
311 CopyOnWriteArrayList<DataPacketListener> serialListeners = new CopyOnWriteArrayList<DataPacketListener>();
312 serialListeners.add(l);
313 this.listenDataPacket.add(serialListeners);
314 this.indexDataPacket.add(l);
317 logger.debug("listener with dependency");
318 // Now search for the dependency and put things in order
319 if (this.indexDataPacket.contains(l)) {
320 logger.error("trying to add an existing element");
322 logger.debug("adding listener: {}", listenerName);
323 // Lets find the set with the dependency in it, if we
324 // find it lets just add our dependency at the end of
326 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
328 boolean done = false;
329 if (serialListeners.contains(lDependency)) {
330 serialListeners.add(l);
333 // If we did fine the element, lets break early
339 this.indexDataPacket.add(l);
344 void unsetListenDataPacket(Map props, IListenDataPacket s) {
345 if (this.listenDataPacket == null || this.indexDataPacket == null) {
346 logger.error("data structure to store data is NULL");
349 logger.trace("Received UNsetListenDataPacket request");
350 for (Object e : props.entrySet()) {
351 Map.Entry entry = (Map.Entry) e;
352 logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
355 String listenerName = null;
357 // Read the listenerName
358 value = props.get("salListenerName");
359 if (value instanceof String) {
360 listenerName = (String) value;
363 if (listenerName == null) {
364 logger.error("Trying to set a listener without a Name");
368 DataPacketListener l = new DataPacketListener(listenerName, s, null,
370 if (!this.indexDataPacket.contains(l)) {
371 logger.error("trying to remove a non-existing element");
373 logger.debug("removing listener: {}", listenerName);
374 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
376 boolean done = false;
377 for (i = 0; i < serialListeners.size(); i++) {
378 if (serialListeners.get(i).equals(l)) {
379 serialListeners.remove(i);
384 // Now remove a serialListener that maybe empty
385 if (serialListeners.isEmpty()) {
386 this.listenDataPacket.remove(serialListeners);
388 // If we did fine the element, lets break early
394 this.indexDataPacket.remove(l);
399 * Function called by the dependency manager when all the required
400 * dependencies are satisfied
404 this.txThread.start();
408 * Function called by the dependency manager when at least one
409 * dependency become unsatisfied or when the component is shutting
410 * down because for example bundle is being stopped.
414 // Make sure to cleanup the data structure we use to track
416 this.listenDataPacket.clear();
417 this.indexDataPacket.clear();
418 this.pluginInDataService.clear();
419 this.statistics.clear();
420 this.txQueue.clear();
421 this.txThread.interrupt();
422 // Wait for them to be done
424 this.txThread.join();
425 } catch (InterruptedException ex) {
430 private void increaseStat(String name) {
431 if (this.statistics == null) {
435 AtomicInteger currValue = null;
436 synchronized (this.statistics) {
437 currValue = this.statistics.get(name);
439 if (currValue == null) {
440 this.statistics.put(name, new AtomicInteger(0));
444 currValue.incrementAndGet();
448 public PacketResult receiveDataPacket(RawPacket inPkt) {
449 if (inPkt.getIncomingNodeConnector() == null) {
450 increaseStat("nullIncomingNodeConnector");
451 return PacketResult.IGNORED;
454 // send the packet off to be processed by listeners
455 this.dispatchPacket(inPkt);
457 // Walk the chain of listener going first throw all the
458 // parallel ones and for each parallel in serial
459 return PacketResult.IGNORED;
463 public void transmitDataPacket(RawPacket outPkt) {
464 if (outPkt.getOutgoingNodeConnector() == null) {
465 increaseStat("nullOutgoingNodeConnector");
469 if (!this.txQueue.offer(outPkt)) {
470 increaseStat("fullTXQueue");
476 public Packet decodeDataPacket(RawPacket pkt) {
481 byte[] data = pkt.getPacketData();
482 if (data.length <= 0) {
485 if (pkt.getEncap().equals(LinkEncap.ETHERNET)) {
486 Ethernet res = new Ethernet();
488 res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
489 } catch (Exception e) {
490 logger.warn("Failed to decode packet: {}", e.getMessage());
498 public RawPacket encodeDataPacket(Packet pkt) {
505 data = pkt.serialize();
506 } catch (Exception e) {
510 if (data.length <= 0) {
514 RawPacket res = new RawPacket(data);
516 } catch (ConstructionException cex) {
518 // If something goes wrong then we have to return null