2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
11 import com.google.common.collect.Iterables;
12 import com.google.common.util.concurrent.AsyncFunction;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.JdkFutureAdapters;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.List;
21 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
22 import org.opendaylight.openflowplugin.applications.frsync.util.CrudCounts;
23 import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
24 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
25 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
26 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
27 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
51 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
52 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
53 import org.opendaylight.yangtools.yang.common.RpcError;
54 import org.opendaylight.yangtools.yang.common.RpcResult;
55 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
60 * Execute CRUD API for flow + group + meter involving one-by-one (incremental) strategy.
62 public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy {
64 private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyIncrementalImpl.class);
66 private FlowForwarder flowForwarder;
67 private MeterForwarder meterForwarder;
68 private GroupForwarder groupForwarder;
69 private TableForwarder tableForwarder;
70 private FlowCapableTransactionService transactionService;
73 public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
74 final SynchronizationDiffInput diffInput,
75 final SyncCrudCounters counters) {
76 final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
77 final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
79 /* Tables - have to be pushed before groups */
80 // TODO enable table-update when ready
81 //resultVehicle = updateTableFeatures(nodeIdent, configTree);
83 resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
85 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
86 if (!input.isSuccessful()) {
87 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
88 //final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
89 // Futures.asList Arrays.asList(input, output),
90 // ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("TODO"));
92 return addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters);
94 }, MoreExecutors.directExecutor());
95 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingGroups"),
96 MoreExecutors.directExecutor());
97 resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
99 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
100 if (!input.isSuccessful()) {
101 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
103 return addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters);
105 }, MoreExecutors.directExecutor());
106 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingMeters"),
107 MoreExecutors.directExecutor());
108 resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
110 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
111 if (!input.isSuccessful()) {
112 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
114 return addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters);
116 }, MoreExecutors.directExecutor());
117 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingFlows"),
118 MoreExecutors.directExecutor());
121 resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
123 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
124 if (!input.isSuccessful()) {
125 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
127 return removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters);
129 }, MoreExecutors.directExecutor());
130 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantFlows"),
131 MoreExecutors.directExecutor());
132 resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
134 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
135 if (!input.isSuccessful()) {
136 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
138 return removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters);
140 }, MoreExecutors.directExecutor());
141 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantMeters"),
142 MoreExecutors.directExecutor());
143 resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
145 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
146 if (!input.isSuccessful()) {
147 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
149 return removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters);
151 }, MoreExecutors.directExecutor());
152 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"),
153 MoreExecutors.directExecutor());
154 return resultVehicle;
158 ListenableFuture<RpcResult<Void>> addMissingFlows(final NodeId nodeId,
159 final InstanceIdentifier<FlowCapableNode> nodeIdent,
160 final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox,
161 final SyncCrudCounters counters) {
162 if (flowsInTablesSyncBox.isEmpty()) {
163 LOG.trace("no tables in config for node: {} -> SKIPPING", nodeId.getValue());
164 return RpcResultBuilder.<Void>success().buildFuture();
167 final List<ListenableFuture<RpcResult<AddFlowOutput>>> allResults = new ArrayList<>();
168 final List<ListenableFuture<RpcResult<UpdateFlowOutput>>> allUpdateResults = new ArrayList<>();
169 final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
171 for (Map.Entry<TableKey, ItemSyncBox<Flow>> flowsInTableBoxEntry : flowsInTablesSyncBox.entrySet()) {
172 final TableKey tableKey = flowsInTableBoxEntry.getKey();
173 final ItemSyncBox<Flow> flowSyncBox = flowsInTableBoxEntry.getValue();
175 final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdent.child(Table.class, tableKey);
177 for (final Flow flow : flowSyncBox.getItemsToPush()) {
178 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.getKey());
180 LOG.trace("adding flow {} in table {} - absent on device {} match{}",
181 flow.getId(), tableKey, nodeId, flow.getMatch());
183 allResults.add(JdkFutureAdapters.listenInPoolThread(
184 flowForwarder.add(flowIdent, flow, nodeIdent)));
185 flowCrudCounts.incAdded();
188 for (final ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowSyncBox.getItemsToUpdate()) {
189 final Flow existingFlow = flowUpdate.getOriginal();
190 final Flow updatedFlow = flowUpdate.getUpdated();
192 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, updatedFlow.getKey());
193 LOG.trace("flow {} in table {} - needs update on device {} match{}",
194 updatedFlow.getId(), tableKey, nodeId, updatedFlow.getMatch());
196 allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
197 flowForwarder.update(flowIdent, existingFlow, updatedFlow, nodeIdent)));
198 flowCrudCounts.incUpdated();
202 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
203 Futures.allAsList(allResults),
204 ReconcileUtil.<AddFlowOutput>createRpcResultCondenser("flow adding"));
206 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
207 Futures.allAsList(allUpdateResults),
208 ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("flow updating"));
210 return Futures.transform(Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
211 ReconcileUtil.<Void>createRpcResultCondenser("flow add/update"));
214 ListenableFuture<RpcResult<Void>> removeRedundantFlows(final NodeId nodeId,
215 final InstanceIdentifier<FlowCapableNode> nodeIdent,
216 final Map<TableKey, ItemSyncBox<Flow>> removalPlan,
217 final SyncCrudCounters counters) {
218 if (removalPlan.isEmpty()) {
219 LOG.trace("no tables in operational for node: {} -> SKIPPING", nodeId.getValue());
220 return RpcResultBuilder.<Void>success().buildFuture();
223 final List<ListenableFuture<RpcResult<RemoveFlowOutput>>> allResults = new ArrayList<>();
224 final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
226 for (final Map.Entry<TableKey, ItemSyncBox<Flow>> flowsPerTable : removalPlan.entrySet()) {
227 final KeyedInstanceIdentifier<Table, TableKey> tableIdent =
228 nodeIdent.child(Table.class, flowsPerTable.getKey());
230 // loop flows on device and check if the are configured
231 for (final Flow flow : flowsPerTable.getValue().getItemsToPush()) {
232 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
233 tableIdent.child(Flow.class, flow.getKey());
234 allResults.add(JdkFutureAdapters.listenInPoolThread(
235 flowForwarder.remove(flowIdent, flow, nodeIdent)));
236 flowCrudCounts.incRemoved();
240 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
241 Futures.allAsList(allResults), ReconcileUtil.<RemoveFlowOutput>createRpcResultCondenser("flow remove"));
242 return Futures.transformAsync(singleVoidResult,
243 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
244 MoreExecutors.directExecutor());
248 ListenableFuture<RpcResult<Void>> removeRedundantMeters(final NodeId nodeId,
249 final InstanceIdentifier<FlowCapableNode> nodeIdent,
250 final ItemSyncBox<Meter> meterRemovalPlan,
251 final SyncCrudCounters counters) {
252 if (meterRemovalPlan.isEmpty()) {
253 LOG.trace("no meters on device for node: {} -> SKIPPING", nodeId.getValue());
254 return RpcResultBuilder.<Void>success().buildFuture();
257 final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
259 final List<ListenableFuture<RpcResult<RemoveMeterOutput>>> allResults = new ArrayList<>();
260 for (Meter meter : meterRemovalPlan.getItemsToPush()) {
261 LOG.trace("removing meter {} - absent in config {}",
262 meter.getMeterId(), nodeId);
263 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
264 nodeIdent.child(Meter.class, meter.getKey());
265 allResults.add(JdkFutureAdapters.listenInPoolThread(
266 meterForwarder.remove(meterIdent, meter, nodeIdent)));
267 meterCrudCounts.incRemoved();
270 return Futures.transform(Futures.allAsList(allResults),
271 ReconcileUtil.<RemoveMeterOutput>createRpcResultCondenser("meter remove"));
274 ListenableFuture<RpcResult<Void>> removeRedundantGroups(final NodeId nodeId,
275 final InstanceIdentifier<FlowCapableNode> nodeIdent,
276 final List<ItemSyncBox<Group>> groupsRemovalPlan,
277 final SyncCrudCounters counters) {
278 if (groupsRemovalPlan.isEmpty()) {
279 LOG.trace("no groups on device for node: {} -> SKIPPING", nodeId.getValue());
280 return RpcResultBuilder.<Void>success().buildFuture();
283 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
285 ListenableFuture<RpcResult<Void>> chainedResult = RpcResultBuilder.<Void>success().buildFuture();
287 groupCrudCounts.setRemoved(ReconcileUtil.countTotalPushed(groupsRemovalPlan));
288 if (LOG.isDebugEnabled()) {
289 LOG.debug("removing groups: planSteps={}, toRemoveTotal={}",
290 groupsRemovalPlan.size(), groupCrudCounts.getRemoved());
292 Collections.reverse(groupsRemovalPlan);
293 for (final ItemSyncBox<Group> groupsPortion : groupsRemovalPlan) {
295 Futures.transformAsync(chainedResult, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
297 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input)
299 final ListenableFuture<RpcResult<Void>> result;
300 if (input.isSuccessful()) {
301 result = flushRemoveGroupPortionAndBarrier(nodeIdent, groupsPortion);
303 // pass through original unsuccessful rpcResult
304 result = Futures.immediateFuture(input);
309 }, MoreExecutors.directExecutor());
311 } catch (IllegalStateException e) {
312 chainedResult = RpcResultBuilder.<Void>failed()
313 .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
317 return chainedResult;
320 private ListenableFuture<RpcResult<Void>> flushRemoveGroupPortionAndBarrier(
321 final InstanceIdentifier<FlowCapableNode> nodeIdent,
322 final ItemSyncBox<Group> groupsPortion) {
323 List<ListenableFuture<RpcResult<RemoveGroupOutput>>> allResults = new ArrayList<>();
324 for (Group group : groupsPortion.getItemsToPush()) {
325 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
326 allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.remove(groupIdent, group, nodeIdent)));
329 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
330 Futures.allAsList(allResults),
331 ReconcileUtil.<RemoveGroupOutput>createRpcResultCondenser("group remove"));
333 return Futures.transformAsync(singleVoidResult,
334 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
335 MoreExecutors.directExecutor());
338 ListenableFuture<RpcResult<Void>> updateTableFeatures(final InstanceIdentifier<FlowCapableNode> nodeIdent,
339 final FlowCapableNode flowCapableNodeConfigured) {
340 // CHECK if while pushing the update, updateTableInput can be null to emulate a table add
341 final List<Table> tableList = ReconcileUtil.safeTables(flowCapableNodeConfigured);
343 final List<ListenableFuture<RpcResult<UpdateTableOutput>>> allResults = new ArrayList<>();
344 for (Table table : tableList) {
345 TableKey tableKey = table.getKey();
346 KeyedInstanceIdentifier<TableFeatures, TableFeaturesKey> tableFeaturesII = nodeIdent
347 .child(TableFeatures.class, new TableFeaturesKey(tableKey.getId()));
348 List<TableFeatures> tableFeatures = flowCapableNodeConfigured.getTableFeatures();
349 if (tableFeatures != null) {
350 for (TableFeatures tableFeaturesItem : tableFeatures) {
351 // TODO uncomment java.lang.NullPointerException
353 // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer.serializeHeader(AbstractOxmMatchEntrySerializer.java:31
354 // allResults.add(JdkFutureAdapters.listenInPoolThread(
355 // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent)));
360 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
361 Futures.allAsList(allResults),
362 ReconcileUtil.<UpdateTableOutput>createRpcResultCondenser("table update"));
364 return Futures.transformAsync(singleVoidResult,
365 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
366 MoreExecutors.directExecutor());
369 private ListenableFuture<RpcResult<Void>> flushAddGroupPortionAndBarrier(
370 final InstanceIdentifier<FlowCapableNode> nodeIdent,
371 final ItemSyncBox<Group> groupsPortion) {
372 final List<ListenableFuture<RpcResult<AddGroupOutput>>> allResults = new ArrayList<>();
373 final List<ListenableFuture<RpcResult<UpdateGroupOutput>>> allUpdateResults = new ArrayList<>();
375 for (Group group : groupsPortion.getItemsToPush()) {
376 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
377 allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.add(groupIdent, group, nodeIdent)));
381 for (ItemSyncBox.ItemUpdateTuple<Group> groupTuple : groupsPortion.getItemsToUpdate()) {
382 final Group existingGroup = groupTuple.getOriginal();
383 final Group group = groupTuple.getUpdated();
385 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
386 allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
387 groupForwarder.update(groupIdent, existingGroup, group, nodeIdent)));
390 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
391 Futures.allAsList(allResults), ReconcileUtil.<AddGroupOutput>createRpcResultCondenser("group add"));
393 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
394 Futures.allAsList(allUpdateResults),
395 ReconcileUtil.<UpdateGroupOutput>createRpcResultCondenser("group update"));
397 final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
398 Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
399 ReconcileUtil.<Void>createRpcResultCondenser("group add/update"));
402 return Futures.transformAsync(summaryResult, ReconcileUtil.chainBarrierFlush(
403 PathUtil.digNodePath(nodeIdent), transactionService), MoreExecutors.directExecutor());
406 ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
407 final InstanceIdentifier<FlowCapableNode> nodeIdent,
408 final ItemSyncBox<Meter> syncBox,
409 final SyncCrudCounters counters) {
410 if (syncBox.isEmpty()) {
411 LOG.trace("no meters configured for node: {} -> SKIPPING", nodeId.getValue());
412 return RpcResultBuilder.<Void>success().buildFuture();
415 final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
417 final List<ListenableFuture<RpcResult<AddMeterOutput>>> allResults = new ArrayList<>();
418 final List<ListenableFuture<RpcResult<UpdateMeterOutput>>> allUpdateResults = new ArrayList<>();
419 for (Meter meter : syncBox.getItemsToPush()) {
420 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, meter.getKey());
421 LOG.debug("adding meter {} - absent on device {}",
422 meter.getMeterId(), nodeId);
423 allResults.add(JdkFutureAdapters.listenInPoolThread(
424 meterForwarder.add(meterIdent, meter, nodeIdent)));
425 meterCrudCounts.incAdded();
428 for (ItemSyncBox.ItemUpdateTuple<Meter> meterTuple : syncBox.getItemsToUpdate()) {
429 final Meter existingMeter = meterTuple.getOriginal();
430 final Meter updated = meterTuple.getUpdated();
431 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, updated.getKey());
432 LOG.trace("meter {} - needs update on device {}", updated.getMeterId(), nodeId);
433 allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
434 meterForwarder.update(meterIdent, existingMeter, updated, nodeIdent)));
435 meterCrudCounts.incUpdated();
438 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
439 Futures.allAsList(allResults), ReconcileUtil.<AddMeterOutput>createRpcResultCondenser("meter add"));
441 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
442 Futures.allAsList(allUpdateResults),
443 ReconcileUtil.<UpdateMeterOutput>createRpcResultCondenser("meter update"));
445 return Futures.transform(Futures.allAsList(singleVoidUpdateResult, singleVoidAddResult),
446 ReconcileUtil.<Void>createRpcResultCondenser("meter add/update"));
449 ListenableFuture<RpcResult<Void>> addMissingGroups(final NodeId nodeId,
450 final InstanceIdentifier<FlowCapableNode> nodeIdent,
451 final List<ItemSyncBox<Group>> groupsAddPlan,
452 final SyncCrudCounters counters) {
453 if (groupsAddPlan.isEmpty()) {
454 LOG.trace("no groups configured for node: {} -> SKIPPING", nodeId.getValue());
455 return RpcResultBuilder.<Void>success().buildFuture();
458 ListenableFuture<RpcResult<Void>> chainedResult;
460 if (!groupsAddPlan.isEmpty()) {
461 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
462 groupCrudCounts.setAdded(ReconcileUtil.countTotalPushed(groupsAddPlan));
463 groupCrudCounts.setUpdated(ReconcileUtil.countTotalUpdated(groupsAddPlan));
465 if (LOG.isDebugEnabled()) {
466 LOG.debug("adding groups: planSteps={}, toAddTotal={}, toUpdateTotal={}",
467 groupsAddPlan.size(),
468 groupCrudCounts.getAdded(),
469 groupCrudCounts.getUpdated());
472 chainedResult = flushAddGroupPortionAndBarrier(nodeIdent, groupsAddPlan.get(0));
473 for (final ItemSyncBox<Group> groupsPortion : Iterables.skip(groupsAddPlan, 1)) {
475 Futures.transformAsync(chainedResult, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
477 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input)
479 final ListenableFuture<RpcResult<Void>> result;
480 if (input.isSuccessful()) {
481 result = flushAddGroupPortionAndBarrier(nodeIdent, groupsPortion);
483 // pass through original unsuccessful rpcResult
484 result = Futures.immediateFuture(input);
489 }, MoreExecutors.directExecutor());
492 chainedResult = RpcResultBuilder.<Void>success().buildFuture();
494 } catch (IllegalStateException e) {
495 chainedResult = RpcResultBuilder.<Void>failed()
496 .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
500 return chainedResult;
504 public SyncPlanPushStrategyIncrementalImpl setFlowForwarder(final FlowForwarder flowForwarder) {
505 this.flowForwarder = flowForwarder;
509 public SyncPlanPushStrategyIncrementalImpl setTableForwarder(final TableForwarder tableForwarder) {
510 this.tableForwarder = tableForwarder;
514 public SyncPlanPushStrategyIncrementalImpl setMeterForwarder(final MeterForwarder meterForwarder) {
515 this.meterForwarder = meterForwarder;
519 public SyncPlanPushStrategyIncrementalImpl setGroupForwarder(final GroupForwarder groupForwarder) {
520 this.groupForwarder = groupForwarder;
524 public SyncPlanPushStrategyIncrementalImpl setTransactionService(final FlowCapableTransactionService transactionService) {
525 this.transactionService = transactionService;