private ConcurrentMap<InetAddress, Short> countDownTimers;
private Timer periodicTimer;
private BlockingQueue<ARPCacheEvent> ARPCacheEvents = new LinkedBlockingQueue<ARPCacheEvent>();
- Thread cacheEventHandler;
+ private Thread cacheEventHandler;
+ private boolean stopping = false;
/*
* A cluster allocated cache. Used for synchronizing ARP request/reply
* events across all cluster controllers. To raise an event, we put() a specific
try {
requestor = new HostNodeConnector(sourceMAC, sourceIP, p, subnet.getVlan());
} catch (ConstructionException e) {
- log.debug("Received ARP packet with invalid MAC: {}", sourceMAC);
+ log.debug("Received ARP packet with invalid MAC: {}", HexEncode.bytesToHexString(sourceMAC));
return;
}
/*
&& (NetUtils.isBroadcastMACAddr(targetMAC) || Arrays.equals(targetMAC, getControllerMAC()))) {
if (connectionManager.isLocal(p.getNode())){
if (log.isTraceEnabled()){
- log.trace("Received local ARP req. for default gateway. Replying with controller MAC: {}", getControllerMAC());
+ log.trace("Received local ARP req. for default gateway. Replying with controller MAC: {}",
+ HexEncode.bytesToHexString(getControllerMAC()));
}
sendARPReply(p, getControllerMAC(), targetIP, pkt.getSenderHardwareAddress(), sourceIP);
} else {
//Raise a bcast request event, all controllers need to send one
log.trace("Sending a bcast ARP request for {}", targetIP);
arpRequestReplyEvent.put(new ARPRequest(targetIP, subnet), false);
-
} else {
/*
* Target host known (across the cluster), send ARP REPLY make sure that targetMAC
* the targetIP as the target Network Address
*/
protected void sendBcastARPRequest(InetAddress targetIP, Subnet subnet) {
+ log.trace("sendBcatARPRequest targetIP:{} subnet:{}", targetIP, subnet);
Set<NodeConnector> nodeConnectors;
if (subnet.isFlatLayer2()) {
nodeConnectors = new HashSet<NodeConnector>();
}
for (NodeConnector p : nodeConnectors) {
-
//fiter out any non-local or internal ports
if (! connectionManager.isLocal(p.getNode()) || topologyManager.isInternal(p)) {
continue;
}
+ log.trace("Sending toward nodeConnector:{}", p);
ARP arp = new ARP();
byte[] senderIP = subnet.getNetworkAddress().getAddress();
byte[] targetIPByte = targetIP.getAddress();
* The sender MAC is the controller's MAC
*/
protected void sendUcastARPRequest(HostNodeConnector host, Subnet subnet) {
-
+ log.trace("sendUcastARPRequest host:{} subnet:{}", host, subnet);
NodeConnector outPort = host.getnodeConnector();
if (outPort == null) {
log.error("Failed sending UcastARP because cannot extract output port from Host: {}", host);
this.dataPacketService.transmitDataPacket(destPkt);
}
+ @Override
public void find(InetAddress networkAddress) {
log.trace("Received find IP {}", networkAddress);
/*
* Probe the host by sending a unicast ARP Request to the host
*/
+ @Override
public void probe(HostNodeConnector host) {
log.trace("Received probe host {}", host);
*
*/
void destroy() {
+ cacheEventHandler.interrupt();
}
/**
*
*/
void start() {
+ stopping = false;
startPeriodicTimer();
cacheEventHandler.start();
-
}
/**
}
void stopping() {
+ stopping = true;
cancelPeriodicTimer();
}
}
private void generateAndSendReply(InetAddress sourceIP, byte[] sourceMAC) {
+ if (log.isTraceEnabled()) {
+ log.trace("generateAndSendReply called with params sourceIP:{} sourceMAC:{}", sourceIP,
+ HexEncode.bytesToHexString(sourceMAC));
+ }
Set<HostNodeConnector> hosts = arpRequestors.remove(sourceIP);
if ((hosts == null) || hosts.isEmpty()) {
+ log.trace("Bailing out no requestors Hosts");
return;
}
countDownTimers.remove(sourceIP);
for (HostNodeConnector host : hosts) {
- log.trace("Sending ARP Reply with src {}/{}, target {}/{}",
- new Object[] { sourceMAC, sourceIP, host.getDataLayerAddressBytes(), host.getNetworkAddress() });
-
+ if (log.isTraceEnabled()) {
+ log.trace("Sending ARP Reply with src {}/{}, target {}/{}",
+ new Object[] {
+ HexEncode.bytesToHexString(sourceMAC),
+ sourceIP,
+ HexEncode.bytesToHexString(host.getDataLayerAddressBytes()),
+ host.getNetworkAddress() });
+ }
if (connectionManager.isLocal(host.getnodeconnectorNode())){
sendARPReply(host.getnodeConnector(),
sourceMAC,
host.getDataLayerAddressBytes(),
host.getNetworkAddress());
} else {
+ /*
+ * In the remote event a requestor moved to another
+ * controller it may turn out it now we need to send
+ * the ARP reply from a different controller, this
+ * cover the case
+ */
arpRequestReplyEvent.put(
new ARPReply(
host.getnodeConnector(),
@Override
public void entryUpdated(ARPEvent key, Boolean new_value, String cacheName, boolean originLocal) {
+ log.trace("Got and entryUpdated for cacheName {} key {} isNew {}", cacheName, key, new_value);
enqueueARPCacheEvent(key, new_value);
}
ARPCacheEvent cacheEvent = new ARPCacheEvent(event, new_value);
if (!ARPCacheEvents.contains(cacheEvent)) {
this.ARPCacheEvents.add(cacheEvent);
+ log.trace("Enqueued {}", event);
}
} catch (Exception e) {
log.debug("enqueueARPCacheEvent caught Interrupt Exception for event {}", event);
private class ARPCacheEventHandler implements Runnable {
@Override
public void run() {
- while (true) {
+ while (!stopping) {
try {
ARPCacheEvent ev = ARPCacheEvents.take();
ARPEvent event = ev.getEvent();
ARPRequest req = (ARPRequest) event;
// If broadcast request
if (req.getHost() == null) {
+ log.trace("Trigger and ARP Broadcast Request upon receipt of {}", req);
sendBcastARPRequest(req.getTargetIP(), req.getSubnet());
//If unicast and local, send reply
} else if (connectionManager.isLocal(req.getHost().getnodeconnectorNode())) {
+ log.trace("ARPCacheEventHandler - sendUcatARPRequest upon receipt of {}", req);
sendUcastARPRequest(req.getHost(), req.getSubnet());
}
} else if (event instanceof ARPReply) {
ARPReply rep = (ARPReply) event;
// New reply received by controller, notify all awaiting requestors across the cluster
if (ev.isNewReply()) {
+ log.trace("Trigger a generateAndSendReply in response to {}", rep);
generateAndSendReply(rep.getTargetIP(), rep.getTargetMac());
-
// Otherwise, a specific reply. If local, send out.
} else if (connectionManager.isLocal(rep.getPort().getNode())) {
+ log.trace("ARPCacheEventHandler - sendUcatARPReply locally in response to {}", rep);
sendARPReply(rep.getPort(),
rep.getSourceMac(),
rep.getSourceIP(),
basedir=`dirname ${fullpath}`
function usage {
- echo "Usage: $0 [-debug] [-debugsuspend] [-debugport <num>] [-start [<console port>]] [-stop] [-status] [-console] [-help] [<other args will automatically be used for the JVM>]"
+ echo "Usage: $0 [-jmx] [-jmxport <num>] [-debug] [-debugsuspend] [-debugport <num>] [-start [<console port>]] [-stop] [-status] [-console] [-help] [<other args will automatically be used for the JVM>]"
exit 1
}
startdaemon=0
daemonport=2400
daemonportread=""
+jmxport=1088
+jmxportread=""
+startjmx=0
stopdaemon=0
statusdaemon=0
consolestart=1
while true ; do
case "$1" in
-debug) debug=1; shift ;;
+ -jmx) startjmx=1; shift ;;
-debugsuspend) debugsuspend=1; shift ;;
-debugport) shift; debugportread="$1"; if [[ "${debugportread}" =~ ^[0-9]+$ ]] ; then debugport=${debugportread}; shift; else echo "-debugport expects a number but was not found"; exit -1; fi;;
+ -jmxport) shift; jmxportread="$1"; if [[ "${jmxportread}" =~ ^[0-9]+$ ]] ; then jmxport=${jmxportread}; shift; else echo "-jmxport expects a number but was not found"; exit -1; fi;;
-start) startdaemon=1; shift; daemonportread="$1"; if [[ "${daemonportread}" =~ ^[0-9]+$ ]] ; then daemonport=${daemonportread}; shift; fi;;
-stop) stopdaemon=1; shift ;;
-status) statusdaemon=1; shift ;;
exit -1
fi
+# Validate jmx port
+if [[ "${jmxport}" -lt 1024 ]] || [[ "${jmxport}" -gt 65535 ]]; then
+ echo "JMX Port not in the range [1024,65535] value is ${jmxport}"
+ exit -1
+fi
+
# Debug options
if [ "${debugsuspend}" -eq 1 ]; then
extraJVMOpts="${extraJVMOpts} -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=${debugport}"
extraJVMOpts="${extraJVMOpts} -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=${debugport}"
fi
+# Add JMX support
+if [ "${startjmx}" -eq 1 ]; then
+ extraJVMOpts="${extraJVMOpts} -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=${jmxport} -Dcom.sun.management.jmxremote"
+fi
+
########################################
# Now add to classpath the OSGi JAR
########################################