2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
11 import com.google.common.collect.Iterables;
12 import com.google.common.util.concurrent.AsyncFunction;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.JdkFutureAdapters;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.List;
20 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
21 import org.opendaylight.openflowplugin.applications.frsync.impl.FlowForwarder;
22 import org.opendaylight.openflowplugin.applications.frsync.impl.GroupForwarder;
23 import org.opendaylight.openflowplugin.applications.frsync.impl.MeterForwarder;
24 import org.opendaylight.openflowplugin.applications.frsync.impl.TableForwarder;
25 import org.opendaylight.openflowplugin.applications.frsync.util.CrudCounts;
26 import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
27 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
28 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
29 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
30 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
55 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
56 import org.opendaylight.yangtools.yang.common.RpcError;
57 import org.opendaylight.yangtools.yang.common.RpcResult;
58 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
63 * Execute CRUD API for flow + group + meter involving one-by-one (incremental) strategy.
65 public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy {
67 private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyIncrementalImpl.class);
69 private FlowForwarder flowForwarder;
70 private TableForwarder tableForwarder;
71 private MeterForwarder meterForwarder;
72 private GroupForwarder groupForwarder;
73 private FlowCapableTransactionService transactionService;
76 public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
77 final SynchronizationDiffInput diffInput,
78 final SyncCrudCounters counters) {
79 final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
80 final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
82 /* Tables - have to be pushed before groups */
83 // TODO enable table-update when ready
84 //resultVehicle = updateTableFeatures(nodeIdent, configTree);
86 resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
88 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
89 if (!input.isSuccessful()) {
90 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
91 //final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
92 // Futures.asList Arrays.asList(input, output),
93 // ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("TODO"));
95 return addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters);
98 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingGroups"));
99 resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
101 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
102 if (!input.isSuccessful()) {
103 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
105 return addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters);
108 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingMeters"));
109 resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
111 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
112 if (!input.isSuccessful()) {
113 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
115 return addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters);
118 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingFlows"));
121 resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
123 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
124 if (!input.isSuccessful()) {
125 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
127 return removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters);
130 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantFlows"));
131 resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
133 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
134 if (!input.isSuccessful()) {
135 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
137 return removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters);
140 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantMeters"));
141 resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
143 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
144 if (!input.isSuccessful()) {
145 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
147 return removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters);
150 Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"));
151 return resultVehicle;
155 ListenableFuture<RpcResult<Void>> addMissingFlows(final NodeId nodeId,
156 final InstanceIdentifier<FlowCapableNode> nodeIdent,
157 final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox,
158 final SyncCrudCounters counters) {
159 if (flowsInTablesSyncBox.isEmpty()) {
160 LOG.trace("no tables in config for node: {} -> SKIPPING", nodeId.getValue());
161 return RpcResultBuilder.<Void>success().buildFuture();
164 final List<ListenableFuture<RpcResult<AddFlowOutput>>> allResults = new ArrayList<>();
165 final List<ListenableFuture<RpcResult<UpdateFlowOutput>>> allUpdateResults = new ArrayList<>();
166 final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
168 for (Map.Entry<TableKey, ItemSyncBox<Flow>> flowsInTableBoxEntry : flowsInTablesSyncBox.entrySet()) {
169 final TableKey tableKey = flowsInTableBoxEntry.getKey();
170 final ItemSyncBox<Flow> flowSyncBox = flowsInTableBoxEntry.getValue();
172 final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdent.child(Table.class, tableKey);
174 for (final Flow flow : flowSyncBox.getItemsToPush()) {
175 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.getKey());
177 LOG.trace("adding flow {} in table {} - absent on device {} match{}",
178 flow.getId(), tableKey, nodeId, flow.getMatch());
180 allResults.add(JdkFutureAdapters.listenInPoolThread(
181 flowForwarder.add(flowIdent, flow, nodeIdent)));
182 flowCrudCounts.incAdded();
185 for (final ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowSyncBox.getItemsToUpdate()) {
186 final Flow existingFlow = flowUpdate.getOriginal();
187 final Flow updatedFlow = flowUpdate.getUpdated();
189 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, updatedFlow.getKey());
190 LOG.trace("flow {} in table {} - needs update on device {} match{}",
191 updatedFlow.getId(), tableKey, nodeId, updatedFlow.getMatch());
193 allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
194 flowForwarder.update(flowIdent, existingFlow, updatedFlow, nodeIdent)));
195 flowCrudCounts.incUpdated();
199 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
200 Futures.allAsList(allResults),
201 ReconcileUtil.<AddFlowOutput>createRpcResultCondenser("flow adding"));
203 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
204 Futures.allAsList(allUpdateResults),
205 ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("flow updating"));
207 final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
208 Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
209 ReconcileUtil.<Void>createRpcResultCondenser("flow add/update"));
211 return summaryResult;
214 return Futures.transform(summaryResult,
215 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
219 ListenableFuture<RpcResult<Void>> removeRedundantFlows(final NodeId nodeId,
220 final InstanceIdentifier<FlowCapableNode> nodeIdent,
221 final Map<TableKey, ItemSyncBox<Flow>> removalPlan,
222 final SyncCrudCounters counters) {
223 if (removalPlan.isEmpty()) {
224 LOG.trace("no tables in operational for node: {} -> SKIPPING", nodeId.getValue());
225 return RpcResultBuilder.<Void>success().buildFuture();
228 final List<ListenableFuture<RpcResult<RemoveFlowOutput>>> allResults = new ArrayList<>();
229 final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
231 for (final Map.Entry<TableKey, ItemSyncBox<Flow>> flowsPerTable : removalPlan.entrySet()) {
232 final KeyedInstanceIdentifier<Table, TableKey> tableIdent =
233 nodeIdent.child(Table.class, flowsPerTable.getKey());
235 // loop flows on device and check if the are configured
236 for (final Flow flow : flowsPerTable.getValue().getItemsToPush()) {
237 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
238 tableIdent.child(Flow.class, flow.getKey());
239 allResults.add(JdkFutureAdapters.listenInPoolThread(
240 flowForwarder.remove(flowIdent, flow, nodeIdent)));
241 flowCrudCounts.incRemoved();
245 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
246 Futures.allAsList(allResults), ReconcileUtil.<RemoveFlowOutput>createRpcResultCondenser("flow remove"));
247 return Futures.transform(singleVoidResult,
248 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
252 ListenableFuture<RpcResult<Void>> removeRedundantMeters(final NodeId nodeId,
253 final InstanceIdentifier<FlowCapableNode> nodeIdent,
254 final ItemSyncBox<Meter> meterRemovalPlan,
255 final SyncCrudCounters counters) {
256 if (meterRemovalPlan.isEmpty()) {
257 LOG.trace("no meters on device for node: {} -> SKIPPING", nodeId.getValue());
258 return RpcResultBuilder.<Void>success().buildFuture();
261 final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
263 final List<ListenableFuture<RpcResult<RemoveMeterOutput>>> allResults = new ArrayList<>();
264 for (Meter meter : meterRemovalPlan.getItemsToPush()) {
265 LOG.trace("removing meter {} - absent in config {}",
266 meter.getMeterId(), nodeId);
267 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
268 nodeIdent.child(Meter.class, meter.getKey());
269 allResults.add(JdkFutureAdapters.listenInPoolThread(
270 meterForwarder.remove(meterIdent, meter, nodeIdent)));
271 meterCrudCounts.incRemoved();
274 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
275 Futures.allAsList(allResults),
276 ReconcileUtil.<RemoveMeterOutput>createRpcResultCondenser("meter remove"));
277 return singleVoidResult;
279 return Futures.transform(singleVoidResult,
280 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
284 ListenableFuture<RpcResult<Void>> removeRedundantGroups(final NodeId nodeId,
285 final InstanceIdentifier<FlowCapableNode> nodeIdent,
286 final List<ItemSyncBox<Group>> groupsRemovalPlan,
287 final SyncCrudCounters counters) {
288 if (groupsRemovalPlan.isEmpty()) {
289 LOG.trace("no groups on device for node: {} -> SKIPPING", nodeId.getValue());
290 return RpcResultBuilder.<Void>success().buildFuture();
293 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
295 ListenableFuture<RpcResult<Void>> chainedResult = RpcResultBuilder.<Void>success().buildFuture();
297 groupCrudCounts.setRemoved(ReconcileUtil.countTotalPushed(groupsRemovalPlan));
298 if (LOG.isDebugEnabled()) {
299 LOG.debug("removing groups: planSteps={}, toRemoveTotal={}",
300 groupsRemovalPlan.size(), groupCrudCounts.getRemoved());
302 Collections.reverse(groupsRemovalPlan);
303 for (final ItemSyncBox<Group> groupsPortion : groupsRemovalPlan) {
305 Futures.transform(chainedResult, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
307 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input)
309 final ListenableFuture<RpcResult<Void>> result;
310 if (input.isSuccessful()) {
311 result = flushRemoveGroupPortionAndBarrier(nodeIdent, groupsPortion);
313 // pass through original unsuccessful rpcResult
314 result = Futures.immediateFuture(input);
321 } catch (IllegalStateException e) {
322 chainedResult = RpcResultBuilder.<Void>failed()
323 .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
327 return chainedResult;
330 private ListenableFuture<RpcResult<Void>> flushRemoveGroupPortionAndBarrier(
331 final InstanceIdentifier<FlowCapableNode> nodeIdent,
332 final ItemSyncBox<Group> groupsPortion) {
333 List<ListenableFuture<RpcResult<RemoveGroupOutput>>> allResults = new ArrayList<>();
334 for (Group group : groupsPortion.getItemsToPush()) {
335 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
336 allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.remove(groupIdent, group, nodeIdent)));
339 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
340 Futures.allAsList(allResults),
341 ReconcileUtil.<RemoveGroupOutput>createRpcResultCondenser("group remove"));
343 return Futures.transform(singleVoidResult,
344 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
347 ListenableFuture<RpcResult<Void>> updateTableFeatures(final InstanceIdentifier<FlowCapableNode> nodeIdent,
348 final FlowCapableNode flowCapableNodeConfigured) {
349 // CHECK if while pushing the update, updateTableInput can be null to emulate a table add
350 final List<Table> tableList = ReconcileUtil.safeTables(flowCapableNodeConfigured);
352 final List<ListenableFuture<RpcResult<UpdateTableOutput>>> allResults = new ArrayList<>();
353 for (Table table : tableList) {
354 TableKey tableKey = table.getKey();
355 KeyedInstanceIdentifier<TableFeatures, TableFeaturesKey> tableFeaturesII = nodeIdent
356 .child(TableFeatures.class, new TableFeaturesKey(tableKey.getId()));
357 List<TableFeatures> tableFeatures = flowCapableNodeConfigured.getTableFeatures();
358 if (tableFeatures != null) {
359 for (TableFeatures tableFeaturesItem : tableFeatures) {
360 // TODO uncomment java.lang.NullPointerException
362 // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer.serializeHeader(AbstractOxmMatchEntrySerializer.java:31
363 // allResults.add(JdkFutureAdapters.listenInPoolThread(
364 // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent)));
369 final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
370 Futures.allAsList(allResults),
371 ReconcileUtil.<UpdateTableOutput>createRpcResultCondenser("table update"));
373 return Futures.transform(singleVoidResult,
374 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
377 private ListenableFuture<RpcResult<Void>> flushAddGroupPortionAndBarrier(
378 final InstanceIdentifier<FlowCapableNode> nodeIdent,
379 final ItemSyncBox<Group> groupsPortion) {
380 final List<ListenableFuture<RpcResult<AddGroupOutput>>> allResults = new ArrayList<>();
381 final List<ListenableFuture<RpcResult<UpdateGroupOutput>>> allUpdateResults = new ArrayList<>();
383 for (Group group : groupsPortion.getItemsToPush()) {
384 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
385 allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.add(groupIdent, group, nodeIdent)));
389 for (ItemSyncBox.ItemUpdateTuple<Group> groupTuple : groupsPortion.getItemsToUpdate()) {
390 final Group existingGroup = groupTuple.getOriginal();
391 final Group group = groupTuple.getUpdated();
393 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
394 allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
395 groupForwarder.update(groupIdent, existingGroup, group, nodeIdent)));
398 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
399 Futures.allAsList(allResults), ReconcileUtil.<AddGroupOutput>createRpcResultCondenser("group add"));
401 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
402 Futures.allAsList(allUpdateResults),
403 ReconcileUtil.<UpdateGroupOutput>createRpcResultCondenser("group update"));
405 final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
406 Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
407 ReconcileUtil.<Void>createRpcResultCondenser("group add/update"));
410 return Futures.transform(summaryResult,
411 ReconcileUtil.chainBarrierFlush(
412 PathUtil.digNodePath(nodeIdent), transactionService));
415 ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
416 final InstanceIdentifier<FlowCapableNode> nodeIdent,
417 final ItemSyncBox<Meter> syncBox,
418 final SyncCrudCounters counters) {
419 if (syncBox.isEmpty()) {
420 LOG.trace("no meters configured for node: {} -> SKIPPING", nodeId.getValue());
421 return RpcResultBuilder.<Void>success().buildFuture();
424 final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
426 final List<ListenableFuture<RpcResult<AddMeterOutput>>> allResults = new ArrayList<>();
427 final List<ListenableFuture<RpcResult<UpdateMeterOutput>>> allUpdateResults = new ArrayList<>();
428 for (Meter meter : syncBox.getItemsToPush()) {
429 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, meter.getKey());
430 LOG.debug("adding meter {} - absent on device {}",
431 meter.getMeterId(), nodeId);
432 allResults.add(JdkFutureAdapters.listenInPoolThread(
433 meterForwarder.add(meterIdent, meter, nodeIdent)));
434 meterCrudCounts.incAdded();
437 for (ItemSyncBox.ItemUpdateTuple<Meter> meterTuple : syncBox.getItemsToUpdate()) {
438 final Meter existingMeter = meterTuple.getOriginal();
439 final Meter updated = meterTuple.getUpdated();
440 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, updated.getKey());
441 LOG.trace("meter {} - needs update on device {}", updated.getMeterId(), nodeId);
442 allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
443 meterForwarder.update(meterIdent, existingMeter, updated, nodeIdent)));
444 meterCrudCounts.incUpdated();
447 final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
448 Futures.allAsList(allResults), ReconcileUtil.<AddMeterOutput>createRpcResultCondenser("meter add"));
450 final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
451 Futures.allAsList(allUpdateResults),
452 ReconcileUtil.<UpdateMeterOutput>createRpcResultCondenser("meter update"));
454 final ListenableFuture<RpcResult<Void>> summaryResults = Futures.transform(
455 Futures.allAsList(singleVoidUpdateResult, singleVoidAddResult),
456 ReconcileUtil.<Void>createRpcResultCondenser("meter add/update"));
458 return summaryResults;
461 return Futures.transform(summaryResults,
462 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
466 ListenableFuture<RpcResult<Void>> addMissingGroups(final NodeId nodeId,
467 final InstanceIdentifier<FlowCapableNode> nodeIdent,
468 final List<ItemSyncBox<Group>> groupsAddPlan,
469 final SyncCrudCounters counters) {
470 if (groupsAddPlan.isEmpty()) {
471 LOG.trace("no groups configured for node: {} -> SKIPPING", nodeId.getValue());
472 return RpcResultBuilder.<Void>success().buildFuture();
475 ListenableFuture<RpcResult<Void>> chainedResult;
477 if (!groupsAddPlan.isEmpty()) {
478 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
479 groupCrudCounts.setAdded(ReconcileUtil.countTotalPushed(groupsAddPlan));
480 groupCrudCounts.setUpdated(ReconcileUtil.countTotalUpdated(groupsAddPlan));
482 if (LOG.isDebugEnabled()) {
483 LOG.debug("adding groups: planSteps={}, toAddTotal={}, toUpdateTotal={}",
484 groupsAddPlan.size(),
485 groupCrudCounts.getAdded(),
486 groupCrudCounts.getUpdated());
489 chainedResult = flushAddGroupPortionAndBarrier(nodeIdent, groupsAddPlan.get(0));
490 for (final ItemSyncBox<Group> groupsPortion : Iterables.skip(groupsAddPlan, 1)) {
492 Futures.transform(chainedResult, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
494 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input)
496 final ListenableFuture<RpcResult<Void>> result;
497 if (input.isSuccessful()) {
498 result = flushAddGroupPortionAndBarrier(nodeIdent, groupsPortion);
500 // pass through original unsuccessful rpcResult
501 result = Futures.immediateFuture(input);
509 chainedResult = RpcResultBuilder.<Void>success().buildFuture();
511 } catch (IllegalStateException e) {
512 chainedResult = RpcResultBuilder.<Void>failed()
513 .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
517 return chainedResult;
521 public SyncPlanPushStrategyIncrementalImpl setFlowForwarder(final FlowForwarder flowForwarder) {
522 this.flowForwarder = flowForwarder;
526 public SyncPlanPushStrategyIncrementalImpl setTableForwarder(final TableForwarder tableForwarder) {
527 this.tableForwarder = tableForwarder;
531 public SyncPlanPushStrategyIncrementalImpl setMeterForwarder(final MeterForwarder meterForwarder) {
532 this.meterForwarder = meterForwarder;
536 public SyncPlanPushStrategyIncrementalImpl setGroupForwarder(final GroupForwarder groupForwarder) {
537 this.groupForwarder = groupForwarder;
541 public SyncPlanPushStrategyIncrementalImpl setTransactionService(final FlowCapableTransactionService transactionService) {
542 this.transactionService = transactionService;