preparing QueueKeeper and message translation
[openflowplugin.git] / openflowplugin / src / test / java / org / opendaylight / openflowplugin / openflow / md / core / plan / ConnectionAdapterStackImpl.java
index fe5c29ead3fa68e773ef774b6059b3c66399dcc8..476eab74179f69c339982931b1c4026bdb737909 100644 (file)
@@ -14,6 +14,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Stack;
 import java.util.concurrent.Future;
 
@@ -58,6 +59,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEventBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
 import org.opendaylight.yangtools.yang.binding.Notification;
 import org.opendaylight.yangtools.yang.common.RpcError;
@@ -67,6 +69,7 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.SettableFuture;
 
@@ -104,155 +107,140 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     }
 
     @Override
-    public Future<RpcResult<BarrierOutput>> barrier(BarrierInput arg0) {
-        checkRpc(arg0, "barrier");
+    public synchronized Future<RpcResult<BarrierOutput>> barrier(BarrierInput arg0) {
+        checkRpcAndNext(arg0, "barrier");
         SettableFuture<RpcResult<BarrierOutput>> result = createAndRegisterRpcResult(arg0);
-        next();
         return result;
     }
 
     @Override
-    public Future<RpcResult<EchoOutput>> echo(EchoInput arg0) {
-        checkRpc(arg0, "echo");
+    public synchronized Future<RpcResult<EchoOutput>> echo(EchoInput arg0) {
+        checkRpcAndNext(arg0, "echo");
         Future<RpcResult<EchoOutput>> result = createAndRegisterRpcResult(arg0);
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> echoReply(EchoReplyInput arg0) {
-        checkRpc(arg0, "echoReply");
+        checkRpcAndNext(arg0, "echoReply");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> experimenter(ExperimenterInput arg0) {
-        checkRpc(arg0, "experimenter");
+        checkRpcAndNext(arg0, "experimenter");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> flowMod(FlowModInput arg0) {
-        checkRpc(arg0, "flowMod");
+        checkRpcAndNext(arg0, "flowMod");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
-    public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput arg0) {
-        checkRpc(arg0, "echo");
+    public synchronized Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput arg0) {
+        checkRpcAndNext(arg0, "echo");
         Future<RpcResult<GetAsyncOutput>> result = createAndRegisterRpcResult(arg0);
-        next();
         return result;
     }
 
     @Override
-    public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput arg0) {
-        checkRpc(arg0, "echo");
+    public synchronized Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput arg0) {
+        checkRpcAndNext(arg0, "echo");
         Future<RpcResult<GetConfigOutput>> result = createAndRegisterRpcResult(arg0);
-        next();
         return result;
     }
 
     @Override
-    public Future<RpcResult<GetFeaturesOutput>> getFeatures(
+    public synchronized Future<RpcResult<GetFeaturesOutput>> getFeatures(
             GetFeaturesInput arg0) {
-        checkRpc(arg0, "getFeatures");
+        checkRpcAndNext(arg0, "getFeatures");
         Future<RpcResult<GetFeaturesOutput>> result = createAndRegisterRpcResult(arg0);
-        next();
         return result;
     }
 
     @Override
-    public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
+    public synchronized Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
             GetQueueConfigInput arg0) {
-        checkRpc(arg0, "echo");
+        checkRpcAndNext(arg0, "echo");
         Future<RpcResult<GetQueueConfigOutput>> result = createAndRegisterRpcResult(arg0);
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> groupMod(GroupModInput arg0) {
-        checkRpc(arg0, "groupMod");
+        checkRpcAndNext(arg0, "groupMod");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> hello(HelloInput arg0) {
-        checkRpc(arg0, "helloReply");
+        checkRpcAndNext(arg0, "helloReply");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> meterMod(MeterModInput arg0) {
-        checkRpc(arg0, "meterMod");
+        checkRpcAndNext(arg0, "meterMod");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> packetOut(PacketOutInput arg0) {
-        checkRpc(arg0, "packetOut");
+        checkRpcAndNext(arg0, "packetOut");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> portMod(PortModInput arg0) {
-        checkRpc(arg0, "portMod");
+        checkRpcAndNext(arg0, "portMod");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
-    public Future<RpcResult<RoleRequestOutput>> roleRequest(
+    public synchronized Future<RpcResult<RoleRequestOutput>> roleRequest(
             RoleRequestInput arg0) {
-        checkRpc(arg0, "echo");
+        checkRpcAndNext(arg0, "echo");
         Future<RpcResult<RoleRequestOutput>> result = createAndRegisterRpcResult(arg0);
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> setAsync(SetAsyncInput arg0) {
-        checkRpc(arg0, "setAsync");
+        checkRpcAndNext(arg0, "setAsync");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> setConfig(SetConfigInput arg0) {
-        checkRpc(arg0, "setConfig");
+        checkRpcAndNext(arg0, "setConfig");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> tableMod(TableModInput arg0) {
-        checkRpc(arg0, "tableMod");
+        checkRpcAndNext(arg0, "tableMod");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
-        next();
         return result;
     }
 
     @Override
     public Future<Boolean> disconnect() {
-        // TODO Auto-generated method stub
+        LOG.info("adapter is told to disconnect");
+        DisconnectEventBuilder disconnectEventBuilder = new DisconnectEventBuilder();
+        disconnectEventBuilder.setInfo("disconnected by plugin");
+        systemListener.onDisconnectEvent(disconnectEventBuilder.build());
         return null;
     }
 
@@ -286,10 +274,17 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
      * @param rpcName
      *            rpc yang name
      */
-    private synchronized void checkRpc(OfHeader rpcInput, String rpcName) {
+    private boolean checkRpc(OfHeader rpcInput, String rpcName) {
         String msg = null;
-        LOG.debug("checking rpc: " + rpcName);
-        if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)) {
+        boolean finished = true;
+        
+        if (eventPlan.isEmpty()) {
+            throw new IllegalStateException("eventPlan already depleted");
+        }
+        
+        LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName, rpcInput.getVersion(), rpcInput.getXid());
+        if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent) 
+                && !(eventPlan.peek() instanceof SwitchTestWaitForAllEvent)) {
             if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
                 SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan.peek());
                 msg = "expected [notification: " +notifEvent.getPlannedNotification()+ "], got [" + rpcInput.getClass().getSimpleName()
@@ -300,32 +295,99 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
                         + "]";
             }
         } else {
-            SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
-                    .peek();
-            if (!rpcName.equals(switchTestRpcEvent.getRpcName())) {
-                msg = "expected rpc name [" + switchTestRpcEvent.getRpcName()
-                        + "], got [" + rpcName + "]";
-            } else if (!rpcInput.getXid().equals(switchTestRpcEvent.getXid())) {
-                msg = "expected xid [" + switchTestRpcEvent.getXid()
-                        + "], got [" + rpcInput.getXid() + "]";
+            if (eventPlan.peek() instanceof SwitchTestWaitForAllEvent) {
+                SwitchTestWaitForAllEvent switchTestWaitForAll = (SwitchTestWaitForAllEvent) eventPlan
+                        .peek();
+                Set<SwitchTestWaitForRpcEvent> eventBag = switchTestWaitForAll.getWaitEventBag();
+                List<String> msgLot = new ArrayList<>();
+                
+                if (eventBag == null || eventBag.isEmpty()) {
+                    msg = "no wait events in bag";
+                } else {
+                    finished = false;
+                    for (SwitchTestWaitForRpcEvent switchTestWaitForRpc : eventBag) {
+                        String msgPart = checkSingleRpcContent(rpcInput, rpcName, switchTestWaitForRpc); 
+                        
+                        if (msgPart != null) {
+                            msgLot.add(msgPart);
+                        } else {
+                            LOG.debug("wait event matched: {}", rpcName);
+                            eventBag.remove(switchTestWaitForRpc);
+                            if (eventBag.isEmpty()) {
+                                finished = true;
+                            }
+                            msgLot.clear();
+                            break;
+                        }
+                    }
+                }
+                
+                if (!msgLot.isEmpty()) {
+                    msg = Joiner.on(" | ").join(msgLot);
+                }
+            } else if (eventPlan.peek() instanceof SwitchTestWaitForRpcEvent) {
+                SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
+                        .peek();
+                msg = checkSingleRpcContent(rpcInput, rpcName, switchTestRpcEvent); 
             }
         }
 
         if (msg != null) {
             LOG.debug("rpc check .. FAILED: " + msg);
-            occuredExceptions.add(new IllegalArgumentException(msg));
+            occuredExceptions.add(new IllegalArgumentException("step:"+planItemCounter+" | "+msg));
+        } else {
+            LOG.debug("rpc check .. OK");
+        }
+        return finished;
+    }
+
+    /**
+     * @param rpcInput
+     * @param rpcName
+     * @param msgTmp
+     * @param switchTestWaitForRpc
+     * @return 
+     */
+    private static String checkSingleRpcContent(OfHeader rpcInput, String rpcName,
+            SwitchTestWaitForRpcEvent switchTestWaitForRpc) {
+        String failureMsg = null;
+        if (!rpcName.equals(switchTestWaitForRpc.getRpcName())) {
+            failureMsg = "expected rpc name [" + switchTestWaitForRpc.getRpcName()
+                    + "], got [" + rpcName + "]";
+        } else if (!rpcInput.getXid().equals(switchTestWaitForRpc.getXid())) {
+            failureMsg = "expected "+rpcName+".xid [" + switchTestWaitForRpc.getXid()
+                    + "], got [" + rpcInput.getXid() + "]";
+        } 
+        
+        return failureMsg;
+    }
+    
+    /**
+     * @param rpcInput
+     *            rpc call parameter
+     * @param rpcName
+     *            rpc yang name
+     */
+    private synchronized void checkRpcAndNext(OfHeader rpcInput, String rpcName) {
+        boolean finished = checkRpc(rpcInput, rpcName);
+        if (finished) {
+            next();
         }
-        LOG.debug("rpc check .. OK");
     }
 
     /**
      * discard current event, execute next, if possible
      */
-    private synchronized void next() {
+    private void next() {
         LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})", planItemCounter, eventPlan.peek());
         eventPlan.pop();
         planItemCounter ++;
         planTouched = true;
+        try {
+            Thread.sleep(JOB_DELAY);
+        } catch (InterruptedException e) {
+            LOG.error(e.getMessage(), e);
+        }
         notify();
     }
 
@@ -345,6 +407,15 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
                     .peek();
             processRpcResponse(rpcResponse);
             processed = true;
+        } else if (eventPlan.peek() instanceof SwitchTestCallbackEvent) {
+            SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan.peek();
+            try {
+                callbackEvent.getCallback().call();
+            } catch (Exception e) {
+                LOG.error(e.getMessage(), e);
+               occuredExceptions.add(e);
+            }
+            processed = true;
         }
 
         if (processed) {
@@ -368,7 +439,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
             proceed();
             if (!planTouched) {
                 occuredExceptions.add(new IllegalStateException(
-                        "eventPlan STALLED"));
+                        "eventPlan STALLED, planItemCounter="+planItemCounter));
+                break;
             }
         }
 
@@ -383,7 +455,7 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     /**
      * @param notificationEvent
      */
-    private void processNotification(
+    private synchronized void processNotification(
             final SwitchTestNotificationEvent notificationEvent) {
 
         Notification notification = notificationEvent
@@ -425,7 +497,7 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
         }
         // default
         else {
-            occuredExceptions.add(new IllegalStateException(
+            occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " +
                     "message listening not supported for type: "
                             + notification.getClass()));
         }
@@ -436,7 +508,7 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     /**
      * @param rpcResponse
      */
-    private void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) {
+    private synchronized void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) {
         OfHeader plannedRpcResponseValue = rpcResponse
                 .getPlannedRpcResponse();
         LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid());
@@ -474,7 +546,7 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
                     + plannedRpcResponseValue.getClass()
                     .getSimpleName();
             LOG.error(msg);
-            occuredExceptions.add(new IllegalStateException(msg));
+            occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " + msg));
         }
 
         LOG.debug("rpc ["+rpcResponse.getXid()+"] .. done");
@@ -495,7 +567,7 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     /**
      * @return rpc future result
      */
-    private static SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
+    private synchronized SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
         SettableFuture<RpcResult<Void>> result = SettableFuture.create();
         List<RpcError> errors = Collections.emptyList();
         result.set(Rpcs.getRpcResult(true, (Void) null, errors));