BUG-1997: moving barrier after message
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / sal / OFRpcTaskUtil.java
index c44d16db32071695f0380768097fbdbde600f295..da5bbe947e7ffc39e9331edfd3be5fc4e24464ac 100644 (file)
@@ -7,8 +7,10 @@
  */
 package org.opendaylight.openflowplugin.openflow.md.core.sal;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Future;
 
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
@@ -19,6 +21,7 @@ import org.opendaylight.openflowplugin.api.statistics.MessageSpy;
 import org.opendaylight.openflowplugin.openflow.md.core.MessageFactory;
 import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
+import org.opendaylight.openflowplugin.openflow.md.util.RpcInputOutputTuple;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
@@ -29,10 +32,13 @@ import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 
+import com.google.common.base.Function;
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
@@ -51,7 +57,9 @@ public abstract class OFRpcTaskUtil {
             SwitchConnectionDistinguisher cookie) {
         Collection<RpcError> errors = null;
         if (Objects.firstNonNull(isBarrier, Boolean.FALSE)) {
-            Future<RpcResult<BarrierOutput>> barrierFuture = sendBarrier(taskContext.getSession(), cookie, taskContext.getMessageService());
+            RpcInputOutputTuple<BarrierInput, ListenableFuture<RpcResult<BarrierOutput>>> sendBarrierRpc = 
+                    sendBarrier(taskContext.getSession(), cookie, taskContext.getMessageService());
+            Future<RpcResult<BarrierOutput>> barrierFuture = sendBarrierRpc.getOutput();
             try {
                 RpcResult<BarrierOutput> barrierResult = barrierFuture.get(
                         taskContext.getMaxTimeout(), taskContext.getMaxTimeoutUnit());
@@ -83,11 +91,14 @@ public abstract class OFRpcTaskUtil {
      * @param messageService
      * @return barrier response
      */
-    private static Future<RpcResult<BarrierOutput>> sendBarrier(SessionContext session, 
+    protected static RpcInputOutputTuple<BarrierInput, ListenableFuture<RpcResult<BarrierOutput>>> sendBarrier(SessionContext session, 
             SwitchConnectionDistinguisher cookie, IMessageDispatchService messageService) {
         BarrierInput barrierInput = MessageFactory.createBarrier(
                 session.getFeatures().getVersion(), session.getNextXid());
-        return messageService.barrier(barrierInput, cookie);
+        Future<RpcResult<BarrierOutput>> barrierResult = messageService.barrier(barrierInput, cookie);
+        ListenableFuture<RpcResult<BarrierOutput>> output = JdkFutureAdapters.listenInPoolThread(barrierResult);
+        
+        return new RpcInputOutputTuple<>(barrierInput, output);
     }
 
     /**
@@ -123,10 +134,83 @@ public abstract class OFRpcTaskUtil {
             
             @Override
             public void onFailure(Throwable t) {
+                //TODO: good place to notify MD-SAL about errors
                 task.getTaskContext().getMessageSpy().spyMessage(
                         task.getInput(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_FAILURE);
             }
         });
     }
+    
+    /**
+     * @param task of rpc
+     * @param originalResult
+     * @param notificationProviderService
+     * @param notificationComposer lazy notification composer
+     * @return chained result with barrier
+     */
+    public static <TX extends TransactionAware, INPUT extends DataContainer>
+    ListenableFuture<RpcResult<TX>> chainFutureBarrier(
+            final OFRpcTask<INPUT, RpcResult<TX>> task,
+            final ListenableFuture<RpcResult<TX>> originalResult) {
+        
+        ListenableFuture<RpcResult<TX>> chainResult = originalResult;
+        if (Objects.firstNonNull(task.isBarrier(), Boolean.FALSE)) { 
+            
+            chainResult = Futures.transform(originalResult, new AsyncFunction<RpcResult<TX>, RpcResult<TX>>() {
 
+                @Override
+                public ListenableFuture<RpcResult<TX>> apply(final RpcResult<TX> input) throws Exception {
+                    if (input.isSuccessful()) {
+                        RpcInputOutputTuple<BarrierInput, ListenableFuture<RpcResult<BarrierOutput>>> sendBarrierRpc = sendBarrier(
+                                task.getSession(), task.getCookie(), task.getMessageService());
+                        ListenableFuture<RpcResult<TX>> barrierTxResult = Futures.transform(
+                                sendBarrierRpc.getOutput(),
+                                transformBarrierToTransactionAware(input, sendBarrierRpc.getInput()));
+                        return barrierTxResult;
+                    } else {
+                        return Futures.immediateFuture(input);
+                    }
+                }
+                
+            });
+        }
+        
+        return chainResult;
+    }
+        
+    /**
+     * @param originalInput
+     * @return
+     */
+    protected static <TX extends TransactionAware> Function<RpcResult<BarrierOutput>, RpcResult<TX>> transformBarrierToTransactionAware(
+            final RpcResult<TX> originalInput, final BarrierInput barrierInput) {
+        return new Function<RpcResult<BarrierOutput>, RpcResult<TX>>() {
+            
+            @Override
+            public RpcResult<TX> apply(final RpcResult<BarrierOutput> barrierResult) {
+                RpcResultBuilder<TX> rpcBuilder = null;
+                if (barrierResult.isSuccessful()) {
+                    rpcBuilder = RpcResultBuilder.<TX>success();
+                } else {
+                    rpcBuilder = RpcResultBuilder.<TX>failed();
+                    RpcError rpcError = RpcResultBuilder.newWarning(
+                            ErrorType.RPC, 
+                            OFConstants.ERROR_TAG_TIMEOUT, 
+                            "barrier sending failed", 
+                            OFConstants.APPLICATION_TAG, 
+                            "switch failed to respond on barrier request, barrier.xid = "+barrierInput.getXid(), 
+                            null);
+                    List<RpcError> chainedErrors = new ArrayList<>();
+                    chainedErrors.add(rpcError);
+                    chainedErrors.addAll(barrierResult.getErrors());
+                    rpcBuilder.withRpcErrors(chainedErrors);
+                }
+                
+                rpcBuilder.withResult(originalInput.getResult());
+                
+                return rpcBuilder.build();
+            }
+            
+        };
+    }
 }