X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fsal%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fimplementation%2Finternal%2FDataPacketService.java;h=0c9ebab8c3c126a2442a0b8ef87ab30577602a10;hp=3f36beaa27b7772c441b43ac417015b62ec5f1b9;hb=e5a5a73c0ed9f298f539cfda61aa6ba8d3f65c9e;hpb=d192c699590d441eb96a697b9e8ab7a028f18860 diff --git a/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java b/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java index 3f36beaa27..0c9ebab8c3 100644 --- a/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java +++ b/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java @@ -1,6 +1,5 @@ - /* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2013-2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -28,10 +27,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; import org.opendaylight.controller.sal.core.ConstructionException; -import org.opendaylight.controller.sal.core.Node; import org.opendaylight.controller.sal.core.NodeConnector; import org.opendaylight.controller.sal.match.Match; import org.opendaylight.controller.sal.packet.Ethernet; @@ -49,7 +45,6 @@ import org.slf4j.LoggerFactory; public class DataPacketService implements IPluginOutDataPacketService, IDataPacketService { - private int RXMAXQUEUESIZE = 1000; private int TXMAXQUEUESIZE = 1000; protected static final Logger logger = LoggerFactory .getLogger(DataPacketService.class); @@ -62,35 +57,25 @@ public class DataPacketService implements IPluginOutDataPacketService, * adding a new service, removing a service, going through all of * them maybe different. */ - private ConcurrentHashMap + private ConcurrentHashMap> pluginInDataService = - new ConcurrentHashMap(); + new ConcurrentHashMap>(); private Map statistics = new HashMap(); - /** - * Queue for packets received from Data Path - */ - private LinkedBlockingQueue rxQueue = new LinkedBlockingQueue( - RXMAXQUEUESIZE); + /** * Queue for packets that need to be transmitted to Data Path */ private LinkedBlockingQueue txQueue = new LinkedBlockingQueue( - RXMAXQUEUESIZE); + TXMAXQUEUESIZE); /** * Transmission thread */ private Thread txThread = new Thread(new TxLoop(), "DataPacketService TX thread"); - /** - * Receiving thread - */ - private Thread rxThread = new Thread(new RxLoop(), - "DataPacketService RX thread"); /** * Representation of a Data Packet Listener including of its * properties - * */ private class DataPacketListener { // Key fields @@ -110,24 +95,35 @@ public class DataPacketService implements IPluginOutDataPacketService, @Override public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (obj == this) { + if (this == obj) return true; - } - if (obj.getClass() != getClass()) { + if (obj == null) return false; - } - DataPacketListener rhs = (DataPacketListener) obj; - return new EqualsBuilder().append(this.listenerName, - rhs.listenerName).isEquals(); + if (getClass() != obj.getClass()) + return false; + DataPacketListener other = (DataPacketListener) obj; + if (!getOuterType().equals(other.getOuterType())) + return false; + if (listenerName == null) { + if (other.listenerName != null) + return false; + } else if (!listenerName.equals(other.listenerName)) + return false; + return true; } @Override public int hashCode() { - return new HashCodeBuilder(13, 31).append(listenerName) - .toHashCode(); + final int prime = 31; + int result = 1; + result = prime * result + getOuterType().hashCode(); + result = prime * result + + ((listenerName == null) ? 0 : listenerName.hashCode()); + return result; + } + + private DataPacketService getOuterType() { + return DataPacketService.this; } } @@ -144,49 +140,30 @@ public class DataPacketService implements IPluginOutDataPacketService, /** * Loop for processing Received packets - * */ - private class RxLoop implements Runnable { - public void run() { - RawPacket pkt; - try { - for (pkt = rxQueue.take(); pkt != null; pkt = rxQueue.take()) { - for (List serialListeners : listenDataPacket) { - int i = 0; - for (i = 0; i < serialListeners.size(); i++) { - RawPacket copyPkt = null; - try { - copyPkt = new RawPacket(pkt); - } catch (ConstructionException cex) { - logger.debug("Error while cloning the packet"); - } - if (copyPkt == null) { - increaseStat("RXPacketCopyFailed"); - continue; - } - DataPacketListener l = serialListeners.get(i); - IListenDataPacket s = (l == null ? null - : l.listener); - if (s != null) { - try { - // TODO Make sure to filter based - // on the match too, later on - PacketResult res = s - .receiveDataPacket(copyPkt); - increaseStat("RXPacketSuccess"); - if (res.equals(PacketResult.CONSUME)) { - increaseStat("RXPacketSerialExit"); - break; - } - } catch (Exception e) { - increaseStat("RXPacketFailedForException"); - } - } + private void dispatchPacket(RawPacket pkt) { + + // for now we treat all listeners as serial listeners + for (List serialListeners : listenDataPacket) { + for (DataPacketListener l : serialListeners) { + + // TODO: possibly deal with read-only and read-write packet + // copies + IListenDataPacket s = (l == null ? null : l.listener); + if (s != null) { + try { + // TODO Make sure to filter based on the match too, + // later on + PacketResult res = s.receiveDataPacket(pkt); + increaseStat("RXPacketSuccess"); + if (res.equals(PacketResult.CONSUME)) { + increaseStat("RXPacketSerialExit"); + break; } + } catch (Exception e) { + increaseStat("RXPacketFailedForException"); } } - } catch (InterruptedException e) { - // Not a big deal } } } @@ -207,11 +184,11 @@ public class DataPacketService implements IPluginOutDataPacketService, String t = p.getNode() .getType(); // Now locate the TX dispatcher - IPluginInDataPacketService s = pluginInDataService - .get(t); - if (s != null) { + ProtocolService service = + pluginInDataService.get(t); + if (service != null) { try { - s.transmitDataPacket(pkt); + service.getService().transmitDataPacket(pkt); increaseStat("TXPacketSuccess"); } catch (Exception e) { increaseStat("TXPacketFailedForException"); @@ -228,54 +205,11 @@ public class DataPacketService implements IPluginOutDataPacketService, } void setPluginInDataService(Map props, IPluginInDataPacketService s) { - if (this.pluginInDataService == null) { - logger.error("pluginInDataService store null"); - return; - } - String type = null; - logger.trace("Received setPluginInDataService request"); - for (Object e : props.entrySet()) { - Map.Entry entry = (Map.Entry) e; - logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue()); - } - - Object value = props.get("protocolPluginType"); - if (value instanceof String) { - type = (String) value; - } - if (type == null) { - logger.error("Received a PluginInDataService without any " - + "protocolPluginType provided"); - } else { - this.pluginInDataService.put(type, s); - logger.debug("Stored the PluginInDataService for type: {}", type); - } + ProtocolService.set(this.pluginInDataService, props, s, logger); } void unsetPluginInDataService(Map props, IPluginInDataPacketService s) { - if (this.pluginInDataService == null) { - logger.error("pluginInDataService store null"); - return; - } - - String type = null; - logger.trace("Received unsetPluginInDataService request"); - for (Object e : props.entrySet()) { - Map.Entry entry = (Map.Entry) e; - logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue()); - } - - Object value = props.get("protocoloPluginType"); - if (value instanceof String) { - type = (String) value; - } - if (type == null) { - logger.error("Received a PluginInDataService without any " - + "protocolPluginType provided"); - } else if (this.pluginInDataService.get(type).equals(s)) { - this.pluginInDataService.remove(type); - logger.debug("Removed the PluginInDataService for type: {}", type); - } + ProtocolService.unset(this.pluginInDataService, props, s, logger); } void setListenDataPacket(Map props, IListenDataPacket s) { @@ -345,7 +279,6 @@ public class DataPacketService implements IPluginOutDataPacketService, // find it lets just add our dependency at the end of // the list. for (List serialListeners : this.listenDataPacket) { - int i = 0; boolean done = false; if (serialListeners.contains(lDependency)) { serialListeners.add(l); @@ -423,7 +356,6 @@ public class DataPacketService implements IPluginOutDataPacketService, */ void init() { this.txThread.start(); - this.rxThread.start(); } /** @@ -439,14 +371,11 @@ public class DataPacketService implements IPluginOutDataPacketService, this.indexDataPacket.clear(); this.pluginInDataService.clear(); this.statistics.clear(); - this.rxQueue.clear(); this.txQueue.clear(); this.txThread.interrupt(); - this.rxThread.interrupt(); // Wait for them to be done try { this.txThread.join(); - this.rxThread.join(); } catch (InterruptedException ex) { // Not a big deal } @@ -476,12 +405,8 @@ public class DataPacketService implements IPluginOutDataPacketService, return PacketResult.IGNORED; } - // If the queue was full don't wait, rather increase a counter - // for it - if (!this.rxQueue.offer(inPkt)) { - increaseStat("fullRXQueue"); - return PacketResult.IGNORED; - } + // send the packet off to be processed by listeners + this.dispatchPacket(inPkt); // Walk the chain of listener going first throw all the // parallel ones and for each parallel in serial @@ -516,7 +441,7 @@ public class DataPacketService implements IPluginOutDataPacketService, try { res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte); } catch (Exception e) { - logger.warn("", e); + logger.warn("Failed to decode packet: {}", e.getMessage()); } return res; }