2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
11 import com.google.common.collect.Iterables;
12 import com.google.common.util.concurrent.AsyncFunction;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.JdkFutureAdapters;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.List;
20 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
21 import org.opendaylight.openflowplugin.applications.frsync.util.CrudCounts;
22 import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
23 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
24 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
25 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
26 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
50 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
51 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
52 import org.opendaylight.yangtools.yang.common.RpcError;
53 import org.opendaylight.yangtools.yang.common.RpcResult;
54 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
59 * Execute CRUD API for flow + group + meter involving one-by-one (incremental) strategy.
61 public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy {
63 private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyIncrementalImpl.class);
65 private FlowForwarder flowForwarder;
66 private MeterForwarder meterForwarder;
67 private GroupForwarder groupForwarder;
68 private TableForwarder tableForwarder;
69 private FlowCapableTransactionService transactionService;
72 public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
73 final SynchronizationDiffInput diffInput,
74 final SyncCrudCounters counters) {
75 final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
76 final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
78 /* Tables - have to be pushed before groups */
79 // TODO enable table-update when ready
80 //resultVehicle = updateTableFeatures(nodeIdent, configTree);
82 resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
84 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
85 if (!input.isSuccessful()) {
86 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
87 //final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
88 // Futures.asList Arrays.asList(input, output),
89 // ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("TODO"));
91 return addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters);
94 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingGroups"));
95 resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
97 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
98 if (!input.isSuccessful()) {
99 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
101 return addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters);
104 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingMeters"));
105 resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
107 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
108 if (!input.isSuccessful()) {
109 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
111 return addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters);
114 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingFlows"));
117 resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
119 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
120 if (!input.isSuccessful()) {
121 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
123 return removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters);
126 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantFlows"));
127 resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
129 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
130 if (!input.isSuccessful()) {
131 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
133 return removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters);
136 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantMeters"));
137 resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
139 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
140 if (!input.isSuccessful()) {
141 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
143 return removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters);
146 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"));
147 return resultVehicle;
151 ListenableFuture<RpcResult<Void>> addMissingFlows(final NodeId nodeId,
152 final InstanceIdentifier<FlowCapableNode> nodeIdent,
153 final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox,
154 final SyncCrudCounters counters) {
155 if (flowsInTablesSyncBox.isEmpty()) {
156 LOG.trace("no tables in config for node: {} -> SKIPPING", nodeId.getValue());
157 return RpcResultBuilder.<Void>success().buildFuture();
160 final List<ListenableFuture<RpcResult<AddFlowOutput>>> allResults = new ArrayList<>();
161 final List<ListenableFuture<RpcResult<UpdateFlowOutput>>> allUpdateResults = new ArrayList<>();
162 final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
164 for (Map.Entry<TableKey, ItemSyncBox<Flow>> flowsInTableBoxEntry : flowsInTablesSyncBox.entrySet()) {
165 final TableKey tableKey = flowsInTableBoxEntry.getKey();
166 final ItemSyncBox<Flow> flowSyncBox = flowsInTableBoxEntry.getValue();
168 final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdent.child(Table.class, tableKey);
170 for (final Flow flow : flowSyncBox.getItemsToPush()) {
171 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.getKey());
173 LOG.trace("adding flow {} in table {} - absent on device {} match{}",
174 flow.getId(), tableKey, nodeId, flow.getMatch());
176 allResults.add(JdkFutureAdapters.listenInPoolThread(
177 flowForwarder.add(flowIdent, flow, nodeIdent)));
178 flowCrudCounts.incAdded();
181 for (final ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowSyncBox.getItemsToUpdate()) {
182 final Flow existingFlow = flowUpdate.getOriginal();
183 final Flow updatedFlow = flowUpdate.getUpdated();
185 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, updatedFlow.getKey());
186 LOG.trace("flow {} in table {} - needs update on device {} match{}",
187 updatedFlow.getId(), tableKey, nodeId, updatedFlow.getMatch());
189 allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
190 flowForwarder.update(flowIdent, existingFlow, updatedFlow, nodeIdent)));
191 flowCrudCounts.incUpdated();
195 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
196 Futures.allAsList(allResults),
197 ReconcileUtil.<AddFlowOutput>createRpcResultCondenser("flow adding"));
199 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
200 Futures.allAsList(allUpdateResults),
201 ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("flow updating"));
203 return Futures.transform(Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
204 ReconcileUtil.<Void>createRpcResultCondenser("flow add/update"));
207 ListenableFuture<RpcResult<Void>> removeRedundantFlows(final NodeId nodeId,
208 final InstanceIdentifier<FlowCapableNode> nodeIdent,
209 final Map<TableKey, ItemSyncBox<Flow>> removalPlan,
210 final SyncCrudCounters counters) {
211 if (removalPlan.isEmpty()) {
212 LOG.trace("no tables in operational for node: {} -> SKIPPING", nodeId.getValue());
213 return RpcResultBuilder.<Void>success().buildFuture();
216 final List<ListenableFuture<RpcResult<RemoveFlowOutput>>> allResults = new ArrayList<>();
217 final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
219 for (final Map.Entry<TableKey, ItemSyncBox<Flow>> flowsPerTable : removalPlan.entrySet()) {
220 final KeyedInstanceIdentifier<Table, TableKey> tableIdent =
221 nodeIdent.child(Table.class, flowsPerTable.getKey());
223 // loop flows on device and check if the are configured
224 for (final Flow flow : flowsPerTable.getValue().getItemsToPush()) {
225 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
226 tableIdent.child(Flow.class, flow.getKey());
227 allResults.add(JdkFutureAdapters.listenInPoolThread(
228 flowForwarder.remove(flowIdent, flow, nodeIdent)));
229 flowCrudCounts.incRemoved();
233 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
234 Futures.allAsList(allResults), ReconcileUtil.<RemoveFlowOutput>createRpcResultCondenser("flow remove"));
235 return Futures.transform(singleVoidResult,
236 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
240 ListenableFuture<RpcResult<Void>> removeRedundantMeters(final NodeId nodeId,
241 final InstanceIdentifier<FlowCapableNode> nodeIdent,
242 final ItemSyncBox<Meter> meterRemovalPlan,
243 final SyncCrudCounters counters) {
244 if (meterRemovalPlan.isEmpty()) {
245 LOG.trace("no meters on device for node: {} -> SKIPPING", nodeId.getValue());
246 return RpcResultBuilder.<Void>success().buildFuture();
249 final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
251 final List<ListenableFuture<RpcResult<RemoveMeterOutput>>> allResults = new ArrayList<>();
252 for (Meter meter : meterRemovalPlan.getItemsToPush()) {
253 LOG.trace("removing meter {} - absent in config {}",
254 meter.getMeterId(), nodeId);
255 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
256 nodeIdent.child(Meter.class, meter.getKey());
257 allResults.add(JdkFutureAdapters.listenInPoolThread(
258 meterForwarder.remove(meterIdent, meter, nodeIdent)));
259 meterCrudCounts.incRemoved();
262 return Futures.transform(Futures.allAsList(allResults),
263 ReconcileUtil.<RemoveMeterOutput>createRpcResultCondenser("meter remove"));
266 ListenableFuture<RpcResult<Void>> removeRedundantGroups(final NodeId nodeId,
267 final InstanceIdentifier<FlowCapableNode> nodeIdent,
268 final List<ItemSyncBox<Group>> groupsRemovalPlan,
269 final SyncCrudCounters counters) {
270 if (groupsRemovalPlan.isEmpty()) {
271 LOG.trace("no groups on device for node: {} -> SKIPPING", nodeId.getValue());
272 return RpcResultBuilder.<Void>success().buildFuture();
275 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
277 ListenableFuture<RpcResult<Void>> chainedResult = RpcResultBuilder.<Void>success().buildFuture();
279 groupCrudCounts.setRemoved(ReconcileUtil.countTotalPushed(groupsRemovalPlan));
280 if (LOG.isDebugEnabled()) {
281 LOG.debug("removing groups: planSteps={}, toRemoveTotal={}",
282 groupsRemovalPlan.size(), groupCrudCounts.getRemoved());
284 Collections.reverse(groupsRemovalPlan);
285 for (final ItemSyncBox<Group> groupsPortion : groupsRemovalPlan) {
287 Futures.transform(chainedResult, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
289 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input)
291 final ListenableFuture<RpcResult<Void>> result;
292 if (input.isSuccessful()) {
293 result = flushRemoveGroupPortionAndBarrier(nodeIdent, groupsPortion);
295 // pass through original unsuccessful rpcResult
296 result = Futures.immediateFuture(input);
303 } catch (IllegalStateException e) {
304 chainedResult = RpcResultBuilder.<Void>failed()
305 .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
309 return chainedResult;
312 private ListenableFuture<RpcResult<Void>> flushRemoveGroupPortionAndBarrier(
313 final InstanceIdentifier<FlowCapableNode> nodeIdent,
314 final ItemSyncBox<Group> groupsPortion) {
315 List<ListenableFuture<RpcResult<RemoveGroupOutput>>> allResults = new ArrayList<>();
316 for (Group group : groupsPortion.getItemsToPush()) {
317 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
318 allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.remove(groupIdent, group, nodeIdent)));
321 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
322 Futures.allAsList(allResults),
323 ReconcileUtil.<RemoveGroupOutput>createRpcResultCondenser("group remove"));
325 return Futures.transform(singleVoidResult,
326 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
329 ListenableFuture<RpcResult<Void>> updateTableFeatures(final InstanceIdentifier<FlowCapableNode> nodeIdent,
330 final FlowCapableNode flowCapableNodeConfigured) {
331 // CHECK if while pushing the update, updateTableInput can be null to emulate a table add
332 final List<Table> tableList = ReconcileUtil.safeTables(flowCapableNodeConfigured);
334 final List<ListenableFuture<RpcResult<UpdateTableOutput>>> allResults = new ArrayList<>();
335 for (Table table : tableList) {
336 TableKey tableKey = table.getKey();
337 KeyedInstanceIdentifier<TableFeatures, TableFeaturesKey> tableFeaturesII = nodeIdent
338 .child(TableFeatures.class, new TableFeaturesKey(tableKey.getId()));
339 List<TableFeatures> tableFeatures = flowCapableNodeConfigured.getTableFeatures();
340 if (tableFeatures != null) {
341 for (TableFeatures tableFeaturesItem : tableFeatures) {
342 // TODO uncomment java.lang.NullPointerException
344 // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer.serializeHeader(AbstractOxmMatchEntrySerializer.java:31
345 // allResults.add(JdkFutureAdapters.listenInPoolThread(
346 // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent)));
351 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
352 Futures.allAsList(allResults),
353 ReconcileUtil.<UpdateTableOutput>createRpcResultCondenser("table update"));
355 return Futures.transform(singleVoidResult,
356 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
359 private ListenableFuture<RpcResult<Void>> flushAddGroupPortionAndBarrier(
360 final InstanceIdentifier<FlowCapableNode> nodeIdent,
361 final ItemSyncBox<Group> groupsPortion) {
362 final List<ListenableFuture<RpcResult<AddGroupOutput>>> allResults = new ArrayList<>();
363 final List<ListenableFuture<RpcResult<UpdateGroupOutput>>> allUpdateResults = new ArrayList<>();
365 for (Group group : groupsPortion.getItemsToPush()) {
366 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
367 allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.add(groupIdent, group, nodeIdent)));
371 for (ItemSyncBox.ItemUpdateTuple<Group> groupTuple : groupsPortion.getItemsToUpdate()) {
372 final Group existingGroup = groupTuple.getOriginal();
373 final Group group = groupTuple.getUpdated();
375 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
376 allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
377 groupForwarder.update(groupIdent, existingGroup, group, nodeIdent)));
380 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
381 Futures.allAsList(allResults), ReconcileUtil.<AddGroupOutput>createRpcResultCondenser("group add"));
383 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
384 Futures.allAsList(allUpdateResults),
385 ReconcileUtil.<UpdateGroupOutput>createRpcResultCondenser("group update"));
387 final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
388 Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
389 ReconcileUtil.<Void>createRpcResultCondenser("group add/update"));
392 return Futures.transform(summaryResult, ReconcileUtil.chainBarrierFlush(
393 PathUtil.digNodePath(nodeIdent), transactionService));
396 ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
397 final InstanceIdentifier<FlowCapableNode> nodeIdent,
398 final ItemSyncBox<Meter> syncBox,
399 final SyncCrudCounters counters) {
400 if (syncBox.isEmpty()) {
401 LOG.trace("no meters configured for node: {} -> SKIPPING", nodeId.getValue());
402 return RpcResultBuilder.<Void>success().buildFuture();
405 final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
407 final List<ListenableFuture<RpcResult<AddMeterOutput>>> allResults = new ArrayList<>();
408 final List<ListenableFuture<RpcResult<UpdateMeterOutput>>> allUpdateResults = new ArrayList<>();
409 for (Meter meter : syncBox.getItemsToPush()) {
410 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, meter.getKey());
411 LOG.debug("adding meter {} - absent on device {}",
412 meter.getMeterId(), nodeId);
413 allResults.add(JdkFutureAdapters.listenInPoolThread(
414 meterForwarder.add(meterIdent, meter, nodeIdent)));
415 meterCrudCounts.incAdded();
418 for (ItemSyncBox.ItemUpdateTuple<Meter> meterTuple : syncBox.getItemsToUpdate()) {
419 final Meter existingMeter = meterTuple.getOriginal();
420 final Meter updated = meterTuple.getUpdated();
421 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, updated.getKey());
422 LOG.trace("meter {} - needs update on device {}", updated.getMeterId(), nodeId);
423 allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
424 meterForwarder.update(meterIdent, existingMeter, updated, nodeIdent)));
425 meterCrudCounts.incUpdated();
428 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
429 Futures.allAsList(allResults), ReconcileUtil.<AddMeterOutput>createRpcResultCondenser("meter add"));
431 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
432 Futures.allAsList(allUpdateResults),
433 ReconcileUtil.<UpdateMeterOutput>createRpcResultCondenser("meter update"));
435 return Futures.transform(Futures.allAsList(singleVoidUpdateResult, singleVoidAddResult),
436 ReconcileUtil.<Void>createRpcResultCondenser("meter add/update"));
439 ListenableFuture<RpcResult<Void>> addMissingGroups(final NodeId nodeId,
440 final InstanceIdentifier<FlowCapableNode> nodeIdent,
441 final List<ItemSyncBox<Group>> groupsAddPlan,
442 final SyncCrudCounters counters) {
443 if (groupsAddPlan.isEmpty()) {
444 LOG.trace("no groups configured for node: {} -> SKIPPING", nodeId.getValue());
445 return RpcResultBuilder.<Void>success().buildFuture();
448 ListenableFuture<RpcResult<Void>> chainedResult;
450 if (!groupsAddPlan.isEmpty()) {
451 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
452 groupCrudCounts.setAdded(ReconcileUtil.countTotalPushed(groupsAddPlan));
453 groupCrudCounts.setUpdated(ReconcileUtil.countTotalUpdated(groupsAddPlan));
455 if (LOG.isDebugEnabled()) {
456 LOG.debug("adding groups: planSteps={}, toAddTotal={}, toUpdateTotal={}",
457 groupsAddPlan.size(),
458 groupCrudCounts.getAdded(),
459 groupCrudCounts.getUpdated());
462 chainedResult = flushAddGroupPortionAndBarrier(nodeIdent, groupsAddPlan.get(0));
463 for (final ItemSyncBox<Group> groupsPortion : Iterables.skip(groupsAddPlan, 1)) {
465 Futures.transform(chainedResult, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
467 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input)
469 final ListenableFuture<RpcResult<Void>> result;
470 if (input.isSuccessful()) {
471 result = flushAddGroupPortionAndBarrier(nodeIdent, groupsPortion);
473 // pass through original unsuccessful rpcResult
474 result = Futures.immediateFuture(input);
482 chainedResult = RpcResultBuilder.<Void>success().buildFuture();
484 } catch (IllegalStateException e) {
485 chainedResult = RpcResultBuilder.<Void>failed()
486 .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
490 return chainedResult;
494 public SyncPlanPushStrategyIncrementalImpl setFlowForwarder(final FlowForwarder flowForwarder) {
495 this.flowForwarder = flowForwarder;
499 public SyncPlanPushStrategyIncrementalImpl setTableForwarder(final TableForwarder tableForwarder) {
500 this.tableForwarder = tableForwarder;
504 public SyncPlanPushStrategyIncrementalImpl setMeterForwarder(final MeterForwarder meterForwarder) {
505 this.meterForwarder = meterForwarder;
509 public SyncPlanPushStrategyIncrementalImpl setGroupForwarder(final GroupForwarder groupForwarder) {
510 this.groupForwarder = groupForwarder;
514 public SyncPlanPushStrategyIncrementalImpl setTransactionService(final FlowCapableTransactionService transactionService) {
515 this.transactionService = transactionService;