Merge "1. Controller switchEvents queue should be priority based. The queue holds...
authorChi-Vien Ly <chivly@cisco.com>
Thu, 22 Aug 2013 03:53:25 +0000 (03:53 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 22 Aug 2013 03:53:25 +0000 (03:53 +0000)
opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/Controller.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SecureMessageReadWriteService.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchEvent.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/DiscoveryService.java
opendaylight/switchmanager/implementation/src/main/java/org/opendaylight/controller/switchmanager/internal/SwitchManager.java

index b99b4bfdcaf40aefc2aae1daa0c340f97e96d218..f9899d7d8b3c5ca5821a2971c336d8d07f43bdac 100644 (file)
@@ -77,9 +77,9 @@ org.eclipse.gemini.web.tomcat.config.path=configuration/tomcat-server.xml
 # entries, including switches' Certification Authority (CA) certificates. For example,
 # secureChannelEnabled=true
 # controllerKeyStore=./configuration/ctlKeyStore
-# controllerKeyStorePassword=xxxxx (this password should match the password used for KeyStore generation)
+# controllerKeyStorePassword=xxxxxxxx (this password should match the password used for KeyStore generation and at least 6 characters)
 # controllerTrustStore=./configuration/ctlTrustStore
-# controllerTrustStorePassword=xxxxx (this password should match the password used for TrustStore generation)
+# controllerTrustStorePassword=xxxxxxxx (this password should match the password used for TrustStore generation and at least 6 characters)
 
 secureChannelEnabled=false
 controllerKeyStore=
index c7c6c8924d29807da61aa5896c4b2b1e1b79cc59..172ec98780ce7bd90a5b4fe19ded1ca462161ec4 100644 (file)
@@ -13,15 +13,15 @@ import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.eclipse.osgi.framework.console.CommandInterpreter;
@@ -32,7 +32,6 @@ import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitchStateListener;
 import org.opendaylight.controller.sal.connection.ConnectionConstants;
 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
-import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.utils.Status;
 import org.opendaylight.controller.sal.utils.StatusCode;
@@ -50,7 +49,7 @@ public class Controller implements IController, CommandProvider, IPluginInConnec
     private ControllerIO controllerIO;
     private Thread switchEventThread;
     private ConcurrentHashMap<Long, ISwitch> switches;
-    private BlockingQueue<SwitchEvent> switchEvents;
+    private PriorityBlockingQueue<SwitchEvent> switchEvents;
     // only 1 message listener per OFType
     private ConcurrentMap<OFType, IMessageListener> messageListeners;
     // only 1 switch state listener
@@ -58,6 +57,8 @@ public class Controller implements IController, CommandProvider, IPluginInConnec
     private AtomicInteger switchInstanceNumber;
     private int MAXQUEUESIZE = 50000;
 
+    private static enum SwitchEventPriority { LOW, NORMAL, HIGH }
+
     /*
      * this thread monitors the switchEvents queue for new incoming events from
      * switch
@@ -119,7 +120,12 @@ public class Controller implements IController, CommandProvider, IPluginInConnec
     public void init() {
         logger.debug("Initializing!");
         this.switches = new ConcurrentHashMap<Long, ISwitch>();
-        this.switchEvents = new LinkedBlockingQueue<SwitchEvent>(MAXQUEUESIZE);
+        this.switchEvents = new PriorityBlockingQueue<SwitchEvent>(MAXQUEUESIZE, new Comparator<SwitchEvent>() {
+            @Override
+            public int compare(SwitchEvent p1, SwitchEvent p2) {
+                return p2.getPriority() - p1.getPriority();
+            }
+        });
         this.messageListeners = new ConcurrentHashMap<OFType, IMessageListener>();
         this.switchStateListener = null;
         this.switchInstanceNumber = new AtomicInteger(0);
@@ -244,7 +250,7 @@ public class Controller implements IController, CommandProvider, IPluginInConnec
         if (((SwitchHandler) sw).isOperational()) {
             Long sid = sw.getId();
             if (this.switches.remove(sid, sw)) {
-                logger.warn("{} is Disconnected", sw);
+                logger.info("{} is removed", sw);
                 notifySwitchDeleted(sw);
             }
         }
@@ -265,35 +271,31 @@ public class Controller implements IController, CommandProvider, IPluginInConnec
     }
 
     private synchronized void addSwitchEvent(SwitchEvent event) {
-        try {
-            this.switchEvents.put(event);
-        } catch (InterruptedException e) {
-            logger.debug("SwitchEvent caught Interrupt Exception");
-        }
+        this.switchEvents.put(event);
     }
 
     public void takeSwitchEventAdd(ISwitch sw) {
-        SwitchEvent ev = new SwitchEvent(
-                SwitchEvent.SwitchEventType.SWITCH_ADD, sw, null);
+        SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ADD, sw, null,
+                SwitchEventPriority.HIGH.ordinal());
         addSwitchEvent(ev);
     }
 
     public void takeSwitchEventDelete(ISwitch sw) {
-        SwitchEvent ev = new SwitchEvent(
-                SwitchEvent.SwitchEventType.SWITCH_DELETE, sw, null);
+        SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_DELETE, sw, null,
+                SwitchEventPriority.HIGH.ordinal());
         addSwitchEvent(ev);
     }
 
     public void takeSwitchEventError(ISwitch sw) {
-        SwitchEvent ev = new SwitchEvent(
-                SwitchEvent.SwitchEventType.SWITCH_ERROR, sw, null);
+        SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ERROR, sw, null,
+                SwitchEventPriority.NORMAL.ordinal());
         addSwitchEvent(ev);
     }
 
     public void takeSwitchEventMsg(ISwitch sw, OFMessage msg) {
         if (messageListeners.get(msg.getType()) != null) {
-            SwitchEvent ev = new SwitchEvent(
-                    SwitchEvent.SwitchEventType.SWITCH_MESSAGE, sw, msg);
+            SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_MESSAGE, sw, msg,
+                    SwitchEventPriority.LOW.ordinal());
             addSwitchEvent(ev);
         }
     }
@@ -308,6 +310,10 @@ public class Controller implements IController, CommandProvider, IPluginInConnec
         return this.switches.get(switchId);
     }
 
+    public void _controllerShowQueueSize(CommandInterpreter ci) {
+        ci.print("switchEvents queue size: " + switchEvents.size() + "\n");
+    }
+
     public void _controllerShowSwitches(CommandInterpreter ci) {
         Set<Long> sids = switches.keySet();
         StringBuffer s = new StringBuffer();
@@ -378,6 +384,7 @@ public class Controller implements IController, CommandProvider, IPluginInConnec
         help.append("\t controllerShowSwitches\n");
         help.append("\t controllerReset\n");
         help.append("\t controllerShowConnConfig\n");
+        help.append("\t controllerShowQueueSize\n");
         return help.toString();
     }
 
index fc2e0ee324b2cc4934a1f4181988cf674d6bebc1..dfbecfeb0682ad41e7e7140767427d13b9170498 100644 (file)
@@ -58,7 +58,7 @@ public class MessageReadWriteService implements IMessageReadWrite {
      * @throws Exception
      */
     @Override
-    public void asyncSend(OFMessage msg) throws IOException {
+    public void asyncSend(OFMessage msg) throws Exception {
         synchronized (outBuffer) {
             int msgLen = msg.getLengthU();
             if (outBuffer.remaining() < msgLen) {
@@ -94,7 +94,7 @@ public class MessageReadWriteService implements IMessageReadWrite {
      * @throws Exception
      */
     @Override
-    public void resumeSend() throws IOException {
+    public void resumeSend() throws Exception {
         synchronized (outBuffer) {
             if (!socket.isOpen()) {
                 return;
@@ -121,7 +121,7 @@ public class MessageReadWriteService implements IMessageReadWrite {
      * @throws Exception
      */
     @Override
-    public List<OFMessage> readMessages() throws IOException {
+    public List<OFMessage> readMessages() throws Exception {
         if (!socket.isOpen()) {
             return null;
         }
index 64031fd01212cceeaaed4ae95b65a0db3904f3da..bb4defcecab81bd3f3bdb737b231adfa786981ef 100644 (file)
@@ -41,7 +41,6 @@ public class SecureMessageReadWriteService implements IMessageReadWrite {
             .getLogger(SecureMessageReadWriteService.class);
 
     private Selector selector;
-    private SelectionKey clientSelectionKey;
     private SocketChannel socket;
     private BasicFactory factory;
 
@@ -132,12 +131,28 @@ public class SecureMessageReadWriteService implements IMessageReadWrite {
         sslEngine = sslContext.createSSLEngine();
         sslEngine.setUseClientMode(false);
         sslEngine.setNeedClientAuth(true);
+        sslEngine.setEnabledCipherSuites(new String[] {
+                "SSL_RSA_WITH_RC4_128_MD5",
+                "SSL_RSA_WITH_RC4_128_SHA",
+                "TLS_RSA_WITH_AES_128_CBC_SHA",
+                "TLS_DHE_RSA_WITH_AES_128_CBC_SHA",
+                "TLS_DHE_DSS_WITH_AES_128_CBC_SHA",
+                "SSL_RSA_WITH_3DES_EDE_CBC_SHA",
+                "SSL_DHE_RSA_WITH_3DES_EDE_CBC_SHA",
+                "SSL_DHE_DSS_WITH_3DES_EDE_CBC_SHA",
+                "SSL_RSA_WITH_DES_CBC_SHA",
+                "SSL_DHE_RSA_WITH_DES_CBC_SHA",
+                "SSL_DHE_DSS_WITH_DES_CBC_SHA",
+                "SSL_RSA_EXPORT_WITH_RC4_40_MD5",
+                "SSL_RSA_EXPORT_WITH_DES40_CBC_SHA",
+                "SSL_DHE_RSA_EXPORT_WITH_DES40_CBC_SHA",
+                "SSL_DHE_DSS_EXPORT_WITH_DES40_CBC_SHA",
+                "TLS_EMPTY_RENEGOTIATION_INFO_SCSV"});
 
         // Do initial handshake
         doHandshake(socket, sslEngine);
 
-        this.clientSelectionKey = this.socket.register(this.selector,
-                SelectionKey.OP_READ);
+        this.socket.register(this.selector, SelectionKey.OP_READ);
     }
 
     /**
@@ -182,12 +197,10 @@ public class SecureMessageReadWriteService implements IMessageReadWrite {
 
             if (myAppData.hasRemaining()) {
                 myAppData.compact();
-                this.clientSelectionKey = this.socket.register(this.selector,
-                        SelectionKey.OP_WRITE, this);
+                this.socket.register(this.selector, SelectionKey.OP_WRITE, this);
             } else {
                 myAppData.clear();
-                this.clientSelectionKey = this.socket.register(this.selector,
-                        SelectionKey.OP_READ, this);
+                this.socket.register(this.selector, SelectionKey.OP_READ, this);
             }
 
             logger.trace("Message sent: {}", msg);
@@ -221,12 +234,10 @@ public class SecureMessageReadWriteService implements IMessageReadWrite {
 
             if (myAppData.hasRemaining()) {
                 myAppData.compact();
-                this.clientSelectionKey = this.socket.register(this.selector,
-                        SelectionKey.OP_WRITE, this);
+                this.socket.register(this.selector, SelectionKey.OP_WRITE, this);
             } else {
                 myAppData.clear();
-                this.clientSelectionKey = this.socket.register(this.selector,
-                        SelectionKey.OP_READ, this);
+                this.socket.register(this.selector, SelectionKey.OP_READ, this);
             }
         }
     }
@@ -280,8 +291,7 @@ public class SecureMessageReadWriteService implements IMessageReadWrite {
             peerAppData.clear();
         }
 
-        this.clientSelectionKey = this.socket.register(this.selector,
-                SelectionKey.OP_READ, this);
+        this.socket.register(this.selector, SelectionKey.OP_READ, this);
 
         return msgs;
     }
index 87e30d70c61ebcc2e1fffe0c376a1b146d8ab561..785be9be092258ef8e36e10795597f8006d64080 100644 (file)
@@ -21,11 +21,13 @@ public class SwitchEvent {
     private SwitchEventType eventType;
     private ISwitch sw;
     private OFMessage msg;
+    private int priority;
 
-    public SwitchEvent(SwitchEventType type, ISwitch sw, OFMessage msg) {
+    public SwitchEvent(SwitchEventType type, ISwitch sw, OFMessage msg, int priority) {
         this.eventType = type;
         this.sw = sw;
         this.msg = msg;
+        this.priority = priority;
     }
 
     public SwitchEventType getEventType() {
@@ -40,6 +42,14 @@ public class SwitchEvent {
         return this.msg;
     }
 
+    public int getPriority() {
+        return priority;
+    }
+
+    public void setPriority(int priority) {
+        this.priority = priority;
+    }
+
     @Override
     public String toString() {
         String s;
index 52ea7fd575a76ae4f2053c5b1c96c7a10e622c0e..91909d20f53a5bde5adea2b7020c38a78406f080 100644 (file)
@@ -8,10 +8,6 @@
 
 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
 
-import java.io.IOException;
-import java.net.SocketException;
-import java.nio.channels.AsynchronousCloseException;
-import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
@@ -32,6 +28,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -64,8 +61,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SwitchHandler implements ISwitch {
-    private static final Logger logger = LoggerFactory
-            .getLogger(SwitchHandler.class);
+    private static final Logger logger = LoggerFactory.getLogger(SwitchHandler.class);
     private static final int SWITCH_LIVENESS_TIMER = 5000;
     private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
     private final int MESSAGE_RESPONSE_TIMER = 2000;
@@ -99,8 +95,7 @@ public class SwitchHandler implements ISwitch {
     private Thread transmitThread;
 
     private enum SwitchState {
-        NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
-                3);
+        NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(3);
 
         private int value;
 
@@ -141,9 +136,7 @@ public class SwitchHandler implements ISwitch {
             try {
                 responseTimerValue = Integer.decode(rTimer);
             } catch (NumberFormatException e) {
-                logger.warn(
-                        "Invalid of.messageResponseTimer: {} use default({})",
-                        rTimer, MESSAGE_RESPONSE_TIMER);
+                logger.warn("Invalid of.messageResponseTimer: {} use default({})", rTimer, MESSAGE_RESPONSE_TIMER);
             }
         }
     }
@@ -168,8 +161,7 @@ public class SwitchHandler implements ISwitch {
                     try {
                         // wait for an incoming connection
                         selector.select(0);
-                        Iterator<SelectionKey> selectedKeys = selector
-                                .selectedKeys().iterator();
+                        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
                         while (selectedKeys.hasNext()) {
                             SelectionKey skey = selectedKeys.next();
                             selectedKeys.remove();
@@ -189,7 +181,9 @@ public class SwitchHandler implements ISwitch {
         switchHandlerThread.start();
     }
 
-    public void stop() {
+    private void stopInternal() {
+        logger.debug("{} receives stop signal",
+                (isOperational() ? HexString.toHexString(sid) : "unknown"));
         running = false;
         cancelSwitchTimer();
         try {
@@ -205,9 +199,14 @@ public class SwitchHandler implements ISwitch {
             msgReadWriteService.stop();
         } catch (Exception e) {
         }
-        executor.shutdown();
+        logger.debug("executor shutdown now");
+        executor.shutdownNow();
 
         msgReadWriteService = null;
+    }
+
+    public void stop() {
+        stopInternal();
 
         if (switchHandlerThread != null) {
             switchHandlerThread.interrupt();
@@ -337,9 +336,7 @@ public class SwitchHandler implements ISwitch {
      */
     private void asyncSendNow(OFMessage msg) {
         if (msgReadWriteService == null) {
-            logger.warn(
-                    "asyncSendNow: {} is not sent because Message ReadWrite Service is not available.",
-                    msg);
+            logger.warn("asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", msg);
             return;
         }
 
@@ -362,8 +359,7 @@ public class SwitchHandler implements ISwitch {
         }
 
         if (msgs == null) {
-            logger.debug("{} is down", this);
-            // the connection is down, inform core
+            logger.info("{} is down", this);
             reportSwitchStateChange(false);
             return;
         }
@@ -374,23 +370,19 @@ public class SwitchHandler implements ISwitch {
             switch (type) {
             case HELLO:
                 // send feature request
-                OFMessage featureRequest = factory
-                        .getMessage(OFType.FEATURES_REQUEST);
+                OFMessage featureRequest = factory.getMessage(OFType.FEATURES_REQUEST);
                 asyncFastSend(featureRequest);
                 // delete all pre-existing flows
                 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
-                OFFlowMod flowMod = (OFFlowMod) factory
-                        .getMessage(OFType.FLOW_MOD);
-                flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
-                        .setOutPort(OFPort.OFPP_NONE)
+                OFFlowMod flowMod = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
+                flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE).setOutPort(OFPort.OFPP_NONE)
                         .setLength((short) OFFlowMod.MINIMUM_LENGTH);
                 asyncFastSend(flowMod);
                 this.state = SwitchState.WAIT_FEATURES_REPLY;
                 startSwitchTimer();
                 break;
             case ECHO_REQUEST:
-                OFEchoReply echoReply = (OFEchoReply) factory
-                        .getMessage(OFType.ECHO_REPLY);
+                OFEchoReply echoReply = (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
                 // respond immediately
                 asyncSendNow(echoReply, msg.getXid());
                 break;
@@ -436,8 +428,7 @@ public class SwitchHandler implements ISwitch {
             updatePhysicalPort(port);
         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
             updatePhysicalPort(port);
-        } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
-                .ordinal()) {
+        } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE.ordinal()) {
             deletePhysicalPort(port);
         }
 
@@ -454,35 +445,29 @@ public class SwitchHandler implements ISwitch {
                         if (probeSent) {
                             // switch failed to respond to our probe, consider
                             // it down
-                            logger.warn("{} is idle for too long, disconnect",
-                                    toString());
+                            logger.warn("{} sid {} is idle for too long, disconnect", socket.socket()
+                                    .getRemoteSocketAddress().toString().split("/")[1], (sid == 0) ? "unknown"
+                                    : HexString.toHexString(sid));
                             reportSwitchStateChange(false);
                         } else {
                             // send a probe to see if the switch is still alive
-                            logger.debug(
-                                    "Send idle probe (Echo Request) to {}",
-                                    this);
+                            logger.debug("Send idle probe (Echo Request) to {}", this);
                             probeSent = true;
-                            OFMessage echo = factory
-                                    .getMessage(OFType.ECHO_REQUEST);
+                            OFMessage echo = factory.getMessage(OFType.ECHO_REQUEST);
                             asyncFastSend(echo);
                         }
                     } else {
                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
                             // send another features request
-                            OFMessage request = factory
-                                    .getMessage(OFType.FEATURES_REQUEST);
+                            OFMessage request = factory.getMessage(OFType.FEATURES_REQUEST);
                             asyncFastSend(request);
                         } else {
                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
                                 // send another config request
-                                OFSetConfig config = (OFSetConfig) factory
-                                        .getMessage(OFType.SET_CONFIG);
-                                config.setMissSendLength((short) 0xffff)
-                                        .setLengthU(OFSetConfig.MINIMUM_LENGTH);
+                                OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
+                                config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
                                 asyncFastSend(config);
-                                OFMessage getConfig = factory
-                                        .getMessage(OFType.GET_CONFIG_REQUEST);
+                                OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
                                 asyncFastSend(getConfig);
                             }
                         }
@@ -501,18 +486,18 @@ public class SwitchHandler implements ISwitch {
     }
 
     private void reportError(Exception e) {
-        if (e instanceof AsynchronousCloseException
-                || e instanceof InterruptedException
-                || e instanceof SocketException || e instanceof IOException
-                || e instanceof ClosedSelectorException) {
-            if (logger.isDebugEnabled()) {
-              logger.debug("Caught exception {}", e.getMessage());
-            }
-        } else {
-            logger.warn("Caught exception ", e);
+        if (!running) {
+            logger.debug("Caught exception {} while switch {} is shutting down. Skip", e.getMessage(),
+                    (isOperational() ? HexString.toHexString(sid) : "unknown"));
+            return;
         }
+        logger.debug("Caught exception: ", e);
+
         // notify core of this error event and disconnect the switch
         ((Controller) core).takeSwitchEventError(this);
+
+        // clean up some internal states immediately
+        stopInternal();
     }
 
     private void reportSwitchStateChange(boolean added) {
@@ -540,10 +525,8 @@ public class SwitchHandler implements ISwitch {
                 updatePhysicalPort(port);
             }
             // config the switch to send full data packet
-            OFSetConfig config = (OFSetConfig) factory
-                    .getMessage(OFType.SET_CONFIG);
-            config.setMissSendLength((short) 0xffff).setLengthU(
-                    OFSetConfig.MINIMUM_LENGTH);
+            OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
+            config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
             asyncFastSend(config);
             // send config request to make sure the switch can handle the set
             // config
@@ -561,17 +544,11 @@ public class SwitchHandler implements ISwitch {
         portBandwidth
                 .put(portNumber,
                         port.getCurrentFeatures()
-                                & (OFPortFeatures.OFPPF_10MB_FD.getValue()
-                                        | OFPortFeatures.OFPPF_10MB_HD
-                                                .getValue()
-                                        | OFPortFeatures.OFPPF_100MB_FD
-                                                .getValue()
-                                        | OFPortFeatures.OFPPF_100MB_HD
-                                                .getValue()
-                                        | OFPortFeatures.OFPPF_1GB_FD
-                                                .getValue()
-                                        | OFPortFeatures.OFPPF_1GB_HD
-                                                .getValue() | OFPortFeatures.OFPPF_10GB_FD
+                                & (OFPortFeatures.OFPPF_10MB_FD.getValue() | OFPortFeatures.OFPPF_10MB_HD.getValue()
+                                        | OFPortFeatures.OFPPF_100MB_FD.getValue()
+                                        | OFPortFeatures.OFPPF_100MB_HD.getValue()
+                                        | OFPortFeatures.OFPPF_1GB_FD.getValue()
+                                        | OFPortFeatures.OFPPF_1GB_HD.getValue() | OFPortFeatures.OFPPF_10GB_FD
                                             .getValue()));
     }
 
@@ -589,13 +566,10 @@ public class SwitchHandler implements ISwitch {
     @Override
     public String toString() {
         try {
-            return ("Switch:"
-                    + socket.socket().getRemoteSocketAddress().toString().split("/")[1]
-                    + " SWID:" + (isOperational() ? HexString
+            return ("Switch:" + socket.socket().getRemoteSocketAddress().toString().split("/")[1] + " SWID:" + (isOperational() ? HexString
                     .toHexString(this.sid) : "unknown"));
         } catch (Exception e) {
-            return (isOperational() ? HexString.toHexString(this.sid)
-                    : "unknown");
+            return (isOperational() ? HexString.toHexString(this.sid) : "unknown");
         }
 
     }
@@ -614,13 +588,20 @@ public class SwitchHandler implements ISwitch {
         int xid = getNextXid();
         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
         messageWaitingDone.put(xid, worker);
-        Future<Object> submit = executor.submit(worker);
+        Future<Object> submit;
         Object result = null;
+        try {
+            submit = executor.submit(worker);
+        } catch (RejectedExecutionException re) {
+            messageWaitingDone.remove(xid);
+            return result;
+        }
         try {
             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
             return result;
         } catch (Exception e) {
-            logger.warn("Timeout while waiting for {} replies", req.getType());
+            logger.warn("Timeout while waiting for {} replies from {}",
+                    req.getType(), (isOperational() ? HexString.toHexString(sid) : "unknown"));
             result = null; // to indicate timeout has occurred
             worker.wakeup();
             return result;
@@ -629,6 +610,10 @@ public class SwitchHandler implements ISwitch {
 
     @Override
     public Object syncSend(OFMessage msg) {
+        if (!running) {
+            logger.debug("Switch is going down, ignore syncSend");
+            return null;
+        }
         int xid = getNextXid();
         return syncSend(msg, xid);
     }
@@ -639,8 +624,7 @@ public class SwitchHandler implements ISwitch {
      */
     private void processBarrierReply(OFBarrierReply msg) {
         Integer xid = msg.getXid();
-        SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
-                .remove(xid);
+        SynchronousMessage worker = (SynchronousMessage) messageWaitingDone.remove(xid);
         if (worker == null) {
             return;
         }
@@ -672,8 +656,7 @@ public class SwitchHandler implements ISwitch {
 
     private void processStatsReply(OFStatisticsReply reply) {
         Integer xid = reply.getXid();
-        StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
-                .get(xid);
+        StatisticsCollector worker = (StatisticsCollector) messageWaitingDone.get(xid);
         if (worker == null) {
             return;
         }
@@ -743,8 +726,7 @@ public class SwitchHandler implements ISwitch {
         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
             return false;
         }
-        if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
-                .getValue()) {
+        if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK.getValue()) {
             return false;
         }
         return true;
@@ -782,8 +764,7 @@ public class SwitchHandler implements ISwitch {
                         syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
                     }
                 } catch (InterruptedException ie) {
-                    reportError(new InterruptedException(
-                            "PriorityMessageTransmit thread interrupted"));
+                    reportError(new InterruptedException("PriorityMessageTransmit thread interrupted"));
                 } catch (Exception e) {
                     reportError(e);
                 }
@@ -796,17 +777,16 @@ public class SwitchHandler implements ISwitch {
      * Setup and start the transmit thread
      */
     private void startTransmitThread() {
-        this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
-                new Comparator<PriorityMessage>() {
-                    @Override
-                    public int compare(PriorityMessage p1, PriorityMessage p2) {
-                        if (p2.priority != p1.priority) {
-                            return p2.priority - p1.priority;
-                        } else {
-                            return (p2.seqNum < p1.seqNum) ? 1 : -1;
-                        }
-                    }
-                });
+        this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, new Comparator<PriorityMessage>() {
+            @Override
+            public int compare(PriorityMessage p1, PriorityMessage p2) {
+                if (p2.priority != p1.priority) {
+                    return p2.priority - p1.priority;
+                } else {
+                    return (p2.seqNum < p1.seqNum) ? 1 : -1;
+                }
+            }
+        });
         this.transmitThread = new Thread(new PriorityMessageTransmit());
         this.transmitThread.start();
     }
@@ -832,9 +812,8 @@ public class SwitchHandler implements ISwitch {
 
     private IMessageReadWrite getMessageReadWriteService() throws Exception {
         String str = System.getProperty("secureChannelEnabled");
-        return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(
-                socket, selector) : new MessageReadWriteService(socket,
-                selector);
+        return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(socket,
+                selector) : new MessageReadWriteService(socket, selector);
     }
 
     /**
@@ -910,7 +889,13 @@ public class SwitchHandler implements ISwitch {
         messageWaitingDone.put(xid, worker);
         Object result = null;
         Boolean status = false;
-        Future<Object> submit = executor.submit(worker);
+        Future<Object> submit;
+        try {
+           submit = executor.submit(worker);
+        } catch (RejectedExecutionException re) {
+            messageWaitingDone.remove(xid);
+            return result;
+        }
         try {
             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
             messageWaitingDone.remove(xid);
@@ -927,14 +912,12 @@ public class SwitchHandler implements ISwitch {
                 // this message
                 // the result if OFError already
                 if (logger.isDebugEnabled()) {
-                  logger.debug("Send {} failed --> {}", msg.getType(),
-                               (result));
+                    logger.debug("Send {} failed --> {}", msg.getType(), (result));
                 }
             }
             return result;
         } catch (Exception e) {
-            logger.warn("Timeout while waiting for {} reply", msg.getType()
-                    .toString());
+            logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
             // convert the result into a Boolean with value false
             status = false;
             result = status;
index bb303e3651de7e3a95659641cf001da4b0153bb4..1938cb1ae68e66efe8bb68b12f42e3fcf1373703 100644 (file)
@@ -61,8 +61,6 @@ import org.opendaylight.controller.sal.utils.HexEncode;
 import org.opendaylight.controller.sal.utils.NetUtils;
 import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
 import org.opendaylight.controller.sal.utils.NodeCreator;
-import org.opendaylight.controller.sal.utils.Status;
-import org.opendaylight.controller.sal.utils.StatusCode;
 
 /**
  * The class describes neighbor discovery service for an OpenFlow network.
@@ -579,6 +577,14 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa
         readyListHi.add(nodeConnector);
     }
 
+    private void removeNodeConnector(NodeConnector nodeConnector) {
+        readyListLo.remove(nodeConnector);
+        readyListHi.remove(nodeConnector);
+        stagingList.remove(nodeConnector);
+        holdTime.remove(nodeConnector);
+        elapsedTime.remove(nodeConnector);
+    }
+
     private Set<NodeConnector> getRemoveSet(Collection<NodeConnector> c, Node node) {
         Set<NodeConnector> removeSet = new HashSet<NodeConnector>();
         if (c == null) {
@@ -586,16 +592,7 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa
         }
         for (NodeConnector nodeConnector : c) {
             if (node.equals(nodeConnector.getNode())) {
-                Edge edge1 = edgeMap.get(nodeConnector);
-                if (edge1 != null) {
-                    removeSet.add(nodeConnector);
-
-                    // check reverse direction
-                    Edge edge2 = edgeMap.get(edge1.getTailNodeConnector());
-                    if ((edge2 != null) && node.equals(edge2.getTailNodeConnector().getNode())) {
-                        removeSet.add(edge2.getHeadNodeConnector());
-                    }
-                }
+                removeSet.add(nodeConnector);
             }
         }
         return removeSet;
@@ -604,6 +601,29 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa
     private void removeDiscovery(Node node) {
         Set<NodeConnector> removeSet;
 
+        removeSet = getRemoveSet(edgeMap.keySet(), node);
+        NodeConnector peerConnector;
+        Edge edge1, edge2;
+        for (NodeConnector nodeConnector : removeSet) {
+            // get the peer for fast removal of the edge in reverse direction
+            peerConnector = null;
+            edge1 = edgeMap.get(nodeConnector);
+            if (edge1 != null) {
+                edge2 = edgeMap.get(edge1.getTailNodeConnector());
+                if ((edge2 != null) && node.equals(edge2.getTailNodeConnector().getNode())) {
+                    peerConnector = edge2.getHeadNodeConnector();
+                }
+            }
+
+            removeEdge(nodeConnector, false);
+            removeEdge(peerConnector, isEnabled(peerConnector));
+        }
+
+        removeSet = getRemoveSet(prodMap.keySet(), node);
+        for (NodeConnector nodeConnector : removeSet) {
+            removeProdEdge(nodeConnector);
+        }
+
         removeSet = getRemoveSet(readyListHi, node);
         readyListHi.removeAll(removeSet);
 
@@ -618,22 +638,14 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa
             holdTime.remove(nodeConnector);
         }
 
-        removeSet = getRemoveSet(edgeMap.keySet(), node);
+        removeSet = getRemoveSet(elapsedTime.keySet(), node);
         for (NodeConnector nodeConnector : removeSet) {
-            removeEdge(nodeConnector, false);
-        }
-
-        removeSet = getRemoveSet(prodMap.keySet(), node);
-        for (NodeConnector nodeConnector : removeSet) {
-            removeProdEdge(nodeConnector);
+            elapsedTime.remove(nodeConnector);
         }
     }
 
     private void removeDiscovery(NodeConnector nodeConnector) {
-        readyListHi.remove(nodeConnector);
-        readyListLo.remove(nodeConnector);
-        stagingList.remove(nodeConnector);
-        holdTime.remove(nodeConnector);
+        removeNodeConnector(nodeConnector);
         removeEdge(nodeConnector, false);
         removeProdEdge(nodeConnector);
     }
@@ -672,7 +684,6 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa
 
             for (NodeConnector nodeConnector : retrySet) {
                 // Allow one more retry
-                readyListLo.add(nodeConnector);
                 elapsedTime.remove(nodeConnector);
                 if (connectionOutService.isLocal(nodeConnector.getNode())) {
                     transmitQ.add(nodeConnector);
@@ -805,6 +816,11 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa
         }
         elapsedTime.remove(src);
 
+        // fast discovery of the edge in reverse direction
+        if (!edgeMap.containsKey(dst) && !readyListHi.contains(dst) && !elapsedTime.keySet().contains(dst)) {
+            moveToReadyListHi(dst);
+        }
+
         // notify
         updateEdge(edge, UpdateType.ADDED, props);
         logger.trace("Add edge {}", edge);
@@ -871,18 +887,15 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa
      * Remove OpenFlow edge
      */
     private void removeEdge(NodeConnector nodeConnector, boolean stillEnabled) {
-        holdTime.remove(nodeConnector);
-        readyListLo.remove(nodeConnector);
-        readyListHi.remove(nodeConnector);
+        if (nodeConnector == null) {
+            return;
+        }
+
+        removeNodeConnector(nodeConnector);
 
         if (stillEnabled) {
             // keep discovering
-            if (!stagingList.contains(nodeConnector)) {
-                stagingList.add(nodeConnector);
-            }
-        } else {
-            // stop it
-            stagingList.remove(nodeConnector);
+            stagingList.add(nodeConnector);
         }
 
         Edge edge = null;
@@ -1240,7 +1253,15 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa
 
         if (val != null) {
             try {
-                int ticks = Integer.parseInt(val);
+                int ticks;
+                Set<NodeConnector> monitorSet = holdTime.keySet();
+                if (monitorSet != null) {
+                    for (NodeConnector nodeConnector : monitorSet) {
+                        holdTime.put(nodeConnector, 0);
+                    }
+                }
+
+                ticks = Integer.parseInt(val);
                 DiscoveryPeriod.INTERVAL.setTick(ticks);
                 discoveryBatchRestartTicks = getDiscoveryInterval();
                 discoveryBatchPauseTicks = getDiscoveryPauseInterval();
index 78c78f55429980466e42d5074e07e133c9cfa0bb..d3d41be19f8d6061d2f5f9502c0678fe71cf332e 100644 (file)
@@ -1042,6 +1042,7 @@ public class SwitchManager implements ISwitchManager, IConfigurationContainerAwa
     public void updateNodeConnector(NodeConnector nodeConnector,
             UpdateType type, Set<Property> props) {
         Map<String, Property> propMap = new HashMap<String, Property>();
+        boolean update = true;
 
         log.debug("updateNodeConnector: {} type {} props {} for container {}",
                 new Object[] { nodeConnector, type, props, containerName });
@@ -1052,7 +1053,6 @@ public class SwitchManager implements ISwitchManager, IConfigurationContainerAwa
 
         switch (type) {
         case ADDED:
-        case CHANGED:
             if (props != null) {
                 for (Property prop : props) {
                     addNodeConnectorProp(nodeConnector, prop);
@@ -1064,17 +1064,33 @@ public class SwitchManager implements ISwitchManager, IConfigurationContainerAwa
 
             addSpanPort(nodeConnector);
             break;
+        case CHANGED:
+            if (!nodeConnectorProps.containsKey(nodeConnector) || (props == null)) {
+                update = false;
+            } else {
+                for (Property prop : props) {
+                    addNodeConnectorProp(nodeConnector, prop);
+                    propMap.put(prop.getName(), prop);
+                }
+            }
+            break;
         case REMOVED:
+            if (!nodeConnectorProps.containsKey(nodeConnector)) {
+                update = false;
+            }
             removeNodeConnectorAllProps(nodeConnector);
 
             // clean up span config
             removeSpanPort(nodeConnector);
             break;
         default:
+            update = false;
             break;
         }
 
-        notifyNodeConnector(nodeConnector, type, propMap);
+        if (update) {
+            notifyNodeConnector(nodeConnector, type, propMap);
+        }
     }
 
     @Override