removing DataPacketService's receive queue 59/459/2
authorColin Dixon <ckd@us.ibm.com>
Sat, 8 Jun 2013 02:53:21 +0000 (19:53 -0700)
committerGiovanni Meo <gmeo@cisco.com>
Thu, 13 Jun 2013 14:31:40 +0000 (16:31 +0200)
Change-Id: Ib1d0e929f6679935f9f0d5a86afafecea4e3ce5d
Signed-off-by: Colin Dixon <ckd@us.ibm.com>
opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/DataPacketService.java

index 29d1c71204a68bde4edb0d66936b121590551ac2..a9eefd3a8a689d8b4d92bd76fc4ec0edf1198193 100644 (file)
@@ -46,7 +46,6 @@ import org.slf4j.LoggerFactory;
 
 public class DataPacketService implements IPluginOutDataPacketService,
         IDataPacketService {
 
 public class DataPacketService implements IPluginOutDataPacketService,
         IDataPacketService {
-    private int RXMAXQUEUESIZE = 1000;
     private int TXMAXQUEUESIZE = 1000;
     protected static final Logger logger = LoggerFactory
             .getLogger(DataPacketService.class);
     private int TXMAXQUEUESIZE = 1000;
     protected static final Logger logger = LoggerFactory
             .getLogger(DataPacketService.class);
@@ -63,31 +62,21 @@ public class DataPacketService implements IPluginOutDataPacketService,
         pluginInDataService =
         new ConcurrentHashMap<String, IPluginInDataPacketService>();
     private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
         pluginInDataService =
         new ConcurrentHashMap<String, IPluginInDataPacketService>();
     private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
-    /**
-     * Queue for packets received from Data Path
-     */
-    private LinkedBlockingQueue<RawPacket> rxQueue = new LinkedBlockingQueue<RawPacket>(
-            RXMAXQUEUESIZE);
+
     /**
      * Queue for packets that need to be transmitted to Data Path
      */
     private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
     /**
      * Queue for packets that need to be transmitted to Data Path
      */
     private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
-            RXMAXQUEUESIZE);
+            TXMAXQUEUESIZE);
     /**
      * Transmission thread
      */
     private Thread txThread = new Thread(new TxLoop(),
             "DataPacketService TX thread");
     /**
      * Transmission thread
      */
     private Thread txThread = new Thread(new TxLoop(),
             "DataPacketService TX thread");
-    /**
-     * Receiving thread
-     */
-    private Thread rxThread = new Thread(new RxLoop(),
-            "DataPacketService RX thread");
 
     /**
      * Representation of a Data Packet Listener including of its
      * properties
 
     /**
      * Representation of a Data Packet Listener including of its
      * properties
-     *
      */
     private class DataPacketListener {
         // Key fields
      */
     private class DataPacketListener {
         // Key fields
@@ -152,49 +141,30 @@ public class DataPacketService implements IPluginOutDataPacketService,
 
     /**
      * Loop for processing Received packets
 
     /**
      * Loop for processing Received packets
-     *
      */
      */
-    private class RxLoop implements Runnable {
-        public void run() {
-            RawPacket pkt;
-            try {
-                for (pkt = rxQueue.take(); pkt != null; pkt = rxQueue.take()) {
-                    for (List<DataPacketListener> serialListeners : listenDataPacket) {
-                        int i = 0;
-                        for (i = 0; i < serialListeners.size(); i++) {
-                            RawPacket copyPkt = null;
-                            try {
-                                copyPkt = new RawPacket(pkt);
-                            } catch (ConstructionException cex) {
-                                logger.debug("Error while cloning the packet");
-                            }
-                            if (copyPkt == null) {
-                                increaseStat("RXPacketCopyFailed");
-                                continue;
-                            }
-                            DataPacketListener l = serialListeners.get(i);
-                            IListenDataPacket s = (l == null ? null
-                                    : l.listener);
-                            if (s != null) {
-                                try {
-                                    // TODO Make sure to filter based
-                                    // on the match too, later on
-                                    PacketResult res = s
-                                            .receiveDataPacket(copyPkt);
-                                    increaseStat("RXPacketSuccess");
-                                    if (res.equals(PacketResult.CONSUME)) {
-                                        increaseStat("RXPacketSerialExit");
-                                        break;
-                                    }
-                                } catch (Exception e) {
-                                    increaseStat("RXPacketFailedForException");
-                                }
-                            }
+    private void dispatchPacket(RawPacket pkt) {
+
+        // for now we treat all listeners as serial listeners
+        for (List<DataPacketListener> serialListeners : listenDataPacket) {
+            for (DataPacketListener l : serialListeners) {
+
+                // TODO: possibly deal with read-only and read-write packet
+                // copies
+                IListenDataPacket s = (l == null ? null : l.listener);
+                if (s != null) {
+                    try {
+                        // TODO Make sure to filter based on the match too,
+                        // later on
+                        PacketResult res = s.receiveDataPacket(pkt);
+                        increaseStat("RXPacketSuccess");
+                        if (res.equals(PacketResult.CONSUME)) {
+                            increaseStat("RXPacketSerialExit");
+                            break;
                         }
                         }
+                    } catch (Exception e) {
+                        increaseStat("RXPacketFailedForException");
                     }
                 }
                     }
                 }
-            } catch (InterruptedException e) {
-                // Not a big deal
             }
         }
     }
             }
         }
     }
@@ -431,7 +401,6 @@ public class DataPacketService implements IPluginOutDataPacketService,
      */
     void init() {
         this.txThread.start();
      */
     void init() {
         this.txThread.start();
-        this.rxThread.start();
     }
 
     /**
     }
 
     /**
@@ -447,14 +416,11 @@ public class DataPacketService implements IPluginOutDataPacketService,
         this.indexDataPacket.clear();
         this.pluginInDataService.clear();
         this.statistics.clear();
         this.indexDataPacket.clear();
         this.pluginInDataService.clear();
         this.statistics.clear();
-        this.rxQueue.clear();
         this.txQueue.clear();
         this.txThread.interrupt();
         this.txQueue.clear();
         this.txThread.interrupt();
-        this.rxThread.interrupt();
         // Wait for them to be done
         try {
             this.txThread.join();
         // Wait for them to be done
         try {
             this.txThread.join();
-            this.rxThread.join();
         } catch (InterruptedException ex) {
             // Not a big deal
         }
         } catch (InterruptedException ex) {
             // Not a big deal
         }
@@ -484,12 +450,8 @@ public class DataPacketService implements IPluginOutDataPacketService,
             return PacketResult.IGNORED;
         }
 
             return PacketResult.IGNORED;
         }
 
-        // If the queue was full don't wait, rather increase a counter
-        // for it
-        if (!this.rxQueue.offer(inPkt)) {
-            increaseStat("fullRXQueue");
-            return PacketResult.IGNORED;
-        }
+        // send the packet off to be processed by listeners
+        this.dispatchPacket(inPkt);
 
         // Walk the chain of listener going first throw all the
         // parallel ones and for each parallel in serial
 
         // Walk the chain of listener going first throw all the
         // parallel ones and for each parallel in serial