2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
11 import com.google.common.collect.Iterables;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.JdkFutureAdapters;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.List;
20 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
21 import org.opendaylight.openflowplugin.applications.frsync.util.CrudCounts;
22 import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
23 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
24 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
25 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
26 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
48 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
49 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.common.RpcError;
51 import org.opendaylight.yangtools.yang.common.RpcResult;
52 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
57 * Execute CRUD API for flow + group + meter involving one-by-one (incremental) strategy.
59 public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy {
61 private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyIncrementalImpl.class);
63 private FlowForwarder flowForwarder;
64 private MeterForwarder meterForwarder;
65 private GroupForwarder groupForwarder;
66 private TableForwarder tableForwarder;
67 private FlowCapableTransactionService transactionService;
70 public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
71 final SynchronizationDiffInput diffInput,
72 final SyncCrudCounters counters) {
73 final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
74 final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
76 /* Tables - have to be pushed before groups */
77 // TODO enable table-update when ready
78 //resultVehicle = updateTableFeatures(nodeIdent, configTree);
80 resultVehicle = Futures.transformAsync(resultVehicle, input -> {
81 // if (!input.isSuccessful()) {
82 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
83 //final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
84 // Futures.asList Arrays.asList(input, output),
85 // ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("TODO"));
87 return addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters);
88 }, MoreExecutors.directExecutor());
89 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingGroups"),
90 MoreExecutors.directExecutor());
91 resultVehicle = Futures.transformAsync(resultVehicle, input -> {
92 // if (!input.isSuccessful()) {
93 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
95 return addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters);
96 }, MoreExecutors.directExecutor());
97 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingMeters"),
98 MoreExecutors.directExecutor());
99 resultVehicle = Futures.transformAsync(resultVehicle, input -> {
100 // if (!input.isSuccessful()) {
101 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
103 return addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters);
104 }, MoreExecutors.directExecutor());
105 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingFlows"),
106 MoreExecutors.directExecutor());
109 resultVehicle = Futures.transformAsync(resultVehicle, input -> {
110 // if (!input.isSuccessful()) {
111 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
113 return removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters);
114 }, MoreExecutors.directExecutor());
115 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantFlows"),
116 MoreExecutors.directExecutor());
117 resultVehicle = Futures.transformAsync(resultVehicle, input -> {
118 // if (!input.isSuccessful()) {
119 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
121 return removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters);
122 }, MoreExecutors.directExecutor());
123 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantMeters"),
124 MoreExecutors.directExecutor());
125 resultVehicle = Futures.transformAsync(resultVehicle, input -> {
126 // if (!input.isSuccessful()) {
127 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
129 return removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters);
130 }, MoreExecutors.directExecutor());
131 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"),
132 MoreExecutors.directExecutor());
133 return resultVehicle;
137 ListenableFuture<RpcResult<Void>> addMissingFlows(final NodeId nodeId,
138 final InstanceIdentifier<FlowCapableNode> nodeIdent,
139 final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox,
140 final SyncCrudCounters counters) {
141 if (flowsInTablesSyncBox.isEmpty()) {
142 LOG.trace("no tables in config for node: {} -> SKIPPING", nodeId.getValue());
143 return RpcResultBuilder.<Void>success().buildFuture();
146 final List<ListenableFuture<RpcResult<AddFlowOutput>>> allResults = new ArrayList<>();
147 final List<ListenableFuture<RpcResult<UpdateFlowOutput>>> allUpdateResults = new ArrayList<>();
148 final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
150 for (Map.Entry<TableKey, ItemSyncBox<Flow>> flowsInTableBoxEntry : flowsInTablesSyncBox.entrySet()) {
151 final TableKey tableKey = flowsInTableBoxEntry.getKey();
152 final ItemSyncBox<Flow> flowSyncBox = flowsInTableBoxEntry.getValue();
154 final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdent.child(Table.class, tableKey);
156 for (final Flow flow : flowSyncBox.getItemsToPush()) {
157 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.key());
159 LOG.trace("adding flow {} in table {} - absent on device {} match{}",
160 flow.getId(), tableKey, nodeId, flow.getMatch());
162 allResults.add(JdkFutureAdapters.listenInPoolThread(
163 flowForwarder.add(flowIdent, flow, nodeIdent)));
164 flowCrudCounts.incAdded();
167 for (final ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowSyncBox.getItemsToUpdate()) {
168 final Flow existingFlow = flowUpdate.getOriginal();
169 final Flow updatedFlow = flowUpdate.getUpdated();
171 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class,
173 LOG.trace("flow {} in table {} - needs update on device {} match{}",
174 updatedFlow.getId(), tableKey, nodeId, updatedFlow.getMatch());
176 allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
177 flowForwarder.update(flowIdent, existingFlow, updatedFlow, nodeIdent)));
178 flowCrudCounts.incUpdated();
182 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
183 Futures.allAsList(allResults),
184 ReconcileUtil.createRpcResultCondenser("flow adding"),
185 MoreExecutors.directExecutor());
187 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
188 Futures.allAsList(allUpdateResults),
189 ReconcileUtil.createRpcResultCondenser("flow updating"),
190 MoreExecutors.directExecutor());
192 return Futures.transform(Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
193 ReconcileUtil.createRpcResultCondenser("flow add/update"),
194 MoreExecutors.directExecutor());
197 ListenableFuture<RpcResult<Void>> removeRedundantFlows(final NodeId nodeId,
198 final InstanceIdentifier<FlowCapableNode> nodeIdent,
199 final Map<TableKey, ItemSyncBox<Flow>> removalPlan,
200 final SyncCrudCounters counters) {
201 if (removalPlan.isEmpty()) {
202 LOG.trace("no tables in operational for node: {} -> SKIPPING", nodeId.getValue());
203 return RpcResultBuilder.<Void>success().buildFuture();
206 final List<ListenableFuture<RpcResult<RemoveFlowOutput>>> allResults = new ArrayList<>();
207 final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
209 for (final Map.Entry<TableKey, ItemSyncBox<Flow>> flowsPerTable : removalPlan.entrySet()) {
210 final KeyedInstanceIdentifier<Table, TableKey> tableIdent =
211 nodeIdent.child(Table.class, flowsPerTable.getKey());
213 // loop flows on device and check if the are configured
214 for (final Flow flow : flowsPerTable.getValue().getItemsToPush()) {
215 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
216 tableIdent.child(Flow.class, flow.key());
217 allResults.add(JdkFutureAdapters.listenInPoolThread(
218 flowForwarder.remove(flowIdent, flow, nodeIdent)));
219 flowCrudCounts.incRemoved();
223 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
224 Futures.allAsList(allResults),
225 ReconcileUtil.createRpcResultCondenser("flow remove"),
226 MoreExecutors.directExecutor());
228 return Futures.transformAsync(singleVoidResult,
229 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
230 MoreExecutors.directExecutor());
234 ListenableFuture<RpcResult<Void>> removeRedundantMeters(final NodeId nodeId,
235 final InstanceIdentifier<FlowCapableNode> nodeIdent,
236 final ItemSyncBox<Meter> meterRemovalPlan,
237 final SyncCrudCounters counters) {
238 if (meterRemovalPlan.isEmpty()) {
239 LOG.trace("no meters on device for node: {} -> SKIPPING", nodeId.getValue());
240 return RpcResultBuilder.<Void>success().buildFuture();
243 final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
245 final List<ListenableFuture<RpcResult<RemoveMeterOutput>>> allResults = new ArrayList<>();
246 for (Meter meter : meterRemovalPlan.getItemsToPush()) {
247 LOG.trace("removing meter {} - absent in config {}",
248 meter.getMeterId(), nodeId);
249 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
250 nodeIdent.child(Meter.class, meter.key());
251 allResults.add(JdkFutureAdapters.listenInPoolThread(
252 meterForwarder.remove(meterIdent, meter, nodeIdent)));
253 meterCrudCounts.incRemoved();
256 return Futures.transform(Futures.allAsList(allResults),
257 ReconcileUtil.createRpcResultCondenser("meter remove"),
258 MoreExecutors.directExecutor());
261 ListenableFuture<RpcResult<Void>> removeRedundantGroups(final NodeId nodeId,
262 final InstanceIdentifier<FlowCapableNode> nodeIdent,
263 final List<ItemSyncBox<Group>> groupsRemovalPlan,
264 final SyncCrudCounters counters) {
265 if (groupsRemovalPlan.isEmpty()) {
266 LOG.trace("no groups on device for node: {} -> SKIPPING", nodeId.getValue());
267 return RpcResultBuilder.<Void>success().buildFuture();
270 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
272 ListenableFuture<RpcResult<Void>> chainedResult = RpcResultBuilder.<Void>success().buildFuture();
274 groupCrudCounts.setRemoved(ReconcileUtil.countTotalPushed(groupsRemovalPlan));
275 if (LOG.isDebugEnabled()) {
276 LOG.debug("removing groups: planSteps={}, toRemoveTotal={}",
277 groupsRemovalPlan.size(), groupCrudCounts.getRemoved());
279 Collections.reverse(groupsRemovalPlan);
280 for (final ItemSyncBox<Group> groupsPortion : groupsRemovalPlan) {
281 chainedResult = Futures.transformAsync(chainedResult, input -> {
282 final ListenableFuture<RpcResult<Void>> result;
283 if (input.isSuccessful()) {
284 result = flushRemoveGroupPortionAndBarrier(nodeIdent, groupsPortion);
286 // pass through original unsuccessful rpcResult
287 result = Futures.immediateFuture(input);
291 }, MoreExecutors.directExecutor());
293 } catch (IllegalStateException e) {
294 chainedResult = RpcResultBuilder.<Void>failed()
295 .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
299 return chainedResult;
302 private ListenableFuture<RpcResult<Void>> flushRemoveGroupPortionAndBarrier(
303 final InstanceIdentifier<FlowCapableNode> nodeIdent,
304 final ItemSyncBox<Group> groupsPortion) {
305 List<ListenableFuture<RpcResult<RemoveGroupOutput>>> allResults = new ArrayList<>();
306 for (Group group : groupsPortion.getItemsToPush()) {
307 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
308 allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.remove(groupIdent, group, nodeIdent)));
311 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
312 Futures.allAsList(allResults),
313 ReconcileUtil.createRpcResultCondenser("group remove"),
314 MoreExecutors.directExecutor());
316 return Futures.transformAsync(singleVoidResult,
317 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
318 MoreExecutors.directExecutor());
321 ListenableFuture<RpcResult<Void>> updateTableFeatures(final InstanceIdentifier<FlowCapableNode> nodeIdent,
322 final FlowCapableNode flowCapableNodeConfigured) {
323 // CHECK if while pushing the update, updateTableInput can be null to emulate a table add
324 //final List<Table> tableList = ReconcileUtil.safeTables(flowCapableNodeConfigured);
326 final List<ListenableFuture<RpcResult<UpdateTableOutput>>> allResults = new ArrayList<>();
327 // for (Table table : tableList) {
328 // List<TableFeatures> tableFeatures = flowCapableNodeConfigured.getTableFeatures();
329 // if (tableFeatures != null) {
330 // for (TableFeatures tableFeaturesItem : tableFeatures) {
331 // // TODO uncomment java.lang.NullPointerException
333 // // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer
334 // // .serializeHeader(AbstractOxmMatchEntrySerializer.java:31
335 // // allResults.add(JdkFutureAdapters.listenInPoolThread(
336 // // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent)));
341 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
342 Futures.allAsList(allResults),
343 ReconcileUtil.createRpcResultCondenser("table update"),
344 MoreExecutors.directExecutor());
346 return Futures.transformAsync(singleVoidResult,
347 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
348 MoreExecutors.directExecutor());
351 private ListenableFuture<RpcResult<Void>> flushAddGroupPortionAndBarrier(
352 final InstanceIdentifier<FlowCapableNode> nodeIdent,
353 final ItemSyncBox<Group> groupsPortion) {
354 final List<ListenableFuture<RpcResult<AddGroupOutput>>> allResults = new ArrayList<>();
355 final List<ListenableFuture<RpcResult<UpdateGroupOutput>>> allUpdateResults = new ArrayList<>();
357 for (Group group : groupsPortion.getItemsToPush()) {
358 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
359 allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.add(groupIdent, group, nodeIdent)));
363 for (ItemSyncBox.ItemUpdateTuple<Group> groupTuple : groupsPortion.getItemsToUpdate()) {
364 final Group existingGroup = groupTuple.getOriginal();
365 final Group group = groupTuple.getUpdated();
367 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
368 allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
369 groupForwarder.update(groupIdent, existingGroup, group, nodeIdent)));
372 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
373 Futures.allAsList(allResults),
374 ReconcileUtil.createRpcResultCondenser("group add"),
375 MoreExecutors.directExecutor());
377 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
378 Futures.allAsList(allUpdateResults),
379 ReconcileUtil.createRpcResultCondenser("group update"),
380 MoreExecutors.directExecutor());
382 final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
383 Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
384 ReconcileUtil.createRpcResultCondenser("group add/update"),
385 MoreExecutors.directExecutor());
388 return Futures.transformAsync(summaryResult, ReconcileUtil.chainBarrierFlush(
389 PathUtil.digNodePath(nodeIdent), transactionService), MoreExecutors.directExecutor());
392 ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
393 final InstanceIdentifier<FlowCapableNode> nodeIdent,
394 final ItemSyncBox<Meter> syncBox,
395 final SyncCrudCounters counters) {
396 if (syncBox.isEmpty()) {
397 LOG.trace("no meters configured for node: {} -> SKIPPING", nodeId.getValue());
398 return RpcResultBuilder.<Void>success().buildFuture();
401 final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
403 final List<ListenableFuture<RpcResult<AddMeterOutput>>> allResults = new ArrayList<>();
404 final List<ListenableFuture<RpcResult<UpdateMeterOutput>>> allUpdateResults = new ArrayList<>();
405 for (Meter meter : syncBox.getItemsToPush()) {
406 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, meter.key());
407 LOG.debug("adding meter {} - absent on device {}",
408 meter.getMeterId(), nodeId);
409 allResults.add(JdkFutureAdapters.listenInPoolThread(
410 meterForwarder.add(meterIdent, meter, nodeIdent)));
411 meterCrudCounts.incAdded();
414 for (ItemSyncBox.ItemUpdateTuple<Meter> meterTuple : syncBox.getItemsToUpdate()) {
415 final Meter existingMeter = meterTuple.getOriginal();
416 final Meter updated = meterTuple.getUpdated();
417 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, updated.key());
418 LOG.trace("meter {} - needs update on device {}", updated.getMeterId(), nodeId);
419 allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
420 meterForwarder.update(meterIdent, existingMeter, updated, nodeIdent)));
421 meterCrudCounts.incUpdated();
424 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
425 Futures.allAsList(allResults),
426 ReconcileUtil.createRpcResultCondenser("meter add"),
427 MoreExecutors.directExecutor());
429 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
430 Futures.allAsList(allUpdateResults),
431 ReconcileUtil.createRpcResultCondenser("meter update"),
432 MoreExecutors.directExecutor());
434 return Futures.transform(Futures.allAsList(singleVoidUpdateResult, singleVoidAddResult),
435 ReconcileUtil.createRpcResultCondenser("meter add/update"),
436 MoreExecutors.directExecutor());
439 ListenableFuture<RpcResult<Void>> addMissingGroups(final NodeId nodeId,
440 final InstanceIdentifier<FlowCapableNode> nodeIdent,
441 final List<ItemSyncBox<Group>> groupsAddPlan,
442 final SyncCrudCounters counters) {
443 if (groupsAddPlan.isEmpty()) {
444 LOG.trace("no groups configured for node: {} -> SKIPPING", nodeId.getValue());
445 return RpcResultBuilder.<Void>success().buildFuture();
448 ListenableFuture<RpcResult<Void>> chainedResult;
450 if (!groupsAddPlan.isEmpty()) {
451 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
452 groupCrudCounts.setAdded(ReconcileUtil.countTotalPushed(groupsAddPlan));
453 groupCrudCounts.setUpdated(ReconcileUtil.countTotalUpdated(groupsAddPlan));
455 if (LOG.isDebugEnabled()) {
456 LOG.debug("adding groups: planSteps={}, toAddTotal={}, toUpdateTotal={}",
457 groupsAddPlan.size(),
458 groupCrudCounts.getAdded(),
459 groupCrudCounts.getUpdated());
462 chainedResult = flushAddGroupPortionAndBarrier(nodeIdent, groupsAddPlan.get(0));
463 for (final ItemSyncBox<Group> groupsPortion : Iterables.skip(groupsAddPlan, 1)) {
465 Futures.transformAsync(chainedResult, input -> {
466 final ListenableFuture<RpcResult<Void>> result;
467 if (input.isSuccessful()) {
468 result = flushAddGroupPortionAndBarrier(nodeIdent, groupsPortion);
470 // pass through original unsuccessful rpcResult
471 result = Futures.immediateFuture(input);
475 }, MoreExecutors.directExecutor());
478 chainedResult = RpcResultBuilder.<Void>success().buildFuture();
480 } catch (IllegalStateException e) {
481 chainedResult = RpcResultBuilder.<Void>failed()
482 .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
486 return chainedResult;
490 public SyncPlanPushStrategyIncrementalImpl setFlowForwarder(final FlowForwarder flowForwarder) {
491 this.flowForwarder = flowForwarder;
495 public SyncPlanPushStrategyIncrementalImpl setTableForwarder(final TableForwarder tableForwarder) {
496 this.tableForwarder = tableForwarder;
500 public SyncPlanPushStrategyIncrementalImpl setMeterForwarder(final MeterForwarder meterForwarder) {
501 this.meterForwarder = meterForwarder;
505 public SyncPlanPushStrategyIncrementalImpl setGroupForwarder(final GroupForwarder groupForwarder) {
506 this.groupForwarder = groupForwarder;
510 public SyncPlanPushStrategyIncrementalImpl setTransactionService(
511 final FlowCapableTransactionService transactionService) {
512 this.transactionService = transactionService;