* Execute CRUD API for flow + group + meter involving one-by-one (incremental) strategy.
*/
public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy {
-
private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyIncrementalImpl.class);
private FlowForwarder flowForwarder;
private MeterForwarder meterForwarder;
private GroupForwarder groupForwarder;
- private TableForwarder tableForwarder;
private FlowCapableTransactionService transactionService;
@Override
// TODO enable table-update when ready
//resultVehicle = updateTableFeatures(nodeIdent, configTree);
- resultVehicle = Futures.transformAsync(resultVehicle, input -> {
- // if (!input.isSuccessful()) {
- //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
- //final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
- // Futures.asList Arrays.asList(input, output),
- // ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("TODO"));
- // }
- return addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters);
- }, MoreExecutors.directExecutor());
+ resultVehicle = Futures.transformAsync(resultVehicle,
+ input -> addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters),
+ MoreExecutors.directExecutor());
Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingGroups"),
- MoreExecutors.directExecutor());
- resultVehicle = Futures.transformAsync(resultVehicle, input -> {
- // if (!input.isSuccessful()) {
- //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
- // }
- return addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters);
- }, MoreExecutors.directExecutor());
+ MoreExecutors.directExecutor());
+ resultVehicle = Futures.transformAsync(resultVehicle,
+ input -> addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters),
+ MoreExecutors.directExecutor());
Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingMeters"),
- MoreExecutors.directExecutor());
- resultVehicle = Futures.transformAsync(resultVehicle, input -> {
- // if (!input.isSuccessful()) {
- //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
- // }
- return addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters);
- }, MoreExecutors.directExecutor());
+ MoreExecutors.directExecutor());
+ resultVehicle = Futures.transformAsync(resultVehicle,
+ input -> addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters),
+ MoreExecutors.directExecutor());
Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingFlows"),
- MoreExecutors.directExecutor());
-
+ MoreExecutors.directExecutor());
- resultVehicle = Futures.transformAsync(resultVehicle, input -> {
- // if (!input.isSuccessful()) {
- //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
- // }
- return removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters);
- }, MoreExecutors.directExecutor());
+ resultVehicle = Futures.transformAsync(resultVehicle,
+ input -> removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters),
+ MoreExecutors.directExecutor());
Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantFlows"),
- MoreExecutors.directExecutor());
- resultVehicle = Futures.transformAsync(resultVehicle, input -> {
- // if (!input.isSuccessful()) {
- //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
- // }
- return removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters);
- }, MoreExecutors.directExecutor());
+ MoreExecutors.directExecutor());
+ resultVehicle = Futures.transformAsync(resultVehicle,
+ input -> removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters),
+ MoreExecutors.directExecutor());
Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantMeters"),
- MoreExecutors.directExecutor());
- resultVehicle = Futures.transformAsync(resultVehicle, input -> {
- // if (!input.isSuccessful()) {
- //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
- // }
- return removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters);
- }, MoreExecutors.directExecutor());
+ MoreExecutors.directExecutor());
+ resultVehicle = Futures.transformAsync(resultVehicle,
+ input -> removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters),
+ MoreExecutors.directExecutor());
Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"),
- MoreExecutors.directExecutor());
+ MoreExecutors.directExecutor());
return resultVehicle;
}
return this;
}
+ @Deprecated(since = "0.17.2", forRemoval = true)
public SyncPlanPushStrategyIncrementalImpl setTableForwarder(final TableForwarder tableForwarder) {
- this.tableForwarder = tableForwarder;
return this;
}
this.transactionService = transactionService;
return this;
}
-
}