-
/*
* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
*
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.Comparator;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.osgi.framework.console.CommandInterpreter;
import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitchStateListener;
+import org.opendaylight.controller.sal.connection.ConnectionConstants;
+import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
+import org.opendaylight.controller.sal.core.Node;
+import org.opendaylight.controller.sal.utils.Status;
+import org.opendaylight.controller.sal.utils.StatusCode;
import org.openflow.protocol.OFMessage;
import org.openflow.protocol.OFType;
import org.openflow.util.HexString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class Controller implements IController, CommandProvider {
+public class Controller implements IController, CommandProvider, IPluginInConnectionService {
private static final Logger logger = LoggerFactory
.getLogger(Controller.class);
private ControllerIO controllerIO;
private Thread switchEventThread;
private ConcurrentHashMap<Long, ISwitch> switches;
- private BlockingQueue<SwitchEvent> switchEvents;
+ private PriorityBlockingQueue<SwitchEvent> switchEvents;
// only 1 message listener per OFType
private ConcurrentMap<OFType, IMessageListener> messageListeners;
// only 1 switch state listener
private ISwitchStateListener switchStateListener;
private AtomicInteger switchInstanceNumber;
+ private int MAXQUEUESIZE = 50000;
+
+ private static enum SwitchEventPriority { LOW, NORMAL, HIGH }
/*
- * this thread monitors the switchEvents queue for new incoming events from switch
+ * this thread monitors the switchEvents queue for new incoming events from
+ * switch
*/
private class EventHandler implements Runnable {
@Override
SwitchEvent ev = switchEvents.take();
SwitchEvent.SwitchEventType eType = ev.getEventType();
ISwitch sw = ev.getSwitch();
- if (eType != SwitchEvent.SwitchEventType.SWITCH_MESSAGE) {
- //logger.debug("Received " + ev.toString() + " from " + sw.toString());
- }
switch (eType) {
case SWITCH_ADD:
Long sid = sw.getId();
ISwitch existingSwitch = switches.get(sid);
if (existingSwitch != null) {
- logger.info(" Replacing existing "
- + existingSwitch.toString() + " with New "
- + sw.toString());
+ logger.info("Replacing existing {} with New {}",
+ existingSwitch, sw);
disconnectSwitch(existingSwitch);
}
switches.put(sid, sw);
}
break;
default:
- logger.error("unknow switch event " + eType.ordinal());
+ logger.error("Unknown switch event {}", eType.ordinal());
}
} catch (InterruptedException e) {
switchEvents.clear();
*
*/
public void init() {
- logger.debug("OpenFlowCore init");
+ logger.debug("Initializing!");
this.switches = new ConcurrentHashMap<Long, ISwitch>();
- this.switchEvents = new LinkedBlockingQueue<SwitchEvent>();
+ this.switchEvents = new PriorityBlockingQueue<SwitchEvent>(MAXQUEUESIZE, new Comparator<SwitchEvent>() {
+ @Override
+ public int compare(SwitchEvent p1, SwitchEvent p2) {
+ return p2.getPriority() - p1.getPriority();
+ }
+ });
this.messageListeners = new ConcurrentHashMap<OFType, IMessageListener>();
this.switchStateListener = null;
this.switchInstanceNumber = new AtomicInteger(0);
}
/**
- * Function called by dependency manager after "init ()" is called
- * and after the services provided by the class are registered in
- * the service registry
+ * Function called by dependency manager after "init ()" is called and after
+ * the services provided by the class are registered in the service registry
*
*/
public void start() {
- logger.debug("OpenFlowCore start() is called");
+ logger.debug("Starting!");
/*
* start a thread to handle event coming from the switch
*/
try {
controllerIO.start();
} catch (IOException ex) {
- logger.error("Caught exception: " + ex + " during start");
+ logger.error("Caught exception while starting:", ex);
}
}
-
+
/**
- * Function called by the dependency manager before the services
- * exported by the component are unregistered, this will be
- * followed by a "destroy ()" calls
+ * Function called by the dependency manager before the services exported by
+ * the component are unregistered, this will be followed by a "destroy ()"
+ * calls
*
*/
public void stop() {
}
switchEventThread.interrupt();
try {
- controllerIO.shutDown();
+ controllerIO.shutDown();
} catch (IOException ex) {
- logger.error("Caught exception: " + ex + " during stop");
+ logger.error("Caught exception while stopping:", ex);
}
}
/**
- * Function called by the dependency manager when at least one
- * dependency become unsatisfied or when the component is shutting
- * down because for example bundle is being stopped.
+ * Function called by the dependency manager when at least one dependency
+ * become unsatisfied or when the component is shutting down because for
+ * example bundle is being stopped.
*
*/
public void destroy() {
public void addMessageListener(OFType type, IMessageListener listener) {
IMessageListener currentListener = this.messageListeners.get(type);
if (currentListener != null) {
- logger.warn(type.toString() + " already listened by "
- + currentListener.toString());
+ logger.warn("{} is already listened by {}", type,
+ currentListener);
}
this.messageListeners.put(type, listener);
- logger.debug(type.toString() + " is now listened by "
- + listener.toString());
+ logger.debug("{} is now listened by {}", type, listener);
}
@Override
public void removeMessageListener(OFType type, IMessageListener listener) {
IMessageListener currentListener = this.messageListeners.get(type);
if ((currentListener != null) && (currentListener == listener)) {
+ logger.debug("{} listener {} is Removed", type, listener);
this.messageListeners.remove(type);
}
}
@Override
public void addSwitchStateListener(ISwitchStateListener listener) {
if (this.switchStateListener != null) {
- logger.warn(this.switchStateListener.toString()
- + "already listened to switch events");
+ logger.warn("Switch events are already listened by {}",
+ this.switchStateListener);
}
this.switchStateListener = listener;
- logger.debug(listener.toString() + " now listens to switch events");
+ logger.debug("Switch events are now listened by {}", listener);
}
@Override
public void removeSwitchStateListener(ISwitchStateListener listener) {
if ((this.switchStateListener != null)
&& (this.switchStateListener == listener)) {
+ logger.debug("SwitchStateListener {} is Removed", listener);
this.switchStateListener = null;
}
}
// create new switch
int i = this.switchInstanceNumber.addAndGet(1);
String instanceName = "SwitchHandler-" + i;
- SwitchHandler switchHandler = new SwitchHandler(this, sc,
- instanceName);
+ SwitchHandler switchHandler = new SwitchHandler(this, sc, instanceName);
switchHandler.start();
- logger.info(instanceName + " connected: " + sc.toString());
+ if (sc.isConnected()) {
+ logger.info("Switch:{} is connected to the Controller",
+ sc.socket().getRemoteSocketAddress()
+ .toString().split("/")[1]);
+ }
+
} catch (IOException e) {
return;
}
if (((SwitchHandler) sw).isOperational()) {
Long sid = sw.getId();
if (this.switches.remove(sid, sw)) {
- logger.warn(sw.toString() + " is disconnected");
+ logger.info("{} is removed", sw);
notifySwitchDeleted(sw);
- } else {
- //logger.warn(sw.toString() + " has been replaced by " +
- // this.switches.get(sid));
}
}
((SwitchHandler) sw).stop();
}
private synchronized void addSwitchEvent(SwitchEvent event) {
- try {
- this.switchEvents.put(event);
- } catch (InterruptedException e) {
- logger.debug("SwitchEvent caught Interrupt Exception");
- }
+ this.switchEvents.put(event);
}
- public void takeSwtichEventAdd(ISwitch sw) {
- SwitchEvent ev = new SwitchEvent(
- SwitchEvent.SwitchEventType.SWITCH_ADD, sw, null);
+ public void takeSwitchEventAdd(ISwitch sw) {
+ SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ADD, sw, null,
+ SwitchEventPriority.HIGH.ordinal());
addSwitchEvent(ev);
}
public void takeSwitchEventDelete(ISwitch sw) {
- SwitchEvent ev = new SwitchEvent(
- SwitchEvent.SwitchEventType.SWITCH_DELETE, sw, null);
+ SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_DELETE, sw, null,
+ SwitchEventPriority.HIGH.ordinal());
addSwitchEvent(ev);
}
public void takeSwitchEventError(ISwitch sw) {
- SwitchEvent ev = new SwitchEvent(
- SwitchEvent.SwitchEventType.SWITCH_ERROR, sw, null);
+ SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ERROR, sw, null,
+ SwitchEventPriority.NORMAL.ordinal());
addSwitchEvent(ev);
}
public void takeSwitchEventMsg(ISwitch sw, OFMessage msg) {
if (messageListeners.get(msg.getType()) != null) {
- SwitchEvent ev = new SwitchEvent(
- SwitchEvent.SwitchEventType.SWITCH_MESSAGE, sw, msg);
+ SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_MESSAGE, sw, msg,
+ SwitchEventPriority.LOW.ordinal());
addSwitchEvent(ev);
}
}
return this.switches.get(switchId);
}
+ public void _controllerShowQueueSize(CommandInterpreter ci) {
+ ci.print("switchEvents queue size: " + switchEvents.size() + "\n");
+ }
+
public void _controllerShowSwitches(CommandInterpreter ci) {
Set<Long> sids = switches.keySet();
StringBuffer s = new StringBuffer();
while (iter.hasNext()) {
Long sid = iter.next();
Date date = switches.get(sid).getConnectedDate();
- String switchInstanceName = ((SwitchHandler) switches.get(sid)).getInstanceName();
+ String switchInstanceName = ((SwitchHandler) switches.get(sid))
+ .getInstanceName();
s.append(switchInstanceName + "/" + HexString.toHexString(sid)
+ " connected since " + date.toString() + "\n");
}
String keyStoreFile = System.getProperty("controllerKeyStore");
String trustStoreFile = System.getProperty("controllerTrustStore");
if ((keyStoreFile == null) || keyStoreFile.trim().isEmpty()) {
- ci.print("controllerKeyStore not specified in ./configuration/config.ini\n");
+ ci.print("controllerKeyStore not specified\n");
} else {
ci.print("controllerKeyStore=" + keyStoreFile + "\n");
}
if ((trustStoreFile == null) || trustStoreFile.trim().isEmpty()) {
- ci.print("controllerTrustStore not specified in ./configuration/config.ini\n");
+ ci.print("controllerTrustStore not specified\n");
} else {
ci.print("controllerTrustStore=" + trustStoreFile + "\n");
}
@Override
public String getHelp() {
StringBuffer help = new StringBuffer();
- help.append("-- Open Flow Controller --\n");
+ help.append("---Open Flow Controller---\n");
help.append("\t controllerShowSwitches\n");
help.append("\t controllerReset\n");
help.append("\t controllerShowConnConfig\n");
+ help.append("\t controllerShowQueueSize\n");
return help.toString();
}
+
+ @Override
+ public Status disconnect(Node node) {
+ ISwitch sw = getSwitch((Long) node.getID());
+ if (sw != null) disconnectSwitch(sw);
+ return new Status(StatusCode.SUCCESS);
+ }
+
+ @Override
+ public Node connect(String connectionIdentifier, Map<ConnectionConstants, String> params) {
+ return null;
+ }
+
+ /**
+ * View Change notification
+ */
+ public void notifyClusterViewChanged() {
+ for (ISwitch sw : switches.values()) {
+ notifySwitchAdded(sw);
+ }
+ }
+
+ /**
+ * Node Disconnected from the node's master controller.
+ */
+ @Override
+ public void notifyNodeDisconnectFromMaster(Node node) {
+ ISwitch sw = switches.get((Long)node.getID());
+ if (sw != null) notifySwitchAdded(sw);
+ }
}