Do not use JdkFutureAdapters
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / strategy / SyncPlanPushStrategyIncrementalImpl.java
index c99d3a3ff715cd4ac338bc6ad41030580e3f325d..5e3ae5dea54894cdeb765f968fc0ac97e2999672 100644 (file)
@@ -1,16 +1,16 @@
-/**
+/*
  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
@@ -34,7 +34,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrier;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
@@ -47,7 +47,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.Upd
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
@@ -57,19 +57,24 @@ import org.slf4j.LoggerFactory;
  * Execute CRUD API for flow + group + meter involving one-by-one (incremental) strategy.
  */
 public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy {
-
     private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyIncrementalImpl.class);
 
-    private FlowForwarder flowForwarder;
-    private MeterForwarder meterForwarder;
-    private GroupForwarder groupForwarder;
-    private TableForwarder tableForwarder;
-    private FlowCapableTransactionService transactionService;
+    private final FlowForwarder flowForwarder;
+    private final MeterForwarder meterForwarder;
+    private final GroupForwarder groupForwarder;
+    private final SendBarrier sendBarrier;
+
+    public SyncPlanPushStrategyIncrementalImpl(final FlowForwarder flowForwarder, final MeterForwarder meterForwarder,
+            final GroupForwarder groupForwarder, final SendBarrier sendBarrier) {
+        this.flowForwarder = requireNonNull(flowForwarder);
+        this.meterForwarder = requireNonNull(meterForwarder);
+        this.groupForwarder = requireNonNull(groupForwarder);
+        this.sendBarrier = requireNonNull(sendBarrier);
+    }
 
     @Override
     public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
-                                                                 final SynchronizationDiffInput diffInput,
-                                                                 final SyncCrudCounters counters) {
+            final SynchronizationDiffInput diffInput, final SyncCrudCounters counters) {
         final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
         final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
 
@@ -77,67 +82,44 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
         // TODO enable table-update when ready
         //resultVehicle = updateTableFeatures(nodeIdent, configTree);
 
-        resultVehicle = Futures.transformAsync(resultVehicle, input -> {
-            // if (!input.isSuccessful()) {
-                //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
-                //final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
-                //        Futures.asList Arrays.asList(input, output),
-                //        ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("TODO"));
-            // }
-            return addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters);
-        }, MoreExecutors.directExecutor());
+        resultVehicle = Futures.transformAsync(resultVehicle,
+            input -> addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters),
+            MoreExecutors.directExecutor());
         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingGroups"),
-                MoreExecutors.directExecutor());
-        resultVehicle = Futures.transformAsync(resultVehicle, input -> {
-            // if (!input.isSuccessful()) {
-                //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
-            // }
-            return addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters);
-        }, MoreExecutors.directExecutor());
+            MoreExecutors.directExecutor());
+        resultVehicle = Futures.transformAsync(resultVehicle,
+            input -> addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters),
+            MoreExecutors.directExecutor());
         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingMeters"),
-                MoreExecutors.directExecutor());
-        resultVehicle = Futures.transformAsync(resultVehicle, input -> {
-            // if (!input.isSuccessful()) {
-                //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
-            // }
-            return addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters);
-        }, MoreExecutors.directExecutor());
+            MoreExecutors.directExecutor());
+        resultVehicle = Futures.transformAsync(resultVehicle,
+            input -> addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters),
+            MoreExecutors.directExecutor());
         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingFlows"),
-                MoreExecutors.directExecutor());
-
+            MoreExecutors.directExecutor());
 
-        resultVehicle = Futures.transformAsync(resultVehicle, input -> {
-            // if (!input.isSuccessful()) {
-                //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
-            // }
-            return removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters);
-        }, MoreExecutors.directExecutor());
+        resultVehicle = Futures.transformAsync(resultVehicle,
+            input -> removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters),
+            MoreExecutors.directExecutor());
         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantFlows"),
-                MoreExecutors.directExecutor());
-        resultVehicle = Futures.transformAsync(resultVehicle, input -> {
-            // if (!input.isSuccessful()) {
-                //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
-            // }
-            return removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters);
-        }, MoreExecutors.directExecutor());
+            MoreExecutors.directExecutor());
+        resultVehicle = Futures.transformAsync(resultVehicle,
+            input -> removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters),
+            MoreExecutors.directExecutor());
         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantMeters"),
-                MoreExecutors.directExecutor());
-        resultVehicle = Futures.transformAsync(resultVehicle, input -> {
-            // if (!input.isSuccessful()) {
-                //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
-            // }
-            return removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters);
-        }, MoreExecutors.directExecutor());
+            MoreExecutors.directExecutor());
+        resultVehicle = Futures.transformAsync(resultVehicle,
+            input -> removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters),
+            MoreExecutors.directExecutor());
         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"),
-                MoreExecutors.directExecutor());
+            MoreExecutors.directExecutor());
         return resultVehicle;
     }
 
 
     ListenableFuture<RpcResult<Void>> addMissingFlows(final NodeId nodeId,
-                                                      final InstanceIdentifier<FlowCapableNode> nodeIdent,
-                                                      final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox,
-                                                      final SyncCrudCounters counters) {
+            final InstanceIdentifier<FlowCapableNode> nodeIdent,
+            final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox, final SyncCrudCounters counters) {
         if (flowsInTablesSyncBox.isEmpty()) {
             LOG.trace("no tables in config for node: {} -> SKIPPING", nodeId.getValue());
             return RpcResultBuilder.<Void>success().buildFuture();
@@ -159,8 +141,7 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
                 LOG.trace("adding flow {} in table {} - absent on device {} match{}",
                         flow.getId(), tableKey, nodeId, flow.getMatch());
 
-                allResults.add(JdkFutureAdapters.listenInPoolThread(
-                        flowForwarder.add(flowIdent, flow, nodeIdent)));
+                allResults.add(flowForwarder.add(flowIdent, flow, nodeIdent));
                 flowCrudCounts.incAdded();
             }
 
@@ -173,24 +154,23 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
                 LOG.trace("flow {} in table {} - needs update on device {} match{}",
                         updatedFlow.getId(), tableKey, nodeId, updatedFlow.getMatch());
 
-                allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
-                        flowForwarder.update(flowIdent, existingFlow, updatedFlow, nodeIdent)));
+                allUpdateResults.add(flowForwarder.update(flowIdent, existingFlow, updatedFlow, nodeIdent));
                 flowCrudCounts.incUpdated();
             }
         }
 
         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
                 Futures.allAsList(allResults),
-                ReconcileUtil.<AddFlowOutput>createRpcResultCondenser("flow adding"),
+                ReconcileUtil.createRpcResultCondenser("flow adding"),
                 MoreExecutors.directExecutor());
 
         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
                 Futures.allAsList(allUpdateResults),
-                ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("flow updating"),
+                ReconcileUtil.createRpcResultCondenser("flow updating"),
                 MoreExecutors.directExecutor());
 
         return Futures.transform(Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
-                ReconcileUtil.<Void>createRpcResultCondenser("flow add/update"),
+                ReconcileUtil.createRpcResultCondenser("flow add/update"),
                 MoreExecutors.directExecutor());
     }
 
@@ -214,19 +194,18 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
             for (final Flow flow : flowsPerTable.getValue().getItemsToPush()) {
                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
                         tableIdent.child(Flow.class, flow.key());
-                allResults.add(JdkFutureAdapters.listenInPoolThread(
-                        flowForwarder.remove(flowIdent, flow, nodeIdent)));
+                allResults.add(flowForwarder.remove(flowIdent, flow, nodeIdent));
                 flowCrudCounts.incRemoved();
             }
         }
 
         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
                 Futures.allAsList(allResults),
-                ReconcileUtil.<RemoveFlowOutput>createRpcResultCondenser("flow remove"),
+                ReconcileUtil.createRpcResultCondenser("flow remove"),
                 MoreExecutors.directExecutor());
 
         return Futures.transformAsync(singleVoidResult,
-                ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
+                ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), sendBarrier),
                 MoreExecutors.directExecutor());
 
     }
@@ -248,13 +227,12 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
                     meter.getMeterId(), nodeId);
             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
                     nodeIdent.child(Meter.class, meter.key());
-            allResults.add(JdkFutureAdapters.listenInPoolThread(
-                    meterForwarder.remove(meterIdent, meter, nodeIdent)));
+            allResults.add(meterForwarder.remove(meterIdent, meter, nodeIdent));
             meterCrudCounts.incRemoved();
         }
 
         return Futures.transform(Futures.allAsList(allResults),
-                ReconcileUtil.<RemoveMeterOutput>createRpcResultCondenser("meter remove"),
+                ReconcileUtil.createRpcResultCondenser("meter remove"),
                 MoreExecutors.directExecutor());
     }
 
@@ -292,7 +270,7 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
             }
         } catch (IllegalStateException e) {
             chainedResult = RpcResultBuilder.<Void>failed()
-                    .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
+                    .withError(ErrorType.APPLICATION, "failed to add missing groups", e)
                     .buildFuture();
         }
 
@@ -305,16 +283,16 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
         List<ListenableFuture<RpcResult<RemoveGroupOutput>>> allResults = new ArrayList<>();
         for (Group group : groupsPortion.getItemsToPush()) {
             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
-            allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.remove(groupIdent, group, nodeIdent)));
+            allResults.add(groupForwarder.remove(groupIdent, group, nodeIdent));
         }
 
         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
                 Futures.allAsList(allResults),
-                ReconcileUtil.<RemoveGroupOutput>createRpcResultCondenser("group remove"),
+                ReconcileUtil.createRpcResultCondenser("group remove"),
                 MoreExecutors.directExecutor());
 
         return Futures.transformAsync(singleVoidResult,
-                ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
+                ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), sendBarrier),
                 MoreExecutors.directExecutor());
     }
 
@@ -332,19 +310,19 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
 //                    // at
 //                    // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer
 //                    //    .serializeHeader(AbstractOxmMatchEntrySerializer.java:31
-//                    // allResults.add(JdkFutureAdapters.listenInPoolThread(
-//                    // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent)));
+//                    // allResults.add(
+//                    // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent));
 //                }
 //            }
 //        }
 
         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
                 Futures.allAsList(allResults),
-                ReconcileUtil.<UpdateTableOutput>createRpcResultCondenser("table update"),
+                ReconcileUtil.createRpcResultCondenser("table update"),
                 MoreExecutors.directExecutor());
 
         return Futures.transformAsync(singleVoidResult,
-                ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
+                ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), sendBarrier),
                 MoreExecutors.directExecutor());
     }
 
@@ -356,7 +334,7 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
 
         for (Group group : groupsPortion.getItemsToPush()) {
             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
-            allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.add(groupIdent, group, nodeIdent)));
+            allResults.add(groupForwarder.add(groupIdent, group, nodeIdent));
 
         }
 
@@ -365,28 +343,27 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
             final Group group = groupTuple.getUpdated();
 
             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
-            allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
-                    groupForwarder.update(groupIdent, existingGroup, group, nodeIdent)));
+            allUpdateResults.add(groupForwarder.update(groupIdent, existingGroup, group, nodeIdent));
         }
 
         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
                 Futures.allAsList(allResults),
-                ReconcileUtil.<AddGroupOutput>createRpcResultCondenser("group add"),
+                ReconcileUtil.createRpcResultCondenser("group add"),
                 MoreExecutors.directExecutor());
 
         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
                 Futures.allAsList(allUpdateResults),
-                ReconcileUtil.<UpdateGroupOutput>createRpcResultCondenser("group update"),
+                ReconcileUtil.createRpcResultCondenser("group update"),
                 MoreExecutors.directExecutor());
 
         final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
                 Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
-                ReconcileUtil.<Void>createRpcResultCondenser("group add/update"),
+                ReconcileUtil.createRpcResultCondenser("group add/update"),
                 MoreExecutors.directExecutor());
 
 
         return Futures.transformAsync(summaryResult, ReconcileUtil.chainBarrierFlush(
-                PathUtil.digNodePath(nodeIdent), transactionService), MoreExecutors.directExecutor());
+                PathUtil.digNodePath(nodeIdent), sendBarrier), MoreExecutors.directExecutor());
     }
 
     ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
@@ -406,8 +383,7 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, meter.key());
             LOG.debug("adding meter {} - absent on device {}",
                     meter.getMeterId(), nodeId);
-            allResults.add(JdkFutureAdapters.listenInPoolThread(
-                    meterForwarder.add(meterIdent, meter, nodeIdent)));
+            allResults.add(meterForwarder.add(meterIdent, meter, nodeIdent));
             meterCrudCounts.incAdded();
         }
 
@@ -416,23 +392,22 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
             final Meter updated = meterTuple.getUpdated();
             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, updated.key());
             LOG.trace("meter {} - needs update on device {}", updated.getMeterId(), nodeId);
-            allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
-                    meterForwarder.update(meterIdent, existingMeter, updated, nodeIdent)));
+            allUpdateResults.add(meterForwarder.update(meterIdent, existingMeter, updated, nodeIdent));
             meterCrudCounts.incUpdated();
         }
 
         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
                 Futures.allAsList(allResults),
-                ReconcileUtil.<AddMeterOutput>createRpcResultCondenser("meter add"),
+                ReconcileUtil.createRpcResultCondenser("meter add"),
                 MoreExecutors.directExecutor());
 
         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
                 Futures.allAsList(allUpdateResults),
-                ReconcileUtil.<UpdateMeterOutput>createRpcResultCondenser("meter update"),
+                ReconcileUtil.createRpcResultCondenser("meter update"),
                 MoreExecutors.directExecutor());
 
         return Futures.transform(Futures.allAsList(singleVoidUpdateResult, singleVoidAddResult),
-                ReconcileUtil.<Void>createRpcResultCondenser("meter add/update"),
+                ReconcileUtil.createRpcResultCondenser("meter add/update"),
                 MoreExecutors.directExecutor());
     }
 
@@ -479,38 +454,10 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
             }
         } catch (IllegalStateException e) {
             chainedResult = RpcResultBuilder.<Void>failed()
-                    .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
+                    .withError(ErrorType.APPLICATION, "failed to add missing groups", e)
                     .buildFuture();
         }
 
         return chainedResult;
     }
-
-
-    public SyncPlanPushStrategyIncrementalImpl setFlowForwarder(final FlowForwarder flowForwarder) {
-        this.flowForwarder = flowForwarder;
-        return this;
-    }
-
-    public SyncPlanPushStrategyIncrementalImpl setTableForwarder(final TableForwarder tableForwarder) {
-        this.tableForwarder = tableForwarder;
-        return this;
-    }
-
-    public SyncPlanPushStrategyIncrementalImpl setMeterForwarder(final MeterForwarder meterForwarder) {
-        this.meterForwarder = meterForwarder;
-        return this;
-    }
-
-    public SyncPlanPushStrategyIncrementalImpl setGroupForwarder(final GroupForwarder groupForwarder) {
-        this.groupForwarder = groupForwarder;
-        return this;
-    }
-
-    public SyncPlanPushStrategyIncrementalImpl setTransactionService(
-            final FlowCapableTransactionService transactionService) {
-        this.transactionService = transactionService;
-        return this;
-    }
-
 }