2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
10 import com.google.common.collect.Iterables;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.ArrayList;
15 import java.util.Collections;
16 import java.util.List;
18 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
19 import org.opendaylight.openflowplugin.applications.frsync.util.CrudCounts;
20 import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
21 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
22 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
23 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
24 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
46 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
47 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.common.ErrorType;
49 import org.opendaylight.yangtools.yang.common.RpcResult;
50 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
55 * Execute CRUD API for flow + group + meter involving one-by-one (incremental) strategy.
57 public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy {
59 private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyIncrementalImpl.class);
61 private FlowForwarder flowForwarder;
62 private MeterForwarder meterForwarder;
63 private GroupForwarder groupForwarder;
64 private TableForwarder tableForwarder;
65 private FlowCapableTransactionService transactionService;
68 public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
69 final SynchronizationDiffInput diffInput,
70 final SyncCrudCounters counters) {
71 final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
72 final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
74 /* Tables - have to be pushed before groups */
75 // TODO enable table-update when ready
76 //resultVehicle = updateTableFeatures(nodeIdent, configTree);
78 resultVehicle = Futures.transformAsync(resultVehicle, input -> {
79 // if (!input.isSuccessful()) {
80 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
81 //final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
82 // Futures.asList Arrays.asList(input, output),
83 // ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("TODO"));
85 return addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters);
86 }, MoreExecutors.directExecutor());
87 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingGroups"),
88 MoreExecutors.directExecutor());
89 resultVehicle = Futures.transformAsync(resultVehicle, input -> {
90 // if (!input.isSuccessful()) {
91 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
93 return addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters);
94 }, MoreExecutors.directExecutor());
95 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingMeters"),
96 MoreExecutors.directExecutor());
97 resultVehicle = Futures.transformAsync(resultVehicle, input -> {
98 // if (!input.isSuccessful()) {
99 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
101 return addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters);
102 }, MoreExecutors.directExecutor());
103 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingFlows"),
104 MoreExecutors.directExecutor());
107 resultVehicle = Futures.transformAsync(resultVehicle, input -> {
108 // if (!input.isSuccessful()) {
109 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
111 return removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters);
112 }, MoreExecutors.directExecutor());
113 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantFlows"),
114 MoreExecutors.directExecutor());
115 resultVehicle = Futures.transformAsync(resultVehicle, input -> {
116 // if (!input.isSuccessful()) {
117 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
119 return removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters);
120 }, MoreExecutors.directExecutor());
121 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantMeters"),
122 MoreExecutors.directExecutor());
123 resultVehicle = Futures.transformAsync(resultVehicle, input -> {
124 // if (!input.isSuccessful()) {
125 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
127 return removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters);
128 }, MoreExecutors.directExecutor());
129 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"),
130 MoreExecutors.directExecutor());
131 return resultVehicle;
135 ListenableFuture<RpcResult<Void>> addMissingFlows(final NodeId nodeId,
136 final InstanceIdentifier<FlowCapableNode> nodeIdent,
137 final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox,
138 final SyncCrudCounters counters) {
139 if (flowsInTablesSyncBox.isEmpty()) {
140 LOG.trace("no tables in config for node: {} -> SKIPPING", nodeId.getValue());
141 return RpcResultBuilder.<Void>success().buildFuture();
144 final List<ListenableFuture<RpcResult<AddFlowOutput>>> allResults = new ArrayList<>();
145 final List<ListenableFuture<RpcResult<UpdateFlowOutput>>> allUpdateResults = new ArrayList<>();
146 final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
148 for (Map.Entry<TableKey, ItemSyncBox<Flow>> flowsInTableBoxEntry : flowsInTablesSyncBox.entrySet()) {
149 final TableKey tableKey = flowsInTableBoxEntry.getKey();
150 final ItemSyncBox<Flow> flowSyncBox = flowsInTableBoxEntry.getValue();
152 final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdent.child(Table.class, tableKey);
154 for (final Flow flow : flowSyncBox.getItemsToPush()) {
155 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.key());
157 LOG.trace("adding flow {} in table {} - absent on device {} match{}",
158 flow.getId(), tableKey, nodeId, flow.getMatch());
160 allResults.add(flowForwarder.add(flowIdent, flow, nodeIdent));
161 flowCrudCounts.incAdded();
164 for (final ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowSyncBox.getItemsToUpdate()) {
165 final Flow existingFlow = flowUpdate.getOriginal();
166 final Flow updatedFlow = flowUpdate.getUpdated();
168 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class,
170 LOG.trace("flow {} in table {} - needs update on device {} match{}",
171 updatedFlow.getId(), tableKey, nodeId, updatedFlow.getMatch());
173 allUpdateResults.add(flowForwarder.update(flowIdent, existingFlow, updatedFlow, nodeIdent));
174 flowCrudCounts.incUpdated();
178 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
179 Futures.allAsList(allResults),
180 ReconcileUtil.createRpcResultCondenser("flow adding"),
181 MoreExecutors.directExecutor());
183 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
184 Futures.allAsList(allUpdateResults),
185 ReconcileUtil.createRpcResultCondenser("flow updating"),
186 MoreExecutors.directExecutor());
188 return Futures.transform(Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
189 ReconcileUtil.createRpcResultCondenser("flow add/update"),
190 MoreExecutors.directExecutor());
193 ListenableFuture<RpcResult<Void>> removeRedundantFlows(final NodeId nodeId,
194 final InstanceIdentifier<FlowCapableNode> nodeIdent,
195 final Map<TableKey, ItemSyncBox<Flow>> removalPlan,
196 final SyncCrudCounters counters) {
197 if (removalPlan.isEmpty()) {
198 LOG.trace("no tables in operational for node: {} -> SKIPPING", nodeId.getValue());
199 return RpcResultBuilder.<Void>success().buildFuture();
202 final List<ListenableFuture<RpcResult<RemoveFlowOutput>>> allResults = new ArrayList<>();
203 final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
205 for (final Map.Entry<TableKey, ItemSyncBox<Flow>> flowsPerTable : removalPlan.entrySet()) {
206 final KeyedInstanceIdentifier<Table, TableKey> tableIdent =
207 nodeIdent.child(Table.class, flowsPerTable.getKey());
209 // loop flows on device and check if the are configured
210 for (final Flow flow : flowsPerTable.getValue().getItemsToPush()) {
211 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
212 tableIdent.child(Flow.class, flow.key());
213 allResults.add(flowForwarder.remove(flowIdent, flow, nodeIdent));
214 flowCrudCounts.incRemoved();
218 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
219 Futures.allAsList(allResults),
220 ReconcileUtil.createRpcResultCondenser("flow remove"),
221 MoreExecutors.directExecutor());
223 return Futures.transformAsync(singleVoidResult,
224 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
225 MoreExecutors.directExecutor());
229 ListenableFuture<RpcResult<Void>> removeRedundantMeters(final NodeId nodeId,
230 final InstanceIdentifier<FlowCapableNode> nodeIdent,
231 final ItemSyncBox<Meter> meterRemovalPlan,
232 final SyncCrudCounters counters) {
233 if (meterRemovalPlan.isEmpty()) {
234 LOG.trace("no meters on device for node: {} -> SKIPPING", nodeId.getValue());
235 return RpcResultBuilder.<Void>success().buildFuture();
238 final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
240 final List<ListenableFuture<RpcResult<RemoveMeterOutput>>> allResults = new ArrayList<>();
241 for (Meter meter : meterRemovalPlan.getItemsToPush()) {
242 LOG.trace("removing meter {} - absent in config {}",
243 meter.getMeterId(), nodeId);
244 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
245 nodeIdent.child(Meter.class, meter.key());
246 allResults.add(meterForwarder.remove(meterIdent, meter, nodeIdent));
247 meterCrudCounts.incRemoved();
250 return Futures.transform(Futures.allAsList(allResults),
251 ReconcileUtil.createRpcResultCondenser("meter remove"),
252 MoreExecutors.directExecutor());
255 ListenableFuture<RpcResult<Void>> removeRedundantGroups(final NodeId nodeId,
256 final InstanceIdentifier<FlowCapableNode> nodeIdent,
257 final List<ItemSyncBox<Group>> groupsRemovalPlan,
258 final SyncCrudCounters counters) {
259 if (groupsRemovalPlan.isEmpty()) {
260 LOG.trace("no groups on device for node: {} -> SKIPPING", nodeId.getValue());
261 return RpcResultBuilder.<Void>success().buildFuture();
264 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
266 ListenableFuture<RpcResult<Void>> chainedResult = RpcResultBuilder.<Void>success().buildFuture();
268 groupCrudCounts.setRemoved(ReconcileUtil.countTotalPushed(groupsRemovalPlan));
269 if (LOG.isDebugEnabled()) {
270 LOG.debug("removing groups: planSteps={}, toRemoveTotal={}",
271 groupsRemovalPlan.size(), groupCrudCounts.getRemoved());
273 Collections.reverse(groupsRemovalPlan);
274 for (final ItemSyncBox<Group> groupsPortion : groupsRemovalPlan) {
275 chainedResult = Futures.transformAsync(chainedResult, input -> {
276 final ListenableFuture<RpcResult<Void>> result;
277 if (input.isSuccessful()) {
278 result = flushRemoveGroupPortionAndBarrier(nodeIdent, groupsPortion);
280 // pass through original unsuccessful rpcResult
281 result = Futures.immediateFuture(input);
285 }, MoreExecutors.directExecutor());
287 } catch (IllegalStateException e) {
288 chainedResult = RpcResultBuilder.<Void>failed()
289 .withError(ErrorType.APPLICATION, "failed to add missing groups", e)
293 return chainedResult;
296 private ListenableFuture<RpcResult<Void>> flushRemoveGroupPortionAndBarrier(
297 final InstanceIdentifier<FlowCapableNode> nodeIdent,
298 final ItemSyncBox<Group> groupsPortion) {
299 List<ListenableFuture<RpcResult<RemoveGroupOutput>>> allResults = new ArrayList<>();
300 for (Group group : groupsPortion.getItemsToPush()) {
301 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
302 allResults.add(groupForwarder.remove(groupIdent, group, nodeIdent));
305 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
306 Futures.allAsList(allResults),
307 ReconcileUtil.createRpcResultCondenser("group remove"),
308 MoreExecutors.directExecutor());
310 return Futures.transformAsync(singleVoidResult,
311 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
312 MoreExecutors.directExecutor());
315 ListenableFuture<RpcResult<Void>> updateTableFeatures(final InstanceIdentifier<FlowCapableNode> nodeIdent,
316 final FlowCapableNode flowCapableNodeConfigured) {
317 // CHECK if while pushing the update, updateTableInput can be null to emulate a table add
318 //final List<Table> tableList = ReconcileUtil.safeTables(flowCapableNodeConfigured);
320 final List<ListenableFuture<RpcResult<UpdateTableOutput>>> allResults = new ArrayList<>();
321 // for (Table table : tableList) {
322 // List<TableFeatures> tableFeatures = flowCapableNodeConfigured.getTableFeatures();
323 // if (tableFeatures != null) {
324 // for (TableFeatures tableFeaturesItem : tableFeatures) {
325 // // TODO uncomment java.lang.NullPointerException
327 // // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer
328 // // .serializeHeader(AbstractOxmMatchEntrySerializer.java:31
329 // // allResults.add(JdkFutureAdapters.listenInPoolThread(
330 // // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent)));
335 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
336 Futures.allAsList(allResults),
337 ReconcileUtil.createRpcResultCondenser("table update"),
338 MoreExecutors.directExecutor());
340 return Futures.transformAsync(singleVoidResult,
341 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
342 MoreExecutors.directExecutor());
345 private ListenableFuture<RpcResult<Void>> flushAddGroupPortionAndBarrier(
346 final InstanceIdentifier<FlowCapableNode> nodeIdent,
347 final ItemSyncBox<Group> groupsPortion) {
348 final List<ListenableFuture<RpcResult<AddGroupOutput>>> allResults = new ArrayList<>();
349 final List<ListenableFuture<RpcResult<UpdateGroupOutput>>> allUpdateResults = new ArrayList<>();
351 for (Group group : groupsPortion.getItemsToPush()) {
352 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
353 allResults.add(groupForwarder.add(groupIdent, group, nodeIdent));
357 for (ItemSyncBox.ItemUpdateTuple<Group> groupTuple : groupsPortion.getItemsToUpdate()) {
358 final Group existingGroup = groupTuple.getOriginal();
359 final Group group = groupTuple.getUpdated();
361 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
362 allUpdateResults.add(groupForwarder.update(groupIdent, existingGroup, group, nodeIdent));
365 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
366 Futures.allAsList(allResults),
367 ReconcileUtil.createRpcResultCondenser("group add"),
368 MoreExecutors.directExecutor());
370 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
371 Futures.allAsList(allUpdateResults),
372 ReconcileUtil.createRpcResultCondenser("group update"),
373 MoreExecutors.directExecutor());
375 final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
376 Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
377 ReconcileUtil.createRpcResultCondenser("group add/update"),
378 MoreExecutors.directExecutor());
381 return Futures.transformAsync(summaryResult, ReconcileUtil.chainBarrierFlush(
382 PathUtil.digNodePath(nodeIdent), transactionService), MoreExecutors.directExecutor());
385 ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
386 final InstanceIdentifier<FlowCapableNode> nodeIdent,
387 final ItemSyncBox<Meter> syncBox,
388 final SyncCrudCounters counters) {
389 if (syncBox.isEmpty()) {
390 LOG.trace("no meters configured for node: {} -> SKIPPING", nodeId.getValue());
391 return RpcResultBuilder.<Void>success().buildFuture();
394 final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
396 final List<ListenableFuture<RpcResult<AddMeterOutput>>> allResults = new ArrayList<>();
397 final List<ListenableFuture<RpcResult<UpdateMeterOutput>>> allUpdateResults = new ArrayList<>();
398 for (Meter meter : syncBox.getItemsToPush()) {
399 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, meter.key());
400 LOG.debug("adding meter {} - absent on device {}",
401 meter.getMeterId(), nodeId);
402 allResults.add(meterForwarder.add(meterIdent, meter, nodeIdent));
403 meterCrudCounts.incAdded();
406 for (ItemSyncBox.ItemUpdateTuple<Meter> meterTuple : syncBox.getItemsToUpdate()) {
407 final Meter existingMeter = meterTuple.getOriginal();
408 final Meter updated = meterTuple.getUpdated();
409 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, updated.key());
410 LOG.trace("meter {} - needs update on device {}", updated.getMeterId(), nodeId);
411 allUpdateResults.add(meterForwarder.update(meterIdent, existingMeter, updated, nodeIdent));
412 meterCrudCounts.incUpdated();
415 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
416 Futures.allAsList(allResults),
417 ReconcileUtil.createRpcResultCondenser("meter add"),
418 MoreExecutors.directExecutor());
420 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
421 Futures.allAsList(allUpdateResults),
422 ReconcileUtil.createRpcResultCondenser("meter update"),
423 MoreExecutors.directExecutor());
425 return Futures.transform(Futures.allAsList(singleVoidUpdateResult, singleVoidAddResult),
426 ReconcileUtil.createRpcResultCondenser("meter add/update"),
427 MoreExecutors.directExecutor());
430 ListenableFuture<RpcResult<Void>> addMissingGroups(final NodeId nodeId,
431 final InstanceIdentifier<FlowCapableNode> nodeIdent,
432 final List<ItemSyncBox<Group>> groupsAddPlan,
433 final SyncCrudCounters counters) {
434 if (groupsAddPlan.isEmpty()) {
435 LOG.trace("no groups configured for node: {} -> SKIPPING", nodeId.getValue());
436 return RpcResultBuilder.<Void>success().buildFuture();
439 ListenableFuture<RpcResult<Void>> chainedResult;
441 if (!groupsAddPlan.isEmpty()) {
442 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
443 groupCrudCounts.setAdded(ReconcileUtil.countTotalPushed(groupsAddPlan));
444 groupCrudCounts.setUpdated(ReconcileUtil.countTotalUpdated(groupsAddPlan));
446 if (LOG.isDebugEnabled()) {
447 LOG.debug("adding groups: planSteps={}, toAddTotal={}, toUpdateTotal={}",
448 groupsAddPlan.size(),
449 groupCrudCounts.getAdded(),
450 groupCrudCounts.getUpdated());
453 chainedResult = flushAddGroupPortionAndBarrier(nodeIdent, groupsAddPlan.get(0));
454 for (final ItemSyncBox<Group> groupsPortion : Iterables.skip(groupsAddPlan, 1)) {
456 Futures.transformAsync(chainedResult, input -> {
457 final ListenableFuture<RpcResult<Void>> result;
458 if (input.isSuccessful()) {
459 result = flushAddGroupPortionAndBarrier(nodeIdent, groupsPortion);
461 // pass through original unsuccessful rpcResult
462 result = Futures.immediateFuture(input);
466 }, MoreExecutors.directExecutor());
469 chainedResult = RpcResultBuilder.<Void>success().buildFuture();
471 } catch (IllegalStateException e) {
472 chainedResult = RpcResultBuilder.<Void>failed()
473 .withError(ErrorType.APPLICATION, "failed to add missing groups", e)
477 return chainedResult;
481 public SyncPlanPushStrategyIncrementalImpl setFlowForwarder(final FlowForwarder flowForwarder) {
482 this.flowForwarder = flowForwarder;
486 public SyncPlanPushStrategyIncrementalImpl setTableForwarder(final TableForwarder tableForwarder) {
487 this.tableForwarder = tableForwarder;
491 public SyncPlanPushStrategyIncrementalImpl setMeterForwarder(final MeterForwarder meterForwarder) {
492 this.meterForwarder = meterForwarder;
496 public SyncPlanPushStrategyIncrementalImpl setGroupForwarder(final GroupForwarder groupForwarder) {
497 this.groupForwarder = groupForwarder;
501 public SyncPlanPushStrategyIncrementalImpl setTransactionService(
502 final FlowCapableTransactionService transactionService) {
503 this.transactionService = transactionService;