import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.opendaylight.controller.arphandler.ARPCacheEvent;
import org.opendaylight.controller.arphandler.ARPEvent;
import org.opendaylight.controller.arphandler.ARPReply;
import org.opendaylight.controller.arphandler.ARPRequest;
import org.opendaylight.controller.sal.packet.Packet;
import org.opendaylight.controller.sal.packet.PacketResult;
import org.opendaylight.controller.sal.packet.RawPacket;
+import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
import org.opendaylight.controller.sal.utils.EtherTypes;
import org.opendaylight.controller.sal.utils.HexEncode;
import org.opendaylight.controller.sal.utils.NetUtils;
private ConcurrentMap<InetAddress, Set<HostNodeConnector>> arpRequestors;
private ConcurrentMap<InetAddress, Short> countDownTimers;
private Timer periodicTimer;
+ private BlockingQueue<ARPCacheEvent> ARPCacheEvents = new LinkedBlockingQueue<ARPCacheEvent>();
+ Thread cacheEventHandler;
/*
* A cluster allocated cache. Used for synchronizing ARP request/reply
* events across all cluster controllers. To raise an event, we put() a specific
void init() {
arpRequestors = new ConcurrentHashMap<InetAddress, Set<HostNodeConnector>>();
countDownTimers = new ConcurrentHashMap<InetAddress, Short>();
+ cacheEventHandler = new Thread(new ARPCacheEventHandler(), "ARPCacheEventHandler Thread");
allocateCaches();
retrieveCaches();
try{
clusterContainerService.createCache(ARP_EVENT_CACHE_NAME,
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
} catch (CacheConfigException e){
log.error("ARPHandler cache configuration invalid!");
} catch (CacheExistException e){
*/
void start() {
startPeriodicTimer();
+ cacheEventHandler.start();
+
}
/**
@Override
public void entryUpdated(ARPEvent key, Boolean new_value, String cacheName, boolean originLocal) {
- if (key instanceof ARPRequest) {
- ARPRequest req = (ARPRequest) key;
- // If broadcast request
- if (req.getHost() == null) {
- sendBcastARPRequest(req.getTargetIP(), req.getSubnet());
-
- //If unicast and local, send reply
- } else if (connectionManager.isLocal(req.getHost().getnodeconnectorNode())) {
- sendUcastARPRequest(req.getHost(), req.getSubnet());
- }
- } else if (key instanceof ARPReply) {
- ARPReply rep = (ARPReply) key;
- // New reply received by controller, notify all awaiting requestors across the cluster
- if (new_value) {
- generateAndSendReply(rep.getTargetIP(), rep.getTargetMac());
-
- // Otherwise, a specific reply. If local, send out.
- } else if (connectionManager.isLocal(rep.getPort().getNode())) {
- sendARPReply(rep.getPort(),
- rep.getSourceMac(),
- rep.getSourceIP(),
- rep.getTargetMac(),
- rep.getTargetIP());
- }
- }
+ enqueueARPCacheEvent(key, new_value);
}
@Override
public void entryDeleted(ARPEvent key, String cacheName, boolean originLocal) {
// nothing to do
}
+
+ private void enqueueARPCacheEvent (ARPEvent event, boolean new_value) {
+ try {
+ ARPCacheEvent cacheEvent = new ARPCacheEvent(event, new_value);
+ if (!ARPCacheEvents.contains(cacheEvent)) {
+ this.ARPCacheEvents.add(cacheEvent);
+ }
+ } catch (Exception e) {
+ log.debug("enqueueARPCacheEvent caught Interrupt Exception for event {}", event);
+ }
+ }
+
+ /*
+ * this thread monitors the connectionEvent queue for new incoming events from
+ */
+ private class ARPCacheEventHandler implements Runnable {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ ARPCacheEvent ev = ARPCacheEvents.take();
+ ARPEvent event = ev.getEvent();
+ if (event instanceof ARPRequest) {
+ ARPRequest req = (ARPRequest) event;
+ // If broadcast request
+ if (req.getHost() == null) {
+ sendBcastARPRequest(req.getTargetIP(), req.getSubnet());
+
+ //If unicast and local, send reply
+ } else if (connectionManager.isLocal(req.getHost().getnodeconnectorNode())) {
+ sendUcastARPRequest(req.getHost(), req.getSubnet());
+ }
+ } else if (event instanceof ARPReply) {
+ ARPReply rep = (ARPReply) event;
+ // New reply received by controller, notify all awaiting requestors across the cluster
+ if (ev.isNewReply()) {
+ generateAndSendReply(rep.getTargetIP(), rep.getTargetMac());
+
+ // Otherwise, a specific reply. If local, send out.
+ } else if (connectionManager.isLocal(rep.getPort().getNode())) {
+ sendARPReply(rep.getPort(),
+ rep.getSourceMac(),
+ rep.getSourceIP(),
+ rep.getTargetMac(),
+ rep.getTargetIP());
+ }
+ }
+ } catch (InterruptedException e) {
+ ARPCacheEvents.clear();
+ return;
+ }
+ }
+ }
+ }
}