X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fprotocol_plugins%2Fopenflow%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fprotocol_plugin%2Fopenflow%2Fcore%2Finternal%2FSwitchHandler.java;h=52ea7fd575a76ae4f2053c5b1c96c7a10e622c0e;hb=cba872bacd7963de38208fde9f37747820340d00;hp=81729c3dec029b39502562fdb0dc2c0cfd35ce8f;hpb=b8bb7db7c6133e00046e85ead70426eb1e05184d;p=controller.git diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java index 81729c3dec..52ea7fd575 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java @@ -36,8 +36,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.opendaylight.controller.protocol_plugin.openflow.core.IController; -import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch; import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite; +import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch; import org.openflow.protocol.OFBarrierReply; import org.openflow.protocol.OFBarrierRequest; import org.openflow.protocol.OFEchoReply; @@ -68,29 +68,29 @@ public class SwitchHandler implements ISwitch { .getLogger(SwitchHandler.class); private static final int SWITCH_LIVENESS_TIMER = 5000; private static final int switchLivenessTimeout = getSwitchLivenessTimeout(); - private int MESSAGE_RESPONSE_TIMER = 2000; + private final int MESSAGE_RESPONSE_TIMER = 2000; - private String instanceName; - private ISwitch thisISwitch; - private IController core; + private final String instanceName; + private final ISwitch thisISwitch; + private final IController core; private Long sid; private Integer buffers; private Integer capabilities; private Byte tables; private Integer actions; private Selector selector; - private SocketChannel socket; - private BasicFactory factory; - private AtomicInteger xid; + private final SocketChannel socket; + private final BasicFactory factory; + private final AtomicInteger xid; private SwitchState state; private Timer periodicTimer; - private Map physicalPorts; - private Map portBandwidth; - private Date connectedDate; + private final Map physicalPorts; + private final Map portBandwidth; + private final Date connectedDate; private Long lastMsgReceivedTimeStamp; private Boolean probeSent; - private ExecutorService executor; - private ConcurrentHashMap> messageWaitingDone; + private final ExecutorService executor; + private final ConcurrentHashMap> messageWaitingDone; private boolean running; private IMessageReadWrite msgReadWriteService; private Thread switchHandlerThread; @@ -622,6 +622,7 @@ public class SwitchHandler implements ISwitch { } catch (Exception e) { logger.warn("Timeout while waiting for {} replies", req.getType()); result = null; // to indicate timeout has occurred + worker.wakeup(); return result; } } @@ -767,23 +768,19 @@ public class SwitchHandler implements ISwitch { * messaging service to transmit it over the socket channel */ class PriorityMessageTransmit implements Runnable { + @Override public void run() { running = true; while (running) { try { - while (!transmitQ.isEmpty()) { - PriorityMessage pmsg = transmitQ.poll(); - msgReadWriteService.asyncSend(pmsg.msg); - logger.trace("Message sent: {}", pmsg); - /* - * If syncReply is set to true, wait for the response - * back. - */ - if (pmsg.syncReply) { - syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false); - } + PriorityMessage pmsg = transmitQ.take(); + msgReadWriteService.asyncSend(pmsg.msg); + /* + * If syncReply is set to true, wait for the response back. + */ + if (pmsg.syncReply) { + syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false); } - Thread.sleep(10); } catch (InterruptedException ie) { reportError(new InterruptedException( "PriorityMessageTransmit thread interrupted")); @@ -801,6 +798,7 @@ public class SwitchHandler implements ISwitch { private void startTransmitThread() { this.transmitQ = new PriorityBlockingQueue(11, new Comparator() { + @Override public int compare(PriorityMessage p1, PriorityMessage p2) { if (p2.priority != p1.priority) { return p2.priority - p1.priority; @@ -930,7 +928,7 @@ public class SwitchHandler implements ISwitch { // the result if OFError already if (logger.isDebugEnabled()) { logger.debug("Send {} failed --> {}", msg.getType(), - ((OFError) result)); + (result)); } } return result; @@ -940,6 +938,7 @@ public class SwitchHandler implements ISwitch { // convert the result into a Boolean with value false status = false; result = status; + worker.wakeup(); return result; } }