From: Michal Rehak Date: Fri, 25 Oct 2013 13:49:35 +0000 (+0200) Subject: Ordered message execution ensured X-Git-Tag: jenkins-openflowjava-bulk-release-prepare-only-1~79 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=156b1d174392438b48a76fd82ad3f84f618e8042;p=openflowjava.git Ordered message execution ensured Removed threadpool IdleStateHandler - added javadoc IntegrationTest - sleepEvents' sleep time reduced SendEvent no longer uses threads to send message Added notification of connection ready status Fixed SimpleClient (msg length) Change-Id: If517c0e31123b5609511dc3ed90b44d8427b08d6 Signed-off-by: Michal Polkorab Signed-off-by: Michal Rehak --- diff --git a/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionAdapter.java b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionAdapter.java index 3fba6303..74a9bdcc 100644 --- a/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionAdapter.java +++ b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionAdapter.java @@ -40,4 +40,15 @@ public interface ConnectionAdapter extends OpenflowProtocolService { */ public void checkListeners(); + /** + * notify listener about connection ready-to-use event + */ + public void fireConnectionReadyNotification(); + + /** + * set listener for connection became ready-to-use event + * @param connectionReadyListener + */ + public void setConnectionReadyListener(ConnectionReadyListener connectionReadyListener); + } diff --git a/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionReadyListener.java b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionReadyListener.java new file mode 100644 index 00000000..44afca0e --- /dev/null +++ b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionReadyListener.java @@ -0,0 +1,20 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowjava.protocol.api.connection; + +/** + * @author mirehak + * + */ +public interface ConnectionReadyListener { + + /** + * fired when connection becomes ready-to-use + */ + public void onConnectionReady(); +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterFactory.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterFactory.java index dcdb15a2..754a2d9d 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterFactory.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterFactory.java @@ -14,7 +14,7 @@ public abstract class ConnectionAdapterFactory { * @param ch * @return connection adapter tcp-implementation */ - public static ConnectionFacade createConnectionAdapter(SocketChannel ch) { + public static ConnectionFacade createConnectionFacade(SocketChannel ch) { ConnectionAdapterImpl connectionAdapter = new ConnectionAdapterImpl(); connectionAdapter.setChannel(ch); return connectionAdapter; diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java index fe90483d..d2c8f1be 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java @@ -9,13 +9,12 @@ import io.netty.util.concurrent.GenericFutureListener; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.sal.common.util.RpcErrors; import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput; @@ -55,6 +54,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener; +import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.Notification; import org.opendaylight.yangtools.yang.common.RpcError; @@ -91,8 +91,9 @@ public class ConnectionAdapterImpl implements ConnectionFacade { protected Cache> responseCache; private SystemNotificationsListener systemListener; private boolean disconnectOccured = false; - private ExecutorService threadPool; - + + protected ConnectionReadyListener connectionReadyListener; + /** * default ctor */ @@ -101,7 +102,6 @@ public class ConnectionAdapterImpl implements ConnectionFacade { .concurrencyLevel(1) .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES) .removalListener(new ResponseRemovalListener()).build(); - threadPool = Executors.newCachedThreadPool(); LOG.info("ConnectionAdapter created"); } @@ -235,17 +235,8 @@ public class ConnectionAdapterImpl implements ConnectionFacade { } @Override - public void consume(final DataObject message) { - threadPool.execute(new Runnable() { - @Override - public void run() { - consumeIntern(message); - } - }); - } - - protected void consumeIntern(final DataObject message) { - LOG.debug("Consume msg"); + public void consume(DataObject message) { + LOG.debug("ConsumeIntern msg"); if (disconnectOccured ) { return; } @@ -398,8 +389,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade { * @param errorMessage * @param input * @param responseClazz - * @param key TODO - * @param future TODO + * @param key of rpcResponse * @return */ private SettableFuture> handleRpcChannelFutureWithResponse( @@ -519,14 +509,16 @@ public class ConnectionAdapterImpl implements ConnectionFacade { if (messageListener == null) { buffer.append("MessageListener "); } + if (connectionReadyListener == null) { + buffer.append("ConnectionReadyListener "); + } if (buffer.length() > 0) { throw new IllegalStateException("Missing listeners: " + buffer.toString()); } } - - static class ResponseRemovalListener implements RemovalListener> { + static class ResponseRemovalListener implements RemovalListener> { @Override public void onRemoval( RemovalNotification> notification) { @@ -538,4 +530,32 @@ public class ConnectionAdapterImpl implements ConnectionFacade { } } + /** + * Class is used ONLY for exiting msgQueue processing thread + * @author michal.polkorab + */ + static class ExitingDataObject implements DataObject { + @Override + public Class getImplementedInterface() { + return null; + } + } + + @Override + public void fireConnectionReadyNotification() { + new Thread(new Runnable() { + @Override + public void run() { + connectionReadyListener.onConnectionReady(); + } + }).start(); + } + + + @Override + public void setConnectionReadyListener( + ConnectionReadyListener connectionReadyListener) { + this.connectionReadyListener = connectionReadyListener; + } + } diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/IdleHandler.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/IdleHandler.java index f187b725..b31a021d 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/IdleHandler.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/IdleHandler.java @@ -12,10 +12,22 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.S import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * + * @author michal.polkorab + * + */ public class IdleHandler extends IdleStateHandler{ - + private static final Logger LOGGER = LoggerFactory.getLogger(IdleHandler.class); + /** + * + * @param readerIdleTime + * @param writerIdleTime + * @param allIdleTime + * @param unit + */ public IdleHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { super(readerIdleTime, writerIdleTime, allIdleTime, unit); diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/PublishingChannelInitializer.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/PublishingChannelInitializer.java index 646a997f..93068830 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/PublishingChannelInitializer.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/PublishingChannelInitializer.java @@ -47,19 +47,23 @@ public class PublishingChannelInitializer extends ChannelInitializer downResult) throws Exception { result.set(downResult.isSuccess()); - result.setException(downResult.cause()); + if (downResult.cause() != null) { + result.setException(downResult.cause()); + } } }); diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TlsDetector.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TlsDetector.java index 6c4dbebb..fa8793de 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TlsDetector.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TlsDetector.java @@ -13,6 +13,7 @@ import javax.net.ssl.SSLEngine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.opendaylight.openflowjava.protocol.impl.connection.ConnectionFacade; import org.opendaylight.openflowjava.protocol.impl.core.TcpHandler.COMPONENT_NAMES; import org.opendaylight.openflowjava.protocol.impl.util.ByteBufUtils; @@ -28,6 +29,8 @@ public class TlsDetector extends ByteToMessageDecoder { private boolean detectSsl; private static final Logger LOGGER = LoggerFactory .getLogger(TlsDetector.class); + + private ConnectionFacade connectionFacade; /** * Constructor of class @@ -79,6 +82,19 @@ public class TlsDetector extends ByteToMessageDecoder { } else { LOGGER.info("Connection is not encrypted"); } + + if (connectionFacade != null) { + LOGGER.debug("Firing onConnectionReady notification"); + connectionFacade.fireConnectionReadyNotification(); + } + ctx.pipeline().remove(COMPONENT_NAMES.TLS_DETECTOR.name()); } + + /** + * @param connectionFacade the connectionFacade to set + */ + public void setConnectionFacade(ConnectionFacade connectionFacade) { + this.connectionFacade = connectionFacade; + } } diff --git a/openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/impl/integration/IntegrationTest.java b/openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/impl/integration/IntegrationTest.java index 31c9c635..d7d3ccf1 100644 --- a/openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/impl/integration/IntegrationTest.java +++ b/openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/impl/integration/IntegrationTest.java @@ -93,9 +93,9 @@ public class IntegrationTest { public void testHandshakeAndEcho() throws Exception { int amountOfCLients = 1; Stack scenario = ScenarioFactory.createHandshakeScenario(); - scenario.add(0, new SleepEvent(1500)); + scenario.add(0, new SleepEvent(100)); scenario.add(0, new SendEvent(ByteBufUtils.hexStringToBytes("04 02 00 08 00 00 00 04"))); - scenario.add(0, new SleepEvent(1500)); + scenario.add(0, new SleepEvent(100)); scenario.add(0, new WaitForMessageEvent(ByteBufUtils.hexStringToBytes("04 03 00 08 00 00 00 04"))); ScenarioHandler handler = new ScenarioHandler(scenario); List clients = createAndStartClient(amountOfCLients, handler); diff --git a/openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/impl/integration/MockPlugin.java b/openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/impl/integration/MockPlugin.java index 73d88fa5..340cba23 100644 --- a/openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/impl/integration/MockPlugin.java +++ b/openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/impl/integration/MockPlugin.java @@ -9,6 +9,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; +import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener; import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder; @@ -41,10 +42,11 @@ import com.google.common.util.concurrent.SettableFuture; * @author michal.polkorab * */ -public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHandler, SystemNotificationsListener { +public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHandler, + SystemNotificationsListener, ConnectionReadyListener { - private static final Logger LOGGER = LoggerFactory.getLogger(MockPlugin.class); - private ConnectionAdapter adapter; + protected static final Logger LOGGER = LoggerFactory.getLogger(MockPlugin.class); + protected ConnectionAdapter adapter; private SettableFuture finishedFuture; private int idleCounter = 0; @@ -61,6 +63,7 @@ public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHan this.adapter = connection; connection.setMessageListener(this); connection.setSystemListener(this); + connection.setConnectionReadyListener(this); } @Override @@ -70,18 +73,20 @@ public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHan } @Override - public void onEchoRequestMessage(EchoRequestMessage notification) { - LOGGER.debug("EchoRequest message received"); - LOGGER.debug("Building EchoReplyInput"); - EchoReplyInputBuilder replyBuilder = new EchoReplyInputBuilder(); - replyBuilder.setVersion((short) 4); - replyBuilder.setXid(notification.getXid()); - EchoReplyInput echoReplyInput = replyBuilder.build(); - LOGGER.debug("EchoReplyInput built"); - LOGGER.debug("Going to send EchoReplyInput"); - adapter.echoReply(echoReplyInput); - LOGGER.debug("EchoReplyInput sent"); - LOGGER.debug("adapter: "+adapter); + public void onEchoRequestMessage(final EchoRequestMessage notification) { + new Thread(new Runnable() { + @Override + public void run() { + LOGGER.debug("EchoRequest message received"); + EchoReplyInputBuilder replyBuilder = new EchoReplyInputBuilder(); + replyBuilder.setVersion((short) 4); + replyBuilder.setXid(notification.getXid()); + EchoReplyInput echoReplyInput = replyBuilder.build(); + adapter.echoReply(echoReplyInput); + LOGGER.debug("EchoReplyInput sent"); + LOGGER.debug("adapter: "+adapter); + } + }).start(); } @Override @@ -104,17 +109,31 @@ public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHan @Override public void onHelloMessage(HelloMessage notification) { - LOGGER.debug("adapter: "+adapter); - LOGGER.debug("Hello message received"); - HelloInputBuilder hib = new HelloInputBuilder(); + new Thread(new Runnable() { + @Override + public void run() { + LOGGER.debug("Hello message received"); + HelloInputBuilder hib = new HelloInputBuilder(); + hib.setVersion((short) 4); + hib.setXid(2L); + HelloInput hi = hib.build(); + adapter.hello(hi); + LOGGER.debug("hello msg sent"); + new Thread(new Runnable() { + @Override + public void run() { + sendFeaturesReply(); + } + }).start(); + LOGGER.debug("adapter: "+adapter); + } + }).start(); + } + + protected void sendFeaturesReply() { GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder(); - hib.setVersion((short) 4); - hib.setXid(2L); featuresBuilder.setVersion((short) 4); featuresBuilder.setXid(3L); - HelloInput hi = hib.build(); - adapter.hello(hi); - LOGGER.debug("hello msg sent"); GetFeaturesInput featuresInput = featuresBuilder.build(); try { LOGGER.debug("Going to send featuresRequest"); @@ -131,10 +150,8 @@ public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHan } } catch (InterruptedException | ExecutionException | TimeoutException e) { LOGGER.error(e.getMessage(), e); - // TODO - Collect exceptions and check for existence in tests } LOGGER.info("After FeaturesReply message"); - LOGGER.debug("adapter: "+adapter); } protected void shutdown() { @@ -146,7 +163,7 @@ public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHan Future disconnect = adapter.disconnect(); disconnect.get(); LOGGER.info("Disconnected"); - } + } } catch (Exception e) { LOGGER.error(e.getMessage(), e); } @@ -202,6 +219,11 @@ public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHan public int getIdleCounter() { return idleCounter; } + + @Override + public void onConnectionReady() { + LOGGER.debug("connection ready notification arrived"); + } } diff --git a/simple-client/src/main/java/org/opendaylight/openflowjava/protocol/impl/clients/SendEvent.java b/simple-client/src/main/java/org/opendaylight/openflowjava/protocol/impl/clients/SendEvent.java index d0ec8b9f..e59bce07 100644 --- a/simple-client/src/main/java/org/opendaylight/openflowjava/protocol/impl/clients/SendEvent.java +++ b/simple-client/src/main/java/org/opendaylight/openflowjava/protocol/impl/clients/SendEvent.java @@ -29,22 +29,11 @@ public class SendEvent implements ClientEvent { @Override public boolean eventExecuted() { LOGGER.debug("sending message"); - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - LOGGER.debug("start of run"); - ByteBuf buffer = ctx.alloc().buffer(); - buffer.writeBytes(msgToSend); - ctx.writeAndFlush(buffer); - LOGGER.debug(">> " + ByteBufUtils.bytesToHexString(msgToSend)); - } - }); - thread.start(); - try { - thread.join(); - } catch (InterruptedException e) { - LOGGER.error(e.getMessage(), e); - } + LOGGER.debug("start of run"); + ByteBuf buffer = ctx.alloc().buffer(); + buffer.writeBytes(msgToSend); + ctx.writeAndFlush(buffer); + LOGGER.debug(">> " + ByteBufUtils.bytesToHexString(msgToSend)); LOGGER.debug("message sent"); return true; } diff --git a/simple-client/src/main/java/org/opendaylight/openflowjava/protocol/impl/clients/SimpleClientHandler.java b/simple-client/src/main/java/org/opendaylight/openflowjava/protocol/impl/clients/SimpleClientHandler.java index 110570cd..04147af0 100644 --- a/simple-client/src/main/java/org/opendaylight/openflowjava/protocol/impl/clients/SimpleClientHandler.java +++ b/simple-client/src/main/java/org/opendaylight/openflowjava/protocol/impl/clients/SimpleClientHandler.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.SettableFuture; public class SimpleClientHandler extends ChannelInboundHandlerAdapter { protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleClientHandler.class); + private static final int OFP_HEADER_LENGTH = 8; private SettableFuture isOnlineFuture; protected ScenarioHandler scenarioHandler; @@ -38,10 +39,15 @@ public class SimpleClientHandler extends ChannelInboundHandlerAdapter { if (LOGGER.isDebugEnabled()) { LOGGER.debug("<< " + ByteBufUtils.byteBufToHexString(bb)); } - byte[] message = new byte[8]; + + if (bb.readableBytes() < OFP_HEADER_LENGTH) { + LOGGER.debug("too few bytes received: "+bb.readableBytes()+" - wait for next data portion"); + return; + } + int msgSize = bb.getUnsignedShort(2); + byte[] message = new byte[msgSize]; bb.readBytes(message); scenarioHandler.addOfMsg(message); - skipMsg(bb); LOGGER.info("end of read"); } @@ -57,12 +63,6 @@ public class SimpleClientHandler extends ChannelInboundHandlerAdapter { } - private static void skipMsg(ByteBuf bb) { - if (bb.readableBytes() > 0) { - bb.skipBytes(bb.getShort(2)); - } - } - /** * @param scenarioHandler handler of scenario events */