bug 2446 - High priority (control) queue stop reading from channel if is full
[openflowplugin.git] / openflowplugin / src / test / java / org / opendaylight / openflowplugin / openflow / md / core / plan / ConnectionAdapterStackImpl.java
index 296bdeccaa9daf073788c85c3c97def83b712b51..9af46b4379ddceea8ec1d33fd8b6b23144b560bf 100644 (file)
@@ -100,6 +100,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
 
     private int planItemCounter;
 
+    private boolean autoRead = true;
+
     /**
      * default ctor
      */
@@ -108,7 +110,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     }
 
     @Override
-    public synchronized Future<RpcResult<BarrierOutput>> barrier(BarrierInput arg0) {
+    public synchronized Future<RpcResult<BarrierOutput>> barrier(
+            BarrierInput arg0) {
         checkRpcAndNext(arg0, "barrier");
         SettableFuture<RpcResult<BarrierOutput>> result = createAndRegisterRpcResult(arg0);
         return result;
@@ -143,14 +146,16 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     }
 
     @Override
-    public synchronized Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput arg0) {
+    public synchronized Future<RpcResult<GetAsyncOutput>> getAsync(
+            GetAsyncInput arg0) {
         checkRpcAndNext(arg0, "echo");
         Future<RpcResult<GetAsyncOutput>> result = createAndRegisterRpcResult(arg0);
         return result;
     }
 
     @Override
-    public synchronized Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput arg0) {
+    public synchronized Future<RpcResult<GetConfigOutput>> getConfig(
+            GetConfigInput arg0) {
         checkRpcAndNext(arg0, "echo");
         Future<RpcResult<GetConfigOutput>> result = createAndRegisterRpcResult(arg0);
         return result;
@@ -258,7 +263,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
 
     @Override
     public void checkListeners() {
-        if (ofListener == null || systemListener == null || connectionReadyListener == null) {
+        if (ofListener == null || systemListener == null
+                || connectionReadyListener == null) {
             occuredExceptions
                     .add(new IllegalStateException("missing listeners"));
         }
@@ -283,23 +289,29 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
             throw new IllegalStateException("eventPlan already depleted");
         }
 
-        LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName, rpcInput.getVersion(), rpcInput.getXid());
+        LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName,
+                rpcInput.getVersion(), rpcInput.getXid());
         if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)
                 && !(eventPlan.peek() instanceof SwitchTestWaitForAllEvent)) {
             if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
-                SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan.peek());
-                msg = "expected [notification: " +notifEvent.getPlannedNotification()+ "], got [" + rpcInput.getClass().getSimpleName()
-                        + "]";
+                SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan
+                        .peek());
+                msg = "expected [notification: "
+                        + notifEvent.getPlannedNotification() + "], got ["
+                        + rpcInput.getClass().getSimpleName() + "]";
             } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
-                SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan.peek());
-                msg = "expected [rpc: " +rpcEvent.getPlannedRpcResponse()+ "], got [" + rpcInput.getClass().getSimpleName()
+                SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan
+                        .peek());
+                msg = "expected [rpc: " + rpcEvent.getPlannedRpcResponse()
+                        + "], got [" + rpcInput.getClass().getSimpleName()
                         + "]";
             }
         } else {
             if (eventPlan.peek() instanceof SwitchTestWaitForAllEvent) {
                 SwitchTestWaitForAllEvent switchTestWaitForAll = (SwitchTestWaitForAllEvent) eventPlan
                         .peek();
-                Set<SwitchTestWaitForRpcEvent> eventBag = switchTestWaitForAll.getWaitEventBag();
+                Set<SwitchTestWaitForRpcEvent> eventBag = switchTestWaitForAll
+                        .getWaitEventBag();
                 List<String> msgLot = new ArrayList<>();
 
                 if (eventBag == null || eventBag.isEmpty()) {
@@ -307,7 +319,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
                 } else {
                     finished = false;
                     for (SwitchTestWaitForRpcEvent switchTestWaitForRpc : eventBag) {
-                        String msgPart = checkSingleRpcContent(rpcInput, rpcName, switchTestWaitForRpc);
+                        String msgPart = checkSingleRpcContent(rpcInput,
+                                rpcName, switchTestWaitForRpc);
 
                         if (msgPart != null) {
                             msgLot.add(msgPart);
@@ -329,13 +342,15 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
             } else if (eventPlan.peek() instanceof SwitchTestWaitForRpcEvent) {
                 SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
                         .peek();
-                msg = checkSingleRpcContent(rpcInput, rpcName, switchTestRpcEvent);
+                msg = checkSingleRpcContent(rpcInput, rpcName,
+                        switchTestRpcEvent);
             }
         }
 
         if (msg != null) {
             LOG.debug("rpc check .. FAILED: " + msg);
-            occuredExceptions.add(new IllegalArgumentException("step:"+planItemCounter+" | "+msg));
+            occuredExceptions.add(new IllegalArgumentException("step:"
+                    + planItemCounter + " | " + msg));
         } else {
             LOG.debug("rpc check .. OK");
         }
@@ -349,15 +364,17 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
      * @param switchTestWaitForRpc
      * @return
      */
-    private static String checkSingleRpcContent(OfHeader rpcInput, String rpcName,
-            SwitchTestWaitForRpcEvent switchTestWaitForRpc) {
+    private static String checkSingleRpcContent(OfHeader rpcInput,
+            String rpcName, SwitchTestWaitForRpcEvent switchTestWaitForRpc) {
         String failureMsg = null;
         if (!rpcName.equals(switchTestWaitForRpc.getRpcName())) {
-            failureMsg = "expected rpc name [" + switchTestWaitForRpc.getRpcName()
-                    + "], got [" + rpcName + "]";
+            failureMsg = "expected rpc name ["
+                    + switchTestWaitForRpc.getRpcName() + "], got [" + rpcName
+                    + "]";
         } else if (!rpcInput.getXid().equals(switchTestWaitForRpc.getXid())) {
-            failureMsg = "expected "+rpcName+".xid [" + switchTestWaitForRpc.getXid()
-                    + "], got [" + rpcInput.getXid() + "]";
+            failureMsg = "expected " + rpcName + ".xid ["
+                    + switchTestWaitForRpc.getXid() + "], got ["
+                    + rpcInput.getXid() + "]";
         }
 
         return failureMsg;
@@ -380,9 +397,10 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
      * discard current event, execute next, if possible
      */
     private void next() {
-        LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})", planItemCounter, eventPlan.peek());
+        LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})",
+                planItemCounter, eventPlan.peek());
         eventPlan.pop();
-        planItemCounter ++;
+        planItemCounter++;
         planTouched = true;
         try {
             Thread.sleep(JOB_DELAY);
@@ -397,7 +415,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
      */
     private synchronized void proceed() {
         boolean processed = false;
-        LOG.debug("proceeding plan item[{}]: {}", planItemCounter, eventPlan.peek());
+        LOG.debug("proceeding plan item[{}]: {}", planItemCounter,
+                eventPlan.peek());
         if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
             SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan
                     .peek();
@@ -409,12 +428,13 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
             processRpcResponse(rpcResponse);
             processed = true;
         } else if (eventPlan.peek() instanceof SwitchTestCallbackEvent) {
-            SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan.peek();
+            SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan
+                    .peek();
             try {
                 callbackEvent.getCallback().call();
             } catch (Exception e) {
                 LOG.error(e.getMessage(), e);
-               occuredExceptions.add(e);
+                occuredExceptions.add(e);
             }
             processed = true;
         }
@@ -439,8 +459,10 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
             planTouched = false;
             proceed();
             if (!planTouched) {
-                occuredExceptions.add(new IllegalStateException(
-                        "eventPlan STALLED, planItemCounter="+planItemCounter));
+                occuredExceptions
+                        .add(new IllegalStateException(
+                                "eventPlan STALLED, planItemCounter="
+                                        + planItemCounter));
                 break;
             }
         }
@@ -459,61 +481,57 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     private synchronized void processNotification(
             final SwitchTestNotificationEvent notificationEvent) {
 
-        Notification notification = notificationEvent
-                .getPlannedNotification();
+        Notification notification = notificationEvent.getPlannedNotification();
         LOG.debug("notificating OF_LISTENER: "
                 + notification.getClass().getSimpleName());
 
         // system events
         if (notification instanceof DisconnectEvent) {
-            systemListener
-            .onDisconnectEvent((DisconnectEvent) notification);
+            systemListener.onDisconnectEvent((DisconnectEvent) notification);
         }
         // of notifications
         else if (notification instanceof EchoRequestMessage) {
-            ofListener
-            .onEchoRequestMessage((EchoRequestMessage) notification);
+            ofListener.onEchoRequestMessage((EchoRequestMessage) notification);
         } else if (notification instanceof ErrorMessage) {
             ofListener.onErrorMessage((ErrorMessage) notification);
         } else if (notification instanceof ExperimenterMessage) {
             ofListener
-            .onExperimenterMessage((ExperimenterMessage) notification);
+                    .onExperimenterMessage((ExperimenterMessage) notification);
         } else if (notification instanceof FlowRemovedMessage) {
-            ofListener
-            .onFlowRemovedMessage((FlowRemovedMessage) notification);
+            ofListener.onFlowRemovedMessage((FlowRemovedMessage) notification);
         } else if (notification instanceof HelloMessage) {
             ofListener.onHelloMessage((HelloMessage) notification);
         } else if (notification instanceof MultipartReplyMessage) {
             ofListener
-            .onMultipartReplyMessage((MultipartReplyMessage) notification);
+                    .onMultipartReplyMessage((MultipartReplyMessage) notification);
         } else if (notification instanceof PacketInMessage) {
-            ofListener
-            .onPacketInMessage((PacketInMessage) notification);
+            ofListener.onPacketInMessage((PacketInMessage) notification);
         } else if (notification instanceof PortStatusMessage) {
-            ofListener
-            .onPortStatusMessage((PortStatusMessage) notification);
+            ofListener.onPortStatusMessage((PortStatusMessage) notification);
         }
         // default
         else {
-            occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " +
-                    "message listening not supported for type: "
-                            + notification.getClass()));
+            occuredExceptions.add(new IllegalStateException("step:"
+                    + planItemCounter + " | "
+                    + "message listening not supported for type: "
+                    + notification.getClass()));
         }
 
-        LOG.debug("notification ["+notification.getClass().getSimpleName()+"] .. done");
+        LOG.debug("notification [" + notification.getClass().getSimpleName()
+                + "] .. done");
     }
 
     /**
      * @param rpcResponse
      */
-    private synchronized void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) {
-        OfHeader plannedRpcResponseValue = rpcResponse
-                .getPlannedRpcResponse();
+    private synchronized void processRpcResponse(
+            final SwitchTestRcpResponseEvent rpcResponse) {
+        OfHeader plannedRpcResponseValue = rpcResponse.getPlannedRpcResponse();
         LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid());
 
         @SuppressWarnings("unchecked")
         SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
-        .get(rpcResponse.getXid());
+                .get(rpcResponse.getXid());
 
         if (response != null) {
             boolean successful = plannedRpcResponseValue != null;
@@ -521,33 +539,25 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
             if (successful) {
                 errors = Collections.emptyList();
             } else {
-                errors = Lists
-                        .newArrayList(RpcErrors
-                                .getRpcError(
-                                        "unit",
-                                        "unit",
-                                        "not requested",
-                                        ErrorSeverity.ERROR,
-                                        "planned response to RPC.id = "
-                                                + rpcResponse.getXid(),
-                                                ErrorType.RPC,
-                                                new Exception(
-                                                        "rpc response failed (planned behavior)")));
+                errors = Lists.newArrayList(RpcErrors.getRpcError("unit",
+                        "unit", "not requested", ErrorSeverity.ERROR,
+                        "planned response to RPC.id = " + rpcResponse.getXid(),
+                        ErrorType.RPC, new Exception(
+                                "rpc response failed (planned behavior)")));
             }
             RpcResult<?> result = Rpcs.getRpcResult(successful,
                     plannedRpcResponseValue, errors);
             response.set(result);
         } else {
             String msg = "RpcResponse not expected: xid="
-                    + rpcResponse.getXid()
-                    + ", "
-                    + plannedRpcResponseValue.getClass()
-                    .getSimpleName();
+                    + rpcResponse.getXid() + ", "
+                    + plannedRpcResponseValue.getClass().getSimpleName();
             LOG.error(msg);
-            occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " + msg));
+            occuredExceptions.add(new IllegalStateException("step:"
+                    + planItemCounter + " | " + msg));
         }
 
-        LOG.debug("rpc ["+rpcResponse.getXid()+"] .. done");
+        LOG.debug("rpc [" + rpcResponse.getXid() + "] .. done");
     }
 
     /**
@@ -597,7 +607,7 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
 
     @Override
     public void fireConnectionReadyNotification() {
-            connectionReadyListener.onConnectionReady();
+        connectionReadyListener.onConnectionReady();
     }
 
     @Override
@@ -607,8 +617,7 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     }
 
     @Override
-    public Future<RpcResult<Void>> multipartRequest(
-            MultipartRequestInput arg0) {
+    public Future<RpcResult<Void>> multipartRequest(MultipartRequestInput arg0) {
         checkRpcAndNext(arg0, "multipartRequestInput");
         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
         return result;
@@ -620,4 +629,14 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
         return null;
     }
 
+    @Override
+    public boolean isAutoRead() {
+        return autoRead;
+    }
+
+    @Override
+    public void setAutoRead(boolean autoRead) {
+        this.autoRead = autoRead;
+    }
+
 }