2. When switch is going to disconnect, SwitchHandler can clean up some internal states to speed up the process in order to avoid unnecessary messages backing up in the queue.
3. In SwitchManager, don't send additional port down notifications after node down event is received.
4. Fixed an issue where links are not recovered when the switch is reconnected to the controller.
5. Fixed TLS exception@java.security.ProviderException.
Change-Id: I4ba190a59f8d046df999401572d3c6822b15429b
Signed-off-by: Jason Ye <yisye@cisco.com>
# entries, including switches' Certification Authority (CA) certificates. For example,
# secureChannelEnabled=true
# controllerKeyStore=./configuration/ctlKeyStore
-# controllerKeyStorePassword=xxxxx (this password should match the password used for KeyStore generation)
+# controllerKeyStorePassword=xxxxxxxx (this password should match the password used for KeyStore generation and at least 6 characters)
# controllerTrustStore=./configuration/ctlTrustStore
-# controllerTrustStorePassword=xxxxx (this password should match the password used for TrustStore generation)
+# controllerTrustStorePassword=xxxxxxxx (this password should match the password used for TrustStore generation and at least 6 characters)
secureChannelEnabled=false
controllerKeyStore=
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.Comparator;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.osgi.framework.console.CommandInterpreter;
import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitchStateListener;
import org.opendaylight.controller.sal.connection.ConnectionConstants;
import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
-import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.sal.utils.StatusCode;
private ControllerIO controllerIO;
private Thread switchEventThread;
private ConcurrentHashMap<Long, ISwitch> switches;
- private BlockingQueue<SwitchEvent> switchEvents;
+ private PriorityBlockingQueue<SwitchEvent> switchEvents;
// only 1 message listener per OFType
private ConcurrentMap<OFType, IMessageListener> messageListeners;
// only 1 switch state listener
private AtomicInteger switchInstanceNumber;
private int MAXQUEUESIZE = 50000;
+ private static enum SwitchEventPriority { LOW, NORMAL, HIGH }
+
/*
* this thread monitors the switchEvents queue for new incoming events from
* switch
public void init() {
logger.debug("Initializing!");
this.switches = new ConcurrentHashMap<Long, ISwitch>();
- this.switchEvents = new LinkedBlockingQueue<SwitchEvent>(MAXQUEUESIZE);
+ this.switchEvents = new PriorityBlockingQueue<SwitchEvent>(MAXQUEUESIZE, new Comparator<SwitchEvent>() {
+ @Override
+ public int compare(SwitchEvent p1, SwitchEvent p2) {
+ return p2.getPriority() - p1.getPriority();
+ }
+ });
this.messageListeners = new ConcurrentHashMap<OFType, IMessageListener>();
this.switchStateListener = null;
this.switchInstanceNumber = new AtomicInteger(0);
if (((SwitchHandler) sw).isOperational()) {
Long sid = sw.getId();
if (this.switches.remove(sid, sw)) {
- logger.warn("{} is Disconnected", sw);
+ logger.info("{} is removed", sw);
notifySwitchDeleted(sw);
}
}
}
private synchronized void addSwitchEvent(SwitchEvent event) {
- try {
- this.switchEvents.put(event);
- } catch (InterruptedException e) {
- logger.debug("SwitchEvent caught Interrupt Exception");
- }
+ this.switchEvents.put(event);
}
public void takeSwitchEventAdd(ISwitch sw) {
- SwitchEvent ev = new SwitchEvent(
- SwitchEvent.SwitchEventType.SWITCH_ADD, sw, null);
+ SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ADD, sw, null,
+ SwitchEventPriority.HIGH.ordinal());
addSwitchEvent(ev);
}
public void takeSwitchEventDelete(ISwitch sw) {
- SwitchEvent ev = new SwitchEvent(
- SwitchEvent.SwitchEventType.SWITCH_DELETE, sw, null);
+ SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_DELETE, sw, null,
+ SwitchEventPriority.HIGH.ordinal());
addSwitchEvent(ev);
}
public void takeSwitchEventError(ISwitch sw) {
- SwitchEvent ev = new SwitchEvent(
- SwitchEvent.SwitchEventType.SWITCH_ERROR, sw, null);
+ SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ERROR, sw, null,
+ SwitchEventPriority.NORMAL.ordinal());
addSwitchEvent(ev);
}
public void takeSwitchEventMsg(ISwitch sw, OFMessage msg) {
if (messageListeners.get(msg.getType()) != null) {
- SwitchEvent ev = new SwitchEvent(
- SwitchEvent.SwitchEventType.SWITCH_MESSAGE, sw, msg);
+ SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_MESSAGE, sw, msg,
+ SwitchEventPriority.LOW.ordinal());
addSwitchEvent(ev);
}
}
return this.switches.get(switchId);
}
+ public void _controllerShowQueueSize(CommandInterpreter ci) {
+ ci.print("switchEvents queue size: " + switchEvents.size() + "\n");
+ }
+
public void _controllerShowSwitches(CommandInterpreter ci) {
Set<Long> sids = switches.keySet();
StringBuffer s = new StringBuffer();
help.append("\t controllerShowSwitches\n");
help.append("\t controllerReset\n");
help.append("\t controllerShowConnConfig\n");
+ help.append("\t controllerShowQueueSize\n");
return help.toString();
}
* @throws Exception
*/
@Override
- public void asyncSend(OFMessage msg) throws IOException {
+ public void asyncSend(OFMessage msg) throws Exception {
synchronized (outBuffer) {
int msgLen = msg.getLengthU();
if (outBuffer.remaining() < msgLen) {
* @throws Exception
*/
@Override
- public void resumeSend() throws IOException {
+ public void resumeSend() throws Exception {
synchronized (outBuffer) {
if (!socket.isOpen()) {
return;
* @throws Exception
*/
@Override
- public List<OFMessage> readMessages() throws IOException {
+ public List<OFMessage> readMessages() throws Exception {
if (!socket.isOpen()) {
return null;
}
.getLogger(SecureMessageReadWriteService.class);
private Selector selector;
- private SelectionKey clientSelectionKey;
private SocketChannel socket;
private BasicFactory factory;
sslEngine = sslContext.createSSLEngine();
sslEngine.setUseClientMode(false);
sslEngine.setNeedClientAuth(true);
+ sslEngine.setEnabledCipherSuites(new String[] {
+ "SSL_RSA_WITH_RC4_128_MD5",
+ "SSL_RSA_WITH_RC4_128_SHA",
+ "TLS_RSA_WITH_AES_128_CBC_SHA",
+ "TLS_DHE_RSA_WITH_AES_128_CBC_SHA",
+ "TLS_DHE_DSS_WITH_AES_128_CBC_SHA",
+ "SSL_RSA_WITH_3DES_EDE_CBC_SHA",
+ "SSL_DHE_RSA_WITH_3DES_EDE_CBC_SHA",
+ "SSL_DHE_DSS_WITH_3DES_EDE_CBC_SHA",
+ "SSL_RSA_WITH_DES_CBC_SHA",
+ "SSL_DHE_RSA_WITH_DES_CBC_SHA",
+ "SSL_DHE_DSS_WITH_DES_CBC_SHA",
+ "SSL_RSA_EXPORT_WITH_RC4_40_MD5",
+ "SSL_RSA_EXPORT_WITH_DES40_CBC_SHA",
+ "SSL_DHE_RSA_EXPORT_WITH_DES40_CBC_SHA",
+ "SSL_DHE_DSS_EXPORT_WITH_DES40_CBC_SHA",
+ "TLS_EMPTY_RENEGOTIATION_INFO_SCSV"});
// Do initial handshake
doHandshake(socket, sslEngine);
- this.clientSelectionKey = this.socket.register(this.selector,
- SelectionKey.OP_READ);
+ this.socket.register(this.selector, SelectionKey.OP_READ);
}
/**
if (myAppData.hasRemaining()) {
myAppData.compact();
- this.clientSelectionKey = this.socket.register(this.selector,
- SelectionKey.OP_WRITE, this);
+ this.socket.register(this.selector, SelectionKey.OP_WRITE, this);
} else {
myAppData.clear();
- this.clientSelectionKey = this.socket.register(this.selector,
- SelectionKey.OP_READ, this);
+ this.socket.register(this.selector, SelectionKey.OP_READ, this);
}
logger.trace("Message sent: {}", msg);
if (myAppData.hasRemaining()) {
myAppData.compact();
- this.clientSelectionKey = this.socket.register(this.selector,
- SelectionKey.OP_WRITE, this);
+ this.socket.register(this.selector, SelectionKey.OP_WRITE, this);
} else {
myAppData.clear();
- this.clientSelectionKey = this.socket.register(this.selector,
- SelectionKey.OP_READ, this);
+ this.socket.register(this.selector, SelectionKey.OP_READ, this);
}
}
}
peerAppData.clear();
}
- this.clientSelectionKey = this.socket.register(this.selector,
- SelectionKey.OP_READ, this);
+ this.socket.register(this.selector, SelectionKey.OP_READ, this);
return msgs;
}
private SwitchEventType eventType;
private ISwitch sw;
private OFMessage msg;
+ private int priority;
- public SwitchEvent(SwitchEventType type, ISwitch sw, OFMessage msg) {
+ public SwitchEvent(SwitchEventType type, ISwitch sw, OFMessage msg, int priority) {
this.eventType = type;
this.sw = sw;
this.msg = msg;
+ this.priority = priority;
}
public SwitchEventType getEventType() {
return this.msg;
}
+ public int getPriority() {
+ return priority;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
@Override
public String toString() {
String s;
package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
-import java.io.IOException;
-import java.net.SocketException;
-import java.nio.channels.AsynchronousCloseException;
-import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.LoggerFactory;
public class SwitchHandler implements ISwitch {
- private static final Logger logger = LoggerFactory
- .getLogger(SwitchHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(SwitchHandler.class);
private static final int SWITCH_LIVENESS_TIMER = 5000;
private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
private final int MESSAGE_RESPONSE_TIMER = 2000;
private Thread transmitThread;
private enum SwitchState {
- NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
- 3);
+ NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(3);
private int value;
try {
responseTimerValue = Integer.decode(rTimer);
} catch (NumberFormatException e) {
- logger.warn(
- "Invalid of.messageResponseTimer: {} use default({})",
- rTimer, MESSAGE_RESPONSE_TIMER);
+ logger.warn("Invalid of.messageResponseTimer: {} use default({})", rTimer, MESSAGE_RESPONSE_TIMER);
}
}
}
try {
// wait for an incoming connection
selector.select(0);
- Iterator<SelectionKey> selectedKeys = selector
- .selectedKeys().iterator();
+ Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey skey = selectedKeys.next();
selectedKeys.remove();
switchHandlerThread.start();
}
- public void stop() {
+ private void stopInternal() {
+ logger.debug("{} receives stop signal",
+ (isOperational() ? HexString.toHexString(sid) : "unknown"));
running = false;
cancelSwitchTimer();
try {
msgReadWriteService.stop();
} catch (Exception e) {
}
- executor.shutdown();
+ logger.debug("executor shutdown now");
+ executor.shutdownNow();
msgReadWriteService = null;
+ }
+
+ public void stop() {
+ stopInternal();
if (switchHandlerThread != null) {
switchHandlerThread.interrupt();
*/
private void asyncSendNow(OFMessage msg) {
if (msgReadWriteService == null) {
- logger.warn(
- "asyncSendNow: {} is not sent because Message ReadWrite Service is not available.",
- msg);
+ logger.warn("asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", msg);
return;
}
}
if (msgs == null) {
- logger.debug("{} is down", this);
- // the connection is down, inform core
+ logger.info("{} is down", this);
reportSwitchStateChange(false);
return;
}
switch (type) {
case HELLO:
// send feature request
- OFMessage featureRequest = factory
- .getMessage(OFType.FEATURES_REQUEST);
+ OFMessage featureRequest = factory.getMessage(OFType.FEATURES_REQUEST);
asyncFastSend(featureRequest);
// delete all pre-existing flows
OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
- OFFlowMod flowMod = (OFFlowMod) factory
- .getMessage(OFType.FLOW_MOD);
- flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
- .setOutPort(OFPort.OFPP_NONE)
+ OFFlowMod flowMod = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
+ flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE).setOutPort(OFPort.OFPP_NONE)
.setLength((short) OFFlowMod.MINIMUM_LENGTH);
asyncFastSend(flowMod);
this.state = SwitchState.WAIT_FEATURES_REPLY;
startSwitchTimer();
break;
case ECHO_REQUEST:
- OFEchoReply echoReply = (OFEchoReply) factory
- .getMessage(OFType.ECHO_REPLY);
+ OFEchoReply echoReply = (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
// respond immediately
asyncSendNow(echoReply, msg.getXid());
break;
updatePhysicalPort(port);
} else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
updatePhysicalPort(port);
- } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
- .ordinal()) {
+ } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE.ordinal()) {
deletePhysicalPort(port);
}
if (probeSent) {
// switch failed to respond to our probe, consider
// it down
- logger.warn("{} is idle for too long, disconnect",
- toString());
+ logger.warn("{} sid {} is idle for too long, disconnect", socket.socket()
+ .getRemoteSocketAddress().toString().split("/")[1], (sid == 0) ? "unknown"
+ : HexString.toHexString(sid));
reportSwitchStateChange(false);
} else {
// send a probe to see if the switch is still alive
- logger.debug(
- "Send idle probe (Echo Request) to {}",
- this);
+ logger.debug("Send idle probe (Echo Request) to {}", this);
probeSent = true;
- OFMessage echo = factory
- .getMessage(OFType.ECHO_REQUEST);
+ OFMessage echo = factory.getMessage(OFType.ECHO_REQUEST);
asyncFastSend(echo);
}
} else {
if (state == SwitchState.WAIT_FEATURES_REPLY) {
// send another features request
- OFMessage request = factory
- .getMessage(OFType.FEATURES_REQUEST);
+ OFMessage request = factory.getMessage(OFType.FEATURES_REQUEST);
asyncFastSend(request);
} else {
if (state == SwitchState.WAIT_CONFIG_REPLY) {
// send another config request
- OFSetConfig config = (OFSetConfig) factory
- .getMessage(OFType.SET_CONFIG);
- config.setMissSendLength((short) 0xffff)
- .setLengthU(OFSetConfig.MINIMUM_LENGTH);
+ OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
+ config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
asyncFastSend(config);
- OFMessage getConfig = factory
- .getMessage(OFType.GET_CONFIG_REQUEST);
+ OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
asyncFastSend(getConfig);
}
}
}
private void reportError(Exception e) {
- if (e instanceof AsynchronousCloseException
- || e instanceof InterruptedException
- || e instanceof SocketException || e instanceof IOException
- || e instanceof ClosedSelectorException) {
- if (logger.isDebugEnabled()) {
- logger.debug("Caught exception {}", e.getMessage());
- }
- } else {
- logger.warn("Caught exception ", e);
+ if (!running) {
+ logger.debug("Caught exception {} while switch {} is shutting down. Skip", e.getMessage(),
+ (isOperational() ? HexString.toHexString(sid) : "unknown"));
+ return;
}
+ logger.debug("Caught exception: ", e);
+
// notify core of this error event and disconnect the switch
((Controller) core).takeSwitchEventError(this);
+
+ // clean up some internal states immediately
+ stopInternal();
}
private void reportSwitchStateChange(boolean added) {
updatePhysicalPort(port);
}
// config the switch to send full data packet
- OFSetConfig config = (OFSetConfig) factory
- .getMessage(OFType.SET_CONFIG);
- config.setMissSendLength((short) 0xffff).setLengthU(
- OFSetConfig.MINIMUM_LENGTH);
+ OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
+ config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
asyncFastSend(config);
// send config request to make sure the switch can handle the set
// config
portBandwidth
.put(portNumber,
port.getCurrentFeatures()
- & (OFPortFeatures.OFPPF_10MB_FD.getValue()
- | OFPortFeatures.OFPPF_10MB_HD
- .getValue()
- | OFPortFeatures.OFPPF_100MB_FD
- .getValue()
- | OFPortFeatures.OFPPF_100MB_HD
- .getValue()
- | OFPortFeatures.OFPPF_1GB_FD
- .getValue()
- | OFPortFeatures.OFPPF_1GB_HD
- .getValue() | OFPortFeatures.OFPPF_10GB_FD
+ & (OFPortFeatures.OFPPF_10MB_FD.getValue() | OFPortFeatures.OFPPF_10MB_HD.getValue()
+ | OFPortFeatures.OFPPF_100MB_FD.getValue()
+ | OFPortFeatures.OFPPF_100MB_HD.getValue()
+ | OFPortFeatures.OFPPF_1GB_FD.getValue()
+ | OFPortFeatures.OFPPF_1GB_HD.getValue() | OFPortFeatures.OFPPF_10GB_FD
.getValue()));
}
@Override
public String toString() {
try {
- return ("Switch:"
- + socket.socket().getRemoteSocketAddress().toString().split("/")[1]
- + " SWID:" + (isOperational() ? HexString
+ return ("Switch:" + socket.socket().getRemoteSocketAddress().toString().split("/")[1] + " SWID:" + (isOperational() ? HexString
.toHexString(this.sid) : "unknown"));
} catch (Exception e) {
- return (isOperational() ? HexString.toHexString(this.sid)
- : "unknown");
+ return (isOperational() ? HexString.toHexString(this.sid) : "unknown");
}
}
int xid = getNextXid();
StatisticsCollector worker = new StatisticsCollector(this, xid, req);
messageWaitingDone.put(xid, worker);
- Future<Object> submit = executor.submit(worker);
+ Future<Object> submit;
Object result = null;
+ try {
+ submit = executor.submit(worker);
+ } catch (RejectedExecutionException re) {
+ messageWaitingDone.remove(xid);
+ return result;
+ }
try {
result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
return result;
} catch (Exception e) {
- logger.warn("Timeout while waiting for {} replies", req.getType());
+ logger.warn("Timeout while waiting for {} replies from {}",
+ req.getType(), (isOperational() ? HexString.toHexString(sid) : "unknown"));
result = null; // to indicate timeout has occurred
worker.wakeup();
return result;
@Override
public Object syncSend(OFMessage msg) {
+ if (!running) {
+ logger.debug("Switch is going down, ignore syncSend");
+ return null;
+ }
int xid = getNextXid();
return syncSend(msg, xid);
}
*/
private void processBarrierReply(OFBarrierReply msg) {
Integer xid = msg.getXid();
- SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
- .remove(xid);
+ SynchronousMessage worker = (SynchronousMessage) messageWaitingDone.remove(xid);
if (worker == null) {
return;
}
private void processStatsReply(OFStatisticsReply reply) {
Integer xid = reply.getXid();
- StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
- .get(xid);
+ StatisticsCollector worker = (StatisticsCollector) messageWaitingDone.get(xid);
if (worker == null) {
return;
}
if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
return false;
}
- if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
- .getValue()) {
+ if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK.getValue()) {
return false;
}
return true;
syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
}
} catch (InterruptedException ie) {
- reportError(new InterruptedException(
- "PriorityMessageTransmit thread interrupted"));
+ reportError(new InterruptedException("PriorityMessageTransmit thread interrupted"));
} catch (Exception e) {
reportError(e);
}
* Setup and start the transmit thread
*/
private void startTransmitThread() {
- this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
- new Comparator<PriorityMessage>() {
- @Override
- public int compare(PriorityMessage p1, PriorityMessage p2) {
- if (p2.priority != p1.priority) {
- return p2.priority - p1.priority;
- } else {
- return (p2.seqNum < p1.seqNum) ? 1 : -1;
- }
- }
- });
+ this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, new Comparator<PriorityMessage>() {
+ @Override
+ public int compare(PriorityMessage p1, PriorityMessage p2) {
+ if (p2.priority != p1.priority) {
+ return p2.priority - p1.priority;
+ } else {
+ return (p2.seqNum < p1.seqNum) ? 1 : -1;
+ }
+ }
+ });
this.transmitThread = new Thread(new PriorityMessageTransmit());
this.transmitThread.start();
}
private IMessageReadWrite getMessageReadWriteService() throws Exception {
String str = System.getProperty("secureChannelEnabled");
- return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(
- socket, selector) : new MessageReadWriteService(socket,
- selector);
+ return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(socket,
+ selector) : new MessageReadWriteService(socket, selector);
}
/**
messageWaitingDone.put(xid, worker);
Object result = null;
Boolean status = false;
- Future<Object> submit = executor.submit(worker);
+ Future<Object> submit;
+ try {
+ submit = executor.submit(worker);
+ } catch (RejectedExecutionException re) {
+ messageWaitingDone.remove(xid);
+ return result;
+ }
try {
result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
messageWaitingDone.remove(xid);
// this message
// the result if OFError already
if (logger.isDebugEnabled()) {
- logger.debug("Send {} failed --> {}", msg.getType(),
- (result));
+ logger.debug("Send {} failed --> {}", msg.getType(), (result));
}
}
return result;
} catch (Exception e) {
- logger.warn("Timeout while waiting for {} reply", msg.getType()
- .toString());
+ logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
// convert the result into a Boolean with value false
status = false;
result = status;
import org.opendaylight.controller.sal.utils.NetUtils;
import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
import org.opendaylight.controller.sal.utils.NodeCreator;
-import org.opendaylight.controller.sal.utils.Status;
-import org.opendaylight.controller.sal.utils.StatusCode;
/**
* The class describes neighbor discovery service for an OpenFlow network.
readyListHi.add(nodeConnector);
}
+ private void removeNodeConnector(NodeConnector nodeConnector) {
+ readyListLo.remove(nodeConnector);
+ readyListHi.remove(nodeConnector);
+ stagingList.remove(nodeConnector);
+ holdTime.remove(nodeConnector);
+ elapsedTime.remove(nodeConnector);
+ }
+
private Set<NodeConnector> getRemoveSet(Collection<NodeConnector> c, Node node) {
Set<NodeConnector> removeSet = new HashSet<NodeConnector>();
if (c == null) {
}
for (NodeConnector nodeConnector : c) {
if (node.equals(nodeConnector.getNode())) {
- Edge edge1 = edgeMap.get(nodeConnector);
- if (edge1 != null) {
- removeSet.add(nodeConnector);
-
- // check reverse direction
- Edge edge2 = edgeMap.get(edge1.getTailNodeConnector());
- if ((edge2 != null) && node.equals(edge2.getTailNodeConnector().getNode())) {
- removeSet.add(edge2.getHeadNodeConnector());
- }
- }
+ removeSet.add(nodeConnector);
}
}
return removeSet;
private void removeDiscovery(Node node) {
Set<NodeConnector> removeSet;
+ removeSet = getRemoveSet(edgeMap.keySet(), node);
+ NodeConnector peerConnector;
+ Edge edge1, edge2;
+ for (NodeConnector nodeConnector : removeSet) {
+ // get the peer for fast removal of the edge in reverse direction
+ peerConnector = null;
+ edge1 = edgeMap.get(nodeConnector);
+ if (edge1 != null) {
+ edge2 = edgeMap.get(edge1.getTailNodeConnector());
+ if ((edge2 != null) && node.equals(edge2.getTailNodeConnector().getNode())) {
+ peerConnector = edge2.getHeadNodeConnector();
+ }
+ }
+
+ removeEdge(nodeConnector, false);
+ removeEdge(peerConnector, isEnabled(peerConnector));
+ }
+
+ removeSet = getRemoveSet(prodMap.keySet(), node);
+ for (NodeConnector nodeConnector : removeSet) {
+ removeProdEdge(nodeConnector);
+ }
+
removeSet = getRemoveSet(readyListHi, node);
readyListHi.removeAll(removeSet);
holdTime.remove(nodeConnector);
}
- removeSet = getRemoveSet(edgeMap.keySet(), node);
+ removeSet = getRemoveSet(elapsedTime.keySet(), node);
for (NodeConnector nodeConnector : removeSet) {
- removeEdge(nodeConnector, false);
- }
-
- removeSet = getRemoveSet(prodMap.keySet(), node);
- for (NodeConnector nodeConnector : removeSet) {
- removeProdEdge(nodeConnector);
+ elapsedTime.remove(nodeConnector);
}
}
private void removeDiscovery(NodeConnector nodeConnector) {
- readyListHi.remove(nodeConnector);
- readyListLo.remove(nodeConnector);
- stagingList.remove(nodeConnector);
- holdTime.remove(nodeConnector);
+ removeNodeConnector(nodeConnector);
removeEdge(nodeConnector, false);
removeProdEdge(nodeConnector);
}
for (NodeConnector nodeConnector : retrySet) {
// Allow one more retry
- readyListLo.add(nodeConnector);
elapsedTime.remove(nodeConnector);
if (connectionOutService.isLocal(nodeConnector.getNode())) {
transmitQ.add(nodeConnector);
}
elapsedTime.remove(src);
+ // fast discovery of the edge in reverse direction
+ if (!edgeMap.containsKey(dst) && !readyListHi.contains(dst) && !elapsedTime.keySet().contains(dst)) {
+ moveToReadyListHi(dst);
+ }
+
// notify
updateEdge(edge, UpdateType.ADDED, props);
logger.trace("Add edge {}", edge);
* Remove OpenFlow edge
*/
private void removeEdge(NodeConnector nodeConnector, boolean stillEnabled) {
- holdTime.remove(nodeConnector);
- readyListLo.remove(nodeConnector);
- readyListHi.remove(nodeConnector);
+ if (nodeConnector == null) {
+ return;
+ }
+
+ removeNodeConnector(nodeConnector);
if (stillEnabled) {
// keep discovering
- if (!stagingList.contains(nodeConnector)) {
- stagingList.add(nodeConnector);
- }
- } else {
- // stop it
- stagingList.remove(nodeConnector);
+ stagingList.add(nodeConnector);
}
Edge edge = null;
if (val != null) {
try {
- int ticks = Integer.parseInt(val);
+ int ticks;
+ Set<NodeConnector> monitorSet = holdTime.keySet();
+ if (monitorSet != null) {
+ for (NodeConnector nodeConnector : monitorSet) {
+ holdTime.put(nodeConnector, 0);
+ }
+ }
+
+ ticks = Integer.parseInt(val);
DiscoveryPeriod.INTERVAL.setTick(ticks);
discoveryBatchRestartTicks = getDiscoveryInterval();
discoveryBatchPauseTicks = getDiscoveryPauseInterval();
public void updateNodeConnector(NodeConnector nodeConnector,
UpdateType type, Set<Property> props) {
Map<String, Property> propMap = new HashMap<String, Property>();
+ boolean update = true;
log.debug("updateNodeConnector: {} type {} props {} for container {}",
new Object[] { nodeConnector, type, props, containerName });
switch (type) {
case ADDED:
- case CHANGED:
if (props != null) {
for (Property prop : props) {
addNodeConnectorProp(nodeConnector, prop);
addSpanPort(nodeConnector);
break;
+ case CHANGED:
+ if (!nodeConnectorProps.containsKey(nodeConnector) || (props == null)) {
+ update = false;
+ } else {
+ for (Property prop : props) {
+ addNodeConnectorProp(nodeConnector, prop);
+ propMap.put(prop.getName(), prop);
+ }
+ }
+ break;
case REMOVED:
+ if (!nodeConnectorProps.containsKey(nodeConnector)) {
+ update = false;
+ }
removeNodeConnectorAllProps(nodeConnector);
// clean up span config
removeSpanPort(nodeConnector);
break;
default:
+ update = false;
break;
}
- notifyNodeConnector(nodeConnector, type, propMap);
+ if (update) {
+ notifyNodeConnector(nodeConnector, type, propMap);
+ }
}
@Override