X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fprotocol_plugins%2Fopenflow%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fprotocol_plugin%2Fopenflow%2Fcore%2Finternal%2FSwitchHandler.java;h=5d51d26a988f553f0e79a3d2a29b408ddb486871;hp=57098ae3c6fe2f61429d72a7f10aac9cab5d8de6;hb=b5744fe10a9eef5b5699cd02a901dc51fcbbb7ad;hpb=00e48ee2adae2c5845bc655195e106d703767f64 diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java index 57098ae3c6..5d51d26a98 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.protocol_plugin.openflow.core.internal; import java.io.IOException; import java.net.SocketException; import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; @@ -206,7 +207,6 @@ public class SwitchHandler implements ISwitch { } executor.shutdown(); - selector = null; msgReadWriteService = null; if (switchHandlerThread != null) { @@ -228,7 +228,7 @@ public class SwitchHandler implements ISwitch { * should be used for non-critical messages such as statistics request, * discovery packets, etc. An unique XID is generated automatically and * inserted into the message. - * + * * @param msg * The OF message to be sent * @return The XID used @@ -239,38 +239,7 @@ public class SwitchHandler implements ISwitch { } private Object syncSend(OFMessage msg, int xid) { - SynchronousMessage worker = new SynchronousMessage(this, xid, msg); - messageWaitingDone.put(xid, worker); - Object result = null; - Boolean status = false; - Future submit = executor.submit(worker); - try { - result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS); - messageWaitingDone.remove(xid); - if (result == null) { - // if result is null, then it means the switch can handle this - // message successfully - // convert the result into a Boolean with value true - status = true; - // logger.debug("Successfully send " + - // msg.getType().toString()); - result = status; - } else { - // if result is not null, this means the switch can't handle - // this message - // the result if OFError already - logger.debug("Send {} failed --> {}", msg.getType().toString(), - ((OFError) result).toString()); - } - return result; - } catch (Exception e) { - logger.warn("Timeout while waiting for {} reply", msg.getType() - .toString()); - // convert the result into a Boolean with value false - status = false; - result = status; - return result; - } + return syncMessageInternal(msg, xid, true); } /** @@ -278,7 +247,7 @@ public class SwitchHandler implements ISwitch { * priority. It will be served after high priority messages. The method * should be used for non-critical messages such as statistics request, * discovery packets, etc. The specified XID is inserted into the message. - * + * * @param msg * The OF message to be Sent * @param xid @@ -300,7 +269,7 @@ public class SwitchHandler implements ISwitch { * method should be used for critical messages such as hello, echo reply * etc. An unique XID is generated automatically and inserted into the * message. - * + * * @param msg * The OF message to be sent * @return The XID used @@ -315,7 +284,7 @@ public class SwitchHandler implements ISwitch { * priority. It will be served first before normal priority messages. The * method should be used for critical messages such as hello, echo reply * etc. The specified XID is inserted into the message. - * + * * @param msg * The OF message to be sent * @return The XID used @@ -339,23 +308,67 @@ public class SwitchHandler implements ISwitch { } } + /** + * This method bypasses the transmit queue and sends the message over the + * socket directly. If the input xid is not null, the specified xid is + * inserted into the message. Otherwise, an unique xid is generated + * automatically and inserted into the message. + * + * @param msg + * Message to be sent + * @param xid + * Message xid + */ + private void asyncSendNow(OFMessage msg, Integer xid) { + if (xid == null) { + xid = getNextXid(); + } + msg.setXid(xid); + + asyncSendNow(msg); + } + + /** + * This method bypasses the transmit queue and sends the message over the + * socket directly. + * + * @param msg + * Message to be sent + */ + private void asyncSendNow(OFMessage msg) { + if (msgReadWriteService == null) { + logger.warn( + "asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", + msg); + return; + } + + try { + msgReadWriteService.asyncSend(msg); + } catch (Exception e) { + reportError(e); + } + } + public void handleMessages() { List msgs = null; try { - msgs = msgReadWriteService.readMessages(); + if (msgReadWriteService != null) { + msgs = msgReadWriteService.readMessages(); + } } catch (Exception e) { reportError(e); } if (msgs == null) { - logger.debug("{} is down", toString()); + logger.debug("{} is down", this); // the connection is down, inform core reportSwitchStateChange(false); return; } for (OFMessage msg : msgs) { - logger.trace("Message received: {}", msg.toString()); + logger.trace("Message received: {}", msg); this.lastMsgReceivedTimeStamp = System.currentTimeMillis(); OFType type = msg.getType(); switch (type) { @@ -378,7 +391,8 @@ public class SwitchHandler implements ISwitch { case ECHO_REQUEST: OFEchoReply echoReply = (OFEchoReply) factory .getMessage(OFType.ECHO_REPLY); - asyncFastSend(echoReply); + // respond immediately + asyncSendNow(echoReply, msg.getXid()); break; case ECHO_REPLY: this.probeSent = false; @@ -447,7 +461,7 @@ public class SwitchHandler implements ISwitch { // send a probe to see if the switch is still alive logger.debug( "Send idle probe (Echo Request) to {}", - toString()); + this); probeSent = true; OFMessage echo = factory .getMessage(OFType.ECHO_REQUEST); @@ -489,8 +503,11 @@ public class SwitchHandler implements ISwitch { private void reportError(Exception e) { if (e instanceof AsynchronousCloseException || e instanceof InterruptedException - || e instanceof SocketException || e instanceof IOException) { - logger.debug("Caught exception {}", e.getMessage()); + || e instanceof SocketException || e instanceof IOException + || e instanceof ClosedSelectorException) { + if (logger.isDebugEnabled()) { + logger.debug("Caught exception {}", e.getMessage()); + } } else { logger.warn("Caught exception ", e); } @@ -757,7 +774,14 @@ public class SwitchHandler implements ISwitch { if (!transmitQ.isEmpty()) { PriorityMessage pmsg = transmitQ.poll(); msgReadWriteService.asyncSend(pmsg.msg); - logger.trace("Message sent: {}", pmsg.toString()); + logger.trace("Message sent: {}", pmsg); + /* + * If syncReply is set to true, wait for the response + * back. + */ + if (pmsg.syncReply) { + syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false); + } } Thread.sleep(10); } catch (InterruptedException ie) { @@ -816,19 +840,40 @@ public class SwitchHandler implements ISwitch { } /** - * Sends synchronous Barrier message + * Send Barrier message synchronously. The caller will be blocked until the + * Barrier reply is received. */ @Override - public Object sendBarrierMessage() { + public Object syncSendBarrierMessage() { OFBarrierRequest barrierMsg = new OFBarrierRequest(); return syncSend(barrierMsg); } + /** + * Send Barrier message asynchronously. The caller is not blocked. The + * Barrier message will be sent in a transmit thread which will be blocked + * until the Barrier reply is received. + */ + @Override + public Object asyncSendBarrierMessage() { + if (transmitQ == null) { + return Boolean.FALSE; + } + + OFBarrierRequest barrierMsg = new OFBarrierRequest(); + int xid = getNextXid(); + + barrierMsg.setXid(xid); + transmitQ.add(new PriorityMessage(barrierMsg, 0, true)); + + return Boolean.TRUE; + } + /** * This method returns the switch liveness timeout value. If controller did * not receive any message from the switch for such a long period, * controller will tear down the connection to the switch. - * + * * @return The timeout value */ private static int getSwitchLivenessTimeout() { @@ -844,4 +889,58 @@ public class SwitchHandler implements ISwitch { return rv; } + + /** + * This method performs synchronous operations for a given message. If + * syncRequest is set to true, the message will be sent out followed by a + * Barrier request message. Then it's blocked until the Barrier rely arrives + * or timeout. If syncRequest is false, it simply skips the message send and + * just waits for the response back. + * + * @param msg + * Message to be sent + * @param xid + * Message XID + * @param request + * If set to true, the message the message will be sent out + * followed by a Barrier request message. If set to false, it + * simply skips the sending and just waits for the Barrier reply. + * @return the result + */ + private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) { + SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest); + messageWaitingDone.put(xid, worker); + Object result = null; + Boolean status = false; + Future submit = executor.submit(worker); + try { + result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS); + messageWaitingDone.remove(xid); + if (result == null) { + // if result is null, then it means the switch can handle this + // message successfully + // convert the result into a Boolean with value true + status = true; + // logger.debug("Successfully send " + + // msg.getType().toString()); + result = status; + } else { + // if result is not null, this means the switch can't handle + // this message + // the result if OFError already + if (logger.isDebugEnabled()) { + logger.debug("Send {} failed --> {}", msg.getType(), + ((OFError) result)); + } + } + return result; + } catch (Exception e) { + logger.warn("Timeout while waiting for {} reply", msg.getType() + .toString()); + // convert the result into a Boolean with value false + status = false; + result = status; + return result; + } + } }