From 78ef04c45c5a7fbee9bbb9ae77ecb1882add8623 Mon Sep 17 00:00:00 2001 From: Jason Ye Date: Wed, 21 Aug 2013 13:44:44 -0700 Subject: [PATCH] 1. Controller switchEvents queue should be priority based. The queue holds switch up, down, error and message events. Assign higher priority to switch up/down events. 2. When switch is going to disconnect, SwitchHandler can clean up some internal states to speed up the process in order to avoid unnecessary messages backing up in the queue. 3. In SwitchManager, don't send additional port down notifications after node down event is received. 4. Fixed an issue where links are not recovered when the switch is reconnected to the controller. 5. Fixed TLS exception@java.security.ProviderException. Change-Id: I4ba190a59f8d046df999401572d3c6822b15429b Signed-off-by: Jason Ye --- .../main/resources/configuration/config.ini | 4 +- .../openflow/core/internal/Controller.java | 45 +++-- .../internal/MessageReadWriteService.java | 6 +- .../SecureMessageReadWriteService.java | 36 ++-- .../openflow/core/internal/SwitchEvent.java | 12 +- .../openflow/core/internal/SwitchHandler.java | 189 ++++++++---------- .../openflow/internal/DiscoveryService.java | 89 +++++---- .../switchmanager/internal/SwitchManager.java | 20 +- 8 files changed, 224 insertions(+), 177 deletions(-) diff --git a/opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini b/opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini index b99b4bfdca..f9899d7d8b 100644 --- a/opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini +++ b/opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini @@ -77,9 +77,9 @@ org.eclipse.gemini.web.tomcat.config.path=configuration/tomcat-server.xml # entries, including switches' Certification Authority (CA) certificates. For example, # secureChannelEnabled=true # controllerKeyStore=./configuration/ctlKeyStore -# controllerKeyStorePassword=xxxxx (this password should match the password used for KeyStore generation) +# controllerKeyStorePassword=xxxxxxxx (this password should match the password used for KeyStore generation and at least 6 characters) # controllerTrustStore=./configuration/ctlTrustStore -# controllerTrustStorePassword=xxxxx (this password should match the password used for TrustStore generation) +# controllerTrustStorePassword=xxxxxxxx (this password should match the password used for TrustStore generation and at least 6 characters) secureChannelEnabled=false controllerKeyStore= diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/Controller.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/Controller.java index c7c6c8924d..172ec98780 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/Controller.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/Controller.java @@ -13,15 +13,15 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.util.Comparator; import java.util.Date; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.osgi.framework.console.CommandInterpreter; @@ -32,7 +32,6 @@ import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch; import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitchStateListener; import org.opendaylight.controller.sal.connection.ConnectionConstants; import org.opendaylight.controller.sal.connection.IPluginInConnectionService; -import org.opendaylight.controller.sal.connection.IPluginOutConnectionService; import org.opendaylight.controller.sal.core.Node; import org.opendaylight.controller.sal.utils.Status; import org.opendaylight.controller.sal.utils.StatusCode; @@ -50,7 +49,7 @@ public class Controller implements IController, CommandProvider, IPluginInConnec private ControllerIO controllerIO; private Thread switchEventThread; private ConcurrentHashMap switches; - private BlockingQueue switchEvents; + private PriorityBlockingQueue switchEvents; // only 1 message listener per OFType private ConcurrentMap messageListeners; // only 1 switch state listener @@ -58,6 +57,8 @@ public class Controller implements IController, CommandProvider, IPluginInConnec private AtomicInteger switchInstanceNumber; private int MAXQUEUESIZE = 50000; + private static enum SwitchEventPriority { LOW, NORMAL, HIGH } + /* * this thread monitors the switchEvents queue for new incoming events from * switch @@ -119,7 +120,12 @@ public class Controller implements IController, CommandProvider, IPluginInConnec public void init() { logger.debug("Initializing!"); this.switches = new ConcurrentHashMap(); - this.switchEvents = new LinkedBlockingQueue(MAXQUEUESIZE); + this.switchEvents = new PriorityBlockingQueue(MAXQUEUESIZE, new Comparator() { + @Override + public int compare(SwitchEvent p1, SwitchEvent p2) { + return p2.getPriority() - p1.getPriority(); + } + }); this.messageListeners = new ConcurrentHashMap(); this.switchStateListener = null; this.switchInstanceNumber = new AtomicInteger(0); @@ -244,7 +250,7 @@ public class Controller implements IController, CommandProvider, IPluginInConnec if (((SwitchHandler) sw).isOperational()) { Long sid = sw.getId(); if (this.switches.remove(sid, sw)) { - logger.warn("{} is Disconnected", sw); + logger.info("{} is removed", sw); notifySwitchDeleted(sw); } } @@ -265,35 +271,31 @@ public class Controller implements IController, CommandProvider, IPluginInConnec } private synchronized void addSwitchEvent(SwitchEvent event) { - try { - this.switchEvents.put(event); - } catch (InterruptedException e) { - logger.debug("SwitchEvent caught Interrupt Exception"); - } + this.switchEvents.put(event); } public void takeSwitchEventAdd(ISwitch sw) { - SwitchEvent ev = new SwitchEvent( - SwitchEvent.SwitchEventType.SWITCH_ADD, sw, null); + SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ADD, sw, null, + SwitchEventPriority.HIGH.ordinal()); addSwitchEvent(ev); } public void takeSwitchEventDelete(ISwitch sw) { - SwitchEvent ev = new SwitchEvent( - SwitchEvent.SwitchEventType.SWITCH_DELETE, sw, null); + SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_DELETE, sw, null, + SwitchEventPriority.HIGH.ordinal()); addSwitchEvent(ev); } public void takeSwitchEventError(ISwitch sw) { - SwitchEvent ev = new SwitchEvent( - SwitchEvent.SwitchEventType.SWITCH_ERROR, sw, null); + SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ERROR, sw, null, + SwitchEventPriority.NORMAL.ordinal()); addSwitchEvent(ev); } public void takeSwitchEventMsg(ISwitch sw, OFMessage msg) { if (messageListeners.get(msg.getType()) != null) { - SwitchEvent ev = new SwitchEvent( - SwitchEvent.SwitchEventType.SWITCH_MESSAGE, sw, msg); + SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_MESSAGE, sw, msg, + SwitchEventPriority.LOW.ordinal()); addSwitchEvent(ev); } } @@ -308,6 +310,10 @@ public class Controller implements IController, CommandProvider, IPluginInConnec return this.switches.get(switchId); } + public void _controllerShowQueueSize(CommandInterpreter ci) { + ci.print("switchEvents queue size: " + switchEvents.size() + "\n"); + } + public void _controllerShowSwitches(CommandInterpreter ci) { Set sids = switches.keySet(); StringBuffer s = new StringBuffer(); @@ -378,6 +384,7 @@ public class Controller implements IController, CommandProvider, IPluginInConnec help.append("\t controllerShowSwitches\n"); help.append("\t controllerReset\n"); help.append("\t controllerShowConnConfig\n"); + help.append("\t controllerShowQueueSize\n"); return help.toString(); } diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java index fc2e0ee324..dfbecfeb06 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/MessageReadWriteService.java @@ -58,7 +58,7 @@ public class MessageReadWriteService implements IMessageReadWrite { * @throws Exception */ @Override - public void asyncSend(OFMessage msg) throws IOException { + public void asyncSend(OFMessage msg) throws Exception { synchronized (outBuffer) { int msgLen = msg.getLengthU(); if (outBuffer.remaining() < msgLen) { @@ -94,7 +94,7 @@ public class MessageReadWriteService implements IMessageReadWrite { * @throws Exception */ @Override - public void resumeSend() throws IOException { + public void resumeSend() throws Exception { synchronized (outBuffer) { if (!socket.isOpen()) { return; @@ -121,7 +121,7 @@ public class MessageReadWriteService implements IMessageReadWrite { * @throws Exception */ @Override - public List readMessages() throws IOException { + public List readMessages() throws Exception { if (!socket.isOpen()) { return null; } diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SecureMessageReadWriteService.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SecureMessageReadWriteService.java index 64031fd012..bb4defceca 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SecureMessageReadWriteService.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SecureMessageReadWriteService.java @@ -41,7 +41,6 @@ public class SecureMessageReadWriteService implements IMessageReadWrite { .getLogger(SecureMessageReadWriteService.class); private Selector selector; - private SelectionKey clientSelectionKey; private SocketChannel socket; private BasicFactory factory; @@ -132,12 +131,28 @@ public class SecureMessageReadWriteService implements IMessageReadWrite { sslEngine = sslContext.createSSLEngine(); sslEngine.setUseClientMode(false); sslEngine.setNeedClientAuth(true); + sslEngine.setEnabledCipherSuites(new String[] { + "SSL_RSA_WITH_RC4_128_MD5", + "SSL_RSA_WITH_RC4_128_SHA", + "TLS_RSA_WITH_AES_128_CBC_SHA", + "TLS_DHE_RSA_WITH_AES_128_CBC_SHA", + "TLS_DHE_DSS_WITH_AES_128_CBC_SHA", + "SSL_RSA_WITH_3DES_EDE_CBC_SHA", + "SSL_DHE_RSA_WITH_3DES_EDE_CBC_SHA", + "SSL_DHE_DSS_WITH_3DES_EDE_CBC_SHA", + "SSL_RSA_WITH_DES_CBC_SHA", + "SSL_DHE_RSA_WITH_DES_CBC_SHA", + "SSL_DHE_DSS_WITH_DES_CBC_SHA", + "SSL_RSA_EXPORT_WITH_RC4_40_MD5", + "SSL_RSA_EXPORT_WITH_DES40_CBC_SHA", + "SSL_DHE_RSA_EXPORT_WITH_DES40_CBC_SHA", + "SSL_DHE_DSS_EXPORT_WITH_DES40_CBC_SHA", + "TLS_EMPTY_RENEGOTIATION_INFO_SCSV"}); // Do initial handshake doHandshake(socket, sslEngine); - this.clientSelectionKey = this.socket.register(this.selector, - SelectionKey.OP_READ); + this.socket.register(this.selector, SelectionKey.OP_READ); } /** @@ -182,12 +197,10 @@ public class SecureMessageReadWriteService implements IMessageReadWrite { if (myAppData.hasRemaining()) { myAppData.compact(); - this.clientSelectionKey = this.socket.register(this.selector, - SelectionKey.OP_WRITE, this); + this.socket.register(this.selector, SelectionKey.OP_WRITE, this); } else { myAppData.clear(); - this.clientSelectionKey = this.socket.register(this.selector, - SelectionKey.OP_READ, this); + this.socket.register(this.selector, SelectionKey.OP_READ, this); } logger.trace("Message sent: {}", msg); @@ -221,12 +234,10 @@ public class SecureMessageReadWriteService implements IMessageReadWrite { if (myAppData.hasRemaining()) { myAppData.compact(); - this.clientSelectionKey = this.socket.register(this.selector, - SelectionKey.OP_WRITE, this); + this.socket.register(this.selector, SelectionKey.OP_WRITE, this); } else { myAppData.clear(); - this.clientSelectionKey = this.socket.register(this.selector, - SelectionKey.OP_READ, this); + this.socket.register(this.selector, SelectionKey.OP_READ, this); } } } @@ -280,8 +291,7 @@ public class SecureMessageReadWriteService implements IMessageReadWrite { peerAppData.clear(); } - this.clientSelectionKey = this.socket.register(this.selector, - SelectionKey.OP_READ, this); + this.socket.register(this.selector, SelectionKey.OP_READ, this); return msgs; } diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchEvent.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchEvent.java index 87e30d70c6..785be9be09 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchEvent.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchEvent.java @@ -21,11 +21,13 @@ public class SwitchEvent { private SwitchEventType eventType; private ISwitch sw; private OFMessage msg; + private int priority; - public SwitchEvent(SwitchEventType type, ISwitch sw, OFMessage msg) { + public SwitchEvent(SwitchEventType type, ISwitch sw, OFMessage msg, int priority) { this.eventType = type; this.sw = sw; this.msg = msg; + this.priority = priority; } public SwitchEventType getEventType() { @@ -40,6 +42,14 @@ public class SwitchEvent { return this.msg; } + public int getPriority() { + return priority; + } + + public void setPriority(int priority) { + this.priority = priority; + } + @Override public String toString() { String s; diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java index 52ea7fd575..91909d20f5 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java @@ -8,10 +8,6 @@ package org.opendaylight.controller.protocol_plugin.openflow.core.internal; -import java.io.IOException; -import java.net.SocketException; -import java.nio.channels.AsynchronousCloseException; -import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; @@ -32,6 +28,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -64,8 +61,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SwitchHandler implements ISwitch { - private static final Logger logger = LoggerFactory - .getLogger(SwitchHandler.class); + private static final Logger logger = LoggerFactory.getLogger(SwitchHandler.class); private static final int SWITCH_LIVENESS_TIMER = 5000; private static final int switchLivenessTimeout = getSwitchLivenessTimeout(); private final int MESSAGE_RESPONSE_TIMER = 2000; @@ -99,8 +95,7 @@ public class SwitchHandler implements ISwitch { private Thread transmitThread; private enum SwitchState { - NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL( - 3); + NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(3); private int value; @@ -141,9 +136,7 @@ public class SwitchHandler implements ISwitch { try { responseTimerValue = Integer.decode(rTimer); } catch (NumberFormatException e) { - logger.warn( - "Invalid of.messageResponseTimer: {} use default({})", - rTimer, MESSAGE_RESPONSE_TIMER); + logger.warn("Invalid of.messageResponseTimer: {} use default({})", rTimer, MESSAGE_RESPONSE_TIMER); } } } @@ -168,8 +161,7 @@ public class SwitchHandler implements ISwitch { try { // wait for an incoming connection selector.select(0); - Iterator selectedKeys = selector - .selectedKeys().iterator(); + Iterator selectedKeys = selector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { SelectionKey skey = selectedKeys.next(); selectedKeys.remove(); @@ -189,7 +181,9 @@ public class SwitchHandler implements ISwitch { switchHandlerThread.start(); } - public void stop() { + private void stopInternal() { + logger.debug("{} receives stop signal", + (isOperational() ? HexString.toHexString(sid) : "unknown")); running = false; cancelSwitchTimer(); try { @@ -205,9 +199,14 @@ public class SwitchHandler implements ISwitch { msgReadWriteService.stop(); } catch (Exception e) { } - executor.shutdown(); + logger.debug("executor shutdown now"); + executor.shutdownNow(); msgReadWriteService = null; + } + + public void stop() { + stopInternal(); if (switchHandlerThread != null) { switchHandlerThread.interrupt(); @@ -337,9 +336,7 @@ public class SwitchHandler implements ISwitch { */ private void asyncSendNow(OFMessage msg) { if (msgReadWriteService == null) { - logger.warn( - "asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", - msg); + logger.warn("asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", msg); return; } @@ -362,8 +359,7 @@ public class SwitchHandler implements ISwitch { } if (msgs == null) { - logger.debug("{} is down", this); - // the connection is down, inform core + logger.info("{} is down", this); reportSwitchStateChange(false); return; } @@ -374,23 +370,19 @@ public class SwitchHandler implements ISwitch { switch (type) { case HELLO: // send feature request - OFMessage featureRequest = factory - .getMessage(OFType.FEATURES_REQUEST); + OFMessage featureRequest = factory.getMessage(OFType.FEATURES_REQUEST); asyncFastSend(featureRequest); // delete all pre-existing flows OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL); - OFFlowMod flowMod = (OFFlowMod) factory - .getMessage(OFType.FLOW_MOD); - flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE) - .setOutPort(OFPort.OFPP_NONE) + OFFlowMod flowMod = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD); + flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE).setOutPort(OFPort.OFPP_NONE) .setLength((short) OFFlowMod.MINIMUM_LENGTH); asyncFastSend(flowMod); this.state = SwitchState.WAIT_FEATURES_REPLY; startSwitchTimer(); break; case ECHO_REQUEST: - OFEchoReply echoReply = (OFEchoReply) factory - .getMessage(OFType.ECHO_REPLY); + OFEchoReply echoReply = (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY); // respond immediately asyncSendNow(echoReply, msg.getXid()); break; @@ -436,8 +428,7 @@ public class SwitchHandler implements ISwitch { updatePhysicalPort(port); } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) { updatePhysicalPort(port); - } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE - .ordinal()) { + } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE.ordinal()) { deletePhysicalPort(port); } @@ -454,35 +445,29 @@ public class SwitchHandler implements ISwitch { if (probeSent) { // switch failed to respond to our probe, consider // it down - logger.warn("{} is idle for too long, disconnect", - toString()); + logger.warn("{} sid {} is idle for too long, disconnect", socket.socket() + .getRemoteSocketAddress().toString().split("/")[1], (sid == 0) ? "unknown" + : HexString.toHexString(sid)); reportSwitchStateChange(false); } else { // send a probe to see if the switch is still alive - logger.debug( - "Send idle probe (Echo Request) to {}", - this); + logger.debug("Send idle probe (Echo Request) to {}", this); probeSent = true; - OFMessage echo = factory - .getMessage(OFType.ECHO_REQUEST); + OFMessage echo = factory.getMessage(OFType.ECHO_REQUEST); asyncFastSend(echo); } } else { if (state == SwitchState.WAIT_FEATURES_REPLY) { // send another features request - OFMessage request = factory - .getMessage(OFType.FEATURES_REQUEST); + OFMessage request = factory.getMessage(OFType.FEATURES_REQUEST); asyncFastSend(request); } else { if (state == SwitchState.WAIT_CONFIG_REPLY) { // send another config request - OFSetConfig config = (OFSetConfig) factory - .getMessage(OFType.SET_CONFIG); - config.setMissSendLength((short) 0xffff) - .setLengthU(OFSetConfig.MINIMUM_LENGTH); + OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG); + config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH); asyncFastSend(config); - OFMessage getConfig = factory - .getMessage(OFType.GET_CONFIG_REQUEST); + OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST); asyncFastSend(getConfig); } } @@ -501,18 +486,18 @@ public class SwitchHandler implements ISwitch { } private void reportError(Exception e) { - if (e instanceof AsynchronousCloseException - || e instanceof InterruptedException - || e instanceof SocketException || e instanceof IOException - || e instanceof ClosedSelectorException) { - if (logger.isDebugEnabled()) { - logger.debug("Caught exception {}", e.getMessage()); - } - } else { - logger.warn("Caught exception ", e); + if (!running) { + logger.debug("Caught exception {} while switch {} is shutting down. Skip", e.getMessage(), + (isOperational() ? HexString.toHexString(sid) : "unknown")); + return; } + logger.debug("Caught exception: ", e); + // notify core of this error event and disconnect the switch ((Controller) core).takeSwitchEventError(this); + + // clean up some internal states immediately + stopInternal(); } private void reportSwitchStateChange(boolean added) { @@ -540,10 +525,8 @@ public class SwitchHandler implements ISwitch { updatePhysicalPort(port); } // config the switch to send full data packet - OFSetConfig config = (OFSetConfig) factory - .getMessage(OFType.SET_CONFIG); - config.setMissSendLength((short) 0xffff).setLengthU( - OFSetConfig.MINIMUM_LENGTH); + OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG); + config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH); asyncFastSend(config); // send config request to make sure the switch can handle the set // config @@ -561,17 +544,11 @@ public class SwitchHandler implements ISwitch { portBandwidth .put(portNumber, port.getCurrentFeatures() - & (OFPortFeatures.OFPPF_10MB_FD.getValue() - | OFPortFeatures.OFPPF_10MB_HD - .getValue() - | OFPortFeatures.OFPPF_100MB_FD - .getValue() - | OFPortFeatures.OFPPF_100MB_HD - .getValue() - | OFPortFeatures.OFPPF_1GB_FD - .getValue() - | OFPortFeatures.OFPPF_1GB_HD - .getValue() | OFPortFeatures.OFPPF_10GB_FD + & (OFPortFeatures.OFPPF_10MB_FD.getValue() | OFPortFeatures.OFPPF_10MB_HD.getValue() + | OFPortFeatures.OFPPF_100MB_FD.getValue() + | OFPortFeatures.OFPPF_100MB_HD.getValue() + | OFPortFeatures.OFPPF_1GB_FD.getValue() + | OFPortFeatures.OFPPF_1GB_HD.getValue() | OFPortFeatures.OFPPF_10GB_FD .getValue())); } @@ -589,13 +566,10 @@ public class SwitchHandler implements ISwitch { @Override public String toString() { try { - return ("Switch:" - + socket.socket().getRemoteSocketAddress().toString().split("/")[1] - + " SWID:" + (isOperational() ? HexString + return ("Switch:" + socket.socket().getRemoteSocketAddress().toString().split("/")[1] + " SWID:" + (isOperational() ? HexString .toHexString(this.sid) : "unknown")); } catch (Exception e) { - return (isOperational() ? HexString.toHexString(this.sid) - : "unknown"); + return (isOperational() ? HexString.toHexString(this.sid) : "unknown"); } } @@ -614,13 +588,20 @@ public class SwitchHandler implements ISwitch { int xid = getNextXid(); StatisticsCollector worker = new StatisticsCollector(this, xid, req); messageWaitingDone.put(xid, worker); - Future submit = executor.submit(worker); + Future submit; Object result = null; + try { + submit = executor.submit(worker); + } catch (RejectedExecutionException re) { + messageWaitingDone.remove(xid); + return result; + } try { result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS); return result; } catch (Exception e) { - logger.warn("Timeout while waiting for {} replies", req.getType()); + logger.warn("Timeout while waiting for {} replies from {}", + req.getType(), (isOperational() ? HexString.toHexString(sid) : "unknown")); result = null; // to indicate timeout has occurred worker.wakeup(); return result; @@ -629,6 +610,10 @@ public class SwitchHandler implements ISwitch { @Override public Object syncSend(OFMessage msg) { + if (!running) { + logger.debug("Switch is going down, ignore syncSend"); + return null; + } int xid = getNextXid(); return syncSend(msg, xid); } @@ -639,8 +624,7 @@ public class SwitchHandler implements ISwitch { */ private void processBarrierReply(OFBarrierReply msg) { Integer xid = msg.getXid(); - SynchronousMessage worker = (SynchronousMessage) messageWaitingDone - .remove(xid); + SynchronousMessage worker = (SynchronousMessage) messageWaitingDone.remove(xid); if (worker == null) { return; } @@ -672,8 +656,7 @@ public class SwitchHandler implements ISwitch { private void processStatsReply(OFStatisticsReply reply) { Integer xid = reply.getXid(); - StatisticsCollector worker = (StatisticsCollector) messageWaitingDone - .get(xid); + StatisticsCollector worker = (StatisticsCollector) messageWaitingDone.get(xid); if (worker == null) { return; } @@ -743,8 +726,7 @@ public class SwitchHandler implements ISwitch { if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) { return false; } - if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK - .getValue()) { + if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK.getValue()) { return false; } return true; @@ -782,8 +764,7 @@ public class SwitchHandler implements ISwitch { syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false); } } catch (InterruptedException ie) { - reportError(new InterruptedException( - "PriorityMessageTransmit thread interrupted")); + reportError(new InterruptedException("PriorityMessageTransmit thread interrupted")); } catch (Exception e) { reportError(e); } @@ -796,17 +777,16 @@ public class SwitchHandler implements ISwitch { * Setup and start the transmit thread */ private void startTransmitThread() { - this.transmitQ = new PriorityBlockingQueue(11, - new Comparator() { - @Override - public int compare(PriorityMessage p1, PriorityMessage p2) { - if (p2.priority != p1.priority) { - return p2.priority - p1.priority; - } else { - return (p2.seqNum < p1.seqNum) ? 1 : -1; - } - } - }); + this.transmitQ = new PriorityBlockingQueue(11, new Comparator() { + @Override + public int compare(PriorityMessage p1, PriorityMessage p2) { + if (p2.priority != p1.priority) { + return p2.priority - p1.priority; + } else { + return (p2.seqNum < p1.seqNum) ? 1 : -1; + } + } + }); this.transmitThread = new Thread(new PriorityMessageTransmit()); this.transmitThread.start(); } @@ -832,9 +812,8 @@ public class SwitchHandler implements ISwitch { private IMessageReadWrite getMessageReadWriteService() throws Exception { String str = System.getProperty("secureChannelEnabled"); - return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService( - socket, selector) : new MessageReadWriteService(socket, - selector); + return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(socket, + selector) : new MessageReadWriteService(socket, selector); } /** @@ -910,7 +889,13 @@ public class SwitchHandler implements ISwitch { messageWaitingDone.put(xid, worker); Object result = null; Boolean status = false; - Future submit = executor.submit(worker); + Future submit; + try { + submit = executor.submit(worker); + } catch (RejectedExecutionException re) { + messageWaitingDone.remove(xid); + return result; + } try { result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS); messageWaitingDone.remove(xid); @@ -927,14 +912,12 @@ public class SwitchHandler implements ISwitch { // this message // the result if OFError already if (logger.isDebugEnabled()) { - logger.debug("Send {} failed --> {}", msg.getType(), - (result)); + logger.debug("Send {} failed --> {}", msg.getType(), (result)); } } return result; } catch (Exception e) { - logger.warn("Timeout while waiting for {} reply", msg.getType() - .toString()); + logger.warn("Timeout while waiting for {} reply", msg.getType().toString()); // convert the result into a Boolean with value false status = false; result = status; diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/DiscoveryService.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/DiscoveryService.java index bb303e3651..1938cb1ae6 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/DiscoveryService.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/DiscoveryService.java @@ -61,8 +61,6 @@ import org.opendaylight.controller.sal.utils.HexEncode; import org.opendaylight.controller.sal.utils.NetUtils; import org.opendaylight.controller.sal.utils.NodeConnectorCreator; import org.opendaylight.controller.sal.utils.NodeCreator; -import org.opendaylight.controller.sal.utils.Status; -import org.opendaylight.controller.sal.utils.StatusCode; /** * The class describes neighbor discovery service for an OpenFlow network. @@ -579,6 +577,14 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa readyListHi.add(nodeConnector); } + private void removeNodeConnector(NodeConnector nodeConnector) { + readyListLo.remove(nodeConnector); + readyListHi.remove(nodeConnector); + stagingList.remove(nodeConnector); + holdTime.remove(nodeConnector); + elapsedTime.remove(nodeConnector); + } + private Set getRemoveSet(Collection c, Node node) { Set removeSet = new HashSet(); if (c == null) { @@ -586,16 +592,7 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa } for (NodeConnector nodeConnector : c) { if (node.equals(nodeConnector.getNode())) { - Edge edge1 = edgeMap.get(nodeConnector); - if (edge1 != null) { - removeSet.add(nodeConnector); - - // check reverse direction - Edge edge2 = edgeMap.get(edge1.getTailNodeConnector()); - if ((edge2 != null) && node.equals(edge2.getTailNodeConnector().getNode())) { - removeSet.add(edge2.getHeadNodeConnector()); - } - } + removeSet.add(nodeConnector); } } return removeSet; @@ -604,6 +601,29 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa private void removeDiscovery(Node node) { Set removeSet; + removeSet = getRemoveSet(edgeMap.keySet(), node); + NodeConnector peerConnector; + Edge edge1, edge2; + for (NodeConnector nodeConnector : removeSet) { + // get the peer for fast removal of the edge in reverse direction + peerConnector = null; + edge1 = edgeMap.get(nodeConnector); + if (edge1 != null) { + edge2 = edgeMap.get(edge1.getTailNodeConnector()); + if ((edge2 != null) && node.equals(edge2.getTailNodeConnector().getNode())) { + peerConnector = edge2.getHeadNodeConnector(); + } + } + + removeEdge(nodeConnector, false); + removeEdge(peerConnector, isEnabled(peerConnector)); + } + + removeSet = getRemoveSet(prodMap.keySet(), node); + for (NodeConnector nodeConnector : removeSet) { + removeProdEdge(nodeConnector); + } + removeSet = getRemoveSet(readyListHi, node); readyListHi.removeAll(removeSet); @@ -618,22 +638,14 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa holdTime.remove(nodeConnector); } - removeSet = getRemoveSet(edgeMap.keySet(), node); + removeSet = getRemoveSet(elapsedTime.keySet(), node); for (NodeConnector nodeConnector : removeSet) { - removeEdge(nodeConnector, false); - } - - removeSet = getRemoveSet(prodMap.keySet(), node); - for (NodeConnector nodeConnector : removeSet) { - removeProdEdge(nodeConnector); + elapsedTime.remove(nodeConnector); } } private void removeDiscovery(NodeConnector nodeConnector) { - readyListHi.remove(nodeConnector); - readyListLo.remove(nodeConnector); - stagingList.remove(nodeConnector); - holdTime.remove(nodeConnector); + removeNodeConnector(nodeConnector); removeEdge(nodeConnector, false); removeProdEdge(nodeConnector); } @@ -672,7 +684,6 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa for (NodeConnector nodeConnector : retrySet) { // Allow one more retry - readyListLo.add(nodeConnector); elapsedTime.remove(nodeConnector); if (connectionOutService.isLocal(nodeConnector.getNode())) { transmitQ.add(nodeConnector); @@ -805,6 +816,11 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa } elapsedTime.remove(src); + // fast discovery of the edge in reverse direction + if (!edgeMap.containsKey(dst) && !readyListHi.contains(dst) && !elapsedTime.keySet().contains(dst)) { + moveToReadyListHi(dst); + } + // notify updateEdge(edge, UpdateType.ADDED, props); logger.trace("Add edge {}", edge); @@ -871,18 +887,15 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa * Remove OpenFlow edge */ private void removeEdge(NodeConnector nodeConnector, boolean stillEnabled) { - holdTime.remove(nodeConnector); - readyListLo.remove(nodeConnector); - readyListHi.remove(nodeConnector); + if (nodeConnector == null) { + return; + } + + removeNodeConnector(nodeConnector); if (stillEnabled) { // keep discovering - if (!stagingList.contains(nodeConnector)) { - stagingList.add(nodeConnector); - } - } else { - // stop it - stagingList.remove(nodeConnector); + stagingList.add(nodeConnector); } Edge edge = null; @@ -1240,7 +1253,15 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa if (val != null) { try { - int ticks = Integer.parseInt(val); + int ticks; + Set monitorSet = holdTime.keySet(); + if (monitorSet != null) { + for (NodeConnector nodeConnector : monitorSet) { + holdTime.put(nodeConnector, 0); + } + } + + ticks = Integer.parseInt(val); DiscoveryPeriod.INTERVAL.setTick(ticks); discoveryBatchRestartTicks = getDiscoveryInterval(); discoveryBatchPauseTicks = getDiscoveryPauseInterval(); diff --git a/opendaylight/switchmanager/implementation/src/main/java/org/opendaylight/controller/switchmanager/internal/SwitchManager.java b/opendaylight/switchmanager/implementation/src/main/java/org/opendaylight/controller/switchmanager/internal/SwitchManager.java index 78c78f5542..d3d41be19f 100644 --- a/opendaylight/switchmanager/implementation/src/main/java/org/opendaylight/controller/switchmanager/internal/SwitchManager.java +++ b/opendaylight/switchmanager/implementation/src/main/java/org/opendaylight/controller/switchmanager/internal/SwitchManager.java @@ -1042,6 +1042,7 @@ public class SwitchManager implements ISwitchManager, IConfigurationContainerAwa public void updateNodeConnector(NodeConnector nodeConnector, UpdateType type, Set props) { Map propMap = new HashMap(); + boolean update = true; log.debug("updateNodeConnector: {} type {} props {} for container {}", new Object[] { nodeConnector, type, props, containerName }); @@ -1052,7 +1053,6 @@ public class SwitchManager implements ISwitchManager, IConfigurationContainerAwa switch (type) { case ADDED: - case CHANGED: if (props != null) { for (Property prop : props) { addNodeConnectorProp(nodeConnector, prop); @@ -1064,17 +1064,33 @@ public class SwitchManager implements ISwitchManager, IConfigurationContainerAwa addSpanPort(nodeConnector); break; + case CHANGED: + if (!nodeConnectorProps.containsKey(nodeConnector) || (props == null)) { + update = false; + } else { + for (Property prop : props) { + addNodeConnectorProp(nodeConnector, prop); + propMap.put(prop.getName(), prop); + } + } + break; case REMOVED: + if (!nodeConnectorProps.containsKey(nodeConnector)) { + update = false; + } removeNodeConnectorAllProps(nodeConnector); // clean up span config removeSpanPort(nodeConnector); break; default: + update = false; break; } - notifyNodeConnector(nodeConnector, type, propMap); + if (update) { + notifyNodeConnector(nodeConnector, type, propMap); + } } @Override -- 2.36.6