import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.opendaylight.controller.sal.core.ConstructionException;
-import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.core.NodeConnector;
import org.opendaylight.controller.sal.match.Match;
import org.opendaylight.controller.sal.packet.Ethernet;
public class DataPacketService implements IPluginOutDataPacketService,
IDataPacketService {
- private int RXMAXQUEUESIZE = 1000;
private int TXMAXQUEUESIZE = 1000;
protected static final Logger logger = LoggerFactory
.getLogger(DataPacketService.class);
pluginInDataService =
new ConcurrentHashMap<String, IPluginInDataPacketService>();
private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
- /**
- * Queue for packets received from Data Path
- */
- private LinkedBlockingQueue<RawPacket> rxQueue = new LinkedBlockingQueue<RawPacket>(
- RXMAXQUEUESIZE);
+
/**
* Queue for packets that need to be transmitted to Data Path
*/
private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
- RXMAXQUEUESIZE);
+ TXMAXQUEUESIZE);
/**
* Transmission thread
*/
private Thread txThread = new Thread(new TxLoop(),
"DataPacketService TX thread");
- /**
- * Receiving thread
- */
- private Thread rxThread = new Thread(new RxLoop(),
- "DataPacketService RX thread");
/**
* Representation of a Data Packet Listener including of its
* properties
- *
*/
private class DataPacketListener {
// Key fields
@Override
public boolean equals(Object obj) {
- if (obj == null) {
- return false;
- }
- if (obj == this) {
+ if (this == obj)
return true;
- }
- if (obj.getClass() != getClass()) {
+ if (obj == null)
return false;
- }
- DataPacketListener rhs = (DataPacketListener) obj;
- return new EqualsBuilder().append(this.listenerName,
- rhs.listenerName).isEquals();
+ if (getClass() != obj.getClass())
+ return false;
+ DataPacketListener other = (DataPacketListener) obj;
+ if (!getOuterType().equals(other.getOuterType()))
+ return false;
+ if (listenerName == null) {
+ if (other.listenerName != null)
+ return false;
+ } else if (!listenerName.equals(other.listenerName))
+ return false;
+ return true;
}
@Override
public int hashCode() {
- return new HashCodeBuilder(13, 31).append(listenerName)
- .toHashCode();
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + getOuterType().hashCode();
+ result = prime * result
+ + ((listenerName == null) ? 0 : listenerName.hashCode());
+ return result;
+ }
+
+ private DataPacketService getOuterType() {
+ return DataPacketService.this;
}
}
/**
* Loop for processing Received packets
- *
*/
- private class RxLoop implements Runnable {
- public void run() {
- RawPacket pkt;
- try {
- for (pkt = rxQueue.take(); pkt != null; pkt = rxQueue.take()) {
- for (List<DataPacketListener> serialListeners : listenDataPacket) {
- int i = 0;
- for (i = 0; i < serialListeners.size(); i++) {
- RawPacket copyPkt = null;
- try {
- copyPkt = new RawPacket(pkt);
- } catch (ConstructionException cex) {
- logger.debug("Error while cloning the packet");
- }
- if (copyPkt == null) {
- increaseStat("RXPacketCopyFailed");
- continue;
- }
- DataPacketListener l = serialListeners.get(i);
- IListenDataPacket s = (l == null ? null
- : l.listener);
- if (s != null) {
- try {
- // TODO Make sure to filter based
- // on the match too, later on
- PacketResult res = s
- .receiveDataPacket(copyPkt);
- increaseStat("RXPacketSuccess");
- if (res.equals(PacketResult.CONSUME)) {
- increaseStat("RXPacketSerialExit");
- break;
- }
- } catch (Exception e) {
- increaseStat("RXPacketFailedForException");
- }
- }
+ private void dispatchPacket(RawPacket pkt) {
+
+ // for now we treat all listeners as serial listeners
+ for (List<DataPacketListener> serialListeners : listenDataPacket) {
+ for (DataPacketListener l : serialListeners) {
+
+ // TODO: possibly deal with read-only and read-write packet
+ // copies
+ IListenDataPacket s = (l == null ? null : l.listener);
+ if (s != null) {
+ try {
+ // TODO Make sure to filter based on the match too,
+ // later on
+ PacketResult res = s.receiveDataPacket(pkt);
+ increaseStat("RXPacketSuccess");
+ if (res.equals(PacketResult.CONSUME)) {
+ increaseStat("RXPacketSerialExit");
+ break;
}
+ } catch (Exception e) {
+ increaseStat("RXPacketFailedForException");
}
}
- } catch (InterruptedException e) {
- // Not a big deal
}
}
}
*/
void init() {
this.txThread.start();
- this.rxThread.start();
}
/**
this.indexDataPacket.clear();
this.pluginInDataService.clear();
this.statistics.clear();
- this.rxQueue.clear();
this.txQueue.clear();
this.txThread.interrupt();
- this.rxThread.interrupt();
// Wait for them to be done
try {
this.txThread.join();
- this.rxThread.join();
} catch (InterruptedException ex) {
// Not a big deal
}
return PacketResult.IGNORED;
}
- // If the queue was full don't wait, rather increase a counter
- // for it
- if (!this.rxQueue.offer(inPkt)) {
- increaseStat("fullRXQueue");
- return PacketResult.IGNORED;
- }
+ // send the packet off to be processed by listeners
+ this.dispatchPacket(inPkt);
// Walk the chain of listener going first throw all the
// parallel ones and for each parallel in serial
try {
res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
} catch (Exception e) {
- logger.warn("", e);
+ logger.warn("Failed to decode packet: {}", e.getMessage());
}
return res;
}
try {
data = pkt.serialize();
} catch (Exception e) {
- e.printStackTrace();
+ logger.error("",e);
return null;
}
if (data.length <= 0) {