-
/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2013-2014 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.opendaylight.controller.sal.core.ConstructionException;
-import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.core.NodeConnector;
import org.opendaylight.controller.sal.match.Match;
import org.opendaylight.controller.sal.packet.Ethernet;
public class DataPacketService implements IPluginOutDataPacketService,
IDataPacketService {
- private int RXMAXQUEUESIZE = 1000;
private int TXMAXQUEUESIZE = 1000;
protected static final Logger logger = LoggerFactory
.getLogger(DataPacketService.class);
* adding a new service, removing a service, going through all of
* them maybe different.
*/
- private ConcurrentHashMap<String, IPluginInDataPacketService>
+ private ConcurrentHashMap<String, ProtocolService<IPluginInDataPacketService>>
pluginInDataService =
- new ConcurrentHashMap<String, IPluginInDataPacketService>();
+ new ConcurrentHashMap<String, ProtocolService<IPluginInDataPacketService>>();
private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
- /**
- * Queue for packets received from Data Path
- */
- private LinkedBlockingQueue<RawPacket> rxQueue = new LinkedBlockingQueue<RawPacket>(
- RXMAXQUEUESIZE);
+
/**
* Queue for packets that need to be transmitted to Data Path
*/
private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
- RXMAXQUEUESIZE);
+ TXMAXQUEUESIZE);
/**
* Transmission thread
*/
private Thread txThread = new Thread(new TxLoop(),
"DataPacketService TX thread");
- /**
- * Receiving thread
- */
- private Thread rxThread = new Thread(new RxLoop(),
- "DataPacketService RX thread");
/**
* Representation of a Data Packet Listener including of its
* properties
- *
*/
private class DataPacketListener {
// Key fields
@Override
public boolean equals(Object obj) {
- if (obj == null) {
- return false;
- }
- if (obj == this) {
+ if (this == obj)
return true;
- }
- if (obj.getClass() != getClass()) {
+ if (obj == null)
return false;
- }
- DataPacketListener rhs = (DataPacketListener) obj;
- return new EqualsBuilder().append(this.listenerName,
- rhs.listenerName).isEquals();
+ if (getClass() != obj.getClass())
+ return false;
+ DataPacketListener other = (DataPacketListener) obj;
+ if (!getOuterType().equals(other.getOuterType()))
+ return false;
+ if (listenerName == null) {
+ if (other.listenerName != null)
+ return false;
+ } else if (!listenerName.equals(other.listenerName))
+ return false;
+ return true;
}
@Override
public int hashCode() {
- return new HashCodeBuilder(13, 31).append(listenerName)
- .toHashCode();
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + getOuterType().hashCode();
+ result = prime * result
+ + ((listenerName == null) ? 0 : listenerName.hashCode());
+ return result;
+ }
+
+ private DataPacketService getOuterType() {
+ return DataPacketService.this;
}
}
/**
* Loop for processing Received packets
- *
*/
- private class RxLoop implements Runnable {
- public void run() {
- RawPacket pkt;
- try {
- for (pkt = rxQueue.take(); pkt != null; pkt = rxQueue.take()) {
- for (List<DataPacketListener> serialListeners : listenDataPacket) {
- int i = 0;
- for (i = 0; i < serialListeners.size(); i++) {
- RawPacket copyPkt = null;
- try {
- copyPkt = new RawPacket(pkt);
- } catch (ConstructionException cex) {
- logger.debug("Error while cloning the packet");
- }
- if (copyPkt == null) {
- increaseStat("RXPacketCopyFailed");
- continue;
- }
- DataPacketListener l = serialListeners.get(i);
- IListenDataPacket s = (l == null ? null
- : l.listener);
- if (s != null) {
- try {
- // TODO Make sure to filter based
- // on the match too, later on
- PacketResult res = s
- .receiveDataPacket(copyPkt);
- increaseStat("RXPacketSuccess");
- if (res.equals(PacketResult.CONSUME)) {
- increaseStat("RXPacketSerialExit");
- break;
- }
- } catch (Exception e) {
- increaseStat("RXPacketFailedForException");
- }
- }
+ private void dispatchPacket(RawPacket pkt) {
+
+ // for now we treat all listeners as serial listeners
+ for (List<DataPacketListener> serialListeners : listenDataPacket) {
+ for (DataPacketListener l : serialListeners) {
+
+ // TODO: possibly deal with read-only and read-write packet
+ // copies
+ IListenDataPacket s = (l == null ? null : l.listener);
+ if (s != null) {
+ try {
+ // TODO Make sure to filter based on the match too,
+ // later on
+ PacketResult res = s.receiveDataPacket(pkt);
+ increaseStat("RXPacketSuccess");
+ if (res.equals(PacketResult.CONSUME)) {
+ increaseStat("RXPacketSerialExit");
+ break;
}
+ } catch (Exception e) {
+ increaseStat("RXPacketFailedForException");
}
}
- } catch (InterruptedException e) {
- // Not a big deal
}
}
}
String t = p.getNode()
.getType();
// Now locate the TX dispatcher
- IPluginInDataPacketService s = pluginInDataService
- .get(t);
- if (s != null) {
+ ProtocolService<IPluginInDataPacketService> service =
+ pluginInDataService.get(t);
+ if (service != null) {
try {
- s.transmitDataPacket(pkt);
+ service.getService().transmitDataPacket(pkt);
increaseStat("TXPacketSuccess");
} catch (Exception e) {
increaseStat("TXPacketFailedForException");
}
}
- void setPluginInDataService(Map props, IPluginInDataPacketService s) {
- if (this.pluginInDataService == null) {
- logger.error("pluginInDataService store null");
- return;
- }
- String type = null;
- logger.trace("Received setPluginInDataService request");
- for (Object e : props.entrySet()) {
- Map.Entry entry = (Map.Entry) e;
- logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
- }
-
- Object value = props.get("protocolPluginType");
- if (value instanceof String) {
- type = (String) value;
- }
- if (type == null) {
- logger.error("Received a PluginInDataService without any "
- + "protocolPluginType provided");
- } else {
- this.pluginInDataService.put(type, s);
- logger.debug("Stored the PluginInDataService for type: {}", type);
- }
+ void setPluginInDataService(Map<?, ?> props, IPluginInDataPacketService s) {
+ ProtocolService.set(this.pluginInDataService, props, s, logger);
}
- void unsetPluginInDataService(Map props, IPluginInDataPacketService s) {
- if (this.pluginInDataService == null) {
- logger.error("pluginInDataService store null");
- return;
- }
-
- String type = null;
- logger.trace("Received unsetPluginInDataService request");
- for (Object e : props.entrySet()) {
- Map.Entry entry = (Map.Entry) e;
- logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
- }
-
- Object value = props.get("protocoloPluginType");
- if (value instanceof String) {
- type = (String) value;
- }
- if (type == null) {
- logger.error("Received a PluginInDataService without any "
- + "protocolPluginType provided");
- } else if (this.pluginInDataService.get(type).equals(s)) {
- this.pluginInDataService.remove(type);
- logger.debug("Removed the PluginInDataService for type: {}", type);
- }
+ void unsetPluginInDataService(Map<?, ?> props, IPluginInDataPacketService s) {
+ ProtocolService.unset(this.pluginInDataService, props, s, logger);
}
- void setListenDataPacket(Map props, IListenDataPacket s) {
+ void setListenDataPacket(Map<?, ?> props, IListenDataPacket s) {
if (this.listenDataPacket == null || this.indexDataPacket == null) {
logger.error("data structure to store data is NULL");
return;
}
logger.trace("Received setListenDataPacket request");
- for (Object e : props.entrySet()) {
- Map.Entry entry = (Map.Entry) e;
- logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
+ for (Map.Entry<?, ?> e : props.entrySet()) {
+ logger.trace("Prop key:({}) value:({})",e.getKey(), e.getValue());
}
String listenerName = null;
// find it lets just add our dependency at the end of
// the list.
for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
- int i = 0;
boolean done = false;
if (serialListeners.contains(lDependency)) {
serialListeners.add(l);
}
}
- void unsetListenDataPacket(Map props, IListenDataPacket s) {
+ void unsetListenDataPacket(Map<?, ?> props, IListenDataPacket s) {
if (this.listenDataPacket == null || this.indexDataPacket == null) {
logger.error("data structure to store data is NULL");
return;
}
logger.trace("Received UNsetListenDataPacket request");
- for (Object e : props.entrySet()) {
- Map.Entry entry = (Map.Entry) e;
- logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
+ for (Map.Entry<?, ?> e : props.entrySet()) {
+ logger.trace("Prop key:({}) value:({})",e.getKey(), e.getValue());
}
String listenerName = null;
*/
void init() {
this.txThread.start();
- this.rxThread.start();
}
/**
this.indexDataPacket.clear();
this.pluginInDataService.clear();
this.statistics.clear();
- this.rxQueue.clear();
this.txQueue.clear();
this.txThread.interrupt();
- this.rxThread.interrupt();
// Wait for them to be done
try {
this.txThread.join();
- this.rxThread.join();
} catch (InterruptedException ex) {
// Not a big deal
}
return PacketResult.IGNORED;
}
- // If the queue was full don't wait, rather increase a counter
- // for it
- if (!this.rxQueue.offer(inPkt)) {
- increaseStat("fullRXQueue");
- return PacketResult.IGNORED;
- }
+ // send the packet off to be processed by listeners
+ this.dispatchPacket(inPkt);
// Walk the chain of listener going first throw all the
// parallel ones and for each parallel in serial
try {
res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
} catch (Exception e) {
- logger.warn("", e);
+ logger.warn("Failed to decode packet: {}", e.getMessage());
}
return res;
}
try {
data = pkt.serialize();
} catch (Exception e) {
- e.printStackTrace();
+ logger.error("",e);
return null;
}
if (data.length <= 0) {