2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
10 import com.google.common.collect.Iterables;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.ArrayList;
15 import java.util.Collections;
16 import java.util.List;
18 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
19 import org.opendaylight.openflowplugin.applications.frsync.util.CrudCounts;
20 import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
21 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
22 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
23 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
24 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
46 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
47 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.common.ErrorType;
49 import org.opendaylight.yangtools.yang.common.RpcResult;
50 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
55 * Execute CRUD API for flow + group + meter involving one-by-one (incremental) strategy.
57 public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy {
58 private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyIncrementalImpl.class);
60 private FlowForwarder flowForwarder;
61 private MeterForwarder meterForwarder;
62 private GroupForwarder groupForwarder;
63 private FlowCapableTransactionService transactionService;
66 public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
67 final SynchronizationDiffInput diffInput,
68 final SyncCrudCounters counters) {
69 final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
70 final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
72 /* Tables - have to be pushed before groups */
73 // TODO enable table-update when ready
74 //resultVehicle = updateTableFeatures(nodeIdent, configTree);
76 resultVehicle = Futures.transformAsync(resultVehicle,
77 input -> addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters),
78 MoreExecutors.directExecutor());
79 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingGroups"),
80 MoreExecutors.directExecutor());
81 resultVehicle = Futures.transformAsync(resultVehicle,
82 input -> addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters),
83 MoreExecutors.directExecutor());
84 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingMeters"),
85 MoreExecutors.directExecutor());
86 resultVehicle = Futures.transformAsync(resultVehicle,
87 input -> addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters),
88 MoreExecutors.directExecutor());
89 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingFlows"),
90 MoreExecutors.directExecutor());
92 resultVehicle = Futures.transformAsync(resultVehicle,
93 input -> removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters),
94 MoreExecutors.directExecutor());
95 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantFlows"),
96 MoreExecutors.directExecutor());
97 resultVehicle = Futures.transformAsync(resultVehicle,
98 input -> removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters),
99 MoreExecutors.directExecutor());
100 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantMeters"),
101 MoreExecutors.directExecutor());
102 resultVehicle = Futures.transformAsync(resultVehicle,
103 input -> removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters),
104 MoreExecutors.directExecutor());
105 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"),
106 MoreExecutors.directExecutor());
107 return resultVehicle;
111 ListenableFuture<RpcResult<Void>> addMissingFlows(final NodeId nodeId,
112 final InstanceIdentifier<FlowCapableNode> nodeIdent,
113 final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox,
114 final SyncCrudCounters counters) {
115 if (flowsInTablesSyncBox.isEmpty()) {
116 LOG.trace("no tables in config for node: {} -> SKIPPING", nodeId.getValue());
117 return RpcResultBuilder.<Void>success().buildFuture();
120 final List<ListenableFuture<RpcResult<AddFlowOutput>>> allResults = new ArrayList<>();
121 final List<ListenableFuture<RpcResult<UpdateFlowOutput>>> allUpdateResults = new ArrayList<>();
122 final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
124 for (Map.Entry<TableKey, ItemSyncBox<Flow>> flowsInTableBoxEntry : flowsInTablesSyncBox.entrySet()) {
125 final TableKey tableKey = flowsInTableBoxEntry.getKey();
126 final ItemSyncBox<Flow> flowSyncBox = flowsInTableBoxEntry.getValue();
128 final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdent.child(Table.class, tableKey);
130 for (final Flow flow : flowSyncBox.getItemsToPush()) {
131 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.key());
133 LOG.trace("adding flow {} in table {} - absent on device {} match{}",
134 flow.getId(), tableKey, nodeId, flow.getMatch());
136 allResults.add(flowForwarder.add(flowIdent, flow, nodeIdent));
137 flowCrudCounts.incAdded();
140 for (final ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowSyncBox.getItemsToUpdate()) {
141 final Flow existingFlow = flowUpdate.getOriginal();
142 final Flow updatedFlow = flowUpdate.getUpdated();
144 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class,
146 LOG.trace("flow {} in table {} - needs update on device {} match{}",
147 updatedFlow.getId(), tableKey, nodeId, updatedFlow.getMatch());
149 allUpdateResults.add(flowForwarder.update(flowIdent, existingFlow, updatedFlow, nodeIdent));
150 flowCrudCounts.incUpdated();
154 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
155 Futures.allAsList(allResults),
156 ReconcileUtil.createRpcResultCondenser("flow adding"),
157 MoreExecutors.directExecutor());
159 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
160 Futures.allAsList(allUpdateResults),
161 ReconcileUtil.createRpcResultCondenser("flow updating"),
162 MoreExecutors.directExecutor());
164 return Futures.transform(Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
165 ReconcileUtil.createRpcResultCondenser("flow add/update"),
166 MoreExecutors.directExecutor());
169 ListenableFuture<RpcResult<Void>> removeRedundantFlows(final NodeId nodeId,
170 final InstanceIdentifier<FlowCapableNode> nodeIdent,
171 final Map<TableKey, ItemSyncBox<Flow>> removalPlan,
172 final SyncCrudCounters counters) {
173 if (removalPlan.isEmpty()) {
174 LOG.trace("no tables in operational for node: {} -> SKIPPING", nodeId.getValue());
175 return RpcResultBuilder.<Void>success().buildFuture();
178 final List<ListenableFuture<RpcResult<RemoveFlowOutput>>> allResults = new ArrayList<>();
179 final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
181 for (final Map.Entry<TableKey, ItemSyncBox<Flow>> flowsPerTable : removalPlan.entrySet()) {
182 final KeyedInstanceIdentifier<Table, TableKey> tableIdent =
183 nodeIdent.child(Table.class, flowsPerTable.getKey());
185 // loop flows on device and check if the are configured
186 for (final Flow flow : flowsPerTable.getValue().getItemsToPush()) {
187 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
188 tableIdent.child(Flow.class, flow.key());
189 allResults.add(flowForwarder.remove(flowIdent, flow, nodeIdent));
190 flowCrudCounts.incRemoved();
194 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
195 Futures.allAsList(allResults),
196 ReconcileUtil.createRpcResultCondenser("flow remove"),
197 MoreExecutors.directExecutor());
199 return Futures.transformAsync(singleVoidResult,
200 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
201 MoreExecutors.directExecutor());
205 ListenableFuture<RpcResult<Void>> removeRedundantMeters(final NodeId nodeId,
206 final InstanceIdentifier<FlowCapableNode> nodeIdent,
207 final ItemSyncBox<Meter> meterRemovalPlan,
208 final SyncCrudCounters counters) {
209 if (meterRemovalPlan.isEmpty()) {
210 LOG.trace("no meters on device for node: {} -> SKIPPING", nodeId.getValue());
211 return RpcResultBuilder.<Void>success().buildFuture();
214 final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
216 final List<ListenableFuture<RpcResult<RemoveMeterOutput>>> allResults = new ArrayList<>();
217 for (Meter meter : meterRemovalPlan.getItemsToPush()) {
218 LOG.trace("removing meter {} - absent in config {}",
219 meter.getMeterId(), nodeId);
220 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
221 nodeIdent.child(Meter.class, meter.key());
222 allResults.add(meterForwarder.remove(meterIdent, meter, nodeIdent));
223 meterCrudCounts.incRemoved();
226 return Futures.transform(Futures.allAsList(allResults),
227 ReconcileUtil.createRpcResultCondenser("meter remove"),
228 MoreExecutors.directExecutor());
231 ListenableFuture<RpcResult<Void>> removeRedundantGroups(final NodeId nodeId,
232 final InstanceIdentifier<FlowCapableNode> nodeIdent,
233 final List<ItemSyncBox<Group>> groupsRemovalPlan,
234 final SyncCrudCounters counters) {
235 if (groupsRemovalPlan.isEmpty()) {
236 LOG.trace("no groups on device for node: {} -> SKIPPING", nodeId.getValue());
237 return RpcResultBuilder.<Void>success().buildFuture();
240 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
242 ListenableFuture<RpcResult<Void>> chainedResult = RpcResultBuilder.<Void>success().buildFuture();
244 groupCrudCounts.setRemoved(ReconcileUtil.countTotalPushed(groupsRemovalPlan));
245 if (LOG.isDebugEnabled()) {
246 LOG.debug("removing groups: planSteps={}, toRemoveTotal={}",
247 groupsRemovalPlan.size(), groupCrudCounts.getRemoved());
249 Collections.reverse(groupsRemovalPlan);
250 for (final ItemSyncBox<Group> groupsPortion : groupsRemovalPlan) {
251 chainedResult = Futures.transformAsync(chainedResult, input -> {
252 final ListenableFuture<RpcResult<Void>> result;
253 if (input.isSuccessful()) {
254 result = flushRemoveGroupPortionAndBarrier(nodeIdent, groupsPortion);
256 // pass through original unsuccessful rpcResult
257 result = Futures.immediateFuture(input);
261 }, MoreExecutors.directExecutor());
263 } catch (IllegalStateException e) {
264 chainedResult = RpcResultBuilder.<Void>failed()
265 .withError(ErrorType.APPLICATION, "failed to add missing groups", e)
269 return chainedResult;
272 private ListenableFuture<RpcResult<Void>> flushRemoveGroupPortionAndBarrier(
273 final InstanceIdentifier<FlowCapableNode> nodeIdent,
274 final ItemSyncBox<Group> groupsPortion) {
275 List<ListenableFuture<RpcResult<RemoveGroupOutput>>> allResults = new ArrayList<>();
276 for (Group group : groupsPortion.getItemsToPush()) {
277 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
278 allResults.add(groupForwarder.remove(groupIdent, group, nodeIdent));
281 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
282 Futures.allAsList(allResults),
283 ReconcileUtil.createRpcResultCondenser("group remove"),
284 MoreExecutors.directExecutor());
286 return Futures.transformAsync(singleVoidResult,
287 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
288 MoreExecutors.directExecutor());
291 ListenableFuture<RpcResult<Void>> updateTableFeatures(final InstanceIdentifier<FlowCapableNode> nodeIdent,
292 final FlowCapableNode flowCapableNodeConfigured) {
293 // CHECK if while pushing the update, updateTableInput can be null to emulate a table add
294 //final List<Table> tableList = ReconcileUtil.safeTables(flowCapableNodeConfigured);
296 final List<ListenableFuture<RpcResult<UpdateTableOutput>>> allResults = new ArrayList<>();
297 // for (Table table : tableList) {
298 // List<TableFeatures> tableFeatures = flowCapableNodeConfigured.getTableFeatures();
299 // if (tableFeatures != null) {
300 // for (TableFeatures tableFeaturesItem : tableFeatures) {
301 // // TODO uncomment java.lang.NullPointerException
303 // // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer
304 // // .serializeHeader(AbstractOxmMatchEntrySerializer.java:31
305 // // allResults.add(JdkFutureAdapters.listenInPoolThread(
306 // // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent)));
311 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
312 Futures.allAsList(allResults),
313 ReconcileUtil.createRpcResultCondenser("table update"),
314 MoreExecutors.directExecutor());
316 return Futures.transformAsync(singleVoidResult,
317 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
318 MoreExecutors.directExecutor());
321 private ListenableFuture<RpcResult<Void>> flushAddGroupPortionAndBarrier(
322 final InstanceIdentifier<FlowCapableNode> nodeIdent,
323 final ItemSyncBox<Group> groupsPortion) {
324 final List<ListenableFuture<RpcResult<AddGroupOutput>>> allResults = new ArrayList<>();
325 final List<ListenableFuture<RpcResult<UpdateGroupOutput>>> allUpdateResults = new ArrayList<>();
327 for (Group group : groupsPortion.getItemsToPush()) {
328 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
329 allResults.add(groupForwarder.add(groupIdent, group, nodeIdent));
333 for (ItemSyncBox.ItemUpdateTuple<Group> groupTuple : groupsPortion.getItemsToUpdate()) {
334 final Group existingGroup = groupTuple.getOriginal();
335 final Group group = groupTuple.getUpdated();
337 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
338 allUpdateResults.add(groupForwarder.update(groupIdent, existingGroup, group, nodeIdent));
341 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
342 Futures.allAsList(allResults),
343 ReconcileUtil.createRpcResultCondenser("group add"),
344 MoreExecutors.directExecutor());
346 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
347 Futures.allAsList(allUpdateResults),
348 ReconcileUtil.createRpcResultCondenser("group update"),
349 MoreExecutors.directExecutor());
351 final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
352 Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
353 ReconcileUtil.createRpcResultCondenser("group add/update"),
354 MoreExecutors.directExecutor());
357 return Futures.transformAsync(summaryResult, ReconcileUtil.chainBarrierFlush(
358 PathUtil.digNodePath(nodeIdent), transactionService), MoreExecutors.directExecutor());
361 ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
362 final InstanceIdentifier<FlowCapableNode> nodeIdent,
363 final ItemSyncBox<Meter> syncBox,
364 final SyncCrudCounters counters) {
365 if (syncBox.isEmpty()) {
366 LOG.trace("no meters configured for node: {} -> SKIPPING", nodeId.getValue());
367 return RpcResultBuilder.<Void>success().buildFuture();
370 final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
372 final List<ListenableFuture<RpcResult<AddMeterOutput>>> allResults = new ArrayList<>();
373 final List<ListenableFuture<RpcResult<UpdateMeterOutput>>> allUpdateResults = new ArrayList<>();
374 for (Meter meter : syncBox.getItemsToPush()) {
375 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, meter.key());
376 LOG.debug("adding meter {} - absent on device {}",
377 meter.getMeterId(), nodeId);
378 allResults.add(meterForwarder.add(meterIdent, meter, nodeIdent));
379 meterCrudCounts.incAdded();
382 for (ItemSyncBox.ItemUpdateTuple<Meter> meterTuple : syncBox.getItemsToUpdate()) {
383 final Meter existingMeter = meterTuple.getOriginal();
384 final Meter updated = meterTuple.getUpdated();
385 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, updated.key());
386 LOG.trace("meter {} - needs update on device {}", updated.getMeterId(), nodeId);
387 allUpdateResults.add(meterForwarder.update(meterIdent, existingMeter, updated, nodeIdent));
388 meterCrudCounts.incUpdated();
391 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
392 Futures.allAsList(allResults),
393 ReconcileUtil.createRpcResultCondenser("meter add"),
394 MoreExecutors.directExecutor());
396 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
397 Futures.allAsList(allUpdateResults),
398 ReconcileUtil.createRpcResultCondenser("meter update"),
399 MoreExecutors.directExecutor());
401 return Futures.transform(Futures.allAsList(singleVoidUpdateResult, singleVoidAddResult),
402 ReconcileUtil.createRpcResultCondenser("meter add/update"),
403 MoreExecutors.directExecutor());
406 ListenableFuture<RpcResult<Void>> addMissingGroups(final NodeId nodeId,
407 final InstanceIdentifier<FlowCapableNode> nodeIdent,
408 final List<ItemSyncBox<Group>> groupsAddPlan,
409 final SyncCrudCounters counters) {
410 if (groupsAddPlan.isEmpty()) {
411 LOG.trace("no groups configured for node: {} -> SKIPPING", nodeId.getValue());
412 return RpcResultBuilder.<Void>success().buildFuture();
415 ListenableFuture<RpcResult<Void>> chainedResult;
417 if (!groupsAddPlan.isEmpty()) {
418 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
419 groupCrudCounts.setAdded(ReconcileUtil.countTotalPushed(groupsAddPlan));
420 groupCrudCounts.setUpdated(ReconcileUtil.countTotalUpdated(groupsAddPlan));
422 if (LOG.isDebugEnabled()) {
423 LOG.debug("adding groups: planSteps={}, toAddTotal={}, toUpdateTotal={}",
424 groupsAddPlan.size(),
425 groupCrudCounts.getAdded(),
426 groupCrudCounts.getUpdated());
429 chainedResult = flushAddGroupPortionAndBarrier(nodeIdent, groupsAddPlan.get(0));
430 for (final ItemSyncBox<Group> groupsPortion : Iterables.skip(groupsAddPlan, 1)) {
432 Futures.transformAsync(chainedResult, input -> {
433 final ListenableFuture<RpcResult<Void>> result;
434 if (input.isSuccessful()) {
435 result = flushAddGroupPortionAndBarrier(nodeIdent, groupsPortion);
437 // pass through original unsuccessful rpcResult
438 result = Futures.immediateFuture(input);
442 }, MoreExecutors.directExecutor());
445 chainedResult = RpcResultBuilder.<Void>success().buildFuture();
447 } catch (IllegalStateException e) {
448 chainedResult = RpcResultBuilder.<Void>failed()
449 .withError(ErrorType.APPLICATION, "failed to add missing groups", e)
453 return chainedResult;
457 public SyncPlanPushStrategyIncrementalImpl setFlowForwarder(final FlowForwarder flowForwarder) {
458 this.flowForwarder = flowForwarder;
462 @Deprecated(since = "0.17.2", forRemoval = true)
463 public SyncPlanPushStrategyIncrementalImpl setTableForwarder(final TableForwarder tableForwarder) {
467 public SyncPlanPushStrategyIncrementalImpl setMeterForwarder(final MeterForwarder meterForwarder) {
468 this.meterForwarder = meterForwarder;
472 public SyncPlanPushStrategyIncrementalImpl setGroupForwarder(final GroupForwarder groupForwarder) {
473 this.groupForwarder = groupForwarder;
477 public SyncPlanPushStrategyIncrementalImpl setTransactionService(
478 final FlowCapableTransactionService transactionService) {
479 this.transactionService = transactionService;