X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fsal%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fimplementation%2Finternal%2FDataPacketService.java;h=a9eefd3a8a689d8b4d92bd76fc4ec0edf1198193;hb=refs%2Fchanges%2F59%2F459%2F2;hp=fe525bb081f4db9c568da0975249f1f46879de9a;hpb=42210c03b0a4c54706320ba9f55794c0abd4d201;p=controller.git diff --git a/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java b/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java index fe525bb081..a9eefd3a8a 100644 --- a/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java +++ b/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java @@ -28,10 +28,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; import org.opendaylight.controller.sal.core.ConstructionException; -import org.opendaylight.controller.sal.core.Node; import org.opendaylight.controller.sal.core.NodeConnector; import org.opendaylight.controller.sal.match.Match; import org.opendaylight.controller.sal.packet.Ethernet; @@ -49,7 +46,6 @@ import org.slf4j.LoggerFactory; public class DataPacketService implements IPluginOutDataPacketService, IDataPacketService { - private int RXMAXQUEUESIZE = 1000; private int TXMAXQUEUESIZE = 1000; protected static final Logger logger = LoggerFactory .getLogger(DataPacketService.class); @@ -66,31 +62,21 @@ public class DataPacketService implements IPluginOutDataPacketService, pluginInDataService = new ConcurrentHashMap(); private Map statistics = new HashMap(); - /** - * Queue for packets received from Data Path - */ - private LinkedBlockingQueue rxQueue = new LinkedBlockingQueue( - RXMAXQUEUESIZE); + /** * Queue for packets that need to be transmitted to Data Path */ private LinkedBlockingQueue txQueue = new LinkedBlockingQueue( - RXMAXQUEUESIZE); + TXMAXQUEUESIZE); /** * Transmission thread */ private Thread txThread = new Thread(new TxLoop(), "DataPacketService TX thread"); - /** - * Receiving thread - */ - private Thread rxThread = new Thread(new RxLoop(), - "DataPacketService RX thread"); /** * Representation of a Data Packet Listener including of its * properties - * */ private class DataPacketListener { // Key fields @@ -110,24 +96,35 @@ public class DataPacketService implements IPluginOutDataPacketService, @Override public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (obj == this) { + if (this == obj) return true; - } - if (obj.getClass() != getClass()) { + if (obj == null) return false; - } - DataPacketListener rhs = (DataPacketListener) obj; - return new EqualsBuilder().append(this.listenerName, - rhs.listenerName).isEquals(); + if (getClass() != obj.getClass()) + return false; + DataPacketListener other = (DataPacketListener) obj; + if (!getOuterType().equals(other.getOuterType())) + return false; + if (listenerName == null) { + if (other.listenerName != null) + return false; + } else if (!listenerName.equals(other.listenerName)) + return false; + return true; } @Override public int hashCode() { - return new HashCodeBuilder(13, 31).append(listenerName) - .toHashCode(); + final int prime = 31; + int result = 1; + result = prime * result + getOuterType().hashCode(); + result = prime * result + + ((listenerName == null) ? 0 : listenerName.hashCode()); + return result; + } + + private DataPacketService getOuterType() { + return DataPacketService.this; } } @@ -144,49 +141,30 @@ public class DataPacketService implements IPluginOutDataPacketService, /** * Loop for processing Received packets - * */ - private class RxLoop implements Runnable { - public void run() { - RawPacket pkt; - try { - for (pkt = rxQueue.take(); pkt != null; pkt = rxQueue.take()) { - for (List serialListeners : listenDataPacket) { - int i = 0; - for (i = 0; i < serialListeners.size(); i++) { - RawPacket copyPkt = null; - try { - copyPkt = new RawPacket(pkt); - } catch (ConstructionException cex) { - logger.debug("Error while cloning the packet"); - } - if (copyPkt == null) { - increaseStat("RXPacketCopyFailed"); - continue; - } - DataPacketListener l = serialListeners.get(i); - IListenDataPacket s = (l == null ? null - : l.listener); - if (s != null) { - try { - // TODO Make sure to filter based - // on the match too, later on - PacketResult res = s - .receiveDataPacket(copyPkt); - increaseStat("RXPacketSuccess"); - if (res.equals(PacketResult.CONSUME)) { - increaseStat("RXPacketSerialExit"); - break; - } - } catch (Exception e) { - increaseStat("RXPacketFailedForException"); - } - } + private void dispatchPacket(RawPacket pkt) { + + // for now we treat all listeners as serial listeners + for (List serialListeners : listenDataPacket) { + for (DataPacketListener l : serialListeners) { + + // TODO: possibly deal with read-only and read-write packet + // copies + IListenDataPacket s = (l == null ? null : l.listener); + if (s != null) { + try { + // TODO Make sure to filter based on the match too, + // later on + PacketResult res = s.receiveDataPacket(pkt); + increaseStat("RXPacketSuccess"); + if (res.equals(PacketResult.CONSUME)) { + increaseStat("RXPacketSerialExit"); + break; } + } catch (Exception e) { + increaseStat("RXPacketFailedForException"); } } - } catch (InterruptedException e) { - // Not a big deal } } } @@ -236,8 +214,7 @@ public class DataPacketService implements IPluginOutDataPacketService, logger.trace("Received setPluginInDataService request"); for (Object e : props.entrySet()) { Map.Entry entry = (Map.Entry) e; - logger.trace("Prop key:(" + entry.getKey() + ") value:(" - + entry.getValue() + ")"); + logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue()); } Object value = props.get("protocolPluginType"); @@ -249,7 +226,7 @@ public class DataPacketService implements IPluginOutDataPacketService, + "protocolPluginType provided"); } else { this.pluginInDataService.put(type, s); - logger.debug("Stored the PluginInDataService for type:" + type); + logger.debug("Stored the PluginInDataService for type: {}", type); } } @@ -263,8 +240,7 @@ public class DataPacketService implements IPluginOutDataPacketService, logger.trace("Received unsetPluginInDataService request"); for (Object e : props.entrySet()) { Map.Entry entry = (Map.Entry) e; - logger.trace("Prop key:(" + entry.getKey() + ") value:(" - + entry.getValue() + ")"); + logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue()); } Object value = props.get("protocoloPluginType"); @@ -276,7 +252,7 @@ public class DataPacketService implements IPluginOutDataPacketService, + "protocolPluginType provided"); } else if (this.pluginInDataService.get(type).equals(s)) { this.pluginInDataService.remove(type); - logger.debug("Removed the PluginInDataService for type:" + type); + logger.debug("Removed the PluginInDataService for type: {}", type); } } @@ -288,8 +264,7 @@ public class DataPacketService implements IPluginOutDataPacketService, logger.trace("Received setListenDataPacket request"); for (Object e : props.entrySet()) { Map.Entry entry = (Map.Entry) e; - logger.trace("Prop key:(" + entry.getKey() + ") value:(" - + entry.getValue() + ")"); + logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue()); } String listenerName = null; @@ -331,7 +306,7 @@ public class DataPacketService implements IPluginOutDataPacketService, if (this.indexDataPacket.contains(l)) { logger.error("trying to add an existing element"); } else { - logger.debug("adding listener: " + listenerName); + logger.debug("adding listener: {}", listenerName); CopyOnWriteArrayList serialListeners = new CopyOnWriteArrayList(); serialListeners.add(l); this.listenDataPacket.add(serialListeners); @@ -343,7 +318,7 @@ public class DataPacketService implements IPluginOutDataPacketService, if (this.indexDataPacket.contains(l)) { logger.error("trying to add an existing element"); } else { - logger.debug("adding listener: " + listenerName); + logger.debug("adding listener: {}", listenerName); // Lets find the set with the dependency in it, if we // find it lets just add our dependency at the end of // the list. @@ -373,8 +348,7 @@ public class DataPacketService implements IPluginOutDataPacketService, logger.trace("Received UNsetListenDataPacket request"); for (Object e : props.entrySet()) { Map.Entry entry = (Map.Entry) e; - logger.trace("Prop key:(" + entry.getKey() + ") value:(" - + entry.getValue() + ")"); + logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue()); } String listenerName = null; @@ -395,7 +369,7 @@ public class DataPacketService implements IPluginOutDataPacketService, if (!this.indexDataPacket.contains(l)) { logger.error("trying to remove a non-existing element"); } else { - logger.debug("removing listener: " + listenerName); + logger.debug("removing listener: {}", listenerName); for (List serialListeners : this.listenDataPacket) { int i = 0; boolean done = false; @@ -427,7 +401,6 @@ public class DataPacketService implements IPluginOutDataPacketService, */ void init() { this.txThread.start(); - this.rxThread.start(); } /** @@ -443,14 +416,11 @@ public class DataPacketService implements IPluginOutDataPacketService, this.indexDataPacket.clear(); this.pluginInDataService.clear(); this.statistics.clear(); - this.rxQueue.clear(); this.txQueue.clear(); this.txThread.interrupt(); - this.rxThread.interrupt(); // Wait for them to be done try { this.txThread.join(); - this.rxThread.join(); } catch (InterruptedException ex) { // Not a big deal } @@ -480,12 +450,8 @@ public class DataPacketService implements IPluginOutDataPacketService, return PacketResult.IGNORED; } - // If the queue was full don't wait, rather increase a counter - // for it - if (!this.rxQueue.offer(inPkt)) { - increaseStat("fullRXQueue"); - return PacketResult.IGNORED; - } + // send the packet off to be processed by listeners + this.dispatchPacket(inPkt); // Walk the chain of listener going first throw all the // parallel ones and for each parallel in serial @@ -520,7 +486,7 @@ public class DataPacketService implements IPluginOutDataPacketService, try { res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte); } catch (Exception e) { - logger.warn("", e); + logger.warn("Failed to decode packet: {}", e.getMessage()); } return res; } @@ -537,7 +503,7 @@ public class DataPacketService implements IPluginOutDataPacketService, try { data = pkt.serialize(); } catch (Exception e) { - e.printStackTrace(); + logger.error("",e); return null; } if (data.length <= 0) {