import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.Comparator;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.osgi.framework.console.CommandInterpreter;
import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitchStateListener;
import org.opendaylight.controller.sal.connection.ConnectionConstants;
import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
-import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.sal.utils.StatusCode;
.getLogger(Controller.class);
private ControllerIO controllerIO;
private Thread switchEventThread;
+ private volatile boolean shutdownSwitchEventThread;// default to false
private ConcurrentHashMap<Long, ISwitch> switches;
- private BlockingQueue<SwitchEvent> switchEvents;
+ private PriorityBlockingQueue<SwitchEvent> switchEvents;
// only 1 message listener per OFType
private ConcurrentMap<OFType, IMessageListener> messageListeners;
// only 1 switch state listener
private AtomicInteger switchInstanceNumber;
private int MAXQUEUESIZE = 50000;
+ private static enum SwitchEventPriority { LOW, NORMAL, HIGH }
+
/*
* this thread monitors the switchEvents queue for new incoming events from
* switch
while (true) {
try {
+ if(shutdownSwitchEventThread) {
+ // break out of the infinite loop
+ // if you are shutting down
+ logger.info("Switch Event Thread is shutting down");
+ break;
+ }
SwitchEvent ev = switchEvents.take();
SwitchEvent.SwitchEventType eType = ev.getEventType();
ISwitch sw = ev.getSwitch();
logger.error("Unknown switch event {}", eType.ordinal());
}
} catch (InterruptedException e) {
- switchEvents.clear();
- return;
+ // nothing to do except retry
+ } catch (Exception e) {
+ // log the exception and retry
+ logger.warn("Exception in Switch Event Thread is {}" ,e);
}
}
+ switchEvents.clear();
}
-
}
/**
public void init() {
logger.debug("Initializing!");
this.switches = new ConcurrentHashMap<Long, ISwitch>();
- this.switchEvents = new LinkedBlockingQueue<SwitchEvent>(MAXQUEUESIZE);
+ this.switchEvents = new PriorityBlockingQueue<SwitchEvent>(MAXQUEUESIZE, new Comparator<SwitchEvent>() {
+ @Override
+ public int compare(SwitchEvent p1, SwitchEvent p2) {
+ return p2.getPriority() - p1.getPriority();
+ }
+ });
this.messageListeners = new ConcurrentHashMap<OFType, IMessageListener>();
this.switchStateListener = null;
this.switchInstanceNumber = new AtomicInteger(0);
((SwitchHandler) entry.getValue()).stop();
it.remove();
}
+ shutdownSwitchEventThread = true;
switchEventThread.interrupt();
try {
controllerIO.shutDown();
if (((SwitchHandler) sw).isOperational()) {
Long sid = sw.getId();
if (this.switches.remove(sid, sw)) {
- logger.warn("{} is Disconnected", sw);
+ logger.info("{} is removed", sw);
notifySwitchDeleted(sw);
}
}
}
private synchronized void addSwitchEvent(SwitchEvent event) {
- try {
- this.switchEvents.put(event);
- } catch (InterruptedException e) {
- logger.debug("SwitchEvent caught Interrupt Exception");
- }
+ this.switchEvents.put(event);
}
public void takeSwitchEventAdd(ISwitch sw) {
- SwitchEvent ev = new SwitchEvent(
- SwitchEvent.SwitchEventType.SWITCH_ADD, sw, null);
+ SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ADD, sw, null,
+ SwitchEventPriority.HIGH.ordinal());
addSwitchEvent(ev);
}
public void takeSwitchEventDelete(ISwitch sw) {
- SwitchEvent ev = new SwitchEvent(
- SwitchEvent.SwitchEventType.SWITCH_DELETE, sw, null);
+ SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_DELETE, sw, null,
+ SwitchEventPriority.HIGH.ordinal());
addSwitchEvent(ev);
}
public void takeSwitchEventError(ISwitch sw) {
- SwitchEvent ev = new SwitchEvent(
- SwitchEvent.SwitchEventType.SWITCH_ERROR, sw, null);
+ SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ERROR, sw, null,
+ SwitchEventPriority.NORMAL.ordinal());
addSwitchEvent(ev);
}
public void takeSwitchEventMsg(ISwitch sw, OFMessage msg) {
if (messageListeners.get(msg.getType()) != null) {
- SwitchEvent ev = new SwitchEvent(
- SwitchEvent.SwitchEventType.SWITCH_MESSAGE, sw, msg);
+ SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_MESSAGE, sw, msg,
+ SwitchEventPriority.LOW.ordinal());
addSwitchEvent(ev);
}
}
return this.switches.get(switchId);
}
+ public void _controllerShowQueueSize(CommandInterpreter ci) {
+ ci.print("switchEvents queue size: " + switchEvents.size() + "\n");
+ }
+
public void _controllerShowSwitches(CommandInterpreter ci) {
Set<Long> sids = switches.keySet();
StringBuffer s = new StringBuffer();
String keyStoreFile = System.getProperty("controllerKeyStore");
String trustStoreFile = System.getProperty("controllerTrustStore");
if ((keyStoreFile == null) || keyStoreFile.trim().isEmpty()) {
- ci.print("controllerKeyStore not specified in ./configuration/config.ini\n");
+ ci.print("controllerKeyStore not specified\n");
} else {
ci.print("controllerKeyStore=" + keyStoreFile + "\n");
}
if ((trustStoreFile == null) || trustStoreFile.trim().isEmpty()) {
- ci.print("controllerTrustStore not specified in ./configuration/config.ini\n");
+ ci.print("controllerTrustStore not specified\n");
} else {
ci.print("controllerTrustStore=" + trustStoreFile + "\n");
}
help.append("\t controllerShowSwitches\n");
help.append("\t controllerReset\n");
help.append("\t controllerShowConnConfig\n");
+ help.append("\t controllerShowQueueSize\n");
return help.toString();
}