FlowService - standalone request context for multiflow input 35/17335/2
authorJozef Gloncak <jgloncak@cisco.com>
Mon, 30 Mar 2015 12:13:44 +0000 (14:13 +0200)
committermichal rehak <mirehak@cisco.com>
Tue, 31 Mar 2015 11:11:09 +0000 (11:11 +0000)
If input flow is devided to more (2 maybe more) flows then result
Future isn't chained but for every flow is created standalone
request context with unique Xid. Methods addFlow and updateFlow
return future object which will contains global result of all
partial futures (if one fail then global fail, if all success then
global success)

Change-Id: I0406888a0568d6ba3905113cde69e2a96aa78141
Signed-off-by: Jozef Gloncak <jgloncak@cisco.com>
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java

index fcf0dd43d3a2b2d08fa4c4e1e69f8e39bc322c40..eb97f74159e23b51b7c86bb1dffda16794f4a312 100644 (file)
@@ -7,19 +7,22 @@
  */
 package org.opendaylight.openflowplugin.impl.services;
 
-import com.google.common.base.Function;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 
-import com.google.common.util.concurrent.AsyncFunction;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import com.google.common.base.Function;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.math.BigInteger;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Future;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
-import org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskFactory;
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor;
 import org.opendaylight.openflowplugin.openflow.md.util.FlowCreatorUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
@@ -45,17 +48,53 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService
         super(rpcContext);
     }
 
+    private class DataCrate {
+        final BigInteger iDConnection;
+        final FlowModInputBuilder flowModInputBuilder;
+
+        public DataCrate(final BigInteger iDConnection, final FlowModInputBuilder flowModInputBuilder) {
+            this.iDConnection = iDConnection;
+            this.flowModInputBuilder = flowModInputBuilder;
+        }
+
+        /**
+         * @return the flowModInputBuilder
+         */
+        public FlowModInputBuilder getFlowModInputBuilder() {
+            return flowModInputBuilder;
+        }
+
+        /**
+         * @return the iDConnection
+         */
+        public BigInteger getiDConnection() {
+            return iDConnection;
+        }
+    }
+
+    <T extends DataObject, F> ListenableFuture<RpcResult<T>> handleServiceCall(final DataCrate dataCrate,
+            final Function<DataCrate, Future<RpcResult<F>>> function) {
+        LOG.debug("Calling the FlowMod RPC method on MessageDispatchService");
+
+        final RequestContext<T> requestContext = rpcContext.createRequestContext();
+        final SettableFuture<RpcResult<T>> result = rpcContext.storeOrFail(requestContext);
+
+        if (!result.isDone()) {
+            final Future<RpcResult<F>> resultFromOFLib = function.apply(dataCrate);
+
+            final RpcResultConvertor<T> rpcResultConvertor = new RpcResultConvertor<>(requestContext, deviceContext);
+            rpcResultConvertor.processResultFromOfJava(resultFromOFLib);
+
+        } else {
+            RequestContextUtil.closeRequstContext(requestContext);
+        }
+        return result;
+    }
+
     @Override
     public Future<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) {
-        return this.<AddFlowOutput, Void> handleServiceCall(PRIMARY_CONNECTION,
-                new Function<BigInteger, Future<RpcResult<Void>>>() {
-                    @Override
-                    public ListenableFuture<RpcResult<Void>> apply(final BigInteger IDConnection) {
-                        final List<FlowModInputBuilder> ofFlowModInputs = FlowConvertor.toFlowModInputs(input, version,
-                                datapathId);
-                        return chainFlowMods(ofFlowModInputs, 0, IDConnection);
-                    }
-                });
+        final List<FlowModInputBuilder> ofFlowModInputs = FlowConvertor.toFlowModInputs(input, version, datapathId);
+        return processFlowModInputBuilders(ofFlowModInputs);
     }
 
     @Override
@@ -96,54 +135,50 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService
         }
 
         allFlowMods.addAll(ofFlowModInputs);
-        LOG.debug("Number of flows to push to switch: {}", allFlowMods.size());
-        Collections.<String> emptyList();
-        return this.<UpdateFlowOutput, Void> handleServiceCall(PRIMARY_CONNECTION,
-                new Function<BigInteger, Future<RpcResult<Void>>>() {
-                    @Override
-                    public Future<RpcResult<Void>> apply(final BigInteger cookie) {
-                        return chainFlowMods(allFlowMods, 0, cookie);
-                    }
-                });
+        return processFlowModInputBuilders(allFlowMods);
     }
 
-    /**
-     * Recursive helper method for
-     * {@link OFRpcTaskFactory#chainFlowMods(java.util.List, int, org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskContext, org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher)}
-     * {@link OFRpcTaskFactory#createUpdateFlowTask()} to chain results of multiple flowmods. The next flowmod gets
-     * executed if the earlier one is successful. All the flowmods should have the same xid, in-order to cross-reference
-     * the notification
-     */
-    protected ListenableFuture<RpcResult<Void>> chainFlowMods(final List<FlowModInputBuilder> ofFlowModInputs,
-            final int index, final BigInteger cookie) {
-
-        final Future<RpcResult<Void>> resultFromOFLib = createResultForFlowMod(ofFlowModInputs.get(index), cookie);
-
-        final ListenableFuture<RpcResult<Void>> result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
-
-        if (ofFlowModInputs.size() > index + 1) {
-            // there are more flowmods to chain
-            return Futures.transform(result, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
+    private <T extends DataObject> Future<RpcResult<T>> processFlowModInputBuilders(final List<FlowModInputBuilder> ofFlowModInputs) {
+        final List<ListenableFuture<RpcResult<T>>> partialFutures = new ArrayList<>();
+        for (FlowModInputBuilder flowModInputBuilder : ofFlowModInputs) {
+            ListenableFuture<RpcResult<T>> partialFuture = handleServiceCall(new DataCrate(
+                    PRIMARY_CONNECTION, flowModInputBuilder), new Function<DataCrate, Future<RpcResult<Void>>>() {
                 @Override
-                public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
-                    if (input.isSuccessful()) {
-                        return chainFlowMods(ofFlowModInputs, index + 1, cookie);
-                    } else {
-                        LOG.warn("Flowmod failed. Any chained flowmods are ignored. xid:{}", ofFlowModInputs.get(index)
-                                .getXid());
-                        return Futures.immediateFuture(input);
-                    }
+                public ListenableFuture<RpcResult<Void>> apply(final DataCrate dataCrate) {
+                    return createResultForFlowMod(dataCrate.getFlowModInputBuilder(), dataCrate.getiDConnection());
                 }
             });
-        } else {
-            return result;
+            partialFutures.add(partialFuture);
         }
+
+        ListenableFuture<List<RpcResult<T>>> allFutures = Futures.allAsList(partialFutures);
+        final SettableFuture<RpcResult<T>> finalFuture = SettableFuture.create();
+        Futures.addCallback(allFutures, new FutureCallback<List<RpcResult<T>>>() {
+            @Override
+            public void onSuccess(List<RpcResult<T>> result) {
+                for (RpcResult<T> rpcResult : result) {
+                    if (rpcResult.isSuccessful()) {
+                        //TODO: AddFlowOutput has getTransactionId() - shouldn't it have some value?
+                        finalFuture.set(RpcResultBuilder.<T> success().build());
+                    }
+                }
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                finalFuture.set(RpcResultBuilder.<T> failed()
+                        .withError(ErrorType.APPLICATION, "", t.getMessage()).build());
+            }
+        });
+
+        return finalFuture;
     }
 
-    protected Future<RpcResult<Void>> createResultForFlowMod(final FlowModInputBuilder flowModInput,
+    protected ListenableFuture<RpcResult<Void>> createResultForFlowMod(final FlowModInputBuilder flowModInput,
             final BigInteger cookie) {
         flowModInput.setXid(deviceContext.getNextXid().getValue());
-        return provideConnectionAdapter(cookie).flowMod(flowModInput.build());
+        Future<RpcResult<Void>> flowModResult = provideConnectionAdapter(cookie).flowMod(flowModInput.build());
+        return JdkFutureAdapters.listenInPoolThread(flowModResult);
     }
 
 }