+
+ /*
+ * Transmit thread polls the message out of the priority queue and invokes
+ * messaging service to transmit it over the socket channel
+ */
+ class PriorityMessageTransmit implements Runnable {
+ public void run() {
+ running = true;
+ while (running) {
+ try {
+ if (!transmitQ.isEmpty()) {
+ PriorityMessage pmsg = transmitQ.poll();
+ msgReadWriteService.asyncSend(pmsg.msg);
+ logger.trace("Message sent: {}", pmsg.toString());
+ }
+ Thread.sleep(10);
+ } catch (InterruptedException ie) {
+ reportError(new InterruptedException("PriorityMessageTransmit thread interrupted"));
+ } catch (Exception e) {
+ reportError(e);
+ }
+ }
+ transmitQ = null;
+ }
+ }
+
+ /*
+ * Setup and start the transmit thread
+ */
+ private void startTransmitThread() {
+ this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
+ new Comparator<PriorityMessage>() {
+ public int compare(PriorityMessage p1, PriorityMessage p2) {
+ return p2.priority - p1.priority;
+ }
+ });
+ this.transmitThread = new Thread(new PriorityMessageTransmit());
+ this.transmitThread.start();
+ }
+
+ /*
+ * Setup communication services
+ */
+ private void setupCommChannel() throws Exception {
+ this.selector = SelectorProvider.provider().openSelector();
+ this.socket.configureBlocking(false);
+ this.socket.socket().setTcpNoDelay(true);
+ this.msgReadWriteService = getMessageReadWriteService();
+ }
+
+ private void sendFirstHello() {
+ try {
+ OFMessage msg = factory.getMessage(OFType.HELLO);
+ asyncFastSend(msg);
+ } catch (Exception e) {
+ reportError(e);
+ }
+ }
+
+ private IMessageReadWrite getMessageReadWriteService() throws Exception {
+ String str = System.getProperty("secureChannelEnabled");
+ return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ?
+ new SecureMessageReadWriteService(socket, selector) :
+ new MessageReadWriteService(socket, selector);
+ }