X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fprotocol_plugins%2Fopenflow%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fprotocol_plugin%2Fopenflow%2Fcore%2Finternal%2FSwitchHandler.java;h=52ea7fd575a76ae4f2053c5b1c96c7a10e622c0e;hb=cba872bacd7963de38208fde9f37747820340d00;hp=91606f4a41daaeecd7cce64e0566c54e028ced82;hpb=a640c5c549376e5d72038e033d49ef6f0df96c92;p=controller.git diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java index 91606f4a41..52ea7fd575 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java @@ -36,8 +36,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.opendaylight.controller.protocol_plugin.openflow.core.IController; -import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch; import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite; +import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch; import org.openflow.protocol.OFBarrierReply; import org.openflow.protocol.OFBarrierRequest; import org.openflow.protocol.OFEchoReply; @@ -68,29 +68,29 @@ public class SwitchHandler implements ISwitch { .getLogger(SwitchHandler.class); private static final int SWITCH_LIVENESS_TIMER = 5000; private static final int switchLivenessTimeout = getSwitchLivenessTimeout(); - private int MESSAGE_RESPONSE_TIMER = 2000; + private final int MESSAGE_RESPONSE_TIMER = 2000; - private String instanceName; - private ISwitch thisISwitch; - private IController core; + private final String instanceName; + private final ISwitch thisISwitch; + private final IController core; private Long sid; private Integer buffers; private Integer capabilities; private Byte tables; private Integer actions; private Selector selector; - private SocketChannel socket; - private BasicFactory factory; - private AtomicInteger xid; + private final SocketChannel socket; + private final BasicFactory factory; + private final AtomicInteger xid; private SwitchState state; private Timer periodicTimer; - private Map physicalPorts; - private Map portBandwidth; - private Date connectedDate; + private final Map physicalPorts; + private final Map portBandwidth; + private final Date connectedDate; private Long lastMsgReceivedTimeStamp; private Boolean probeSent; - private ExecutorService executor; - private ConcurrentHashMap> messageWaitingDone; + private final ExecutorService executor; + private final ConcurrentHashMap> messageWaitingDone; private boolean running; private IMessageReadWrite msgReadWriteService; private Thread switchHandlerThread; @@ -228,7 +228,7 @@ public class SwitchHandler implements ISwitch { * should be used for non-critical messages such as statistics request, * discovery packets, etc. An unique XID is generated automatically and * inserted into the message. - * + * * @param msg * The OF message to be sent * @return The XID used @@ -247,7 +247,7 @@ public class SwitchHandler implements ISwitch { * priority. It will be served after high priority messages. The method * should be used for non-critical messages such as statistics request, * discovery packets, etc. The specified XID is inserted into the message. - * + * * @param msg * The OF message to be Sent * @param xid @@ -269,7 +269,7 @@ public class SwitchHandler implements ISwitch { * method should be used for critical messages such as hello, echo reply * etc. An unique XID is generated automatically and inserted into the * message. - * + * * @param msg * The OF message to be sent * @return The XID used @@ -284,7 +284,7 @@ public class SwitchHandler implements ISwitch { * priority. It will be served first before normal priority messages. The * method should be used for critical messages such as hello, echo reply * etc. The specified XID is inserted into the message. - * + * * @param msg * The OF message to be sent * @return The XID used @@ -308,6 +308,48 @@ public class SwitchHandler implements ISwitch { } } + /** + * This method bypasses the transmit queue and sends the message over the + * socket directly. If the input xid is not null, the specified xid is + * inserted into the message. Otherwise, an unique xid is generated + * automatically and inserted into the message. + * + * @param msg + * Message to be sent + * @param xid + * Message xid + */ + private void asyncSendNow(OFMessage msg, Integer xid) { + if (xid == null) { + xid = getNextXid(); + } + msg.setXid(xid); + + asyncSendNow(msg); + } + + /** + * This method bypasses the transmit queue and sends the message over the + * socket directly. + * + * @param msg + * Message to be sent + */ + private void asyncSendNow(OFMessage msg) { + if (msgReadWriteService == null) { + logger.warn( + "asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", + msg); + return; + } + + try { + msgReadWriteService.asyncSend(msg); + } catch (Exception e) { + reportError(e); + } + } + public void handleMessages() { List msgs = null; @@ -349,7 +391,8 @@ public class SwitchHandler implements ISwitch { case ECHO_REQUEST: OFEchoReply echoReply = (OFEchoReply) factory .getMessage(OFType.ECHO_REPLY); - asyncFastSend(echoReply); + // respond immediately + asyncSendNow(echoReply, msg.getXid()); break; case ECHO_REPLY: this.probeSent = false; @@ -579,6 +622,7 @@ public class SwitchHandler implements ISwitch { } catch (Exception e) { logger.warn("Timeout while waiting for {} replies", req.getType()); result = null; // to indicate timeout has occurred + worker.wakeup(); return result; } } @@ -724,23 +768,19 @@ public class SwitchHandler implements ISwitch { * messaging service to transmit it over the socket channel */ class PriorityMessageTransmit implements Runnable { + @Override public void run() { running = true; while (running) { try { - if (!transmitQ.isEmpty()) { - PriorityMessage pmsg = transmitQ.poll(); - msgReadWriteService.asyncSend(pmsg.msg); - logger.trace("Message sent: {}", pmsg); - /* - * If syncReply is set to true, wait for the response - * back. - */ - if (pmsg.syncReply) { - syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false); - } + PriorityMessage pmsg = transmitQ.take(); + msgReadWriteService.asyncSend(pmsg.msg); + /* + * If syncReply is set to true, wait for the response back. + */ + if (pmsg.syncReply) { + syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false); } - Thread.sleep(10); } catch (InterruptedException ie) { reportError(new InterruptedException( "PriorityMessageTransmit thread interrupted")); @@ -758,6 +798,7 @@ public class SwitchHandler implements ISwitch { private void startTransmitThread() { this.transmitQ = new PriorityBlockingQueue(11, new Comparator() { + @Override public int compare(PriorityMessage p1, PriorityMessage p2) { if (p2.priority != p1.priority) { return p2.priority - p1.priority; @@ -822,7 +863,7 @@ public class SwitchHandler implements ISwitch { barrierMsg.setXid(xid); transmitQ.add(new PriorityMessage(barrierMsg, 0, true)); - + return Boolean.TRUE; } @@ -830,7 +871,7 @@ public class SwitchHandler implements ISwitch { * This method returns the switch liveness timeout value. If controller did * not receive any message from the switch for such a long period, * controller will tear down the connection to the switch. - * + * * @return The timeout value */ private static int getSwitchLivenessTimeout() { @@ -853,7 +894,7 @@ public class SwitchHandler implements ISwitch { * Barrier request message. Then it's blocked until the Barrier rely arrives * or timeout. If syncRequest is false, it simply skips the message send and * just waits for the response back. - * + * * @param msg * Message to be sent * @param xid @@ -887,7 +928,7 @@ public class SwitchHandler implements ISwitch { // the result if OFError already if (logger.isDebugEnabled()) { logger.debug("Send {} failed --> {}", msg.getType(), - ((OFError) result)); + (result)); } } return result; @@ -897,6 +938,7 @@ public class SwitchHandler implements ISwitch { // convert the result into a Boolean with value false status = false; result = status; + worker.wakeup(); return result; } }