X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fsal%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fimplementation%2Finternal%2FDataPacketService.java;h=a9eefd3a8a689d8b4d92bd76fc4ec0edf1198193;hb=refs%2Fchanges%2F59%2F459%2F2;hp=7741945e73db55a56a496444375747f146adaca0;hpb=65dcba73cc07b3f29e6a6ad547ab5105f1ab9bde;p=controller.git diff --git a/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java b/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java index 7741945e73..a9eefd3a8a 100644 --- a/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java +++ b/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java @@ -28,8 +28,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; import org.opendaylight.controller.sal.core.ConstructionException; import org.opendaylight.controller.sal.core.NodeConnector; import org.opendaylight.controller.sal.match.Match; @@ -48,7 +46,6 @@ import org.slf4j.LoggerFactory; public class DataPacketService implements IPluginOutDataPacketService, IDataPacketService { - private int RXMAXQUEUESIZE = 1000; private int TXMAXQUEUESIZE = 1000; protected static final Logger logger = LoggerFactory .getLogger(DataPacketService.class); @@ -65,31 +62,21 @@ public class DataPacketService implements IPluginOutDataPacketService, pluginInDataService = new ConcurrentHashMap(); private Map statistics = new HashMap(); - /** - * Queue for packets received from Data Path - */ - private LinkedBlockingQueue rxQueue = new LinkedBlockingQueue( - RXMAXQUEUESIZE); + /** * Queue for packets that need to be transmitted to Data Path */ private LinkedBlockingQueue txQueue = new LinkedBlockingQueue( - RXMAXQUEUESIZE); + TXMAXQUEUESIZE); /** * Transmission thread */ private Thread txThread = new Thread(new TxLoop(), "DataPacketService TX thread"); - /** - * Receiving thread - */ - private Thread rxThread = new Thread(new RxLoop(), - "DataPacketService RX thread"); /** * Representation of a Data Packet Listener including of its * properties - * */ private class DataPacketListener { // Key fields @@ -109,24 +96,35 @@ public class DataPacketService implements IPluginOutDataPacketService, @Override public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (obj == this) { + if (this == obj) return true; - } - if (obj.getClass() != getClass()) { + if (obj == null) return false; - } - DataPacketListener rhs = (DataPacketListener) obj; - return new EqualsBuilder().append(this.listenerName, - rhs.listenerName).isEquals(); + if (getClass() != obj.getClass()) + return false; + DataPacketListener other = (DataPacketListener) obj; + if (!getOuterType().equals(other.getOuterType())) + return false; + if (listenerName == null) { + if (other.listenerName != null) + return false; + } else if (!listenerName.equals(other.listenerName)) + return false; + return true; } @Override public int hashCode() { - return new HashCodeBuilder(13, 31).append(listenerName) - .toHashCode(); + final int prime = 31; + int result = 1; + result = prime * result + getOuterType().hashCode(); + result = prime * result + + ((listenerName == null) ? 0 : listenerName.hashCode()); + return result; + } + + private DataPacketService getOuterType() { + return DataPacketService.this; } } @@ -143,49 +141,30 @@ public class DataPacketService implements IPluginOutDataPacketService, /** * Loop for processing Received packets - * */ - private class RxLoop implements Runnable { - public void run() { - RawPacket pkt; - try { - for (pkt = rxQueue.take(); pkt != null; pkt = rxQueue.take()) { - for (List serialListeners : listenDataPacket) { - int i = 0; - for (i = 0; i < serialListeners.size(); i++) { - RawPacket copyPkt = null; - try { - copyPkt = new RawPacket(pkt); - } catch (ConstructionException cex) { - logger.debug("Error while cloning the packet"); - } - if (copyPkt == null) { - increaseStat("RXPacketCopyFailed"); - continue; - } - DataPacketListener l = serialListeners.get(i); - IListenDataPacket s = (l == null ? null - : l.listener); - if (s != null) { - try { - // TODO Make sure to filter based - // on the match too, later on - PacketResult res = s - .receiveDataPacket(copyPkt); - increaseStat("RXPacketSuccess"); - if (res.equals(PacketResult.CONSUME)) { - increaseStat("RXPacketSerialExit"); - break; - } - } catch (Exception e) { - increaseStat("RXPacketFailedForException"); - } - } + private void dispatchPacket(RawPacket pkt) { + + // for now we treat all listeners as serial listeners + for (List serialListeners : listenDataPacket) { + for (DataPacketListener l : serialListeners) { + + // TODO: possibly deal with read-only and read-write packet + // copies + IListenDataPacket s = (l == null ? null : l.listener); + if (s != null) { + try { + // TODO Make sure to filter based on the match too, + // later on + PacketResult res = s.receiveDataPacket(pkt); + increaseStat("RXPacketSuccess"); + if (res.equals(PacketResult.CONSUME)) { + increaseStat("RXPacketSerialExit"); + break; } + } catch (Exception e) { + increaseStat("RXPacketFailedForException"); } } - } catch (InterruptedException e) { - // Not a big deal } } } @@ -422,7 +401,6 @@ public class DataPacketService implements IPluginOutDataPacketService, */ void init() { this.txThread.start(); - this.rxThread.start(); } /** @@ -438,14 +416,11 @@ public class DataPacketService implements IPluginOutDataPacketService, this.indexDataPacket.clear(); this.pluginInDataService.clear(); this.statistics.clear(); - this.rxQueue.clear(); this.txQueue.clear(); this.txThread.interrupt(); - this.rxThread.interrupt(); // Wait for them to be done try { this.txThread.join(); - this.rxThread.join(); } catch (InterruptedException ex) { // Not a big deal } @@ -475,12 +450,8 @@ public class DataPacketService implements IPluginOutDataPacketService, return PacketResult.IGNORED; } - // If the queue was full don't wait, rather increase a counter - // for it - if (!this.rxQueue.offer(inPkt)) { - increaseStat("fullRXQueue"); - return PacketResult.IGNORED; - } + // send the packet off to be processed by listeners + this.dispatchPacket(inPkt); // Walk the chain of listener going first throw all the // parallel ones and for each parallel in serial