import java.util.concurrent.atomic.AtomicInteger;
import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
-import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
+import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
import org.openflow.protocol.OFBarrierReply;
import org.openflow.protocol.OFBarrierRequest;
import org.openflow.protocol.OFEchoReply;
.getLogger(SwitchHandler.class);
private static final int SWITCH_LIVENESS_TIMER = 5000;
private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
- private int MESSAGE_RESPONSE_TIMER = 2000;
+ private final int MESSAGE_RESPONSE_TIMER = 2000;
- private String instanceName;
- private ISwitch thisISwitch;
- private IController core;
+ private final String instanceName;
+ private final ISwitch thisISwitch;
+ private final IController core;
private Long sid;
private Integer buffers;
private Integer capabilities;
private Byte tables;
private Integer actions;
private Selector selector;
- private SocketChannel socket;
- private BasicFactory factory;
- private AtomicInteger xid;
+ private final SocketChannel socket;
+ private final BasicFactory factory;
+ private final AtomicInteger xid;
private SwitchState state;
private Timer periodicTimer;
- private Map<Short, OFPhysicalPort> physicalPorts;
- private Map<Short, Integer> portBandwidth;
- private Date connectedDate;
+ private final Map<Short, OFPhysicalPort> physicalPorts;
+ private final Map<Short, Integer> portBandwidth;
+ private final Date connectedDate;
private Long lastMsgReceivedTimeStamp;
private Boolean probeSent;
- private ExecutorService executor;
- private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
+ private final ExecutorService executor;
+ private final ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
private boolean running;
private IMessageReadWrite msgReadWriteService;
private Thread switchHandlerThread;
} catch (Exception e) {
logger.warn("Timeout while waiting for {} replies", req.getType());
result = null; // to indicate timeout has occurred
+ worker.wakeup();
return result;
}
}
* messaging service to transmit it over the socket channel
*/
class PriorityMessageTransmit implements Runnable {
+ @Override
public void run() {
running = true;
while (running) {
try {
- while (!transmitQ.isEmpty()) {
- PriorityMessage pmsg = transmitQ.poll();
- msgReadWriteService.asyncSend(pmsg.msg);
- logger.trace("Message sent: {}", pmsg);
- /*
- * If syncReply is set to true, wait for the response
- * back.
- */
- if (pmsg.syncReply) {
- syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
- }
+ PriorityMessage pmsg = transmitQ.take();
+ msgReadWriteService.asyncSend(pmsg.msg);
+ /*
+ * If syncReply is set to true, wait for the response back.
+ */
+ if (pmsg.syncReply) {
+ syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
}
- Thread.sleep(10);
} catch (InterruptedException ie) {
reportError(new InterruptedException(
"PriorityMessageTransmit thread interrupted"));
private void startTransmitThread() {
this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
new Comparator<PriorityMessage>() {
+ @Override
public int compare(PriorityMessage p1, PriorityMessage p2) {
if (p2.priority != p1.priority) {
return p2.priority - p1.priority;
// the result if OFError already
if (logger.isDebugEnabled()) {
logger.debug("Send {} failed --> {}", msg.getType(),
- ((OFError) result));
+ (result));
}
}
return result;
// convert the result into a Boolean with value false
status = false;
result = status;
+ worker.wakeup();
return result;
}
}