Merge "Get some basic unit testing in place for the RaftActor class"
[controller.git] / opendaylight / sal / implementation / src / main / java / org / opendaylight / controller / sal / implementation / internal / DataPacketService.java
index 3f36beaa27b7772c441b43ac417015b62ec5f1b9..0c9ebab8c3c126a2442a0b8ef87ab30577602a10 100644 (file)
@@ -1,6 +1,5 @@
-
 /*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2013-2014 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -28,10 +27,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.opendaylight.controller.sal.core.ConstructionException;
-import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.NodeConnector;
 import org.opendaylight.controller.sal.match.Match;
 import org.opendaylight.controller.sal.packet.Ethernet;
@@ -49,7 +45,6 @@ import org.slf4j.LoggerFactory;
 
 public class DataPacketService implements IPluginOutDataPacketService,
         IDataPacketService {
-    private int RXMAXQUEUESIZE = 1000;
     private int TXMAXQUEUESIZE = 1000;
     protected static final Logger logger = LoggerFactory
             .getLogger(DataPacketService.class);
@@ -62,35 +57,25 @@ public class DataPacketService implements IPluginOutDataPacketService,
      * adding a new service, removing a service, going through all of
      * them maybe different.
      */
-    private ConcurrentHashMap<String, IPluginInDataPacketService>
+    private ConcurrentHashMap<String, ProtocolService<IPluginInDataPacketService>>
         pluginInDataService =
-        new ConcurrentHashMap<String, IPluginInDataPacketService>();
+        new ConcurrentHashMap<String, ProtocolService<IPluginInDataPacketService>>();
     private Map<String, AtomicInteger> statistics = new HashMap<String, AtomicInteger>();
-    /**
-     * Queue for packets received from Data Path
-     */
-    private LinkedBlockingQueue<RawPacket> rxQueue = new LinkedBlockingQueue<RawPacket>(
-            RXMAXQUEUESIZE);
+
     /**
      * Queue for packets that need to be transmitted to Data Path
      */
     private LinkedBlockingQueue<RawPacket> txQueue = new LinkedBlockingQueue<RawPacket>(
-            RXMAXQUEUESIZE);
+            TXMAXQUEUESIZE);
     /**
      * Transmission thread
      */
     private Thread txThread = new Thread(new TxLoop(),
             "DataPacketService TX thread");
-    /**
-     * Receiving thread
-     */
-    private Thread rxThread = new Thread(new RxLoop(),
-            "DataPacketService RX thread");
 
     /**
      * Representation of a Data Packet Listener including of its
      * properties
-     *
      */
     private class DataPacketListener {
         // Key fields
@@ -110,24 +95,35 @@ public class DataPacketService implements IPluginOutDataPacketService,
 
         @Override
         public boolean equals(Object obj) {
-            if (obj == null) {
-                return false;
-            }
-            if (obj == this) {
+            if (this == obj)
                 return true;
-            }
-            if (obj.getClass() != getClass()) {
+            if (obj == null)
                 return false;
-            }
-            DataPacketListener rhs = (DataPacketListener) obj;
-            return new EqualsBuilder().append(this.listenerName,
-                    rhs.listenerName).isEquals();
+            if (getClass() != obj.getClass())
+                return false;
+            DataPacketListener other = (DataPacketListener) obj;
+            if (!getOuterType().equals(other.getOuterType()))
+                return false;
+            if (listenerName == null) {
+                if (other.listenerName != null)
+                    return false;
+            } else if (!listenerName.equals(other.listenerName))
+                return false;
+            return true;
         }
 
         @Override
         public int hashCode() {
-            return new HashCodeBuilder(13, 31).append(listenerName)
-                    .toHashCode();
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + getOuterType().hashCode();
+            result = prime * result
+                    + ((listenerName == null) ? 0 : listenerName.hashCode());
+            return result;
+        }
+
+        private DataPacketService getOuterType() {
+            return DataPacketService.this;
         }
     }
 
@@ -144,49 +140,30 @@ public class DataPacketService implements IPluginOutDataPacketService,
 
     /**
      * Loop for processing Received packets
-     *
      */
-    private class RxLoop implements Runnable {
-        public void run() {
-            RawPacket pkt;
-            try {
-                for (pkt = rxQueue.take(); pkt != null; pkt = rxQueue.take()) {
-                    for (List<DataPacketListener> serialListeners : listenDataPacket) {
-                        int i = 0;
-                        for (i = 0; i < serialListeners.size(); i++) {
-                            RawPacket copyPkt = null;
-                            try {
-                                copyPkt = new RawPacket(pkt);
-                            } catch (ConstructionException cex) {
-                                logger.debug("Error while cloning the packet");
-                            }
-                            if (copyPkt == null) {
-                                increaseStat("RXPacketCopyFailed");
-                                continue;
-                            }
-                            DataPacketListener l = serialListeners.get(i);
-                            IListenDataPacket s = (l == null ? null
-                                    : l.listener);
-                            if (s != null) {
-                                try {
-                                    // TODO Make sure to filter based
-                                    // on the match too, later on
-                                    PacketResult res = s
-                                            .receiveDataPacket(copyPkt);
-                                    increaseStat("RXPacketSuccess");
-                                    if (res.equals(PacketResult.CONSUME)) {
-                                        increaseStat("RXPacketSerialExit");
-                                        break;
-                                    }
-                                } catch (Exception e) {
-                                    increaseStat("RXPacketFailedForException");
-                                }
-                            }
+    private void dispatchPacket(RawPacket pkt) {
+
+        // for now we treat all listeners as serial listeners
+        for (List<DataPacketListener> serialListeners : listenDataPacket) {
+            for (DataPacketListener l : serialListeners) {
+
+                // TODO: possibly deal with read-only and read-write packet
+                // copies
+                IListenDataPacket s = (l == null ? null : l.listener);
+                if (s != null) {
+                    try {
+                        // TODO Make sure to filter based on the match too,
+                        // later on
+                        PacketResult res = s.receiveDataPacket(pkt);
+                        increaseStat("RXPacketSuccess");
+                        if (res.equals(PacketResult.CONSUME)) {
+                            increaseStat("RXPacketSerialExit");
+                            break;
                         }
+                    } catch (Exception e) {
+                        increaseStat("RXPacketFailedForException");
                     }
                 }
-            } catch (InterruptedException e) {
-                // Not a big deal
             }
         }
     }
@@ -207,11 +184,11 @@ public class DataPacketService implements IPluginOutDataPacketService,
                         String t = p.getNode()
                                 .getType();
                         // Now locate the TX dispatcher
-                        IPluginInDataPacketService s = pluginInDataService
-                                .get(t);
-                        if (s != null) {
+                        ProtocolService<IPluginInDataPacketService> service =
+                            pluginInDataService.get(t);
+                        if (service != null) {
                             try {
-                                s.transmitDataPacket(pkt);
+                                service.getService().transmitDataPacket(pkt);
                                 increaseStat("TXPacketSuccess");
                             } catch (Exception e) {
                                 increaseStat("TXPacketFailedForException");
@@ -228,54 +205,11 @@ public class DataPacketService implements IPluginOutDataPacketService,
     }
 
     void setPluginInDataService(Map props, IPluginInDataPacketService s) {
-        if (this.pluginInDataService == null) {
-            logger.error("pluginInDataService store null");
-            return;
-        }
-        String type = null;
-        logger.trace("Received setPluginInDataService request");
-        for (Object e : props.entrySet()) {
-            Map.Entry entry = (Map.Entry) e;
-            logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
-        }
-
-        Object value = props.get("protocolPluginType");
-        if (value instanceof String) {
-            type = (String) value;
-        }
-        if (type == null) {
-            logger.error("Received a PluginInDataService without any "
-                    + "protocolPluginType provided");
-        } else {
-            this.pluginInDataService.put(type, s);
-            logger.debug("Stored the PluginInDataService for type: {}", type);
-        }
+        ProtocolService.set(this.pluginInDataService, props, s, logger);
     }
 
     void unsetPluginInDataService(Map props, IPluginInDataPacketService s) {
-        if (this.pluginInDataService == null) {
-            logger.error("pluginInDataService store null");
-            return;
-        }
-
-        String type = null;
-        logger.trace("Received unsetPluginInDataService request");
-        for (Object e : props.entrySet()) {
-            Map.Entry entry = (Map.Entry) e;
-            logger.trace("Prop key:({}) value:({})",entry.getKey(), entry.getValue());
-        }
-
-        Object value = props.get("protocoloPluginType");
-        if (value instanceof String) {
-            type = (String) value;
-        }
-        if (type == null) {
-            logger.error("Received a PluginInDataService without any "
-                    + "protocolPluginType provided");
-        } else if (this.pluginInDataService.get(type).equals(s)) {
-            this.pluginInDataService.remove(type);
-            logger.debug("Removed the PluginInDataService for type: {}", type);
-        }
+        ProtocolService.unset(this.pluginInDataService, props, s, logger);
     }
 
     void setListenDataPacket(Map props, IListenDataPacket s) {
@@ -345,7 +279,6 @@ public class DataPacketService implements IPluginOutDataPacketService,
                 // find it lets just add our dependency at the end of
                 // the list.
                 for (List<DataPacketListener> serialListeners : this.listenDataPacket) {
-                    int i = 0;
                     boolean done = false;
                     if (serialListeners.contains(lDependency)) {
                         serialListeners.add(l);
@@ -423,7 +356,6 @@ public class DataPacketService implements IPluginOutDataPacketService,
      */
     void init() {
         this.txThread.start();
-        this.rxThread.start();
     }
 
     /**
@@ -439,14 +371,11 @@ public class DataPacketService implements IPluginOutDataPacketService,
         this.indexDataPacket.clear();
         this.pluginInDataService.clear();
         this.statistics.clear();
-        this.rxQueue.clear();
         this.txQueue.clear();
         this.txThread.interrupt();
-        this.rxThread.interrupt();
         // Wait for them to be done
         try {
             this.txThread.join();
-            this.rxThread.join();
         } catch (InterruptedException ex) {
             // Not a big deal
         }
@@ -476,12 +405,8 @@ public class DataPacketService implements IPluginOutDataPacketService,
             return PacketResult.IGNORED;
         }
 
-        // If the queue was full don't wait, rather increase a counter
-        // for it
-        if (!this.rxQueue.offer(inPkt)) {
-            increaseStat("fullRXQueue");
-            return PacketResult.IGNORED;
-        }
+        // send the packet off to be processed by listeners
+        this.dispatchPacket(inPkt);
 
         // Walk the chain of listener going first throw all the
         // parallel ones and for each parallel in serial
@@ -516,7 +441,7 @@ public class DataPacketService implements IPluginOutDataPacketService,
             try {
                 res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
             } catch (Exception e) {
-                logger.warn("", e);
+                logger.warn("Failed to decode packet: {}", e.getMessage());
             }
             return res;
         }