From 02386e04eb4f544c93ccd4e4f19bc0a454480eb5 Mon Sep 17 00:00:00 2001 From: Colin Dixon Date: Fri, 7 Jun 2013 19:53:21 -0700 Subject: [PATCH] removing DataPacketService's receive queue Change-Id: Ib1d0e929f6679935f9f0d5a86afafecea4e3ce5d Signed-off-by: Colin Dixon --- .../internal/DataPacketService.java | 86 ++++++------------- 1 file changed, 24 insertions(+), 62 deletions(-) diff --git a/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java b/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java index 29d1c71204..a9eefd3a8a 100644 --- a/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java +++ b/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java @@ -46,7 +46,6 @@ import org.slf4j.LoggerFactory; public class DataPacketService implements IPluginOutDataPacketService, IDataPacketService { - private int RXMAXQUEUESIZE = 1000; private int TXMAXQUEUESIZE = 1000; protected static final Logger logger = LoggerFactory .getLogger(DataPacketService.class); @@ -63,31 +62,21 @@ public class DataPacketService implements IPluginOutDataPacketService, pluginInDataService = new ConcurrentHashMap(); private Map statistics = new HashMap(); - /** - * Queue for packets received from Data Path - */ - private LinkedBlockingQueue rxQueue = new LinkedBlockingQueue( - RXMAXQUEUESIZE); + /** * Queue for packets that need to be transmitted to Data Path */ private LinkedBlockingQueue txQueue = new LinkedBlockingQueue( - RXMAXQUEUESIZE); + TXMAXQUEUESIZE); /** * Transmission thread */ private Thread txThread = new Thread(new TxLoop(), "DataPacketService TX thread"); - /** - * Receiving thread - */ - private Thread rxThread = new Thread(new RxLoop(), - "DataPacketService RX thread"); /** * Representation of a Data Packet Listener including of its * properties - * */ private class DataPacketListener { // Key fields @@ -152,49 +141,30 @@ public class DataPacketService implements IPluginOutDataPacketService, /** * Loop for processing Received packets - * */ - private class RxLoop implements Runnable { - public void run() { - RawPacket pkt; - try { - for (pkt = rxQueue.take(); pkt != null; pkt = rxQueue.take()) { - for (List serialListeners : listenDataPacket) { - int i = 0; - for (i = 0; i < serialListeners.size(); i++) { - RawPacket copyPkt = null; - try { - copyPkt = new RawPacket(pkt); - } catch (ConstructionException cex) { - logger.debug("Error while cloning the packet"); - } - if (copyPkt == null) { - increaseStat("RXPacketCopyFailed"); - continue; - } - DataPacketListener l = serialListeners.get(i); - IListenDataPacket s = (l == null ? null - : l.listener); - if (s != null) { - try { - // TODO Make sure to filter based - // on the match too, later on - PacketResult res = s - .receiveDataPacket(copyPkt); - increaseStat("RXPacketSuccess"); - if (res.equals(PacketResult.CONSUME)) { - increaseStat("RXPacketSerialExit"); - break; - } - } catch (Exception e) { - increaseStat("RXPacketFailedForException"); - } - } + private void dispatchPacket(RawPacket pkt) { + + // for now we treat all listeners as serial listeners + for (List serialListeners : listenDataPacket) { + for (DataPacketListener l : serialListeners) { + + // TODO: possibly deal with read-only and read-write packet + // copies + IListenDataPacket s = (l == null ? null : l.listener); + if (s != null) { + try { + // TODO Make sure to filter based on the match too, + // later on + PacketResult res = s.receiveDataPacket(pkt); + increaseStat("RXPacketSuccess"); + if (res.equals(PacketResult.CONSUME)) { + increaseStat("RXPacketSerialExit"); + break; } + } catch (Exception e) { + increaseStat("RXPacketFailedForException"); } } - } catch (InterruptedException e) { - // Not a big deal } } } @@ -431,7 +401,6 @@ public class DataPacketService implements IPluginOutDataPacketService, */ void init() { this.txThread.start(); - this.rxThread.start(); } /** @@ -447,14 +416,11 @@ public class DataPacketService implements IPluginOutDataPacketService, this.indexDataPacket.clear(); this.pluginInDataService.clear(); this.statistics.clear(); - this.rxQueue.clear(); this.txQueue.clear(); this.txThread.interrupt(); - this.rxThread.interrupt(); // Wait for them to be done try { this.txThread.join(); - this.rxThread.join(); } catch (InterruptedException ex) { // Not a big deal } @@ -484,12 +450,8 @@ public class DataPacketService implements IPluginOutDataPacketService, return PacketResult.IGNORED; } - // If the queue was full don't wait, rather increase a counter - // for it - if (!this.rxQueue.offer(inPkt)) { - increaseStat("fullRXQueue"); - return PacketResult.IGNORED; - } + // send the packet off to be processed by listeners + this.dispatchPacket(inPkt); // Walk the chain of listener going first throw all the // parallel ones and for each parallel in serial -- 2.36.6