X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fsal%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fimplementation%2Finternal%2FDataPacketService.java;h=0c9ebab8c3c126a2442a0b8ef87ab30577602a10;hb=a2a56c27c18eeedff28796034be220ac049d6a26;hp=29d1c71204a68bde4edb0d66936b121590551ac2;hpb=ce75340226ce5f213a85572a744a8dde82864b9e;p=controller.git diff --git a/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java b/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java index 29d1c71204..0c9ebab8c3 100644 --- a/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java +++ b/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java @@ -1,6 +1,5 @@ - /* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2013-2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -46,7 +45,6 @@ import org.slf4j.LoggerFactory; public class DataPacketService implements IPluginOutDataPacketService, IDataPacketService { - private int RXMAXQUEUESIZE = 1000; private int TXMAXQUEUESIZE = 1000; protected static final Logger logger = LoggerFactory .getLogger(DataPacketService.class); @@ -59,35 +57,25 @@ public class DataPacketService implements IPluginOutDataPacketService, * adding a new service, removing a service, going through all of * them maybe different. */ - private ConcurrentHashMap + private ConcurrentHashMap> pluginInDataService = - new ConcurrentHashMap(); + new ConcurrentHashMap>(); private Map statistics = new HashMap(); - /** - * Queue for packets received from Data Path - */ - private LinkedBlockingQueue rxQueue = new LinkedBlockingQueue( - RXMAXQUEUESIZE); + /** * Queue for packets that need to be transmitted to Data Path */ private LinkedBlockingQueue txQueue = new LinkedBlockingQueue( - RXMAXQUEUESIZE); + TXMAXQUEUESIZE); /** * Transmission thread */ private Thread txThread = new Thread(new TxLoop(), "DataPacketService TX thread"); - /** - * Receiving thread - */ - private Thread rxThread = new Thread(new RxLoop(), - "DataPacketService RX thread"); /** * Representation of a Data Packet Listener including of its * properties - * */ private class DataPacketListener { // Key fields @@ -152,49 +140,30 @@ public class DataPacketService implements IPluginOutDataPacketService, /** * Loop for processing Received packets - * */ - private class RxLoop implements Runnable { - public void run() { - RawPacket pkt; - try { - for (pkt = rxQueue.take(); pkt != null; pkt = rxQueue.take()) { - for (List serialListeners : listenDataPacket) { - int i = 0; - for (i = 0; i < serialListeners.size(); i++) { - RawPacket copyPkt = null; - try { - copyPkt = new RawPacket(pkt); - } catch (ConstructionException cex) { - logger.debug("Error while cloning the packet"); - } - if (copyPkt == null) { - increaseStat("RXPacketCopyFailed"); - continue; - } - DataPacketListener l = serialListeners.get(i); - IListenDataPacket s = (l == null ? null - : l.listener); - if (s != null) { - try { - // TODO Make sure to filter based - // on the match too, later on - PacketResult res = s - .receiveDataPacket(copyPkt); - increaseStat("RXPacketSuccess"); - if (res.equals(PacketResult.CONSUME)) { - increaseStat("RXPacketSerialExit"); - break; - } - } catch (Exception e) { - increaseStat("RXPacketFailedForException"); - } - } + private void dispatchPacket(RawPacket pkt) { + + // for now we treat all listeners as serial listeners + for (List serialListeners : listenDataPacket) { + for (DataPacketListener l : serialListeners) { + + // TODO: possibly deal with read-only and read-write packet + // copies + IListenDataPacket s = (l == null ? null : l.listener); + if (s != null) { + try { + // TODO Make sure to filter based on the match too, + // later on + PacketResult res = s.receiveDataPacket(pkt); + increaseStat("RXPacketSuccess"); + if (res.equals(PacketResult.CONSUME)) { + increaseStat("RXPacketSerialExit"); + break; } + } catch (Exception e) { + increaseStat("RXPacketFailedForException"); } } - } catch (InterruptedException e) { - // Not a big deal } } } @@ -215,11 +184,11 @@ public class DataPacketService implements IPluginOutDataPacketService, String t = p.getNode() .getType(); // Now locate the TX dispatcher - IPluginInDataPacketService s = pluginInDataService - .get(t); - if (s != null) { + ProtocolService service = + pluginInDataService.get(t); + if (service != null) { try { - s.transmitDataPacket(pkt); + service.getService().transmitDataPacket(pkt); increaseStat("TXPacketSuccess"); } catch (Exception e) { increaseStat("TXPacketFailedForException"); @@ -236,54 +205,11 @@ public class DataPacketService implements IPluginOutDataPacketService, } void setPluginInDataService(Map props, IPluginInDataPacketService s) { - if (this.pluginInDataService == null) { - logger.error("pluginInDataService store null"); - return; - } - String type = null; - logger.trace("Received setPluginInDataService request"); - for (Object e : props.entrySet()) { - Map.Entry entry = (Map.Entry) e; - logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue()); - } - - Object value = props.get("protocolPluginType"); - if (value instanceof String) { - type = (String) value; - } - if (type == null) { - logger.error("Received a PluginInDataService without any " - + "protocolPluginType provided"); - } else { - this.pluginInDataService.put(type, s); - logger.debug("Stored the PluginInDataService for type: {}", type); - } + ProtocolService.set(this.pluginInDataService, props, s, logger); } void unsetPluginInDataService(Map props, IPluginInDataPacketService s) { - if (this.pluginInDataService == null) { - logger.error("pluginInDataService store null"); - return; - } - - String type = null; - logger.trace("Received unsetPluginInDataService request"); - for (Object e : props.entrySet()) { - Map.Entry entry = (Map.Entry) e; - logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue()); - } - - Object value = props.get("protocoloPluginType"); - if (value instanceof String) { - type = (String) value; - } - if (type == null) { - logger.error("Received a PluginInDataService without any " - + "protocolPluginType provided"); - } else if (this.pluginInDataService.get(type).equals(s)) { - this.pluginInDataService.remove(type); - logger.debug("Removed the PluginInDataService for type: {}", type); - } + ProtocolService.unset(this.pluginInDataService, props, s, logger); } void setListenDataPacket(Map props, IListenDataPacket s) { @@ -353,7 +279,6 @@ public class DataPacketService implements IPluginOutDataPacketService, // find it lets just add our dependency at the end of // the list. for (List serialListeners : this.listenDataPacket) { - int i = 0; boolean done = false; if (serialListeners.contains(lDependency)) { serialListeners.add(l); @@ -431,7 +356,6 @@ public class DataPacketService implements IPluginOutDataPacketService, */ void init() { this.txThread.start(); - this.rxThread.start(); } /** @@ -447,14 +371,11 @@ public class DataPacketService implements IPluginOutDataPacketService, this.indexDataPacket.clear(); this.pluginInDataService.clear(); this.statistics.clear(); - this.rxQueue.clear(); this.txQueue.clear(); this.txThread.interrupt(); - this.rxThread.interrupt(); // Wait for them to be done try { this.txThread.join(); - this.rxThread.join(); } catch (InterruptedException ex) { // Not a big deal } @@ -484,12 +405,8 @@ public class DataPacketService implements IPluginOutDataPacketService, return PacketResult.IGNORED; } - // If the queue was full don't wait, rather increase a counter - // for it - if (!this.rxQueue.offer(inPkt)) { - increaseStat("fullRXQueue"); - return PacketResult.IGNORED; - } + // send the packet off to be processed by listeners + this.dispatchPacket(inPkt); // Walk the chain of listener going first throw all the // parallel ones and for each parallel in serial