2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.collect.Iterables;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.List;
20 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
21 import org.opendaylight.openflowplugin.applications.frsync.util.CrudCounts;
22 import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
23 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
24 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
25 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
26 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrier;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
48 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
49 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.common.ErrorType;
51 import org.opendaylight.yangtools.yang.common.RpcResult;
52 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
57 * Execute CRUD API for flow + group + meter involving one-by-one (incremental) strategy.
59 public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy {
60 private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyIncrementalImpl.class);
62 private final FlowForwarder flowForwarder;
63 private final MeterForwarder meterForwarder;
64 private final GroupForwarder groupForwarder;
65 private final SendBarrier sendBarrier;
67 public SyncPlanPushStrategyIncrementalImpl(final FlowForwarder flowForwarder, final MeterForwarder meterForwarder,
68 final GroupForwarder groupForwarder, final SendBarrier sendBarrier) {
69 this.flowForwarder = requireNonNull(flowForwarder);
70 this.meterForwarder = requireNonNull(meterForwarder);
71 this.groupForwarder = requireNonNull(groupForwarder);
72 this.sendBarrier = requireNonNull(sendBarrier);
76 public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
77 final SynchronizationDiffInput diffInput, final SyncCrudCounters counters) {
78 final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
79 final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
81 /* Tables - have to be pushed before groups */
82 // TODO enable table-update when ready
83 //resultVehicle = updateTableFeatures(nodeIdent, configTree);
85 resultVehicle = Futures.transformAsync(resultVehicle,
86 input -> addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters),
87 MoreExecutors.directExecutor());
88 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingGroups"),
89 MoreExecutors.directExecutor());
90 resultVehicle = Futures.transformAsync(resultVehicle,
91 input -> addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters),
92 MoreExecutors.directExecutor());
93 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingMeters"),
94 MoreExecutors.directExecutor());
95 resultVehicle = Futures.transformAsync(resultVehicle,
96 input -> addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters),
97 MoreExecutors.directExecutor());
98 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingFlows"),
99 MoreExecutors.directExecutor());
101 resultVehicle = Futures.transformAsync(resultVehicle,
102 input -> removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters),
103 MoreExecutors.directExecutor());
104 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantFlows"),
105 MoreExecutors.directExecutor());
106 resultVehicle = Futures.transformAsync(resultVehicle,
107 input -> removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters),
108 MoreExecutors.directExecutor());
109 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantMeters"),
110 MoreExecutors.directExecutor());
111 resultVehicle = Futures.transformAsync(resultVehicle,
112 input -> removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters),
113 MoreExecutors.directExecutor());
114 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"),
115 MoreExecutors.directExecutor());
116 return resultVehicle;
120 ListenableFuture<RpcResult<Void>> addMissingFlows(final NodeId nodeId,
121 final InstanceIdentifier<FlowCapableNode> nodeIdent,
122 final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox, final SyncCrudCounters counters) {
123 if (flowsInTablesSyncBox.isEmpty()) {
124 LOG.trace("no tables in config for node: {} -> SKIPPING", nodeId.getValue());
125 return RpcResultBuilder.<Void>success().buildFuture();
128 final List<ListenableFuture<RpcResult<AddFlowOutput>>> allResults = new ArrayList<>();
129 final List<ListenableFuture<RpcResult<UpdateFlowOutput>>> allUpdateResults = new ArrayList<>();
130 final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
132 for (Map.Entry<TableKey, ItemSyncBox<Flow>> flowsInTableBoxEntry : flowsInTablesSyncBox.entrySet()) {
133 final TableKey tableKey = flowsInTableBoxEntry.getKey();
134 final ItemSyncBox<Flow> flowSyncBox = flowsInTableBoxEntry.getValue();
136 final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdent.child(Table.class, tableKey);
138 for (final Flow flow : flowSyncBox.getItemsToPush()) {
139 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.key());
141 LOG.trace("adding flow {} in table {} - absent on device {} match{}",
142 flow.getId(), tableKey, nodeId, flow.getMatch());
144 allResults.add(flowForwarder.add(flowIdent, flow, nodeIdent));
145 flowCrudCounts.incAdded();
148 for (final ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowSyncBox.getItemsToUpdate()) {
149 final Flow existingFlow = flowUpdate.getOriginal();
150 final Flow updatedFlow = flowUpdate.getUpdated();
152 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class,
154 LOG.trace("flow {} in table {} - needs update on device {} match{}",
155 updatedFlow.getId(), tableKey, nodeId, updatedFlow.getMatch());
157 allUpdateResults.add(flowForwarder.update(flowIdent, existingFlow, updatedFlow, nodeIdent));
158 flowCrudCounts.incUpdated();
162 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
163 Futures.allAsList(allResults),
164 ReconcileUtil.createRpcResultCondenser("flow adding"),
165 MoreExecutors.directExecutor());
167 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
168 Futures.allAsList(allUpdateResults),
169 ReconcileUtil.createRpcResultCondenser("flow updating"),
170 MoreExecutors.directExecutor());
172 return Futures.transform(Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
173 ReconcileUtil.createRpcResultCondenser("flow add/update"),
174 MoreExecutors.directExecutor());
177 ListenableFuture<RpcResult<Void>> removeRedundantFlows(final NodeId nodeId,
178 final InstanceIdentifier<FlowCapableNode> nodeIdent,
179 final Map<TableKey, ItemSyncBox<Flow>> removalPlan,
180 final SyncCrudCounters counters) {
181 if (removalPlan.isEmpty()) {
182 LOG.trace("no tables in operational for node: {} -> SKIPPING", nodeId.getValue());
183 return RpcResultBuilder.<Void>success().buildFuture();
186 final List<ListenableFuture<RpcResult<RemoveFlowOutput>>> allResults = new ArrayList<>();
187 final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
189 for (final Map.Entry<TableKey, ItemSyncBox<Flow>> flowsPerTable : removalPlan.entrySet()) {
190 final KeyedInstanceIdentifier<Table, TableKey> tableIdent =
191 nodeIdent.child(Table.class, flowsPerTable.getKey());
193 // loop flows on device and check if the are configured
194 for (final Flow flow : flowsPerTable.getValue().getItemsToPush()) {
195 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
196 tableIdent.child(Flow.class, flow.key());
197 allResults.add(flowForwarder.remove(flowIdent, flow, nodeIdent));
198 flowCrudCounts.incRemoved();
202 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
203 Futures.allAsList(allResults),
204 ReconcileUtil.createRpcResultCondenser("flow remove"),
205 MoreExecutors.directExecutor());
207 return Futures.transformAsync(singleVoidResult,
208 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), sendBarrier),
209 MoreExecutors.directExecutor());
213 ListenableFuture<RpcResult<Void>> removeRedundantMeters(final NodeId nodeId,
214 final InstanceIdentifier<FlowCapableNode> nodeIdent,
215 final ItemSyncBox<Meter> meterRemovalPlan,
216 final SyncCrudCounters counters) {
217 if (meterRemovalPlan.isEmpty()) {
218 LOG.trace("no meters on device for node: {} -> SKIPPING", nodeId.getValue());
219 return RpcResultBuilder.<Void>success().buildFuture();
222 final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
224 final List<ListenableFuture<RpcResult<RemoveMeterOutput>>> allResults = new ArrayList<>();
225 for (Meter meter : meterRemovalPlan.getItemsToPush()) {
226 LOG.trace("removing meter {} - absent in config {}",
227 meter.getMeterId(), nodeId);
228 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
229 nodeIdent.child(Meter.class, meter.key());
230 allResults.add(meterForwarder.remove(meterIdent, meter, nodeIdent));
231 meterCrudCounts.incRemoved();
234 return Futures.transform(Futures.allAsList(allResults),
235 ReconcileUtil.createRpcResultCondenser("meter remove"),
236 MoreExecutors.directExecutor());
239 ListenableFuture<RpcResult<Void>> removeRedundantGroups(final NodeId nodeId,
240 final InstanceIdentifier<FlowCapableNode> nodeIdent,
241 final List<ItemSyncBox<Group>> groupsRemovalPlan,
242 final SyncCrudCounters counters) {
243 if (groupsRemovalPlan.isEmpty()) {
244 LOG.trace("no groups on device for node: {} -> SKIPPING", nodeId.getValue());
245 return RpcResultBuilder.<Void>success().buildFuture();
248 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
250 ListenableFuture<RpcResult<Void>> chainedResult = RpcResultBuilder.<Void>success().buildFuture();
252 groupCrudCounts.setRemoved(ReconcileUtil.countTotalPushed(groupsRemovalPlan));
253 if (LOG.isDebugEnabled()) {
254 LOG.debug("removing groups: planSteps={}, toRemoveTotal={}",
255 groupsRemovalPlan.size(), groupCrudCounts.getRemoved());
257 Collections.reverse(groupsRemovalPlan);
258 for (final ItemSyncBox<Group> groupsPortion : groupsRemovalPlan) {
259 chainedResult = Futures.transformAsync(chainedResult, input -> {
260 final ListenableFuture<RpcResult<Void>> result;
261 if (input.isSuccessful()) {
262 result = flushRemoveGroupPortionAndBarrier(nodeIdent, groupsPortion);
264 // pass through original unsuccessful rpcResult
265 result = Futures.immediateFuture(input);
269 }, MoreExecutors.directExecutor());
271 } catch (IllegalStateException e) {
272 chainedResult = RpcResultBuilder.<Void>failed()
273 .withError(ErrorType.APPLICATION, "failed to add missing groups", e)
277 return chainedResult;
280 private ListenableFuture<RpcResult<Void>> flushRemoveGroupPortionAndBarrier(
281 final InstanceIdentifier<FlowCapableNode> nodeIdent,
282 final ItemSyncBox<Group> groupsPortion) {
283 List<ListenableFuture<RpcResult<RemoveGroupOutput>>> allResults = new ArrayList<>();
284 for (Group group : groupsPortion.getItemsToPush()) {
285 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
286 allResults.add(groupForwarder.remove(groupIdent, group, nodeIdent));
289 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
290 Futures.allAsList(allResults),
291 ReconcileUtil.createRpcResultCondenser("group remove"),
292 MoreExecutors.directExecutor());
294 return Futures.transformAsync(singleVoidResult,
295 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), sendBarrier),
296 MoreExecutors.directExecutor());
299 ListenableFuture<RpcResult<Void>> updateTableFeatures(final InstanceIdentifier<FlowCapableNode> nodeIdent,
300 final FlowCapableNode flowCapableNodeConfigured) {
301 // CHECK if while pushing the update, updateTableInput can be null to emulate a table add
302 //final List<Table> tableList = ReconcileUtil.safeTables(flowCapableNodeConfigured);
304 final List<ListenableFuture<RpcResult<UpdateTableOutput>>> allResults = new ArrayList<>();
305 // for (Table table : tableList) {
306 // List<TableFeatures> tableFeatures = flowCapableNodeConfigured.getTableFeatures();
307 // if (tableFeatures != null) {
308 // for (TableFeatures tableFeaturesItem : tableFeatures) {
309 // // TODO uncomment java.lang.NullPointerException
311 // // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer
312 // // .serializeHeader(AbstractOxmMatchEntrySerializer.java:31
313 // // allResults.add(
314 // // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent));
319 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
320 Futures.allAsList(allResults),
321 ReconcileUtil.createRpcResultCondenser("table update"),
322 MoreExecutors.directExecutor());
324 return Futures.transformAsync(singleVoidResult,
325 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), sendBarrier),
326 MoreExecutors.directExecutor());
329 private ListenableFuture<RpcResult<Void>> flushAddGroupPortionAndBarrier(
330 final InstanceIdentifier<FlowCapableNode> nodeIdent,
331 final ItemSyncBox<Group> groupsPortion) {
332 final List<ListenableFuture<RpcResult<AddGroupOutput>>> allResults = new ArrayList<>();
333 final List<ListenableFuture<RpcResult<UpdateGroupOutput>>> allUpdateResults = new ArrayList<>();
335 for (Group group : groupsPortion.getItemsToPush()) {
336 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
337 allResults.add(groupForwarder.add(groupIdent, group, nodeIdent));
341 for (ItemSyncBox.ItemUpdateTuple<Group> groupTuple : groupsPortion.getItemsToUpdate()) {
342 final Group existingGroup = groupTuple.getOriginal();
343 final Group group = groupTuple.getUpdated();
345 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
346 allUpdateResults.add(groupForwarder.update(groupIdent, existingGroup, group, nodeIdent));
349 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
350 Futures.allAsList(allResults),
351 ReconcileUtil.createRpcResultCondenser("group add"),
352 MoreExecutors.directExecutor());
354 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
355 Futures.allAsList(allUpdateResults),
356 ReconcileUtil.createRpcResultCondenser("group update"),
357 MoreExecutors.directExecutor());
359 final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
360 Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
361 ReconcileUtil.createRpcResultCondenser("group add/update"),
362 MoreExecutors.directExecutor());
365 return Futures.transformAsync(summaryResult, ReconcileUtil.chainBarrierFlush(
366 PathUtil.digNodePath(nodeIdent), sendBarrier), MoreExecutors.directExecutor());
369 ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
370 final InstanceIdentifier<FlowCapableNode> nodeIdent,
371 final ItemSyncBox<Meter> syncBox,
372 final SyncCrudCounters counters) {
373 if (syncBox.isEmpty()) {
374 LOG.trace("no meters configured for node: {} -> SKIPPING", nodeId.getValue());
375 return RpcResultBuilder.<Void>success().buildFuture();
378 final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
380 final List<ListenableFuture<RpcResult<AddMeterOutput>>> allResults = new ArrayList<>();
381 final List<ListenableFuture<RpcResult<UpdateMeterOutput>>> allUpdateResults = new ArrayList<>();
382 for (Meter meter : syncBox.getItemsToPush()) {
383 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, meter.key());
384 LOG.debug("adding meter {} - absent on device {}",
385 meter.getMeterId(), nodeId);
386 allResults.add(meterForwarder.add(meterIdent, meter, nodeIdent));
387 meterCrudCounts.incAdded();
390 for (ItemSyncBox.ItemUpdateTuple<Meter> meterTuple : syncBox.getItemsToUpdate()) {
391 final Meter existingMeter = meterTuple.getOriginal();
392 final Meter updated = meterTuple.getUpdated();
393 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, updated.key());
394 LOG.trace("meter {} - needs update on device {}", updated.getMeterId(), nodeId);
395 allUpdateResults.add(meterForwarder.update(meterIdent, existingMeter, updated, nodeIdent));
396 meterCrudCounts.incUpdated();
399 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
400 Futures.allAsList(allResults),
401 ReconcileUtil.createRpcResultCondenser("meter add"),
402 MoreExecutors.directExecutor());
404 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
405 Futures.allAsList(allUpdateResults),
406 ReconcileUtil.createRpcResultCondenser("meter update"),
407 MoreExecutors.directExecutor());
409 return Futures.transform(Futures.allAsList(singleVoidUpdateResult, singleVoidAddResult),
410 ReconcileUtil.createRpcResultCondenser("meter add/update"),
411 MoreExecutors.directExecutor());
414 ListenableFuture<RpcResult<Void>> addMissingGroups(final NodeId nodeId,
415 final InstanceIdentifier<FlowCapableNode> nodeIdent,
416 final List<ItemSyncBox<Group>> groupsAddPlan,
417 final SyncCrudCounters counters) {
418 if (groupsAddPlan.isEmpty()) {
419 LOG.trace("no groups configured for node: {} -> SKIPPING", nodeId.getValue());
420 return RpcResultBuilder.<Void>success().buildFuture();
423 ListenableFuture<RpcResult<Void>> chainedResult;
425 if (!groupsAddPlan.isEmpty()) {
426 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
427 groupCrudCounts.setAdded(ReconcileUtil.countTotalPushed(groupsAddPlan));
428 groupCrudCounts.setUpdated(ReconcileUtil.countTotalUpdated(groupsAddPlan));
430 if (LOG.isDebugEnabled()) {
431 LOG.debug("adding groups: planSteps={}, toAddTotal={}, toUpdateTotal={}",
432 groupsAddPlan.size(),
433 groupCrudCounts.getAdded(),
434 groupCrudCounts.getUpdated());
437 chainedResult = flushAddGroupPortionAndBarrier(nodeIdent, groupsAddPlan.get(0));
438 for (final ItemSyncBox<Group> groupsPortion : Iterables.skip(groupsAddPlan, 1)) {
440 Futures.transformAsync(chainedResult, input -> {
441 final ListenableFuture<RpcResult<Void>> result;
442 if (input.isSuccessful()) {
443 result = flushAddGroupPortionAndBarrier(nodeIdent, groupsPortion);
445 // pass through original unsuccessful rpcResult
446 result = Futures.immediateFuture(input);
450 }, MoreExecutors.directExecutor());
453 chainedResult = RpcResultBuilder.<Void>success().buildFuture();
455 } catch (IllegalStateException e) {
456 chainedResult = RpcResultBuilder.<Void>failed()
457 .withError(ErrorType.APPLICATION, "failed to add missing groups", e)
461 return chainedResult;