Migrating caches to TRANSACTIONAL Caches and enabled use1PcForAutoCommitTransactions.
[controller.git] / opendaylight / arphandler / src / main / java / org / opendaylight / controller / arphandler / internal / ArpHandler.java
index 0ff1cd9bd6a542dc8694b09933bfe2c1835d3bbd..f3b22c75d8102a76beeb78229e7faabb89e60bd5 100644 (file)
@@ -21,10 +21,13 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
 
+import org.opendaylight.controller.arphandler.ARPCacheEvent;
 import org.opendaylight.controller.arphandler.ARPEvent;
 import org.opendaylight.controller.arphandler.ARPReply;
 import org.opendaylight.controller.arphandler.ARPRequest;
@@ -49,6 +52,7 @@ import org.opendaylight.controller.sal.packet.IPv4;
 import org.opendaylight.controller.sal.packet.Packet;
 import org.opendaylight.controller.sal.packet.PacketResult;
 import org.opendaylight.controller.sal.packet.RawPacket;
+import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
 import org.opendaylight.controller.sal.utils.EtherTypes;
 import org.opendaylight.controller.sal.utils.HexEncode;
 import org.opendaylight.controller.sal.utils.NetUtils;
@@ -71,6 +75,8 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
     private ConcurrentMap<InetAddress, Set<HostNodeConnector>> arpRequestors;
     private ConcurrentMap<InetAddress, Short> countDownTimers;
     private Timer periodicTimer;
+    private BlockingQueue<ARPCacheEvent> ARPCacheEvents = new LinkedBlockingQueue<ARPCacheEvent>();
+    Thread cacheEventHandler;
     /*
      * A cluster allocated cache. Used for synchronizing ARP request/reply
      * events across all cluster controllers. To raise an event, we put() a specific
@@ -504,6 +510,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
     void init() {
         arpRequestors = new ConcurrentHashMap<InetAddress, Set<HostNodeConnector>>();
         countDownTimers = new ConcurrentHashMap<InetAddress, Short>();
+        cacheEventHandler = new Thread(new ARPCacheEventHandler(), "ARPCacheEventHandler Thread");
 
         allocateCaches();
         retrieveCaches();
@@ -536,7 +543,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
 
         try{
             clusterContainerService.createCache(ARP_EVENT_CACHE_NAME,
-                    EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+                    EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
         } catch (CacheConfigException e){
             log.error("ARPHandler cache configuration invalid!");
         } catch (CacheExistException e){
@@ -565,6 +572,8 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
      */
     void start() {
         startPeriodicTimer();
+        cacheEventHandler.start();
+
     }
 
     /**
@@ -687,31 +696,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
 
     @Override
     public void entryUpdated(ARPEvent key, Boolean new_value, String cacheName, boolean originLocal) {
-        if (key instanceof ARPRequest) {
-            ARPRequest req = (ARPRequest) key;
-            // If broadcast request
-            if (req.getHost() == null) {
-                sendBcastARPRequest(req.getTargetIP(), req.getSubnet());
-
-            //If unicast and local, send reply
-            } else if (connectionManager.isLocal(req.getHost().getnodeconnectorNode())) {
-                sendUcastARPRequest(req.getHost(), req.getSubnet());
-            }
-        } else if (key instanceof ARPReply) {
-            ARPReply rep = (ARPReply) key;
-            // New reply received by controller, notify all awaiting requestors across the cluster
-            if (new_value) {
-                generateAndSendReply(rep.getTargetIP(), rep.getTargetMac());
-
-            // Otherwise, a specific reply. If local, send out.
-            } else if (connectionManager.isLocal(rep.getPort().getNode())) {
-                sendARPReply(rep.getPort(),
-                        rep.getSourceMac(),
-                        rep.getSourceIP(),
-                        rep.getTargetMac(),
-                        rep.getTargetIP());
-            }
-        }
+        enqueueARPCacheEvent(key, new_value);
     }
 
     @Override
@@ -722,4 +707,58 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA
     public void entryDeleted(ARPEvent key, String cacheName, boolean originLocal) {
         // nothing to do
     }
+
+    private void enqueueARPCacheEvent (ARPEvent event, boolean new_value) {
+        try {
+            ARPCacheEvent cacheEvent = new ARPCacheEvent(event, new_value);
+            if (!ARPCacheEvents.contains(cacheEvent)) {
+                this.ARPCacheEvents.add(cacheEvent);
+            }
+        } catch (Exception e) {
+            log.debug("enqueueARPCacheEvent caught Interrupt Exception for event {}", event);
+        }
+    }
+
+    /*
+     * this thread monitors the connectionEvent queue for new incoming events from
+     */
+    private class ARPCacheEventHandler implements Runnable {
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    ARPCacheEvent ev = ARPCacheEvents.take();
+                    ARPEvent event = ev.getEvent();
+                    if (event instanceof ARPRequest) {
+                        ARPRequest req = (ARPRequest) event;
+                        // If broadcast request
+                        if (req.getHost() == null) {
+                            sendBcastARPRequest(req.getTargetIP(), req.getSubnet());
+
+                        //If unicast and local, send reply
+                        } else if (connectionManager.isLocal(req.getHost().getnodeconnectorNode())) {
+                            sendUcastARPRequest(req.getHost(), req.getSubnet());
+                        }
+                    } else if (event instanceof ARPReply) {
+                        ARPReply rep = (ARPReply) event;
+                        // New reply received by controller, notify all awaiting requestors across the cluster
+                        if (ev.isNewReply()) {
+                            generateAndSendReply(rep.getTargetIP(), rep.getTargetMac());
+
+                        // Otherwise, a specific reply. If local, send out.
+                        } else if (connectionManager.isLocal(rep.getPort().getNode())) {
+                            sendARPReply(rep.getPort(),
+                                    rep.getSourceMac(),
+                                    rep.getSourceIP(),
+                                    rep.getTargetMac(),
+                                    rep.getTargetIP());
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    ARPCacheEvents.clear();
+                    return;
+                }
+            }
+        }
+    }
 }