+
+ /*
+ * Transmit thread polls the message out of the priority queue and invokes
+ * messaging service to transmit it over the socket channel
+ */
+ class PriorityMessageTransmit implements Runnable {
+ @Override
+ public void run() {
+ running = true;
+ while (running) {
+ try {
+ PriorityMessage pmsg = transmitQ.take();
+ msgReadWriteService.asyncSend(pmsg.msg);
+ /*
+ * If syncReply is set to true, wait for the response back.
+ */
+ if (pmsg.syncReply) {
+ syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
+ }
+ } catch (InterruptedException ie) {
+ reportError(new InterruptedException("PriorityMessageTransmit thread interrupted"));
+ } catch (Exception e) {
+ reportError(e);
+ }
+ }
+ transmitQ = null;
+ }
+ }
+
+ /*
+ * Setup and start the transmit thread
+ */
+ private void startTransmitThread() {
+ this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, new Comparator<PriorityMessage>() {
+ @Override
+ public int compare(PriorityMessage p1, PriorityMessage p2) {
+ if (p2.priority != p1.priority) {
+ return p2.priority - p1.priority;
+ } else {
+ return (p2.seqNum < p1.seqNum) ? 1 : -1;
+ }
+ }
+ });
+ this.transmitThread = new Thread(new PriorityMessageTransmit());
+ this.transmitThread.start();
+ }
+
+ /*
+ * Setup communication services
+ */
+ private void setupCommChannel() throws Exception {
+ this.selector = SelectorProvider.provider().openSelector();
+ this.socket.configureBlocking(false);
+ this.socket.socket().setTcpNoDelay(true);
+ this.msgReadWriteService = getMessageReadWriteService();
+ }
+
+ private void sendFirstHello() {
+ try {
+ OFMessage msg = factory.getMessage(OFType.HELLO);
+ asyncFastSend(msg);
+ } catch (Exception e) {
+ reportError(e);
+ }
+ }
+
+ private IMessageReadWrite getMessageReadWriteService() throws Exception {
+ String str = System.getProperty("secureChannelEnabled");
+ return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(socket,
+ selector) : new MessageReadWriteService(socket, selector);
+ }
+
+ /**
+ * Send Barrier message synchronously. The caller will be blocked until the
+ * Barrier reply is received.
+ */
+ @Override
+ public Object syncSendBarrierMessage() {
+ OFBarrierRequest barrierMsg = new OFBarrierRequest();
+ return syncSend(barrierMsg);
+ }
+
+ /**
+ * Send Barrier message asynchronously. The caller is not blocked. The
+ * Barrier message will be sent in a transmit thread which will be blocked
+ * until the Barrier reply is received.
+ */
+ @Override
+ public Object asyncSendBarrierMessage() {
+ if (transmitQ == null) {
+ return Boolean.FALSE;
+ }
+
+ OFBarrierRequest barrierMsg = new OFBarrierRequest();
+ int xid = getNextXid();
+
+ barrierMsg.setXid(xid);
+ transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
+
+ return Boolean.TRUE;
+ }
+
+ /**
+ * This method returns the switch liveness timeout value. If controller did
+ * not receive any message from the switch for such a long period,
+ * controller will tear down the connection to the switch.
+ *
+ * @return The timeout value
+ */
+ private static int getSwitchLivenessTimeout() {
+ String timeout = System.getProperty("of.switchLivenessTimeout");
+ int rv = 60500;
+
+ try {
+ if (timeout != null) {
+ rv = Integer.parseInt(timeout);
+ }
+ } catch (Exception e) {
+ }
+
+ return rv;
+ }
+
+ /**
+ * This method performs synchronous operations for a given message. If
+ * syncRequest is set to true, the message will be sent out followed by a
+ * Barrier request message. Then it's blocked until the Barrier rely arrives
+ * or timeout. If syncRequest is false, it simply skips the message send and
+ * just waits for the response back.
+ *
+ * @param msg
+ * Message to be sent
+ * @param xid
+ * Message XID
+ * @param request
+ * If set to true, the message the message will be sent out
+ * followed by a Barrier request message. If set to false, it
+ * simply skips the sending and just waits for the Barrier reply.
+ * @return the result
+ */
+ private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
+ SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
+ messageWaitingDone.put(xid, worker);
+ Object result = null;
+ Boolean status = false;
+ Future<Object> submit;
+ try {
+ submit = executor.submit(worker);
+ } catch (RejectedExecutionException re) {
+ messageWaitingDone.remove(xid);
+ return result;
+ }
+ try {
+ result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
+ messageWaitingDone.remove(xid);
+ if (result == null) {
+ // if result is null, then it means the switch can handle this
+ // message successfully
+ // convert the result into a Boolean with value true
+ status = true;
+ // logger.debug("Successfully send " +
+ // msg.getType().toString());
+ result = status;
+ } else {
+ // if result is not null, this means the switch can't handle
+ // this message
+ // the result if OFError already
+ if (logger.isDebugEnabled()) {
+ logger.debug("Send {} failed --> {}", msg.getType(), (result));
+ }
+ }
+ return result;
+ } catch (Exception e) {
+ logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
+ // convert the result into a Boolean with value false
+ status = false;
+ result = status;
+ worker.wakeup();
+ return result;
+ }
+ }
+
+ @Override
+ public void deleteAllFlows() {
+ logger.trace("deleteAllFlows on switch {}", HexString.toHexString(this.sid));
+ OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
+ OFFlowMod flowMod = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
+ flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE).setOutPort(OFPort.OFPP_NONE)
+ .setLength((short) OFFlowMod.MINIMUM_LENGTH);
+ asyncFastSend(flowMod);
+ }