X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Farphandler%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Farphandler%2Finternal%2FArpHandler.java;h=f3b22c75d8102a76beeb78229e7faabb89e60bd5;hp=0ff1cd9bd6a542dc8694b09933bfe2c1835d3bbd;hb=4ca5ea31c027f33cd4c8c84adece356c354c1c3a;hpb=67a8679ab2eb3ad8d7914756e844243eac3cbd32 diff --git a/opendaylight/arphandler/src/main/java/org/opendaylight/controller/arphandler/internal/ArpHandler.java b/opendaylight/arphandler/src/main/java/org/opendaylight/controller/arphandler/internal/ArpHandler.java index 0ff1cd9bd6..f3b22c75d8 100644 --- a/opendaylight/arphandler/src/main/java/org/opendaylight/controller/arphandler/internal/ArpHandler.java +++ b/opendaylight/arphandler/src/main/java/org/opendaylight/controller/arphandler/internal/ArpHandler.java @@ -21,10 +21,13 @@ import java.util.HashSet; import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.LinkedBlockingQueue; +import org.opendaylight.controller.arphandler.ARPCacheEvent; import org.opendaylight.controller.arphandler.ARPEvent; import org.opendaylight.controller.arphandler.ARPReply; import org.opendaylight.controller.arphandler.ARPRequest; @@ -49,6 +52,7 @@ import org.opendaylight.controller.sal.packet.IPv4; import org.opendaylight.controller.sal.packet.Packet; import org.opendaylight.controller.sal.packet.PacketResult; import org.opendaylight.controller.sal.packet.RawPacket; +import org.opendaylight.controller.sal.topology.TopoEdgeUpdate; import org.opendaylight.controller.sal.utils.EtherTypes; import org.opendaylight.controller.sal.utils.HexEncode; import org.opendaylight.controller.sal.utils.NetUtils; @@ -71,6 +75,8 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA private ConcurrentMap> arpRequestors; private ConcurrentMap countDownTimers; private Timer periodicTimer; + private BlockingQueue ARPCacheEvents = new LinkedBlockingQueue(); + Thread cacheEventHandler; /* * A cluster allocated cache. Used for synchronizing ARP request/reply * events across all cluster controllers. To raise an event, we put() a specific @@ -504,6 +510,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA void init() { arpRequestors = new ConcurrentHashMap>(); countDownTimers = new ConcurrentHashMap(); + cacheEventHandler = new Thread(new ARPCacheEventHandler(), "ARPCacheEventHandler Thread"); allocateCaches(); retrieveCaches(); @@ -536,7 +543,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA try{ clusterContainerService.createCache(ARP_EVENT_CACHE_NAME, - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); } catch (CacheConfigException e){ log.error("ARPHandler cache configuration invalid!"); } catch (CacheExistException e){ @@ -565,6 +572,8 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA */ void start() { startPeriodicTimer(); + cacheEventHandler.start(); + } /** @@ -687,31 +696,7 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA @Override public void entryUpdated(ARPEvent key, Boolean new_value, String cacheName, boolean originLocal) { - if (key instanceof ARPRequest) { - ARPRequest req = (ARPRequest) key; - // If broadcast request - if (req.getHost() == null) { - sendBcastARPRequest(req.getTargetIP(), req.getSubnet()); - - //If unicast and local, send reply - } else if (connectionManager.isLocal(req.getHost().getnodeconnectorNode())) { - sendUcastARPRequest(req.getHost(), req.getSubnet()); - } - } else if (key instanceof ARPReply) { - ARPReply rep = (ARPReply) key; - // New reply received by controller, notify all awaiting requestors across the cluster - if (new_value) { - generateAndSendReply(rep.getTargetIP(), rep.getTargetMac()); - - // Otherwise, a specific reply. If local, send out. - } else if (connectionManager.isLocal(rep.getPort().getNode())) { - sendARPReply(rep.getPort(), - rep.getSourceMac(), - rep.getSourceIP(), - rep.getTargetMac(), - rep.getTargetIP()); - } - } + enqueueARPCacheEvent(key, new_value); } @Override @@ -722,4 +707,58 @@ public class ArpHandler implements IHostFinder, IListenDataPacket, ICacheUpdateA public void entryDeleted(ARPEvent key, String cacheName, boolean originLocal) { // nothing to do } + + private void enqueueARPCacheEvent (ARPEvent event, boolean new_value) { + try { + ARPCacheEvent cacheEvent = new ARPCacheEvent(event, new_value); + if (!ARPCacheEvents.contains(cacheEvent)) { + this.ARPCacheEvents.add(cacheEvent); + } + } catch (Exception e) { + log.debug("enqueueARPCacheEvent caught Interrupt Exception for event {}", event); + } + } + + /* + * this thread monitors the connectionEvent queue for new incoming events from + */ + private class ARPCacheEventHandler implements Runnable { + @Override + public void run() { + while (true) { + try { + ARPCacheEvent ev = ARPCacheEvents.take(); + ARPEvent event = ev.getEvent(); + if (event instanceof ARPRequest) { + ARPRequest req = (ARPRequest) event; + // If broadcast request + if (req.getHost() == null) { + sendBcastARPRequest(req.getTargetIP(), req.getSubnet()); + + //If unicast and local, send reply + } else if (connectionManager.isLocal(req.getHost().getnodeconnectorNode())) { + sendUcastARPRequest(req.getHost(), req.getSubnet()); + } + } else if (event instanceof ARPReply) { + ARPReply rep = (ARPReply) event; + // New reply received by controller, notify all awaiting requestors across the cluster + if (ev.isNewReply()) { + generateAndSendReply(rep.getTargetIP(), rep.getTargetMac()); + + // Otherwise, a specific reply. If local, send out. + } else if (connectionManager.isLocal(rep.getPort().getNode())) { + sendARPReply(rep.getPort(), + rep.getSourceMac(), + rep.getSourceIP(), + rep.getTargetMac(), + rep.getTargetIP()); + } + } + } catch (InterruptedException e) { + ARPCacheEvents.clear(); + return; + } + } + } + } }