Ordered message execution ensured 66/2166/4
authorMichal Rehak <mirehak@cisco.com>
Fri, 25 Oct 2013 13:49:35 +0000 (15:49 +0200)
committerMichal Rehak <mirehak@cisco.com>
Tue, 5 Nov 2013 10:24:16 +0000 (11:24 +0100)
Removed threadpool
IdleStateHandler - added javadoc
IntegrationTest - sleepEvents' sleep time reduced
SendEvent no longer uses threads to send message

Added notification of connection ready status
Fixed SimpleClient (msg length)

Change-Id: If517c0e31123b5609511dc3ed90b44d8427b08d6
Signed-off-by: Michal Polkorab <michal.polkorab@pantheon.sk>
Signed-off-by: Michal Rehak <mirehak@cisco.com>
12 files changed:
openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionAdapter.java
openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionReadyListener.java [new file with mode: 0644]
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterFactory.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/IdleHandler.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/PublishingChannelInitializer.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandler.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TlsDetector.java
openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/impl/integration/IntegrationTest.java
openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/impl/integration/MockPlugin.java
simple-client/src/main/java/org/opendaylight/openflowjava/protocol/impl/clients/SendEvent.java
simple-client/src/main/java/org/opendaylight/openflowjava/protocol/impl/clients/SimpleClientHandler.java

index 3fba6303dd1c98cd644d806d80a4ade1c4024cd0..74a9bdccf204393e185d68239da65fc97b7ba319 100644 (file)
@@ -40,4 +40,15 @@ public interface ConnectionAdapter extends OpenflowProtocolService {
      */
     public void checkListeners();
 
+    /**
+     * notify listener about connection ready-to-use event
+     */
+    public void fireConnectionReadyNotification();
+
+    /**
+     * set listener for connection became ready-to-use event  
+     * @param connectionReadyListener
+     */
+    public void setConnectionReadyListener(ConnectionReadyListener connectionReadyListener);
+
 }
diff --git a/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionReadyListener.java b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionReadyListener.java
new file mode 100644 (file)
index 0000000..44afca0
--- /dev/null
@@ -0,0 +1,20 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.api.connection;
+
+/**
+ * @author mirehak
+ *
+ */
+public interface ConnectionReadyListener {
+
+    /**
+     * fired when connection becomes ready-to-use
+     */
+    public void onConnectionReady();
+}
index dcdb15a2d1666349719214ec6a7c821918d576de..754a2d9d5eb584975f2bf3b2ab39961eefdf94be 100644 (file)
@@ -14,7 +14,7 @@ public abstract class ConnectionAdapterFactory {
      * @param ch
      * @return connection adapter tcp-implementation
      */
-    public static ConnectionFacade createConnectionAdapter(SocketChannel ch) {
+    public static ConnectionFacade createConnectionFacade(SocketChannel ch) {
         ConnectionAdapterImpl connectionAdapter = new ConnectionAdapterImpl();
         connectionAdapter.setChannel(ch);
         return connectionAdapter;
index fe90483daf46ea60a22f1e20c7308e031ccb0286..d2c8f1be4278590ea18de8b3b1c9d0d33e174e7e 100644 (file)
@@ -9,13 +9,12 @@ import io.netty.util.concurrent.GenericFutureListener;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.opendaylight.controller.sal.common.util.RpcErrors;
 import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
@@ -55,6 +54,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.Notification;
 import org.opendaylight.yangtools.yang.common.RpcError;
@@ -91,8 +91,9 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
     protected Cache<RpcResponseKey, SettableFuture<?>> responseCache;
     private SystemNotificationsListener systemListener;
     private boolean disconnectOccured = false;
-    private ExecutorService threadPool;
-    
+
+    protected ConnectionReadyListener connectionReadyListener;
+
     /**
      * default ctor 
      */
@@ -101,7 +102,6 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
                 .concurrencyLevel(1)
                 .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
                 .removalListener(new ResponseRemovalListener()).build();
-        threadPool = Executors.newCachedThreadPool();
         LOG.info("ConnectionAdapter created");
     }
     
@@ -235,17 +235,8 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
     }
     
     @Override
-    public void consume(final DataObject message) {
-        threadPool.execute(new Runnable() {
-            @Override
-            public void run() {
-                consumeIntern(message);
-            }
-        });
-    }
-
-    protected void consumeIntern(final DataObject message) {
-        LOG.debug("Consume msg");
+    public void consume(DataObject message) {
+        LOG.debug("ConsumeIntern msg");
         if (disconnectOccured ) {
             return;
         }
@@ -398,8 +389,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
      * @param errorMessage
      * @param input
      * @param responseClazz
-     * @param key TODO
-     * @param future TODO
+     * @param key of rpcResponse
      * @return
      */
     private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> handleRpcChannelFutureWithResponse(
@@ -519,14 +509,16 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
         if (messageListener == null) {
             buffer.append("MessageListener ");
         }
+        if (connectionReadyListener == null) {
+            buffer.append("ConnectionReadyListener ");
+        }
         
         if (buffer.length() > 0) {
             throw new IllegalStateException("Missing listeners: " + buffer.toString());
         }
     }
-    
-    static class ResponseRemovalListener implements RemovalListener<RpcResponseKey, SettableFuture<?>> {
 
+    static class ResponseRemovalListener implements RemovalListener<RpcResponseKey, SettableFuture<?>> {
         @Override
         public void onRemoval(
                 RemovalNotification<RpcResponseKey, SettableFuture<?>> notification) {
@@ -538,4 +530,32 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
         }
     }
 
+    /**
+     * Class is used ONLY for exiting msgQueue processing thread
+     * @author michal.polkorab
+     */
+    static class ExitingDataObject implements DataObject {
+        @Override
+        public Class<? extends DataContainer> getImplementedInterface() {
+            return null;
+        }
+    }
+    
+    @Override
+    public void fireConnectionReadyNotification() {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                connectionReadyListener.onConnectionReady();
+            }
+        }).start();
+    }
+    
+    
+    @Override
+    public void setConnectionReadyListener(
+            ConnectionReadyListener connectionReadyListener) {
+        this.connectionReadyListener = connectionReadyListener;
+    }
+    
 }
index f187b72511eb7d70578d9310a03f3d7a168953b9..b31a021dd1b57081c86db68f22e8afad3e9b9720 100644 (file)
@@ -12,10 +12,22 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.S
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * 
+ * @author michal.polkorab
+ *
+ */
 public class IdleHandler extends IdleStateHandler{
-    
+
     private static final Logger LOGGER = LoggerFactory.getLogger(IdleHandler.class);
 
+    /**
+     * 
+     * @param readerIdleTime
+     * @param writerIdleTime
+     * @param allIdleTime
+     * @param unit
+     */
     public IdleHandler(long readerIdleTime, long writerIdleTime,
             long allIdleTime, TimeUnit unit) {
         super(readerIdleTime, writerIdleTime, allIdleTime, unit);
index 646a997f5b9a97beb70df85c1962f44b8582e331..930688305573998bfa3cd57f07cd7f9c7449731e 100644 (file)
@@ -47,19 +47,23 @@ public class PublishingChannelInitializer extends ChannelInitializer<SocketChann
         }
         LOGGER.info("Incoming connection accepted - building pipeline");
         allChannels.add(ch);
-        ConnectionFacade connectionAdapter = null;
-        connectionAdapter = ConnectionAdapterFactory.createConnectionAdapter(ch);
+        ConnectionFacade connectionFacade = null;
+        connectionFacade = ConnectionAdapterFactory.createConnectionFacade(ch);
         try {
             LOGGER.debug("calling plugin: "+switchConnectionHandler);
-            switchConnectionHandler.onSwitchConnected(connectionAdapter);
-            connectionAdapter.checkListeners();
+            switchConnectionHandler.onSwitchConnected(connectionFacade);
+            connectionFacade.checkListeners();
+
+            TlsDetector tlsDetector = new TlsDetector();
+            tlsDetector.setConnectionFacade(connectionFacade);
+            
             ch.pipeline().addLast(COMPONENT_NAMES.IDLE_HANDLER.name(), new IdleHandler(switchIdleTimeout, 0, 0, TimeUnit.MILLISECONDS));
-            ch.pipeline().addLast(COMPONENT_NAMES.TLS_DETECTOR.name(), new TlsDetector());
+            ch.pipeline().addLast(COMPONENT_NAMES.TLS_DETECTOR.name(), tlsDetector);
             ch.pipeline().addLast(COMPONENT_NAMES.OF_FRAME_DECODER.name(), new OFFrameDecoder());
             ch.pipeline().addLast(COMPONENT_NAMES.OF_VERSION_DETECTOR.name(), new OFVersionDetector());
             ch.pipeline().addLast(COMPONENT_NAMES.OF_DECODER.name(), new OF13Decoder());
             ch.pipeline().addLast(COMPONENT_NAMES.OF_ENCODER.name(), new OF13Encoder());
-            ch.pipeline().addLast(COMPONENT_NAMES.DELEGATING_INBOUND_HANDLER.name(), new DelegatingInboundHandler(connectionAdapter));
+            ch.pipeline().addLast(COMPONENT_NAMES.DELEGATING_INBOUND_HANDLER.name(), new DelegatingInboundHandler(connectionFacade));
         } catch (Exception e) {
             LOGGER.error(e.getMessage(), e);
             ch.close();
index 943bfeec7f1f782691579b7fa648807e768e893d..0ae503a8a91fe661ddd0d633583a9560b3c7c47b 100644 (file)
@@ -153,7 +153,9 @@ public class TcpHandler implements ServerFacade {
             public void operationComplete(
                     io.netty.util.concurrent.Future<Object> downResult) throws Exception {
                 result.set(downResult.isSuccess());
-                result.setException(downResult.cause());
+                if (downResult.cause() != null) {
+                    result.setException(downResult.cause());
+                }
             }
             
         });
index 6c4dbebb9030f3a742310d2dd5a96ade753bd9b6..fa8793de79e2f0ddb2c8fc20a74bdacc937087e6 100644 (file)
@@ -13,6 +13,7 @@ import javax.net.ssl.SSLEngine;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.opendaylight.openflowjava.protocol.impl.connection.ConnectionFacade;
 import org.opendaylight.openflowjava.protocol.impl.core.TcpHandler.COMPONENT_NAMES;
 import org.opendaylight.openflowjava.protocol.impl.util.ByteBufUtils;
 
@@ -28,6 +29,8 @@ public class TlsDetector extends ByteToMessageDecoder {
     private boolean detectSsl;
     private static final Logger LOGGER = LoggerFactory
             .getLogger(TlsDetector.class);
+    
+    private ConnectionFacade connectionFacade;
 
     /**
      * Constructor of class
@@ -79,6 +82,19 @@ public class TlsDetector extends ByteToMessageDecoder {
         } else {
             LOGGER.info("Connection is not encrypted");
         }
+        
+        if (connectionFacade != null) {
+            LOGGER.debug("Firing onConnectionReady notification");
+            connectionFacade.fireConnectionReadyNotification();
+        }
+        
         ctx.pipeline().remove(COMPONENT_NAMES.TLS_DETECTOR.name());
     }
+    
+    /**
+     * @param connectionFacade the connectionFacade to set
+     */
+    public void setConnectionFacade(ConnectionFacade connectionFacade) {
+        this.connectionFacade = connectionFacade;
+    }
 }
index 31c9c6354018a56e1fb3083586f59b144d0eed03..d7d3ccf17f623111488894b80cfcacb953a97373 100644 (file)
@@ -93,9 +93,9 @@ public class IntegrationTest {
     public void testHandshakeAndEcho() throws Exception {\r
         int amountOfCLients = 1;\r
         Stack<ClientEvent> scenario = ScenarioFactory.createHandshakeScenario();\r
-        scenario.add(0, new SleepEvent(1500));\r
+        scenario.add(0, new SleepEvent(100));\r
         scenario.add(0, new SendEvent(ByteBufUtils.hexStringToBytes("04 02 00 08 00 00 00 04")));\r
-        scenario.add(0, new SleepEvent(1500));\r
+        scenario.add(0, new SleepEvent(100));\r
         scenario.add(0, new WaitForMessageEvent(ByteBufUtils.hexStringToBytes("04 03 00 08 00 00 00 04")));\r
         ScenarioHandler handler = new ScenarioHandler(scenario);\r
         List<SimpleClient> clients = createAndStartClient(amountOfCLients, handler);\r
index 73d88fa5e9de8f7722a57364250563b81d13024c..340cba23f2aa05730f59730d3b31d9c11a73c4e6 100644 (file)
@@ -9,6 +9,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;\r
 \r
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;\r
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;\r
 import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;\r
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;\r
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;\r
@@ -41,10 +42,11 @@ import com.google.common.util.concurrent.SettableFuture;
  * @author michal.polkorab\r
  *\r
  */\r
-public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHandler, SystemNotificationsListener {\r
+public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHandler, \r
+        SystemNotificationsListener, ConnectionReadyListener {\r
 \r
-    private static final Logger LOGGER = LoggerFactory.getLogger(MockPlugin.class);\r
-    private ConnectionAdapter adapter;\r
+    protected static final Logger LOGGER = LoggerFactory.getLogger(MockPlugin.class);\r
+    protected ConnectionAdapter adapter;\r
     private SettableFuture<Void> finishedFuture;\r
     private int idleCounter = 0;\r
 \r
@@ -61,6 +63,7 @@ public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHan
         this.adapter = connection;\r
         connection.setMessageListener(this);\r
         connection.setSystemListener(this);\r
+        connection.setConnectionReadyListener(this);\r
     }\r
 \r
     @Override\r
@@ -70,18 +73,20 @@ public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHan
     }\r
 \r
     @Override\r
-    public void onEchoRequestMessage(EchoRequestMessage notification) {\r
-        LOGGER.debug("EchoRequest message received");\r
-        LOGGER.debug("Building EchoReplyInput");\r
-        EchoReplyInputBuilder replyBuilder = new EchoReplyInputBuilder();\r
-        replyBuilder.setVersion((short) 4);\r
-        replyBuilder.setXid(notification.getXid());\r
-        EchoReplyInput echoReplyInput = replyBuilder.build();\r
-        LOGGER.debug("EchoReplyInput built");\r
-        LOGGER.debug("Going to send EchoReplyInput");\r
-        adapter.echoReply(echoReplyInput);\r
-        LOGGER.debug("EchoReplyInput sent");\r
-        LOGGER.debug("adapter: "+adapter);\r
+    public void onEchoRequestMessage(final EchoRequestMessage notification) {\r
+        new Thread(new Runnable() {\r
+            @Override\r
+            public void run() {\r
+                LOGGER.debug("EchoRequest message received");\r
+                EchoReplyInputBuilder replyBuilder = new EchoReplyInputBuilder();\r
+                replyBuilder.setVersion((short) 4);\r
+                replyBuilder.setXid(notification.getXid());\r
+                EchoReplyInput echoReplyInput = replyBuilder.build();\r
+                adapter.echoReply(echoReplyInput);\r
+                LOGGER.debug("EchoReplyInput sent");\r
+                LOGGER.debug("adapter: "+adapter);\r
+            }\r
+        }).start();\r
     }\r
 \r
     @Override\r
@@ -104,17 +109,31 @@ public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHan
 \r
     @Override\r
     public void onHelloMessage(HelloMessage notification) {\r
-        LOGGER.debug("adapter: "+adapter);\r
-        LOGGER.debug("Hello message received");\r
-        HelloInputBuilder hib = new HelloInputBuilder();\r
+        new Thread(new Runnable() {\r
+            @Override\r
+            public void run() {\r
+                LOGGER.debug("Hello message received");\r
+                HelloInputBuilder hib = new HelloInputBuilder();\r
+                hib.setVersion((short) 4);\r
+                hib.setXid(2L);\r
+                HelloInput hi = hib.build();\r
+                adapter.hello(hi);\r
+                LOGGER.debug("hello msg sent");\r
+                new Thread(new Runnable() {\r
+                    @Override\r
+                    public void run() {\r
+                        sendFeaturesReply();\r
+                    }\r
+                }).start();\r
+                LOGGER.debug("adapter: "+adapter);\r
+            }\r
+        }).start();\r
+    }\r
+    \r
+    protected void sendFeaturesReply() {\r
         GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();\r
-        hib.setVersion((short) 4);\r
-        hib.setXid(2L);\r
         featuresBuilder.setVersion((short) 4);\r
         featuresBuilder.setXid(3L);\r
-        HelloInput hi = hib.build();\r
-        adapter.hello(hi);\r
-        LOGGER.debug("hello msg sent");\r
         GetFeaturesInput featuresInput = featuresBuilder.build();\r
         try {\r
             LOGGER.debug("Going to send featuresRequest");\r
@@ -131,10 +150,8 @@ public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHan
             }\r
         } catch (InterruptedException | ExecutionException | TimeoutException e) {\r
             LOGGER.error(e.getMessage(), e);\r
-            // TODO - Collect exceptions and check for existence in tests\r
         }\r
         LOGGER.info("After FeaturesReply message");\r
-        LOGGER.debug("adapter: "+adapter);\r
     }\r
 \r
     protected void shutdown() {\r
@@ -146,7 +163,7 @@ public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHan
                 Future<Boolean> disconnect = adapter.disconnect();\r
                 disconnect.get();\r
                 LOGGER.info("Disconnected");\r
-            }\r
+            } \r
         } catch (Exception e) {\r
             LOGGER.error(e.getMessage(), e);\r
         }\r
@@ -202,6 +219,11 @@ public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHan
     public int getIdleCounter() {\r
         return idleCounter;\r
     }\r
+    \r
+    @Override\r
+    public void onConnectionReady() {\r
+        LOGGER.debug("connection ready notification arrived");\r
+    }\r
 \r
 \r
 }\r
index d0ec8b9f560769905c00b3fed8c6150c8e1f98b7..e59bce0775eb5d6a3734ce53e2e5bd3b80983887 100644 (file)
@@ -29,22 +29,11 @@ public class SendEvent implements ClientEvent {
     @Override
     public boolean eventExecuted() {
         LOGGER.debug("sending message");
-        Thread thread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                LOGGER.debug("start of run");
-                ByteBuf buffer = ctx.alloc().buffer();
-                buffer.writeBytes(msgToSend);
-                ctx.writeAndFlush(buffer);
-                LOGGER.debug(">> " + ByteBufUtils.bytesToHexString(msgToSend));
-            }
-        });
-        thread.start();
-        try {
-            thread.join();
-        } catch (InterruptedException e) {
-            LOGGER.error(e.getMessage(), e);
-        }
+        LOGGER.debug("start of run");
+        ByteBuf buffer = ctx.alloc().buffer();
+        buffer.writeBytes(msgToSend);
+        ctx.writeAndFlush(buffer);
+        LOGGER.debug(">> " + ByteBufUtils.bytesToHexString(msgToSend));
         LOGGER.debug("message sent");
         return true;
     }
index 110570cdd842c0cd372be4c48a05a2a8522e1ff9..04147af06470a639d3cc75ba2d19ee6217e27f7e 100644 (file)
@@ -19,6 +19,7 @@ import com.google.common.util.concurrent.SettableFuture;
 public class SimpleClientHandler extends ChannelInboundHandlerAdapter {\r
 \r
     protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleClientHandler.class);\r
+    private static final int OFP_HEADER_LENGTH = 8;\r
     private SettableFuture<Boolean> isOnlineFuture;\r
     protected ScenarioHandler scenarioHandler;\r
 \r
@@ -38,10 +39,15 @@ public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
         if (LOGGER.isDebugEnabled()) {\r
             LOGGER.debug("<< " + ByteBufUtils.byteBufToHexString(bb));\r
         }\r
-        byte[] message = new byte[8];\r
+        \r
+        if (bb.readableBytes() < OFP_HEADER_LENGTH) {\r
+            LOGGER.debug("too few bytes received: "+bb.readableBytes()+" - wait for next data portion");\r
+            return;\r
+        }\r
+        int msgSize = bb.getUnsignedShort(2);\r
+        byte[] message = new byte[msgSize];\r
         bb.readBytes(message);\r
         scenarioHandler.addOfMsg(message);\r
-        skipMsg(bb);\r
         LOGGER.info("end of read");\r
     }\r
 \r
@@ -57,12 +63,6 @@ public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
         \r
     }\r
 \r
-    private static void skipMsg(ByteBuf bb) {\r
-        if (bb.readableBytes() > 0) {\r
-            bb.skipBytes(bb.getShort(2));\r
-        }\r
-    }\r
-\r
     /**\r
      * @param scenarioHandler handler of scenario events\r
      */\r