# entries, including switches' Certification Authority (CA) certificates. For example,
# secureChannelEnabled=true
# controllerKeyStore=./configuration/ctlKeyStore
-# controllerKeyStorePassword=xxxxx (this password should match the password used for KeyStore generation)
+# controllerKeyStorePassword=xxxxxxxx (this password should match the password used for KeyStore generation and at least 6 characters)
# controllerTrustStore=./configuration/ctlTrustStore
-# controllerTrustStorePassword=xxxxx (this password should match the password used for TrustStore generation)
+# controllerTrustStorePassword=xxxxxxxx (this password should match the password used for TrustStore generation and at least 6 characters)
secureChannelEnabled=false
controllerKeyStore=
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.Comparator;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.osgi.framework.console.CommandInterpreter;
import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitchStateListener;
import org.opendaylight.controller.sal.connection.ConnectionConstants;
import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
-import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.sal.utils.StatusCode;
private ControllerIO controllerIO;
private Thread switchEventThread;
private ConcurrentHashMap<Long, ISwitch> switches;
- private BlockingQueue<SwitchEvent> switchEvents;
+ private PriorityBlockingQueue<SwitchEvent> switchEvents;
// only 1 message listener per OFType
private ConcurrentMap<OFType, IMessageListener> messageListeners;
// only 1 switch state listener
private AtomicInteger switchInstanceNumber;
private int MAXQUEUESIZE = 50000;
+ private static enum SwitchEventPriority { LOW, NORMAL, HIGH }
+
/*
* this thread monitors the switchEvents queue for new incoming events from
* switch
public void init() {
logger.debug("Initializing!");
this.switches = new ConcurrentHashMap<Long, ISwitch>();
- this.switchEvents = new LinkedBlockingQueue<SwitchEvent>(MAXQUEUESIZE);
+ this.switchEvents = new PriorityBlockingQueue<SwitchEvent>(MAXQUEUESIZE, new Comparator<SwitchEvent>() {
+ @Override
+ public int compare(SwitchEvent p1, SwitchEvent p2) {
+ return p2.getPriority() - p1.getPriority();
+ }
+ });
this.messageListeners = new ConcurrentHashMap<OFType, IMessageListener>();
this.switchStateListener = null;
this.switchInstanceNumber = new AtomicInteger(0);
if (((SwitchHandler) sw).isOperational()) {
Long sid = sw.getId();
if (this.switches.remove(sid, sw)) {
- logger.warn("{} is Disconnected", sw);
+ logger.info("{} is removed", sw);
notifySwitchDeleted(sw);
}
}
}
private synchronized void addSwitchEvent(SwitchEvent event) {
- try {
- this.switchEvents.put(event);
- } catch (InterruptedException e) {
- logger.debug("SwitchEvent caught Interrupt Exception");
- }
+ this.switchEvents.put(event);
}
public void takeSwitchEventAdd(ISwitch sw) {
- SwitchEvent ev = new SwitchEvent(
- SwitchEvent.SwitchEventType.SWITCH_ADD, sw, null);
+ SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ADD, sw, null,
+ SwitchEventPriority.HIGH.ordinal());
addSwitchEvent(ev);
}
public void takeSwitchEventDelete(ISwitch sw) {
- SwitchEvent ev = new SwitchEvent(
- SwitchEvent.SwitchEventType.SWITCH_DELETE, sw, null);
+ SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_DELETE, sw, null,
+ SwitchEventPriority.HIGH.ordinal());
addSwitchEvent(ev);
}
public void takeSwitchEventError(ISwitch sw) {
- SwitchEvent ev = new SwitchEvent(
- SwitchEvent.SwitchEventType.SWITCH_ERROR, sw, null);
+ SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ERROR, sw, null,
+ SwitchEventPriority.NORMAL.ordinal());
addSwitchEvent(ev);
}
public void takeSwitchEventMsg(ISwitch sw, OFMessage msg) {
if (messageListeners.get(msg.getType()) != null) {
- SwitchEvent ev = new SwitchEvent(
- SwitchEvent.SwitchEventType.SWITCH_MESSAGE, sw, msg);
+ SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_MESSAGE, sw, msg,
+ SwitchEventPriority.LOW.ordinal());
addSwitchEvent(ev);
}
}
return this.switches.get(switchId);
}
+ public void _controllerShowQueueSize(CommandInterpreter ci) {
+ ci.print("switchEvents queue size: " + switchEvents.size() + "\n");
+ }
+
public void _controllerShowSwitches(CommandInterpreter ci) {
Set<Long> sids = switches.keySet();
StringBuffer s = new StringBuffer();
help.append("\t controllerShowSwitches\n");
help.append("\t controllerReset\n");
help.append("\t controllerShowConnConfig\n");
+ help.append("\t controllerShowQueueSize\n");
return help.toString();
}
* @throws Exception
*/
@Override
- public void asyncSend(OFMessage msg) throws IOException {
+ public void asyncSend(OFMessage msg) throws Exception {
synchronized (outBuffer) {
int msgLen = msg.getLengthU();
if (outBuffer.remaining() < msgLen) {
* @throws Exception
*/
@Override
- public void resumeSend() throws IOException {
+ public void resumeSend() throws Exception {
synchronized (outBuffer) {
if (!socket.isOpen()) {
return;
* @throws Exception
*/
@Override
- public List<OFMessage> readMessages() throws IOException {
+ public List<OFMessage> readMessages() throws Exception {
if (!socket.isOpen()) {
return null;
}
.getLogger(SecureMessageReadWriteService.class);
private Selector selector;
- private SelectionKey clientSelectionKey;
private SocketChannel socket;
private BasicFactory factory;
sslEngine = sslContext.createSSLEngine();
sslEngine.setUseClientMode(false);
sslEngine.setNeedClientAuth(true);
+ sslEngine.setEnabledCipherSuites(new String[] {
+ "SSL_RSA_WITH_RC4_128_MD5",
+ "SSL_RSA_WITH_RC4_128_SHA",
+ "TLS_RSA_WITH_AES_128_CBC_SHA",
+ "TLS_DHE_RSA_WITH_AES_128_CBC_SHA",
+ "TLS_DHE_DSS_WITH_AES_128_CBC_SHA",
+ "SSL_RSA_WITH_3DES_EDE_CBC_SHA",
+ "SSL_DHE_RSA_WITH_3DES_EDE_CBC_SHA",
+ "SSL_DHE_DSS_WITH_3DES_EDE_CBC_SHA",
+ "SSL_RSA_WITH_DES_CBC_SHA",
+ "SSL_DHE_RSA_WITH_DES_CBC_SHA",
+ "SSL_DHE_DSS_WITH_DES_CBC_SHA",
+ "SSL_RSA_EXPORT_WITH_RC4_40_MD5",
+ "SSL_RSA_EXPORT_WITH_DES40_CBC_SHA",
+ "SSL_DHE_RSA_EXPORT_WITH_DES40_CBC_SHA",
+ "SSL_DHE_DSS_EXPORT_WITH_DES40_CBC_SHA",
+ "TLS_EMPTY_RENEGOTIATION_INFO_SCSV"});
// Do initial handshake
doHandshake(socket, sslEngine);
- this.clientSelectionKey = this.socket.register(this.selector,
- SelectionKey.OP_READ);
+ this.socket.register(this.selector, SelectionKey.OP_READ);
}
/**
if (myAppData.hasRemaining()) {
myAppData.compact();
- this.clientSelectionKey = this.socket.register(this.selector,
- SelectionKey.OP_WRITE, this);
+ this.socket.register(this.selector, SelectionKey.OP_WRITE, this);
} else {
myAppData.clear();
- this.clientSelectionKey = this.socket.register(this.selector,
- SelectionKey.OP_READ, this);
+ this.socket.register(this.selector, SelectionKey.OP_READ, this);
}
logger.trace("Message sent: {}", msg);
if (myAppData.hasRemaining()) {
myAppData.compact();
- this.clientSelectionKey = this.socket.register(this.selector,
- SelectionKey.OP_WRITE, this);
+ this.socket.register(this.selector, SelectionKey.OP_WRITE, this);
} else {
myAppData.clear();
- this.clientSelectionKey = this.socket.register(this.selector,
- SelectionKey.OP_READ, this);
+ this.socket.register(this.selector, SelectionKey.OP_READ, this);
}
}
}
peerAppData.clear();
}
- this.clientSelectionKey = this.socket.register(this.selector,
- SelectionKey.OP_READ, this);
+ this.socket.register(this.selector, SelectionKey.OP_READ, this);
return msgs;
}
private SwitchEventType eventType;
private ISwitch sw;
private OFMessage msg;
+ private int priority;
- public SwitchEvent(SwitchEventType type, ISwitch sw, OFMessage msg) {
+ public SwitchEvent(SwitchEventType type, ISwitch sw, OFMessage msg, int priority) {
this.eventType = type;
this.sw = sw;
this.msg = msg;
+ this.priority = priority;
}
public SwitchEventType getEventType() {
return this.msg;
}
+ public int getPriority() {
+ return priority;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
@Override
public String toString() {
String s;
package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
-import java.io.IOException;
-import java.net.SocketException;
-import java.nio.channels.AsynchronousCloseException;
-import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.LoggerFactory;
public class SwitchHandler implements ISwitch {
- private static final Logger logger = LoggerFactory
- .getLogger(SwitchHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(SwitchHandler.class);
private static final int SWITCH_LIVENESS_TIMER = 5000;
private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
private final int MESSAGE_RESPONSE_TIMER = 2000;
private Thread transmitThread;
private enum SwitchState {
- NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
- 3);
+ NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(3);
private int value;
try {
responseTimerValue = Integer.decode(rTimer);
} catch (NumberFormatException e) {
- logger.warn(
- "Invalid of.messageResponseTimer: {} use default({})",
- rTimer, MESSAGE_RESPONSE_TIMER);
+ logger.warn("Invalid of.messageResponseTimer: {} use default({})", rTimer, MESSAGE_RESPONSE_TIMER);
}
}
}
try {
// wait for an incoming connection
selector.select(0);
- Iterator<SelectionKey> selectedKeys = selector
- .selectedKeys().iterator();
+ Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey skey = selectedKeys.next();
selectedKeys.remove();
switchHandlerThread.start();
}
- public void stop() {
+ private void stopInternal() {
+ logger.debug("{} receives stop signal",
+ (isOperational() ? HexString.toHexString(sid) : "unknown"));
running = false;
cancelSwitchTimer();
try {
msgReadWriteService.stop();
} catch (Exception e) {
}
- executor.shutdown();
+ logger.debug("executor shutdown now");
+ executor.shutdownNow();
msgReadWriteService = null;
+ }
+
+ public void stop() {
+ stopInternal();
if (switchHandlerThread != null) {
switchHandlerThread.interrupt();
*/
private void asyncSendNow(OFMessage msg) {
if (msgReadWriteService == null) {
- logger.warn(
- "asyncSendNow: {} is not sent because Message ReadWrite Service is not available.",
- msg);
+ logger.warn("asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", msg);
return;
}
}
if (msgs == null) {
- logger.debug("{} is down", this);
- // the connection is down, inform core
+ logger.info("{} is down", this);
reportSwitchStateChange(false);
return;
}
switch (type) {
case HELLO:
// send feature request
- OFMessage featureRequest = factory
- .getMessage(OFType.FEATURES_REQUEST);
+ OFMessage featureRequest = factory.getMessage(OFType.FEATURES_REQUEST);
asyncFastSend(featureRequest);
// delete all pre-existing flows
OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
- OFFlowMod flowMod = (OFFlowMod) factory
- .getMessage(OFType.FLOW_MOD);
- flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
- .setOutPort(OFPort.OFPP_NONE)
+ OFFlowMod flowMod = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
+ flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE).setOutPort(OFPort.OFPP_NONE)
.setLength((short) OFFlowMod.MINIMUM_LENGTH);
asyncFastSend(flowMod);
this.state = SwitchState.WAIT_FEATURES_REPLY;
startSwitchTimer();
break;
case ECHO_REQUEST:
- OFEchoReply echoReply = (OFEchoReply) factory
- .getMessage(OFType.ECHO_REPLY);
+ OFEchoReply echoReply = (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
// respond immediately
asyncSendNow(echoReply, msg.getXid());
break;
updatePhysicalPort(port);
} else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
updatePhysicalPort(port);
- } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
- .ordinal()) {
+ } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE.ordinal()) {
deletePhysicalPort(port);
}
if (probeSent) {
// switch failed to respond to our probe, consider
// it down
- logger.warn("{} is idle for too long, disconnect",
- toString());
+ logger.warn("{} sid {} is idle for too long, disconnect", socket.socket()
+ .getRemoteSocketAddress().toString().split("/")[1], (sid == 0) ? "unknown"
+ : HexString.toHexString(sid));
reportSwitchStateChange(false);
} else {
// send a probe to see if the switch is still alive
- logger.debug(
- "Send idle probe (Echo Request) to {}",
- this);
+ logger.debug("Send idle probe (Echo Request) to {}", this);
probeSent = true;
- OFMessage echo = factory
- .getMessage(OFType.ECHO_REQUEST);
+ OFMessage echo = factory.getMessage(OFType.ECHO_REQUEST);
asyncFastSend(echo);
}
} else {
if (state == SwitchState.WAIT_FEATURES_REPLY) {
// send another features request
- OFMessage request = factory
- .getMessage(OFType.FEATURES_REQUEST);
+ OFMessage request = factory.getMessage(OFType.FEATURES_REQUEST);
asyncFastSend(request);
} else {
if (state == SwitchState.WAIT_CONFIG_REPLY) {
// send another config request
- OFSetConfig config = (OFSetConfig) factory
- .getMessage(OFType.SET_CONFIG);
- config.setMissSendLength((short) 0xffff)
- .setLengthU(OFSetConfig.MINIMUM_LENGTH);
+ OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
+ config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
asyncFastSend(config);
- OFMessage getConfig = factory
- .getMessage(OFType.GET_CONFIG_REQUEST);
+ OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
asyncFastSend(getConfig);
}
}
}
private void reportError(Exception e) {
- if (e instanceof AsynchronousCloseException
- || e instanceof InterruptedException
- || e instanceof SocketException || e instanceof IOException
- || e instanceof ClosedSelectorException) {
- if (logger.isDebugEnabled()) {
- logger.debug("Caught exception {}", e.getMessage());
- }
- } else {
- logger.warn("Caught exception ", e);
+ if (!running) {
+ logger.debug("Caught exception {} while switch {} is shutting down. Skip", e.getMessage(),
+ (isOperational() ? HexString.toHexString(sid) : "unknown"));
+ return;
}
+ logger.debug("Caught exception: ", e);
+
// notify core of this error event and disconnect the switch
((Controller) core).takeSwitchEventError(this);
+
+ // clean up some internal states immediately
+ stopInternal();
}
private void reportSwitchStateChange(boolean added) {
updatePhysicalPort(port);
}
// config the switch to send full data packet
- OFSetConfig config = (OFSetConfig) factory
- .getMessage(OFType.SET_CONFIG);
- config.setMissSendLength((short) 0xffff).setLengthU(
- OFSetConfig.MINIMUM_LENGTH);
+ OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
+ config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
asyncFastSend(config);
// send config request to make sure the switch can handle the set
// config
portBandwidth
.put(portNumber,
port.getCurrentFeatures()
- & (OFPortFeatures.OFPPF_10MB_FD.getValue()
- | OFPortFeatures.OFPPF_10MB_HD
- .getValue()
- | OFPortFeatures.OFPPF_100MB_FD
- .getValue()
- | OFPortFeatures.OFPPF_100MB_HD
- .getValue()
- | OFPortFeatures.OFPPF_1GB_FD
- .getValue()
- | OFPortFeatures.OFPPF_1GB_HD
- .getValue() | OFPortFeatures.OFPPF_10GB_FD
+ & (OFPortFeatures.OFPPF_10MB_FD.getValue() | OFPortFeatures.OFPPF_10MB_HD.getValue()
+ | OFPortFeatures.OFPPF_100MB_FD.getValue()
+ | OFPortFeatures.OFPPF_100MB_HD.getValue()
+ | OFPortFeatures.OFPPF_1GB_FD.getValue()
+ | OFPortFeatures.OFPPF_1GB_HD.getValue() | OFPortFeatures.OFPPF_10GB_FD
.getValue()));
}
@Override
public String toString() {
try {
- return ("Switch:"
- + socket.socket().getRemoteSocketAddress().toString().split("/")[1]
- + " SWID:" + (isOperational() ? HexString
+ return ("Switch:" + socket.socket().getRemoteSocketAddress().toString().split("/")[1] + " SWID:" + (isOperational() ? HexString
.toHexString(this.sid) : "unknown"));
} catch (Exception e) {
- return (isOperational() ? HexString.toHexString(this.sid)
- : "unknown");
+ return (isOperational() ? HexString.toHexString(this.sid) : "unknown");
}
}
int xid = getNextXid();
StatisticsCollector worker = new StatisticsCollector(this, xid, req);
messageWaitingDone.put(xid, worker);
- Future<Object> submit = executor.submit(worker);
+ Future<Object> submit;
Object result = null;
+ try {
+ submit = executor.submit(worker);
+ } catch (RejectedExecutionException re) {
+ messageWaitingDone.remove(xid);
+ return result;
+ }
try {
result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
return result;
} catch (Exception e) {
- logger.warn("Timeout while waiting for {} replies", req.getType());
+ logger.warn("Timeout while waiting for {} replies from {}",
+ req.getType(), (isOperational() ? HexString.toHexString(sid) : "unknown"));
result = null; // to indicate timeout has occurred
worker.wakeup();
return result;
@Override
public Object syncSend(OFMessage msg) {
+ if (!running) {
+ logger.debug("Switch is going down, ignore syncSend");
+ return null;
+ }
int xid = getNextXid();
return syncSend(msg, xid);
}
*/
private void processBarrierReply(OFBarrierReply msg) {
Integer xid = msg.getXid();
- SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
- .remove(xid);
+ SynchronousMessage worker = (SynchronousMessage) messageWaitingDone.remove(xid);
if (worker == null) {
return;
}
private void processStatsReply(OFStatisticsReply reply) {
Integer xid = reply.getXid();
- StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
- .get(xid);
+ StatisticsCollector worker = (StatisticsCollector) messageWaitingDone.get(xid);
if (worker == null) {
return;
}
if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
return false;
}
- if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
- .getValue()) {
+ if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK.getValue()) {
return false;
}
return true;
syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
}
} catch (InterruptedException ie) {
- reportError(new InterruptedException(
- "PriorityMessageTransmit thread interrupted"));
+ reportError(new InterruptedException("PriorityMessageTransmit thread interrupted"));
} catch (Exception e) {
reportError(e);
}
* Setup and start the transmit thread
*/
private void startTransmitThread() {
- this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
- new Comparator<PriorityMessage>() {
- @Override
- public int compare(PriorityMessage p1, PriorityMessage p2) {
- if (p2.priority != p1.priority) {
- return p2.priority - p1.priority;
- } else {
- return (p2.seqNum < p1.seqNum) ? 1 : -1;
- }
- }
- });
+ this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, new Comparator<PriorityMessage>() {
+ @Override
+ public int compare(PriorityMessage p1, PriorityMessage p2) {
+ if (p2.priority != p1.priority) {
+ return p2.priority - p1.priority;
+ } else {
+ return (p2.seqNum < p1.seqNum) ? 1 : -1;
+ }
+ }
+ });
this.transmitThread = new Thread(new PriorityMessageTransmit());
this.transmitThread.start();
}
private IMessageReadWrite getMessageReadWriteService() throws Exception {
String str = System.getProperty("secureChannelEnabled");
- return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(
- socket, selector) : new MessageReadWriteService(socket,
- selector);
+ return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(socket,
+ selector) : new MessageReadWriteService(socket, selector);
}
/**
messageWaitingDone.put(xid, worker);
Object result = null;
Boolean status = false;
- Future<Object> submit = executor.submit(worker);
+ Future<Object> submit;
+ try {
+ submit = executor.submit(worker);
+ } catch (RejectedExecutionException re) {
+ messageWaitingDone.remove(xid);
+ return result;
+ }
try {
result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
messageWaitingDone.remove(xid);
// this message
// the result if OFError already
if (logger.isDebugEnabled()) {
- logger.debug("Send {} failed --> {}", msg.getType(),
- (result));
+ logger.debug("Send {} failed --> {}", msg.getType(), (result));
}
}
return result;
} catch (Exception e) {
- logger.warn("Timeout while waiting for {} reply", msg.getType()
- .toString());
+ logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
// convert the result into a Boolean with value false
status = false;
result = status;
import org.opendaylight.controller.sal.utils.NetUtils;
import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
import org.opendaylight.controller.sal.utils.NodeCreator;
-import org.opendaylight.controller.sal.utils.Status;
-import org.opendaylight.controller.sal.utils.StatusCode;
/**
* The class describes neighbor discovery service for an OpenFlow network.
readyListHi.add(nodeConnector);
}
+ private void removeNodeConnector(NodeConnector nodeConnector) {
+ readyListLo.remove(nodeConnector);
+ readyListHi.remove(nodeConnector);
+ stagingList.remove(nodeConnector);
+ holdTime.remove(nodeConnector);
+ elapsedTime.remove(nodeConnector);
+ }
+
private Set<NodeConnector> getRemoveSet(Collection<NodeConnector> c, Node node) {
Set<NodeConnector> removeSet = new HashSet<NodeConnector>();
if (c == null) {
}
for (NodeConnector nodeConnector : c) {
if (node.equals(nodeConnector.getNode())) {
- Edge edge1 = edgeMap.get(nodeConnector);
- if (edge1 != null) {
- removeSet.add(nodeConnector);
-
- // check reverse direction
- Edge edge2 = edgeMap.get(edge1.getTailNodeConnector());
- if ((edge2 != null) && node.equals(edge2.getTailNodeConnector().getNode())) {
- removeSet.add(edge2.getHeadNodeConnector());
- }
- }
+ removeSet.add(nodeConnector);
}
}
return removeSet;
private void removeDiscovery(Node node) {
Set<NodeConnector> removeSet;
+ removeSet = getRemoveSet(edgeMap.keySet(), node);
+ NodeConnector peerConnector;
+ Edge edge1, edge2;
+ for (NodeConnector nodeConnector : removeSet) {
+ // get the peer for fast removal of the edge in reverse direction
+ peerConnector = null;
+ edge1 = edgeMap.get(nodeConnector);
+ if (edge1 != null) {
+ edge2 = edgeMap.get(edge1.getTailNodeConnector());
+ if ((edge2 != null) && node.equals(edge2.getTailNodeConnector().getNode())) {
+ peerConnector = edge2.getHeadNodeConnector();
+ }
+ }
+
+ removeEdge(nodeConnector, false);
+ removeEdge(peerConnector, isEnabled(peerConnector));
+ }
+
+ removeSet = getRemoveSet(prodMap.keySet(), node);
+ for (NodeConnector nodeConnector : removeSet) {
+ removeProdEdge(nodeConnector);
+ }
+
removeSet = getRemoveSet(readyListHi, node);
readyListHi.removeAll(removeSet);
holdTime.remove(nodeConnector);
}
- removeSet = getRemoveSet(edgeMap.keySet(), node);
+ removeSet = getRemoveSet(elapsedTime.keySet(), node);
for (NodeConnector nodeConnector : removeSet) {
- removeEdge(nodeConnector, false);
- }
-
- removeSet = getRemoveSet(prodMap.keySet(), node);
- for (NodeConnector nodeConnector : removeSet) {
- removeProdEdge(nodeConnector);
+ elapsedTime.remove(nodeConnector);
}
}
private void removeDiscovery(NodeConnector nodeConnector) {
- readyListHi.remove(nodeConnector);
- readyListLo.remove(nodeConnector);
- stagingList.remove(nodeConnector);
- holdTime.remove(nodeConnector);
+ removeNodeConnector(nodeConnector);
removeEdge(nodeConnector, false);
removeProdEdge(nodeConnector);
}
for (NodeConnector nodeConnector : retrySet) {
// Allow one more retry
- readyListLo.add(nodeConnector);
elapsedTime.remove(nodeConnector);
if (connectionOutService.isLocal(nodeConnector.getNode())) {
transmitQ.add(nodeConnector);
}
elapsedTime.remove(src);
+ // fast discovery of the edge in reverse direction
+ if (!edgeMap.containsKey(dst) && !readyListHi.contains(dst) && !elapsedTime.keySet().contains(dst)) {
+ moveToReadyListHi(dst);
+ }
+
// notify
updateEdge(edge, UpdateType.ADDED, props);
logger.trace("Add edge {}", edge);
* Remove OpenFlow edge
*/
private void removeEdge(NodeConnector nodeConnector, boolean stillEnabled) {
- holdTime.remove(nodeConnector);
- readyListLo.remove(nodeConnector);
- readyListHi.remove(nodeConnector);
+ if (nodeConnector == null) {
+ return;
+ }
+
+ removeNodeConnector(nodeConnector);
if (stillEnabled) {
// keep discovering
- if (!stagingList.contains(nodeConnector)) {
- stagingList.add(nodeConnector);
- }
- } else {
- // stop it
- stagingList.remove(nodeConnector);
+ stagingList.add(nodeConnector);
}
Edge edge = null;
if (val != null) {
try {
- int ticks = Integer.parseInt(val);
+ int ticks;
+ Set<NodeConnector> monitorSet = holdTime.keySet();
+ if (monitorSet != null) {
+ for (NodeConnector nodeConnector : monitorSet) {
+ holdTime.put(nodeConnector, 0);
+ }
+ }
+
+ ticks = Integer.parseInt(val);
DiscoveryPeriod.INTERVAL.setTick(ticks);
discoveryBatchRestartTicks = getDiscoveryInterval();
discoveryBatchPauseTicks = getDiscoveryPauseInterval();
public void updateNodeConnector(NodeConnector nodeConnector,
UpdateType type, Set<Property> props) {
Map<String, Property> propMap = new HashMap<String, Property>();
+ boolean update = true;
log.debug("updateNodeConnector: {} type {} props {} for container {}",
new Object[] { nodeConnector, type, props, containerName });
switch (type) {
case ADDED:
- case CHANGED:
if (props != null) {
for (Property prop : props) {
addNodeConnectorProp(nodeConnector, prop);
addSpanPort(nodeConnector);
break;
+ case CHANGED:
+ if (!nodeConnectorProps.containsKey(nodeConnector) || (props == null)) {
+ update = false;
+ } else {
+ for (Property prop : props) {
+ addNodeConnectorProp(nodeConnector, prop);
+ propMap.put(prop.getName(), prop);
+ }
+ }
+ break;
case REMOVED:
+ if (!nodeConnectorProps.containsKey(nodeConnector)) {
+ update = false;
+ }
removeNodeConnectorAllProps(nodeConnector);
// clean up span config
removeSpanPort(nodeConnector);
break;
default:
+ update = false;
break;
}
- notifyNodeConnector(nodeConnector, type, propMap);
+ if (update) {
+ notifyNodeConnector(nodeConnector, type, propMap);
+ }
}
@Override