X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fprotocol_plugins%2Fopenflow%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fprotocol_plugin%2Fopenflow%2Fcore%2Finternal%2FController.java;h=172ec98780ce7bd90a5b4fe19ded1ca462161ec4;hb=78ef04c45c5a7fbee9bbb9ae77ecb1882add8623;hp=8e6a30fceed5e0961e9c52660cfd77bb5c9ea71f;hpb=d4526d295217d6d6d60434e179a2a78c1b2eb52b;p=controller.git diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/Controller.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/Controller.java index 8e6a30fcee..172ec98780 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/Controller.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/Controller.java @@ -13,15 +13,15 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.util.Comparator; import java.util.Date; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.osgi.framework.console.CommandInterpreter; @@ -30,6 +30,11 @@ import org.opendaylight.controller.protocol_plugin.openflow.core.IController; import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener; import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch; import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitchStateListener; +import org.opendaylight.controller.sal.connection.ConnectionConstants; +import org.opendaylight.controller.sal.connection.IPluginInConnectionService; +import org.opendaylight.controller.sal.core.Node; +import org.opendaylight.controller.sal.utils.Status; +import org.opendaylight.controller.sal.utils.StatusCode; import org.openflow.protocol.OFMessage; import org.openflow.protocol.OFType; import org.openflow.util.HexString; @@ -38,18 +43,21 @@ import org.osgi.framework.FrameworkUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class Controller implements IController, CommandProvider { +public class Controller implements IController, CommandProvider, IPluginInConnectionService { private static final Logger logger = LoggerFactory .getLogger(Controller.class); private ControllerIO controllerIO; private Thread switchEventThread; private ConcurrentHashMap switches; - private BlockingQueue switchEvents; + private PriorityBlockingQueue switchEvents; // only 1 message listener per OFType private ConcurrentMap messageListeners; // only 1 switch state listener private ISwitchStateListener switchStateListener; private AtomicInteger switchInstanceNumber; + private int MAXQUEUESIZE = 50000; + + private static enum SwitchEventPriority { LOW, NORMAL, HIGH } /* * this thread monitors the switchEvents queue for new incoming events from @@ -70,7 +78,7 @@ public class Controller implements IController, CommandProvider { ISwitch existingSwitch = switches.get(sid); if (existingSwitch != null) { logger.info("Replacing existing {} with New {}", - existingSwitch.toString(), sw.toString()); + existingSwitch, sw); disconnectSwitch(existingSwitch); } switches.put(sid, sw); @@ -107,12 +115,17 @@ public class Controller implements IController, CommandProvider { /** * Function called by the dependency manager when all the required * dependencies are satisfied - * + * */ public void init() { logger.debug("Initializing!"); this.switches = new ConcurrentHashMap(); - this.switchEvents = new LinkedBlockingQueue(); + this.switchEvents = new PriorityBlockingQueue(MAXQUEUESIZE, new Comparator() { + @Override + public int compare(SwitchEvent p1, SwitchEvent p2) { + return p2.getPriority() - p1.getPriority(); + } + }); this.messageListeners = new ConcurrentHashMap(); this.switchStateListener = null; this.switchInstanceNumber = new AtomicInteger(0); @@ -122,7 +135,7 @@ public class Controller implements IController, CommandProvider { /** * Function called by dependency manager after "init ()" is called and after * the services provided by the class are registered in the service registry - * + * */ public void start() { logger.debug("Starting!"); @@ -145,7 +158,7 @@ public class Controller implements IController, CommandProvider { * Function called by the dependency manager before the services exported by * the component are unregistered, this will be followed by a "destroy ()" * calls - * + * */ public void stop() { for (Iterator> it = switches.entrySet().iterator(); it @@ -166,7 +179,7 @@ public class Controller implements IController, CommandProvider { * Function called by the dependency manager when at least one dependency * become unsatisfied or when the component is shutting down because for * example bundle is being stopped. - * + * */ public void destroy() { } @@ -175,20 +188,18 @@ public class Controller implements IController, CommandProvider { public void addMessageListener(OFType type, IMessageListener listener) { IMessageListener currentListener = this.messageListeners.get(type); if (currentListener != null) { - logger.warn("{} is already listened by {}", type.toString(), - currentListener.toString()); + logger.warn("{} is already listened by {}", type, + currentListener); } this.messageListeners.put(type, listener); - logger.debug("{} is now listened by {}", type.toString(), - listener.toString()); + logger.debug("{} is now listened by {}", type, listener); } @Override public void removeMessageListener(OFType type, IMessageListener listener) { IMessageListener currentListener = this.messageListeners.get(type); if ((currentListener != null) && (currentListener == listener)) { - logger.debug("{} listener {} is Removed", type.toString(), - listener.toString()); + logger.debug("{} listener {} is Removed", type, listener); this.messageListeners.remove(type); } } @@ -197,19 +208,17 @@ public class Controller implements IController, CommandProvider { public void addSwitchStateListener(ISwitchStateListener listener) { if (this.switchStateListener != null) { logger.warn("Switch events are already listened by {}", - this.switchStateListener.toString()); + this.switchStateListener); } this.switchStateListener = listener; - logger.debug("Switch events are now listened by {}", - listener.toString()); + logger.debug("Switch events are now listened by {}", listener); } @Override public void removeSwitchStateListener(ISwitchStateListener listener) { if ((this.switchStateListener != null) && (this.switchStateListener == listener)) { - logger.debug("SwitchStateListener {} is Removed", - listener.toString()); + logger.debug("SwitchStateListener {} is Removed", listener); this.switchStateListener = null; } } @@ -224,12 +233,12 @@ public class Controller implements IController, CommandProvider { // create new switch int i = this.switchInstanceNumber.addAndGet(1); String instanceName = "SwitchHandler-" + i; - SwitchHandler switchHandler = new SwitchHandler(this, sc, - instanceName); + SwitchHandler switchHandler = new SwitchHandler(this, sc, instanceName); switchHandler.start(); if (sc.isConnected()) { - logger.info("Switch:{} is connected to the Controller", sc - .getRemoteAddress().toString().split("/")[1]); + logger.info("Switch:{} is connected to the Controller", + sc.socket().getRemoteSocketAddress() + .toString().split("/")[1]); } } catch (IOException e) { @@ -241,7 +250,7 @@ public class Controller implements IController, CommandProvider { if (((SwitchHandler) sw).isOperational()) { Long sid = sw.getId(); if (this.switches.remove(sid, sw)) { - logger.warn("{} is Disconnected", sw.toString()); + logger.info("{} is removed", sw); notifySwitchDeleted(sw); } } @@ -262,35 +271,31 @@ public class Controller implements IController, CommandProvider { } private synchronized void addSwitchEvent(SwitchEvent event) { - try { - this.switchEvents.put(event); - } catch (InterruptedException e) { - logger.debug("SwitchEvent caught Interrupt Exception"); - } + this.switchEvents.put(event); } public void takeSwitchEventAdd(ISwitch sw) { - SwitchEvent ev = new SwitchEvent( - SwitchEvent.SwitchEventType.SWITCH_ADD, sw, null); + SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ADD, sw, null, + SwitchEventPriority.HIGH.ordinal()); addSwitchEvent(ev); } public void takeSwitchEventDelete(ISwitch sw) { - SwitchEvent ev = new SwitchEvent( - SwitchEvent.SwitchEventType.SWITCH_DELETE, sw, null); + SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_DELETE, sw, null, + SwitchEventPriority.HIGH.ordinal()); addSwitchEvent(ev); } public void takeSwitchEventError(ISwitch sw) { - SwitchEvent ev = new SwitchEvent( - SwitchEvent.SwitchEventType.SWITCH_ERROR, sw, null); + SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ERROR, sw, null, + SwitchEventPriority.NORMAL.ordinal()); addSwitchEvent(ev); } public void takeSwitchEventMsg(ISwitch sw, OFMessage msg) { if (messageListeners.get(msg.getType()) != null) { - SwitchEvent ev = new SwitchEvent( - SwitchEvent.SwitchEventType.SWITCH_MESSAGE, sw, msg); + SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_MESSAGE, sw, msg, + SwitchEventPriority.LOW.ordinal()); addSwitchEvent(ev); } } @@ -305,6 +310,10 @@ public class Controller implements IController, CommandProvider { return this.switches.get(switchId); } + public void _controllerShowQueueSize(CommandInterpreter ci) { + ci.print("switchEvents queue size: " + switchEvents.size() + "\n"); + } + public void _controllerShowSwitches(CommandInterpreter ci) { Set sids = switches.keySet(); StringBuffer s = new StringBuffer(); @@ -375,6 +384,37 @@ public class Controller implements IController, CommandProvider { help.append("\t controllerShowSwitches\n"); help.append("\t controllerReset\n"); help.append("\t controllerShowConnConfig\n"); + help.append("\t controllerShowQueueSize\n"); return help.toString(); } + + @Override + public Status disconnect(Node node) { + ISwitch sw = getSwitch((Long) node.getID()); + if (sw != null) disconnectSwitch(sw); + return new Status(StatusCode.SUCCESS); + } + + @Override + public Node connect(String connectionIdentifier, Map params) { + return null; + } + + /** + * View Change notification + */ + public void notifyClusterViewChanged() { + for (ISwitch sw : switches.values()) { + notifySwitchAdded(sw); + } + } + + /** + * Node Disconnected from the node's master controller. + */ + @Override + public void notifyNodeDisconnectFromMaster(Node node) { + ISwitch sw = switches.get((Long)node.getID()); + if (sw != null) notifySwitchAdded(sw); + } }