bug 2446 - High priority (control) queue stop reading from channel if is full
[openflowplugin.git] / openflowplugin / src / test / java / org / opendaylight / openflowplugin / openflow / md / core / plan / ConnectionAdapterStackImpl.java
index eb9d411a944ffa5b0b98b2d11d86ef51fd4de01e..9af46b4379ddceea8ec1d33fd8b6b23144b560bf 100644 (file)
@@ -8,17 +8,16 @@
 
 package org.opendaylight.openflowplugin.openflow.md.core.plan;
 
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Stack;
-import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import org.opendaylight.controller.sal.common.util.RpcErrors;
 import org.opendaylight.controller.sal.common.util.Rpcs;
@@ -48,7 +47,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
@@ -61,6 +60,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEventBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
 import org.opendaylight.yangtools.yang.binding.Notification;
 import org.opendaylight.yangtools.yang.common.RpcError;
@@ -70,6 +70,7 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.SettableFuture;
 
@@ -89,8 +90,6 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     protected SystemNotificationsListener systemListener;
 
     protected Map<Long, SettableFuture<?>> rpcResults = new HashMap<>();
-    private ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
-            8);
     protected boolean planTouched = false;
 
     private long proceedTimeout;
@@ -99,6 +98,10 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
 
     private ConnectionReadyListener connectionReadyListener;
 
+    private int planItemCounter;
+
+    private boolean autoRead = true;
+
     /**
      * default ctor
      */
@@ -107,155 +110,143 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     }
 
     @Override
-    public Future<RpcResult<BarrierOutput>> barrier(BarrierInput arg0) {
-        checkRpc(arg0, "barrier");
+    public synchronized Future<RpcResult<BarrierOutput>> barrier(
+            BarrierInput arg0) {
+        checkRpcAndNext(arg0, "barrier");
         SettableFuture<RpcResult<BarrierOutput>> result = createAndRegisterRpcResult(arg0);
-        next();
         return result;
     }
 
     @Override
-    public Future<RpcResult<EchoOutput>> echo(EchoInput arg0) {
-        checkRpc(arg0, "echo");
+    public synchronized Future<RpcResult<EchoOutput>> echo(EchoInput arg0) {
+        checkRpcAndNext(arg0, "echo");
         Future<RpcResult<EchoOutput>> result = createAndRegisterRpcResult(arg0);
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> echoReply(EchoReplyInput arg0) {
-        checkRpc(arg0, "echoReply");
+        checkRpcAndNext(arg0, "echoReply");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> experimenter(ExperimenterInput arg0) {
-        checkRpc(arg0, "experimenter");
+        checkRpcAndNext(arg0, "experimenter");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> flowMod(FlowModInput arg0) {
-        checkRpc(arg0, "flowMod");
+        checkRpcAndNext(arg0, "flowMod");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
-    public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput arg0) {
-        checkRpc(arg0, "echo");
+    public synchronized Future<RpcResult<GetAsyncOutput>> getAsync(
+            GetAsyncInput arg0) {
+        checkRpcAndNext(arg0, "echo");
         Future<RpcResult<GetAsyncOutput>> result = createAndRegisterRpcResult(arg0);
-        next();
         return result;
     }
 
     @Override
-    public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput arg0) {
-        checkRpc(arg0, "echo");
+    public synchronized Future<RpcResult<GetConfigOutput>> getConfig(
+            GetConfigInput arg0) {
+        checkRpcAndNext(arg0, "echo");
         Future<RpcResult<GetConfigOutput>> result = createAndRegisterRpcResult(arg0);
-        next();
         return result;
     }
 
     @Override
-    public Future<RpcResult<GetFeaturesOutput>> getFeatures(
+    public synchronized Future<RpcResult<GetFeaturesOutput>> getFeatures(
             GetFeaturesInput arg0) {
-        checkRpc(arg0, "getFeatures");
+        checkRpcAndNext(arg0, "getFeatures");
         Future<RpcResult<GetFeaturesOutput>> result = createAndRegisterRpcResult(arg0);
-        next();
         return result;
     }
 
     @Override
-    public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
+    public synchronized Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
             GetQueueConfigInput arg0) {
-        checkRpc(arg0, "echo");
+        checkRpcAndNext(arg0, "echo");
         Future<RpcResult<GetQueueConfigOutput>> result = createAndRegisterRpcResult(arg0);
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> groupMod(GroupModInput arg0) {
-        checkRpc(arg0, "groupMod");
+        checkRpcAndNext(arg0, "groupMod");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> hello(HelloInput arg0) {
-        checkRpc(arg0, "helloReply");
+        checkRpcAndNext(arg0, "helloReply");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> meterMod(MeterModInput arg0) {
-        checkRpc(arg0, "meterMod");
+        checkRpcAndNext(arg0, "meterMod");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> packetOut(PacketOutInput arg0) {
-        checkRpc(arg0, "packetOut");
+        checkRpcAndNext(arg0, "packetOut");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> portMod(PortModInput arg0) {
-        checkRpc(arg0, "portMod");
+        checkRpcAndNext(arg0, "portMod");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
-    public Future<RpcResult<RoleRequestOutput>> roleRequest(
+    public synchronized Future<RpcResult<RoleRequestOutput>> roleRequest(
             RoleRequestInput arg0) {
-        checkRpc(arg0, "echo");
+        checkRpcAndNext(arg0, "echo");
         Future<RpcResult<RoleRequestOutput>> result = createAndRegisterRpcResult(arg0);
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> setAsync(SetAsyncInput arg0) {
-        checkRpc(arg0, "setAsync");
+        checkRpcAndNext(arg0, "setAsync");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> setConfig(SetConfigInput arg0) {
-        checkRpc(arg0, "setConfig");
+        checkRpcAndNext(arg0, "setConfig");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> tableMod(TableModInput arg0) {
-        checkRpc(arg0, "tableMod");
+        checkRpcAndNext(arg0, "tableMod");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
     public Future<Boolean> disconnect() {
-        // TODO Auto-generated method stub
+        LOG.debug("adapter is told to disconnect");
+        DisconnectEventBuilder disconnectEventBuilder = new DisconnectEventBuilder();
+        disconnectEventBuilder.setInfo("disconnected by plugin");
+        systemListener.onDisconnectEvent(disconnectEventBuilder.build());
         return null;
     }
 
@@ -272,7 +263,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
 
     @Override
     public void checkListeners() {
-        if (ofListener == null || systemListener == null || connectionReadyListener == null) {
+        if (ofListener == null || systemListener == null
+                || connectionReadyListener == null) {
             occuredExceptions
                     .add(new IllegalStateException("missing listeners"));
         }
@@ -289,38 +281,132 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
      * @param rpcName
      *            rpc yang name
      */
-    private synchronized void checkRpc(OfHeader rpcInput, String rpcName) {
+    private boolean checkRpc(OfHeader rpcInput, String rpcName) {
         String msg = null;
-        LOG.debug("checking rpc: " + rpcName);
-        if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)) {
-            msg = "expected [rpc], got [" + rpcInput.getClass().getSimpleName()
-                    + "]";
+        boolean finished = true;
+
+        if (eventPlan.isEmpty()) {
+            throw new IllegalStateException("eventPlan already depleted");
+        }
+
+        LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName,
+                rpcInput.getVersion(), rpcInput.getXid());
+        if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)
+                && !(eventPlan.peek() instanceof SwitchTestWaitForAllEvent)) {
+            if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
+                SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan
+                        .peek());
+                msg = "expected [notification: "
+                        + notifEvent.getPlannedNotification() + "], got ["
+                        + rpcInput.getClass().getSimpleName() + "]";
+            } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
+                SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan
+                        .peek());
+                msg = "expected [rpc: " + rpcEvent.getPlannedRpcResponse()
+                        + "], got [" + rpcInput.getClass().getSimpleName()
+                        + "]";
+            }
         } else {
-            SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
-                    .peek();
-            if (!rpcName.equals(switchTestRpcEvent.getRpcName())) {
-                msg = "expected rpc name [" + switchTestRpcEvent.getRpcName()
-                        + "], got [" + rpcName + "]";
-            } else if (!rpcInput.getXid().equals(switchTestRpcEvent.getXid())) {
-                msg = "expected xid [" + switchTestRpcEvent.getXid()
-                        + "], got [" + rpcInput.getXid() + "]";
+            if (eventPlan.peek() instanceof SwitchTestWaitForAllEvent) {
+                SwitchTestWaitForAllEvent switchTestWaitForAll = (SwitchTestWaitForAllEvent) eventPlan
+                        .peek();
+                Set<SwitchTestWaitForRpcEvent> eventBag = switchTestWaitForAll
+                        .getWaitEventBag();
+                List<String> msgLot = new ArrayList<>();
+
+                if (eventBag == null || eventBag.isEmpty()) {
+                    msg = "no wait events in bag";
+                } else {
+                    finished = false;
+                    for (SwitchTestWaitForRpcEvent switchTestWaitForRpc : eventBag) {
+                        String msgPart = checkSingleRpcContent(rpcInput,
+                                rpcName, switchTestWaitForRpc);
+
+                        if (msgPart != null) {
+                            msgLot.add(msgPart);
+                        } else {
+                            LOG.debug("wait event matched: {}", rpcName);
+                            eventBag.remove(switchTestWaitForRpc);
+                            if (eventBag.isEmpty()) {
+                                finished = true;
+                            }
+                            msgLot.clear();
+                            break;
+                        }
+                    }
+                }
+
+                if (!msgLot.isEmpty()) {
+                    msg = Joiner.on(" | ").join(msgLot);
+                }
+            } else if (eventPlan.peek() instanceof SwitchTestWaitForRpcEvent) {
+                SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
+                        .peek();
+                msg = checkSingleRpcContent(rpcInput, rpcName,
+                        switchTestRpcEvent);
             }
         }
 
         if (msg != null) {
-            LOG.debug("check .. FAILED: " + msg);
-            occuredExceptions.add(new IllegalArgumentException(msg));
+            LOG.debug("rpc check .. FAILED: " + msg);
+            occuredExceptions.add(new IllegalArgumentException("step:"
+                    + planItemCounter + " | " + msg));
+        } else {
+            LOG.debug("rpc check .. OK");
+        }
+        return finished;
+    }
+
+    /**
+     * @param rpcInput
+     * @param rpcName
+     * @param msgTmp
+     * @param switchTestWaitForRpc
+     * @return
+     */
+    private static String checkSingleRpcContent(OfHeader rpcInput,
+            String rpcName, SwitchTestWaitForRpcEvent switchTestWaitForRpc) {
+        String failureMsg = null;
+        if (!rpcName.equals(switchTestWaitForRpc.getRpcName())) {
+            failureMsg = "expected rpc name ["
+                    + switchTestWaitForRpc.getRpcName() + "], got [" + rpcName
+                    + "]";
+        } else if (!rpcInput.getXid().equals(switchTestWaitForRpc.getXid())) {
+            failureMsg = "expected " + rpcName + ".xid ["
+                    + switchTestWaitForRpc.getXid() + "], got ["
+                    + rpcInput.getXid() + "]";
+        }
+
+        return failureMsg;
+    }
+
+    /**
+     * @param rpcInput
+     *            rpc call parameter
+     * @param rpcName
+     *            rpc yang name
+     */
+    private synchronized void checkRpcAndNext(OfHeader rpcInput, String rpcName) {
+        boolean finished = checkRpc(rpcInput, rpcName);
+        if (finished) {
+            next();
         }
-        LOG.debug("check .. OK");
     }
 
     /**
      * discard current event, execute next, if possible
      */
-    private synchronized void next() {
-        LOG.debug("STEPPING TO NEXT event in plan");
+    private void next() {
+        LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})",
+                planItemCounter, eventPlan.peek());
         eventPlan.pop();
+        planItemCounter++;
         planTouched = true;
+        try {
+            Thread.sleep(JOB_DELAY);
+        } catch (InterruptedException e) {
+            LOG.error(e.getMessage(), e);
+        }
         notify();
     }
 
@@ -329,7 +415,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
      */
     private synchronized void proceed() {
         boolean processed = false;
-        LOG.debug("proceeding plan item: " + eventPlan.peek());
+        LOG.debug("proceeding plan item[{}]: {}", planItemCounter,
+                eventPlan.peek());
         if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
             SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan
                     .peek();
@@ -340,13 +427,23 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
                     .peek();
             processRpcResponse(rpcResponse);
             processed = true;
+        } else if (eventPlan.peek() instanceof SwitchTestCallbackEvent) {
+            SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan
+                    .peek();
+            try {
+                callbackEvent.getCallback().call();
+            } catch (Exception e) {
+                LOG.error(e.getMessage(), e);
+                occuredExceptions.add(e);
+            }
+            processed = true;
         }
 
         if (processed) {
             next();
         } else {
             try {
-                LOG.debug("now waiting for HANDLER to act");
+                LOG.debug("now WAITING for OF_LISTENER to act ..");
                 wait(proceedTimeout);
             } catch (InterruptedException e) {
                 LOG.error(e.getMessage(), e);
@@ -356,140 +453,111 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
 
     @Override
     public void run() {
-        LOG.debug("evenPlan STARTING ..");
+        LOG.debug("|---> evenPlan STARTING ..");
+        planItemCounter = 0;
         while (!eventPlan.isEmpty()) {
             planTouched = false;
             proceed();
             if (!planTouched) {
-                occuredExceptions.add(new IllegalStateException(
-                        "eventPlan STALLED"));
+                occuredExceptions
+                        .add(new IllegalStateException(
+                                "eventPlan STALLED, planItemCounter="
+                                        + planItemCounter));
+                break;
             }
         }
 
         try {
-            pool.awaitTermination(10 * JOB_DELAY, TimeUnit.MILLISECONDS);
+            Thread.sleep(JOB_DELAY);
         } catch (InterruptedException e) {
             LOG.error(e.getMessage(), e);
         }
-        LOG.debug("eventPlan done");
+        LOG.debug("<---| eventPlan DONE");
     }
 
     /**
      * @param notificationEvent
      */
-    private void processNotification(
+    private synchronized void processNotification(
             final SwitchTestNotificationEvent notificationEvent) {
 
-        Callable<Void> notifyCmd = new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-                Notification notification = notificationEvent
-                        .getPlannedNotification();
-                LOG.debug("notificating HANDLER: "
-                        + notification.getClass().getSimpleName());
-
-                // system events
-                if (notification instanceof DisconnectEvent) {
-                    systemListener
-                            .onDisconnectEvent((DisconnectEvent) notification);
-                }
-                // of notifications
-                else if (notification instanceof EchoRequestMessage) {
-                    ofListener
-                            .onEchoRequestMessage((EchoRequestMessage) notification);
-                } else if (notification instanceof ErrorMessage) {
-                    ofListener.onErrorMessage((ErrorMessage) notification);
-                } else if (notification instanceof ExperimenterMessage) {
-                    ofListener
-                            .onExperimenterMessage((ExperimenterMessage) notification);
-                } else if (notification instanceof FlowRemovedMessage) {
-                    ofListener
-                            .onFlowRemovedMessage((FlowRemovedMessage) notification);
-                } else if (notification instanceof HelloMessage) {
-                    ofListener.onHelloMessage((HelloMessage) notification);
-                } else if (notification instanceof MultipartReplyMessage) {
-                    ofListener
-                            .onMultipartReplyMessage((MultipartReplyMessage) notification);
-                } else if (notification instanceof MultipartRequestMessage) {
-                    ofListener
-                            .onMultipartRequestMessage((MultipartRequestMessage) notification);
-                } else if (notification instanceof PacketInMessage) {
-                    ofListener
-                            .onPacketInMessage((PacketInMessage) notification);
-                } else if (notification instanceof PortStatusMessage) {
-                    ofListener
-                            .onPortStatusMessage((PortStatusMessage) notification);
-                }
-                // default
-                else {
-                    occuredExceptions.add(new IllegalStateException(
-                            "message listening not supported for type: "
-                                    + notification.getClass()));
-                }
-
-                LOG.debug("thread finished");
-                return null;
-            }
+        Notification notification = notificationEvent.getPlannedNotification();
+        LOG.debug("notificating OF_LISTENER: "
+                + notification.getClass().getSimpleName());
 
-        };
+        // system events
+        if (notification instanceof DisconnectEvent) {
+            systemListener.onDisconnectEvent((DisconnectEvent) notification);
+        }
+        // of notifications
+        else if (notification instanceof EchoRequestMessage) {
+            ofListener.onEchoRequestMessage((EchoRequestMessage) notification);
+        } else if (notification instanceof ErrorMessage) {
+            ofListener.onErrorMessage((ErrorMessage) notification);
+        } else if (notification instanceof ExperimenterMessage) {
+            ofListener
+                    .onExperimenterMessage((ExperimenterMessage) notification);
+        } else if (notification instanceof FlowRemovedMessage) {
+            ofListener.onFlowRemovedMessage((FlowRemovedMessage) notification);
+        } else if (notification instanceof HelloMessage) {
+            ofListener.onHelloMessage((HelloMessage) notification);
+        } else if (notification instanceof MultipartReplyMessage) {
+            ofListener
+                    .onMultipartReplyMessage((MultipartReplyMessage) notification);
+        } else if (notification instanceof PacketInMessage) {
+            ofListener.onPacketInMessage((PacketInMessage) notification);
+        } else if (notification instanceof PortStatusMessage) {
+            ofListener.onPortStatusMessage((PortStatusMessage) notification);
+        }
+        // default
+        else {
+            occuredExceptions.add(new IllegalStateException("step:"
+                    + planItemCounter + " | "
+                    + "message listening not supported for type: "
+                    + notification.getClass()));
+        }
 
-        pool.schedule(notifyCmd, JOB_DELAY, TimeUnit.MILLISECONDS);
+        LOG.debug("notification [" + notification.getClass().getSimpleName()
+                + "] .. done");
     }
 
     /**
      * @param rpcResponse
      */
-    private void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) {
-        Callable<Void> notifyCmd = new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-
-                OfHeader plannedRpcResponseValue = rpcResponse
-                        .getPlannedRpcResponse();
-                LOG.debug("rpc-responding to HANDLER: " + rpcResponse.getXid());
-
-                @SuppressWarnings("unchecked")
-                SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
-                        .get(rpcResponse.getXid());
-
-                if (response != null) {
-                    boolean successful = plannedRpcResponseValue != null;
-                    Collection<RpcError> errors;
-                    if (successful) {
-                        errors = Collections.emptyList();
-                    } else {
-                        errors = Lists
-                                .newArrayList(RpcErrors
-                                        .getRpcError(
-                                                "unit",
-                                                "unit",
-                                                "not requested",
-                                                ErrorSeverity.ERROR,
-                                                "planned response to RPC.id = "
-                                                        + rpcResponse.getXid(),
-                                                ErrorType.RPC,
-                                                new Exception(
-                                                        "rpc response failed (planned behavior)")));
-                    }
-                    RpcResult<?> result = Rpcs.getRpcResult(successful,
-                            plannedRpcResponseValue, errors);
-                    response.set(result);
-                } else {
-                    String msg = "RpcResponse not expected: xid="
-                            + rpcResponse.getXid()
-                            + ", "
-                            + plannedRpcResponseValue.getClass()
-                                    .getSimpleName();
-                    LOG.error(msg);
-                    occuredExceptions.add(new IllegalStateException(msg));
-                }
-
-                LOG.debug("thread finished");
-                return null;
+    private synchronized void processRpcResponse(
+            final SwitchTestRcpResponseEvent rpcResponse) {
+        OfHeader plannedRpcResponseValue = rpcResponse.getPlannedRpcResponse();
+        LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid());
+
+        @SuppressWarnings("unchecked")
+        SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
+                .get(rpcResponse.getXid());
+
+        if (response != null) {
+            boolean successful = plannedRpcResponseValue != null;
+            Collection<RpcError> errors;
+            if (successful) {
+                errors = Collections.emptyList();
+            } else {
+                errors = Lists.newArrayList(RpcErrors.getRpcError("unit",
+                        "unit", "not requested", ErrorSeverity.ERROR,
+                        "planned response to RPC.id = " + rpcResponse.getXid(),
+                        ErrorType.RPC, new Exception(
+                                "rpc response failed (planned behavior)")));
             }
-        };
+            RpcResult<?> result = Rpcs.getRpcResult(successful,
+                    plannedRpcResponseValue, errors);
+            response.set(result);
+        } else {
+            String msg = "RpcResponse not expected: xid="
+                    + rpcResponse.getXid() + ", "
+                    + plannedRpcResponseValue.getClass().getSimpleName();
+            LOG.error(msg);
+            occuredExceptions.add(new IllegalStateException("step:"
+                    + planItemCounter + " | " + msg));
+        }
 
-        pool.schedule(notifyCmd, JOB_DELAY, TimeUnit.MILLISECONDS);
+        LOG.debug("rpc [" + rpcResponse.getXid() + "] .. done");
     }
 
     /**
@@ -507,9 +575,10 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     /**
      * @return rpc future result
      */
-    private static SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
+    private synchronized SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
         SettableFuture<RpcResult<Void>> result = SettableFuture.create();
-        result.set(null);
+        List<RpcError> errors = Collections.emptyList();
+        result.set(Rpcs.getRpcResult(true, (Void) null, errors));
         return result;
     }
 
@@ -538,9 +607,7 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
 
     @Override
     public void fireConnectionReadyNotification() {
-        if (connectionReadyListener != null) {
-            connectionReadyListener.onConnectionReady();
-        }
+        connectionReadyListener.onConnectionReady();
     }
 
     @Override
@@ -548,4 +615,28 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
             ConnectionReadyListener connectionReadyListener) {
         this.connectionReadyListener = connectionReadyListener;
     }
+
+    @Override
+    public Future<RpcResult<Void>> multipartRequest(MultipartRequestInput arg0) {
+        checkRpcAndNext(arg0, "multipartRequestInput");
+        SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+        return result;
+    }
+
+    @Override
+    public InetSocketAddress getRemoteAddress() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public boolean isAutoRead() {
+        return autoRead;
+    }
+
+    @Override
+    public void setAutoRead(boolean autoRead) {
+        this.autoRead = autoRead;
+    }
+
 }