Bug 5575 main SyncReactor + utils 51/38151/8
authorAndrej Leitner <anleitne@cisco.com>
Tue, 26 Apr 2016 13:50:04 +0000 (15:50 +0200)
committerAndrej Leitner <anleitne@cisco.com>
Tue, 24 May 2016 06:32:06 +0000 (08:32 +0200)
  - synchronization reactor implementation, applicable for both - syncup and reconciliation
  - useful util methods

Change-Id: I4a0ef8043e3e9b66a92c49deb86a83b083782ccc
Signed-off-by: Andrej Leitner <anleitne@cisco.com>
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/SyncReactor.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProvider.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorImpl.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/markandsweep/SwitchFlowId.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/CrudCounts.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/FlowCapableNodeLookups.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/ItemSyncBox.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/PathUtil.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/ReconcileUtil.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/SyncCrudCounters.java [new file with mode: 0644]

diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/SyncReactor.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/SyncReactor.java
new file mode 100644 (file)
index 0000000..12cffd1
--- /dev/null
@@ -0,0 +1,29 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Device synchronization API.
+ */
+public interface SyncReactor {
+    /**
+     * @param flowcapableNodePath path to openflow augmentation of node
+     * @param configTree configured node
+     * @param operationalTree device reflection
+     * @return synchronization outcome
+     */
+    ListenableFuture<Boolean> syncup(InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+            FlowCapableNode configTree, FlowCapableNode operationalTree) throws InterruptedException;
+
+}
index 9c9457c8540a1ccc7cf87ca93c671faa402555fb..eb5b46f292bd08291d93e8c388fc3805df17a543 100644 (file)
@@ -90,6 +90,8 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
         final TableForwarder tableForwarder = new TableForwarder(salTableService);
 
         {
+            final SyncReactorImpl syncReactorImpl = new SyncReactorImpl();
+            
             final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
             final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
             final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorImpl.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorImpl.java
new file mode 100644 (file)
index 0000000..9f271d6
--- /dev/null
@@ -0,0 +1,791 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.markandsweep.SwitchFlowId;
+import org.opendaylight.openflowplugin.applications.frsync.util.CrudCounts;
+import org.opendaylight.openflowplugin.applications.frsync.util.FlowCapableNodeLookups;
+import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Synchronization reactor implementation, applicable for both - syncup and reconciliation.
+ */
+public class SyncReactorImpl implements SyncReactor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SyncReactorImpl.class);
+
+    private FlowForwarder flowForwarder;
+    private TableForwarder tableForwarder;
+    private MeterForwarder meterForwarder;
+    private GroupForwarder groupForwarder;
+    private FlowCapableTransactionService transactionService;
+
+    @Override
+    public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> nodeIdent,
+                                            final FlowCapableNode configTree, final FlowCapableNode operationalTree) {
+
+        LOG.trace("syncup {} cfg:{} oper:{}", nodeIdent, configTree == null ? "is null" : "non null", operationalTree == null ? "is null" : "non null");
+        final SyncCrudCounters counters = new SyncCrudCounters();
+        /**
+         * reconciliation strategy - phase 1: - add/update missing objects in following order -
+         * table features - groups (reordered) - meters - flows
+         **/
+        ListenableFuture<RpcResult<Void>> resultVehicle = null;
+        final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
+
+        /* Tables - have to be pushed before groups */
+        resultVehicle = updateTableFeatures(nodeIdent, configTree);
+        resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
+            @Override
+            public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
+                if (!input.isSuccessful()) {
+                    //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
+                    //final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
+                    //        Futures.asList Arrays.asList(input, output),
+                    //        ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("TODO"));
+                }
+                return addMissingGroups(nodeIdent, configTree, operationalTree, counters);
+            }
+        });
+        Futures.addCallback(resultVehicle, logResultCallback(nodeId, "addMissingGroups"));
+        resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
+            @Override
+            public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
+                if (!input.isSuccessful()) {
+                    //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
+                }
+                return addMissingMeters(nodeIdent, configTree, operationalTree, counters);
+            }
+        });
+        Futures.addCallback(resultVehicle, logResultCallback(nodeId, "addMissingMeters"));
+        resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
+            @Override
+            public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
+                if (!input.isSuccessful()) {
+                    //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
+                }
+                return addMissingFlows(nodeIdent, configTree, operationalTree, counters);
+            }
+        });
+        Futures.addCallback(resultVehicle, logResultCallback(nodeId, "addMissingFlows"));
+
+        /**
+         * reconciliation strategy - phase 2: - remove redundand objects in following order - flows
+         * - meters - groups (reordered)
+         **/
+        resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
+            @Override
+            public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
+                if (!input.isSuccessful()) {
+                    //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
+                }
+                return removeRedundantFlows(nodeIdent, configTree, operationalTree, counters);
+            }
+        });
+        Futures.addCallback(resultVehicle, logResultCallback(nodeId, "removeRedundantFlows"));
+        resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
+            @Override
+            public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
+                if (!input.isSuccessful()) {
+                    //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
+                }
+                return removeRedundantMeters(nodeIdent, configTree, operationalTree, counters);
+            }
+        });
+        Futures.addCallback(resultVehicle, logResultCallback(nodeId, "removeRedundantMeters"));
+        resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
+            @Override
+            public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
+                if (!input.isSuccessful()) {
+                    //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
+                }
+                return removeRedundantGroups(nodeIdent, configTree, operationalTree, counters);
+            }
+        });
+        Futures.addCallback(resultVehicle, logResultCallback(nodeId, "removeRedundantGroups"));
+
+        // log final result
+        Futures.addCallback(resultVehicle, logResultCallback(nodeId, "final result"));
+
+        return Futures.transform(resultVehicle, new Function<RpcResult<Void>, Boolean>() {
+            @Override
+            public Boolean apply(RpcResult<Void> input) {
+                if (input == null) {
+                    return false;
+                }
+
+                if (LOG.isDebugEnabled()) {
+                    final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
+                    final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
+                    final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
+                    LOG.debug("sync-outcome[{}] (added/updated/removed): flow={}/{}/{}, meter={}/{}/{}, group={}/{}/{}",
+                            nodeId.getValue(),
+                            flowCrudCounts.getAdded(),
+                            flowCrudCounts.getUpdated(),
+                            flowCrudCounts.getRemoved(),
+                            meterCrudCounts.getAdded(),
+                            meterCrudCounts.getUpdated(),
+                            meterCrudCounts.getRemoved(),
+                            groupCrudCounts.getAdded(),
+                            groupCrudCounts.getUpdated(),
+                            groupCrudCounts.getRemoved()
+                    );
+                }
+
+                return input.isSuccessful();
+            }
+        });
+    }
+
+    private FutureCallback<RpcResult<Void>> logResultCallback(final NodeId nodeId, final String prefix) {
+        return new FutureCallback<RpcResult<Void>>() {
+            @Override
+            public void onSuccess(@Nullable final RpcResult<Void> result) {
+                if (result != null) {
+                    if (result.isSuccessful()) {
+                        LOG.debug(prefix + " finished successfully: {}", nodeId.getValue());
+                    } else {
+                        final Collection<RpcError> errors = MoreObjects.firstNonNull(result.getErrors(), ImmutableList.<RpcError>of());
+                        LOG.debug(prefix + " failed: {} -> {}", nodeId.getValue(), Arrays.toString(errors.toArray()));
+                    }
+                } else {
+                    LOG.debug(prefix + "reconciliation failed: {} -> null result", nodeId.getValue());
+                }
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                LOG.debug(prefix + "reconciliation failed seriously: {}", nodeId.getValue(), t);
+            }
+        };
+    }
+
+    public SyncReactorImpl setFlowForwarder(final FlowForwarder flowForwarder) {
+        this.flowForwarder = flowForwarder;
+        return this;
+    }
+
+    public SyncReactorImpl setTableForwarder(final TableForwarder tableForwarder) {
+        this.tableForwarder = tableForwarder;
+        return this;
+    }
+
+    public SyncReactorImpl setMeterForwarder(final MeterForwarder meterForwarder) {
+        this.meterForwarder = meterForwarder;
+        return this;
+    }
+
+    public SyncReactorImpl setGroupForwarder(final GroupForwarder groupForwarder) {
+        this.groupForwarder = groupForwarder;
+        return this;
+    }
+
+    public SyncReactorImpl setTransactionService(final FlowCapableTransactionService transactionService) {
+        this.transactionService = transactionService;
+        return this;
+    }
+
+    ListenableFuture<RpcResult<Void>> updateTableFeatures(final InstanceIdentifier<FlowCapableNode> nodeIdent,
+                                                          final FlowCapableNode flowCapableNodeConfigured) {
+        // CHECK if while pushing the update, updateTableInput can be null to emulate a table add
+        final List<Table> tableList = safeTables(flowCapableNodeConfigured);
+
+        final List<ListenableFuture<RpcResult<UpdateTableOutput>>> allResults = new ArrayList<>();
+        for (Table table : tableList) {
+            TableKey tableKey = table.getKey();
+            KeyedInstanceIdentifier<TableFeatures, TableFeaturesKey> tableFeaturesII = nodeIdent
+                    .child(TableFeatures.class, new TableFeaturesKey(tableKey.getId()));
+            List<TableFeatures> tableFeatures = flowCapableNodeConfigured.getTableFeatures();
+            if (tableFeatures != null) {
+                for (TableFeatures tableFeaturesItem : tableFeatures) {
+                    // TODO uncomment java.lang.NullPointerException
+                    // at
+                    // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer.serializeHeader(AbstractOxmMatchEntrySerializer.java:31
+                    // allResults.add(JdkFutureAdapters.listenInPoolThread(
+                    // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent)));
+                }
+            }
+        }
+
+        final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
+                Futures.allAsList(allResults),
+                ReconcileUtil.<UpdateTableOutput>createRpcResultCondenser("table update"));
+
+        return Futures.transform(singleVoidResult,
+                ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
+    }
+
+
+    @VisibleForTesting
+    ListenableFuture<RpcResult<Void>> addMissingGroups(final InstanceIdentifier<FlowCapableNode> nodeIdent,
+                                                       final FlowCapableNode flowCapableNodeConfigured,
+                                                       final FlowCapableNode flowCapableNodeOperational,
+                                                       final SyncCrudCounters counters) {
+        final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
+        final List<Group> groupsConfigured = safeGroups(flowCapableNodeConfigured);
+        if (groupsConfigured.isEmpty()) {
+            LOG.trace("no groups configured for node: {} -> SKIPPING", nodeId.getValue());
+            return RpcResultBuilder.<Void>success().buildFuture();
+        }
+
+        final List<Group> groupsOperational = safeGroups(flowCapableNodeOperational);
+
+        return addMissingGroups(nodeId, nodeIdent, groupsConfigured, groupsOperational, counters);
+    }
+
+    protected ListenableFuture<RpcResult<Void>> addMissingGroups(NodeId nodeId,
+                                                                 final InstanceIdentifier<FlowCapableNode> nodeIdent,
+                                                                 final List<Group> groupsConfigured,
+                                                                 final List<Group> groupsOperational,
+                                                                 final SyncCrudCounters counters) {
+
+        final Map<Long, Group> groupOperationalMap = FlowCapableNodeLookups.wrapGroupsToMap(groupsOperational);
+
+        final List<Group> pendingGroups = new ArrayList<>();
+        pendingGroups.addAll(groupsConfigured);
+
+        ListenableFuture<RpcResult<Void>> chainedResult;
+        try {
+            final List<ItemSyncBox<Group>> groupsAddPlan =
+                    ReconcileUtil.resolveAndDivideGroups(nodeId, groupOperationalMap, pendingGroups);
+            if (!groupsAddPlan.isEmpty()) {
+                final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
+                groupCrudCounts.setAdded(ReconcileUtil.countTotalAdds(groupsAddPlan));
+                groupCrudCounts.setUpdated(ReconcileUtil.countTotalUpdated(groupsAddPlan));
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("adding groups: inputGroups={}, planSteps={}, toAddTotal={}, toUpdateTotal={}",
+                            pendingGroups.size(), groupsAddPlan.size(),
+                            groupCrudCounts.getAdded(),
+                            groupCrudCounts.getUpdated());
+                }
+
+                chainedResult = flushAddGroupPortionAndBarrier(nodeIdent, groupsAddPlan.get(0));
+                for (final ItemSyncBox<Group> groupsPortion : Iterables.skip(groupsAddPlan, 1)) {
+                    chainedResult =
+                            Futures.transform(chainedResult, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
+                                @Override
+                                public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input)
+                                        throws Exception {
+                                    final ListenableFuture<RpcResult<Void>> result;
+                                    if (input.isSuccessful()) {
+                                        result = flushAddGroupPortionAndBarrier(nodeIdent, groupsPortion);
+                                    } else {
+                                        // pass through original unsuccessful rpcResult
+                                        result = Futures.immediateFuture(input);
+                                    }
+
+                                    return result;
+                                }
+                            });
+                }
+            } else {
+                chainedResult = RpcResultBuilder.<Void>success().buildFuture();
+            }
+        } catch (IllegalStateException e) {
+            chainedResult = RpcResultBuilder.<Void>failed()
+                    .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
+                    .buildFuture();
+        }
+
+        return chainedResult;
+    }
+
+    private ListenableFuture<RpcResult<Void>> flushAddGroupPortionAndBarrier(
+            final InstanceIdentifier<FlowCapableNode> nodeIdent,
+            final ItemSyncBox<Group> groupsPortion) {
+        final List<ListenableFuture<RpcResult<AddGroupOutput>>> allResults = new ArrayList<>();
+        final List<ListenableFuture<RpcResult<UpdateGroupOutput>>> allUpdateResults = new ArrayList<>();
+
+        for (Group group : groupsPortion.getItemsToAdd()) {
+            final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
+            allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.add(groupIdent, group, nodeIdent)));
+
+        }
+
+        for (ItemSyncBox.ItemUpdateTuple<Group> groupTuple : groupsPortion.getItemsToUpdate()) {
+            final Group existingGroup = groupTuple.getOriginal();
+            final Group group = groupTuple.getUpdated();
+
+            final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
+            allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
+                    groupForwarder.update(groupIdent, existingGroup, group, nodeIdent)));
+        }
+
+        final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
+                Futures.allAsList(allResults), ReconcileUtil.<AddGroupOutput>createRpcResultCondenser("group add"));
+
+        final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
+                Futures.allAsList(allUpdateResults),
+                ReconcileUtil.<UpdateGroupOutput>createRpcResultCondenser("group update"));
+
+        final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
+                Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
+                ReconcileUtil.<Void>createRpcResultCondenser("group add/update"));
+
+
+        return Futures.transform(summaryResult,
+                ReconcileUtil.chainBarrierFlush(
+                        PathUtil.digNodePath(nodeIdent), transactionService));
+    }
+
+    ListenableFuture<RpcResult<Void>> addMissingMeters(final InstanceIdentifier<FlowCapableNode> nodeIdent,
+                                                       final FlowCapableNode flowCapableNodeConfigured,
+                                                       final FlowCapableNode flowCapableNodeOperational,
+                                                       final SyncCrudCounters counters) {
+        final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
+        final List<Meter> metersConfigured = safeMeters(flowCapableNodeConfigured);
+        if (metersConfigured.isEmpty()) {
+            LOG.trace("no meters configured for node: {} -> SKIPPING", nodeId.getValue());
+            return RpcResultBuilder.<Void>success().buildFuture();
+        }
+
+        final List<Meter> metersOperational = safeMeters(flowCapableNodeOperational);
+
+        return addMissingMeters(nodeId, nodeIdent, metersConfigured, metersOperational, counters);
+    }
+
+
+    protected ListenableFuture<RpcResult<Void>> addMissingMeters(NodeId nodeId,
+                                                                 final InstanceIdentifier<FlowCapableNode> nodeIdent,
+                                                                 List<Meter> metersConfigured,
+                                                                 List<Meter> metersOperational,
+                                                                 final SyncCrudCounters counters) {
+
+        final Map<MeterId, Meter> meterOperationalMap = FlowCapableNodeLookups.wrapMetersToMap(metersOperational);
+        final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
+
+        final List<ListenableFuture<RpcResult<AddMeterOutput>>> allResults = new ArrayList<>();
+        final List<ListenableFuture<RpcResult<UpdateMeterOutput>>> allUpdateResults = new ArrayList<>();
+        for (Meter meter : metersConfigured) {
+            final Meter existingMeter = meterOperationalMap.get(meter.getMeterId());
+            final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, meter.getKey());
+
+            if (existingMeter == null) {
+                LOG.debug("adding meter {} - absent on device {}",
+                        meter.getMeterId(), nodeId);
+                allResults.add(JdkFutureAdapters.listenInPoolThread(
+                        meterForwarder.add(meterIdent, meter, nodeIdent)));
+                meterCrudCounts.incAdded();
+            } else {
+                // compare content and eventually update
+                LOG.trace("meter {} - already present on device {} .. comparing", meter.getMeterId(), nodeId);
+                if (!meter.equals(existingMeter)) {
+                    LOG.trace("meter {} - needs update on device {}", meter.getMeterId(), nodeId);
+                    allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
+                            meterForwarder.update(meterIdent, existingMeter, meter, nodeIdent)));
+                    meterCrudCounts.incUpdated();
+                } else {
+                    LOG.trace("meter {} - on device {} is equal to the configured one", meter.getMeterId(), nodeId);
+                }
+            }
+        }
+
+        final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
+                Futures.allAsList(allResults), ReconcileUtil.<AddMeterOutput>createRpcResultCondenser("meter add"));
+
+        final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
+                Futures.allAsList(allUpdateResults),
+                ReconcileUtil.<UpdateMeterOutput>createRpcResultCondenser("meter update"));
+
+        final ListenableFuture<RpcResult<Void>> summaryResults = Futures.transform(
+                Futures.allAsList(singleVoidUpdateResult, singleVoidAddResult),
+                ReconcileUtil.<Void>createRpcResultCondenser("meter add/update"));
+
+        return summaryResults;
+
+        /*
+        return Futures.transform(summaryResults,
+                ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
+                */
+    }
+
+    @VisibleForTesting
+    ListenableFuture<RpcResult<Void>> addMissingFlows(final InstanceIdentifier<FlowCapableNode> nodeIdent,
+                                                      final FlowCapableNode flowCapableNodeConfigured,
+                                                      final FlowCapableNode flowCapableNodeOperational,
+                                                      final SyncCrudCounters counters) {
+        final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
+        final List<Table> tablesConfigured = safeTables(flowCapableNodeConfigured);
+        if (tablesConfigured.isEmpty()) {
+            LOG.trace("no tables in config for node: {} -> SKIPPING", nodeId.getValue());
+            return RpcResultBuilder.<Void>success().buildFuture();
+        }
+
+        final List<Table> tablesOperational = safeTables(flowCapableNodeOperational);
+
+        return addMissingFlows(nodeId, nodeIdent, tablesConfigured, tablesOperational, counters);
+    }
+
+    protected ListenableFuture<RpcResult<Void>> addMissingFlows(NodeId nodeId,
+                                                                final InstanceIdentifier<FlowCapableNode> nodeIdent,
+                                                                List<Table> tablesConfigured, List<Table> tablesOperational,
+                                                                final SyncCrudCounters counters) {
+
+        final Map<Short, Table> tableOperationalMap = FlowCapableNodeLookups.wrapTablesToMap(tablesOperational);
+        final List<ListenableFuture<RpcResult<AddFlowOutput>>> allResults = new ArrayList<>();
+        final List<ListenableFuture<RpcResult<UpdateFlowOutput>>> allUpdateResults = new ArrayList<>();
+        final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
+
+        for (final Table tableConfigured : tablesConfigured) {
+            final List<Flow> flowsConfigured = tableConfigured.getFlow();
+            if (flowsConfigured == null || flowsConfigured.isEmpty()) {
+                continue;
+            }
+
+            final KeyedInstanceIdentifier<Table, TableKey> tableIdent =
+                    nodeIdent.child(Table.class, tableConfigured.getKey());
+
+            // lookup table (on device)
+            final Table tableOperational = tableOperationalMap.get(tableConfigured.getId());
+            // wrap existing (on device) flows in current table into map
+            final Map<SwitchFlowId, Flow> flowOperationalMap = FlowCapableNodeLookups.wrapFlowsToMap(
+                    tableOperational != null
+                            ? tableOperational.getFlow()
+                            : null);
+
+
+            // loop configured flows and check if already present on device
+            for (final Flow flow : flowsConfigured) {
+                final Flow existingFlow = FlowCapableNodeLookups.flowMapLookupExisting(flow, flowOperationalMap);
+                final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.getKey());
+
+                if (existingFlow == null) {
+                    LOG.debug("adding flow {} in table {} - absent on device {} match{}",
+                            flow.getId(), tableConfigured.getKey(), nodeId, flow.getMatch());
+
+                    allResults.add(JdkFutureAdapters.listenInPoolThread(
+                            flowForwarder.add(flowIdent, flow, nodeIdent)));
+                    flowCrudCounts.incAdded();
+                } else {
+                    LOG.trace("flow {} in table {} - already present on device {} .. comparing match{}",
+                            flow.getId(), tableConfigured.getKey(), nodeId, flow.getMatch());
+                    // check instructions and eventually update
+                    if (!Objects.equals(flow.getInstructions(), existingFlow.getInstructions())) {
+                        LOG.trace("flow {} in table {} - needs update on device {} match{}",
+                                flow.getId(), tableConfigured.getKey(), nodeId, flow.getMatch());
+                        allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
+                                flowForwarder.update(flowIdent, existingFlow, flow, nodeIdent)));
+                        flowCrudCounts.incUpdated();
+                    } else {
+                        LOG.trace("flow {} in table {} - is equal to configured one on device {} match{}",
+                                flow.getId(), tableConfigured.getKey(), nodeId, flow.getMatch());
+                    }
+                }
+            }
+        }
+
+        final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
+                Futures.allAsList(allResults),
+                ReconcileUtil.<AddFlowOutput>createRpcResultCondenser("flow adding"));
+
+        final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
+                Futures.allAsList(allUpdateResults),
+                ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("flow updating"));
+
+        final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
+                Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
+                ReconcileUtil.<Void>createRpcResultCondenser("flow add/update"));
+
+        return summaryResult;
+
+        /*
+        return Futures.transform(summaryResult,
+                ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
+                */
+    }
+
+    @VisibleForTesting
+    ListenableFuture<RpcResult<Void>> removeRedundantFlows(
+            final InstanceIdentifier<FlowCapableNode> nodeIdent,
+            final FlowCapableNode flowCapableNodeConfigured,
+            final FlowCapableNode flowCapableNodeOperational,
+            final SyncCrudCounters counters) {
+        final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
+        final List<Table> tablesOperational = safeTables(flowCapableNodeOperational);
+
+        if (tablesOperational.isEmpty()) {
+            LOG.trace("no tables in operational for node: {} -> SKIPPING", nodeId.getValue());
+            return RpcResultBuilder.<Void>success().buildFuture();
+        }
+
+        final List<Table> tablesConfigured = safeTables(flowCapableNodeConfigured);
+
+        return removeRedundantFlows(nodeId, nodeIdent, tablesConfigured, tablesOperational, counters);
+    }
+
+    protected ListenableFuture<RpcResult<Void>> removeRedundantFlows(
+            NodeId nodeId, final InstanceIdentifier<FlowCapableNode> nodeIdent,
+            final List<Table> tablesConfigured, final List<Table> tablesOperational, final SyncCrudCounters counters) {
+        final Map<Short, Table> tableConfigMap = FlowCapableNodeLookups.wrapTablesToMap(tablesConfigured);
+        final List<ListenableFuture<RpcResult<RemoveFlowOutput>>> allResults = new ArrayList<>();
+        final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
+
+        for (final Table tableOperational : tablesOperational) {
+            final List<Flow> flowsOperational = tableOperational.getFlow();
+            if (flowsOperational == null || flowsOperational.isEmpty()) {
+                continue;
+            }
+
+            final KeyedInstanceIdentifier<Table, TableKey> tableIdent =
+                    nodeIdent.child(Table.class, tableOperational.getKey());
+
+            // lookup configured table
+            final Table tableConfig = tableConfigMap.get(tableOperational.getId());
+            // wrap configured flows in current table into map
+            final Map<SwitchFlowId, Flow> flowConfigMap = FlowCapableNodeLookups.wrapFlowsToMap(
+                    tableConfig != null
+                            ? tableConfig.getFlow()
+                            : null);
+
+            // loop flows on device and check if the are configured
+            for (final Flow flow : flowsOperational) {
+                final Flow existingFlow = FlowCapableNodeLookups.flowMapLookupExisting(flow, flowConfigMap);
+                if (existingFlow == null) {
+                    LOG.trace("removing flow {} in table {} - absent in config {}, match {}",
+                            flow.getId(), tableOperational.getKey(), nodeId, flow.getMatch());
+
+                    final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
+                            tableIdent.child(Flow.class, flow.getKey());
+                    allResults.add(JdkFutureAdapters.listenInPoolThread(
+                            flowForwarder.remove(flowIdent, flow, nodeIdent)));
+                    flowCrudCounts.incRemoved();
+                } else {
+                    LOG.trace("skipping flow {} in table {} - present in config {}, match {}",
+                            flow.getId(), tableOperational.getKey(), nodeId, flow.getMatch());
+                }
+            }
+        }
+
+        final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
+                Futures.allAsList(allResults), ReconcileUtil.<RemoveFlowOutput>createRpcResultCondenser("flow remove"));
+        return Futures.transform(singleVoidResult,
+                ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
+
+    }
+
+    @VisibleForTesting
+    ListenableFuture<RpcResult<Void>> removeRedundantMeters(final InstanceIdentifier<FlowCapableNode> nodeIdent,
+                                                            final FlowCapableNode flowCapableNodeConfigured,
+                                                            final FlowCapableNode flowCapableNodeOperational,
+                                                            final SyncCrudCounters counters) {
+
+        final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
+        final List<Meter> metersOperational = safeMeters(flowCapableNodeOperational);
+        if (metersOperational.isEmpty()) {
+            LOG.trace("no meters on device for node: {} -> SKIPPING", nodeId.getValue());
+            return RpcResultBuilder.<Void>success().buildFuture();
+        }
+
+        final List<Meter> metersConfigured = safeMeters(flowCapableNodeConfigured);
+
+        return removeRedundantMeters(nodeId, nodeIdent, metersConfigured, metersOperational, counters);
+    }
+
+
+    protected ListenableFuture<RpcResult<Void>> removeRedundantMeters(NodeId nodeId,
+                                                                      final InstanceIdentifier<FlowCapableNode> nodeIdent,
+                                                                      List<Meter> metersConfigured,
+                                                                      List<Meter> metersOperational,
+                                                                      final SyncCrudCounters counters) {
+
+        final Map<MeterId, Meter> meterConfigMap = FlowCapableNodeLookups.wrapMetersToMap(metersConfigured);
+        final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
+
+        final List<ListenableFuture<RpcResult<RemoveMeterOutput>>> allResults = new ArrayList<>();
+        for (Meter meter : metersOperational) {
+            if (!meterConfigMap.containsKey(meter.getMeterId())) {
+                LOG.trace("removing meter {} - absent in config {}",
+                        meter.getMeterId(), nodeId);
+                final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
+                        nodeIdent.child(Meter.class, meter.getKey());
+                allResults.add(JdkFutureAdapters.listenInPoolThread(
+                        meterForwarder.remove(meterIdent, meter, nodeIdent)));
+                meterCrudCounts.incRemoved();
+            } else {
+                LOG.trace("skipping meter {} - present in config {}",
+                        meter.getMeterId(), nodeId);
+            }
+        }
+
+        final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
+                Futures.allAsList(allResults),
+                ReconcileUtil.<RemoveMeterOutput>createRpcResultCondenser("meter remove"));
+        return singleVoidResult;
+        /*
+        return Futures.transform(singleVoidResult,
+                ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
+                */
+    }
+
+    @VisibleForTesting
+    ListenableFuture<RpcResult<Void>> removeRedundantGroups(final InstanceIdentifier<FlowCapableNode> nodeIdent,
+                                                            final FlowCapableNode flowCapableNodeConfigured,
+                                                            final FlowCapableNode flowCapableNodeOperational,
+                                                            final SyncCrudCounters counters) {
+        final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
+        final List<Group> groupsOperational = safeGroups(flowCapableNodeOperational);
+        if (groupsOperational == null || groupsOperational.isEmpty()) {
+            LOG.trace("no groups on device for node: {} -> SKIPPING", nodeId.getValue());
+            return RpcResultBuilder.<Void>success().buildFuture();
+        }
+
+        final List<Group> groupsConfigured = safeGroups(flowCapableNodeConfigured);
+
+        return removeRedundantGroups(nodeId, nodeIdent, groupsConfigured, groupsOperational, counters);
+    }
+
+    ListenableFuture<RpcResult<Void>> removeRedundantGroups(NodeId nodeId,
+                                                            final InstanceIdentifier<FlowCapableNode> nodeIdent,
+                                                            List<Group> groupsConfigured, List<Group> groupsOperational,
+                                                            final SyncCrudCounters counters) {
+
+        final Map<Long, Group> groupConfigMap = FlowCapableNodeLookups.wrapGroupsToMap(groupsConfigured);
+        final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
+
+        final List<Group> pendingGroups = new ArrayList<>();
+        pendingGroups.addAll(groupsOperational);
+
+        ListenableFuture<RpcResult<Void>> chainedResult;
+        try {
+            final List<ItemSyncBox<Group>> groupsRemovePlan =
+                    ReconcileUtil.resolveAndDivideGroups(nodeId, groupConfigMap, pendingGroups, false);
+            if (!groupsRemovePlan.isEmpty()) {
+                groupCrudCounts.setRemoved(ReconcileUtil.countTotalAdds(groupsRemovePlan));
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("removing groups: inputGroups={}, planSteps={}, toRemoveTotal={}",
+                            pendingGroups.size(), groupsRemovePlan.size(),
+                            groupCrudCounts.getRemoved());
+                }
+                Collections.reverse(groupsRemovePlan);
+                chainedResult = flushRemoveGroupPortionAndBarrier(nodeIdent, groupsRemovePlan.get(0));
+                for (final ItemSyncBox<Group> groupsPortion : Iterables.skip(groupsRemovePlan, 1)) {
+                    chainedResult =
+                            Futures.transform(chainedResult, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
+                                @Override
+                                public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input)
+                                        throws Exception {
+                                    final ListenableFuture<RpcResult<Void>> result;
+                                    if (input.isSuccessful()) {
+                                        result = flushRemoveGroupPortionAndBarrier(nodeIdent, groupsPortion);
+                                    } else {
+                                        // pass through original unsuccessful rpcResult
+                                        result = Futures.immediateFuture(input);
+                                    }
+
+                                    return result;
+                                }
+                            });
+                }
+            } else {
+                chainedResult = RpcResultBuilder.<Void>success().buildFuture();
+            }
+        } catch (IllegalStateException e) {
+            chainedResult = RpcResultBuilder.<Void>failed()
+                    .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
+                    .buildFuture();
+        }
+
+        return chainedResult;
+    }
+
+    static List<Group> safeGroups(FlowCapableNode node) {
+        if (node == null) {
+            return Collections.emptyList();
+        }
+
+        return MoreObjects.firstNonNull(node.getGroup(), ImmutableList.<Group>of());
+    }
+
+    static List<Table> safeTables(FlowCapableNode node) {
+        if (node == null) {
+            return Collections.emptyList();
+        }
+
+        return MoreObjects.firstNonNull(node.getTable(), ImmutableList.<Table>of());
+    }
+
+    static List<Meter> safeMeters(FlowCapableNode node) {
+        if (node == null) {
+            return Collections.emptyList();
+        }
+
+        return MoreObjects.firstNonNull(node.getMeter(), ImmutableList.<Meter>of());
+    }
+
+    private ListenableFuture<RpcResult<Void>> flushRemoveGroupPortionAndBarrier(
+            final InstanceIdentifier<FlowCapableNode> nodeIdent,
+            final ItemSyncBox<Group> groupsPortion) {
+        List<ListenableFuture<RpcResult<RemoveGroupOutput>>> allResults = new ArrayList<>();
+        for (Group group : groupsPortion.getItemsToAdd()) {
+            final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
+            allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.remove(groupIdent, group, nodeIdent)));
+        }
+
+        final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
+                Futures.allAsList(allResults),
+                ReconcileUtil.<RemoveGroupOutput>createRpcResultCondenser("group remove"));
+
+        return Futures.transform(singleVoidResult,
+                ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
+    }
+}
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/markandsweep/SwitchFlowId.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/markandsweep/SwitchFlowId.java
new file mode 100644 (file)
index 0000000..86c8cad
--- /dev/null
@@ -0,0 +1,68 @@
+/**\r
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.\r
+ *\r
+ * This program and the accompanying materials are made available under the\r
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+ * and is available at http://www.eclipse.org/legal/epl-v10.html\r
+ */\r
+\r
+package org.opendaylight.openflowplugin.applications.frsync.markandsweep;\r
+\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;\r
+\r
+/**\r
+ * Identifier of {@link Flow} on device. Switch does not know about flow-id but,\r
+ * it uses combination of these unique fields: table-id, priority, match.\r
+ */\r
+public class SwitchFlowId {\r
+\r
+    private final Short tableId;\r
+\r
+    private final Integer priority;\r
+\r
+    private final Match match;\r
+\r
+    public SwitchFlowId(Flow flow) {\r
+        this.tableId = flow.getTableId();\r
+        this.priority = flow.getPriority();\r
+        this.match = flow.getMatch();\r
+    }\r
+\r
+    @Override\r
+    public int hashCode() {\r
+        final int prime = 31;\r
+        int result = 1;\r
+        result = prime * result + ((match == null) ? 0 : match.hashCode());\r
+        result = prime * result + ((priority == null) ? 0 : priority.hashCode());\r
+        result = prime * result + ((tableId == null) ? 0 : tableId.hashCode());\r
+        return result;\r
+    }\r
+\r
+    @Override\r
+    public boolean equals(Object obj) {\r
+        if (this == obj)\r
+            return true;\r
+        if (obj == null)\r
+            return false;\r
+        if (getClass() != obj.getClass())\r
+            return false;\r
+        SwitchFlowId other = (SwitchFlowId) obj;\r
+        if (match == null) {\r
+            if (other.match != null)\r
+                return false;\r
+        } else if (!match.equals(other.match))\r
+            return false;\r
+        if (priority == null) {\r
+            if (other.priority != null)\r
+                return false;\r
+        } else if (!priority.equals(other.priority))\r
+            return false;\r
+        if (tableId == null) {\r
+            if (other.tableId != null)\r
+                return false;\r
+        } else if (!tableId.equals(other.tableId))\r
+            return false;\r
+        return true;\r
+    }\r
+}\r
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/CrudCounts.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/CrudCounts.java
new file mode 100644 (file)
index 0000000..a6fdd2d
--- /dev/null
@@ -0,0 +1,53 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.applications.frsync.util;
+
+/**
+ * General placeholder for add/update/remove counts.
+ */
+public class CrudCounts {
+    private int added;
+    private int updated;
+    private int removed;
+
+    public int getAdded() {
+        return added;
+    }
+
+    public void setAdded(final int added) {
+        this.added = added;
+    }
+
+    public int getUpdated() {
+        return updated;
+    }
+
+    public void setUpdated(final int updated) {
+        this.updated = updated;
+    }
+
+    public int getRemoved() {
+        return removed;
+    }
+
+    public void setRemoved(final int removed) {
+        this.removed = removed;
+    }
+
+    public void incAdded() {
+        added++;
+    }
+
+    public void incUpdated() {
+        updated++;
+    }
+
+    public void incRemoved() {
+        removed++;
+    }
+}
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/FlowCapableNodeLookups.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/FlowCapableNodeLookups.java
new file mode 100644 (file)
index 0000000..df3a3db
--- /dev/null
@@ -0,0 +1,111 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.util;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.opendaylight.openflowplugin.applications.frsync.markandsweep.SwitchFlowId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helpers for flow lookups in {@link FlowCapableNode}.
+ */
+public final class FlowCapableNodeLookups {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlowCapableNodeLookups.class);
+
+    private FlowCapableNodeLookups() {
+        throw new IllegalAccessError("non instantiable util class");
+    }
+
+    @Nonnull
+    public static Map<Short, Table> wrapTablesToMap(@Nullable final List<Table> tables) {
+        final Map<Short, Table> tableMap;
+
+        if (tables == null) {
+            tableMap = Collections.emptyMap();
+        } else {
+            LOG.trace("tables found: {}", tables.size());
+            tableMap = new HashMap<>();
+            for (Table table : tables) {
+                tableMap.put(table.getId(), table);
+            }
+        }
+
+        return tableMap;
+    }
+
+    @Nonnull
+    public static Map<SwitchFlowId, Flow> wrapFlowsToMap(@Nullable final List<Flow> flows) {
+        final Map<SwitchFlowId, Flow> flowMap;
+
+        if (flows == null) {
+            flowMap = Collections.emptyMap();
+        } else {
+            LOG.trace("flows found: {}", flows.size());
+            flowMap = new HashMap<>();
+            for (Flow flow : flows) {
+                flowMap.put(new SwitchFlowId(flow), flow);
+            }
+        }
+
+        return flowMap;
+    }
+    
+    public static Flow flowMapLookupExisting(Flow flow, Map<SwitchFlowId, Flow> flowConfigMap) {
+        return flowConfigMap.get(new SwitchFlowId(flow));
+    }
+
+    @Nonnull
+    public static Map<MeterId, Meter> wrapMetersToMap(@Nullable final List<Meter> meters) {
+        final Map<MeterId, Meter> meterMap;
+
+        if (meters == null) {
+            meterMap = Collections.emptyMap();
+        } else {
+            LOG.trace("meters found: {}", meters.size());
+            meterMap = new HashMap<>();
+            for (Meter meter : meters) {
+                meterMap.put(meter.getMeterId(), meter);
+            }
+        }
+
+        return meterMap;
+    }
+
+    @Nonnull
+    public static Map<Long, Group> wrapGroupsToMap(@Nullable final List<Group> groups) {
+        final Map<Long, Group> groupMap;
+
+        if (groups == null) {
+            groupMap = Collections.emptyMap();
+        } else {
+            LOG.trace("groups found: {}", groups.size());
+            groupMap = new HashMap<>();
+            for (Group group : groups) {
+                groupMap.put(group.getGroupId().getValue(), group);
+            }
+        }
+
+        return groupMap;
+    }
+}
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/ItemSyncBox.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/ItemSyncBox.java
new file mode 100644 (file)
index 0000000..d83dfab
--- /dev/null
@@ -0,0 +1,57 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.util;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Holder for items to be pushed to device.
+ * Contains two sets of groups -set of items to be pushed and set of tuples for update.
+ */
+public class ItemSyncBox<I> {
+
+    private Set<I> itemsToAdd = new HashSet<>();
+    private Set<ItemUpdateTuple<I>> itemsToUpdate = new HashSet<>();
+
+    public Set<I> getItemsToAdd() {
+        return itemsToAdd;
+    }
+
+    public Set<ItemUpdateTuple<I>> getItemsToUpdate() {
+        return itemsToUpdate;
+    }
+
+    public boolean isEmpty() {
+        return itemsToAdd.isEmpty() && itemsToUpdate.isEmpty();
+    }
+
+    /**
+     * Tuple holder for original and updated item
+     *
+     * @param <I> basic type
+     */
+    public static final class ItemUpdateTuple<I> {
+        private final I original;
+        private final I updated;
+
+        public ItemUpdateTuple(I original, I updated) {
+            this.original = original;
+            this.updated = updated;
+        }
+
+        public I getOriginal() {
+            return original;
+        }
+
+        public I getUpdated() {
+            return updated;
+        }
+    }
+}
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/PathUtil.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/PathUtil.java
new file mode 100644 (file)
index 0000000..f4a91c2
--- /dev/null
@@ -0,0 +1,28 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.util;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+/**
+ * Basic {@link InstanceIdentifier} related tools.
+ */
+public class PathUtil {
+    public static NodeId digNodeId(final InstanceIdentifier<?> nodeIdent) {
+        return nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId();
+    }
+
+    public static InstanceIdentifier<Node> digNodePath(final InstanceIdentifier<FlowCapableNode> nodeIdent) {
+        return nodeIdent.firstIdentifierOf(Node.class);
+    }
+}
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/ReconcileUtil.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/ReconcileUtil.java
new file mode 100644 (file)
index 0000000..b041b49
--- /dev/null
@@ -0,0 +1,201 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Util methods for group reconcil task (future chaining, transforms).
+ */
+public class ReconcileUtil {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReconcileUtil.class);
+
+    /**
+     * @param previousItemAction description for case when the triggering future contains failure
+     * @param <D>                type of rpc output (gathered in list)
+     * @return single rpc result of type Void honoring all partial rpc results
+     */
+    public static <D> Function<List<RpcResult<D>>, RpcResult<Void>> createRpcResultCondenser(final String previousItemAction) {
+        return new Function<List<RpcResult<D>>, RpcResult<Void>>() {
+            @Nullable
+            @Override
+            public RpcResult<Void> apply(@Nullable final List<RpcResult<D>> input) {
+                final RpcResultBuilder<Void> resultSink;
+                if (input != null) {
+                    List<RpcError> errors = new ArrayList<>();
+                    for (RpcResult<D> rpcResult : input) {
+                        if (!rpcResult.isSuccessful()) {
+                            errors.addAll(rpcResult.getErrors());
+                        }
+                    }
+                    if (errors.isEmpty()) {
+                        resultSink = RpcResultBuilder.success();
+                    } else {
+                        resultSink = RpcResultBuilder.<Void>failed().withRpcErrors(errors);
+                    }
+                } else {
+                    resultSink = RpcResultBuilder.<Void>failed()
+                            .withError(RpcError.ErrorType.APPLICATION, "previous " + previousItemAction + " failed");
+
+                }
+
+                return resultSink.build();
+            }
+        };
+    }
+
+    /**
+     * @param nodeIdent                     flow capable node path - target device for routed rpc
+     * @param flowCapableTransactionService barrier rpc service
+     * @return async barrier result
+     */
+    public static AsyncFunction<RpcResult<Void>, RpcResult<Void>> chainBarrierFlush(
+            final InstanceIdentifier<Node> nodeIdent,
+            final FlowCapableTransactionService flowCapableTransactionService) {
+        return new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
+            @Override
+            public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
+                final SendBarrierInput barrierInput = new SendBarrierInputBuilder()
+                        .setNode(new NodeRef(nodeIdent))
+                        .build();
+                return JdkFutureAdapters.listenInPoolThread(flowCapableTransactionService.sendBarrier(barrierInput));
+            }
+        };
+    }
+
+    public static List<ItemSyncBox<Group>> resolveAndDivideGroups(final NodeId nodeId,
+                                                                  final Map<Long, Group> installedGroupsArg,
+                                                                  final Collection<Group> pendingGroups) {
+        return resolveAndDivideGroups(nodeId, installedGroupsArg, pendingGroups, true);
+    }
+
+    public static List<ItemSyncBox<Group>> resolveAndDivideGroups(final NodeId nodeId,
+                                                                  final Map<Long, Group> installedGroupsArg,
+                                                                  final Collection<Group> pendingGroups,
+                                                                  final boolean gatherUpdates) {
+
+        final Map<Long, Group> installedGroups = new HashMap<>(installedGroupsArg);
+        final List<ItemSyncBox<Group>> plan = new ArrayList<>();
+
+        while (!Iterables.isEmpty(pendingGroups)) {
+            final ItemSyncBox<Group> stepPlan = new ItemSyncBox<>();
+            final Iterator<Group> iterator = pendingGroups.iterator();
+            final Map<Long, Group> installIncrement = new HashMap<>();
+
+            while (iterator.hasNext()) {
+                final Group group = iterator.next();
+
+                final Group existingGroup = installedGroups.get(group.getGroupId().getValue());
+                if (existingGroup != null) {
+                    if (!gatherUpdates) {
+                        iterator.remove();
+                    } else {
+                        // check buckets and eventually update
+                        if (group.equals(existingGroup)) {
+                            iterator.remove();
+                        } else {
+                            if (checkGroupPrecondition(installedGroups.keySet(), group)) {
+                                iterator.remove();
+                                LOG.trace("Group {} on device {} differs - planned for update", group.getGroupId(), nodeId);
+                                stepPlan.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingGroup, group));
+                            }
+                        }
+                    }
+                } else if (checkGroupPrecondition(installedGroups.keySet(), group)) {
+                    iterator.remove();
+                    installIncrement.put(group.getGroupId().getValue(), group);
+                    stepPlan.getItemsToAdd().add(group);
+                }
+            }
+
+            if (!stepPlan.isEmpty()) {
+                // atomic update of installed flows in order to keep plan portions clean of local group dependencies
+                installedGroups.putAll(installIncrement);
+                plan.add(stepPlan);
+            } else if (!pendingGroups.isEmpty()) {
+                LOG.warn("Failed to resolve and divide groups into preconditions-match based ordered plan: {}, " +
+                        "resolving stuck at level {}", nodeId.getValue(), plan.size());
+                throw new IllegalStateException("Failed to resolve and divide groups when matching preconditions");
+            }
+        }
+
+        return plan;
+    }
+
+    public static boolean checkGroupPrecondition(final Set<Long> installedGroupIds, final Group pendingGroup) {
+        boolean okToInstall = true;
+        // check each bucket in the pending group
+        for (Bucket bucket : pendingGroup.getBuckets().getBucket()) {
+            for (Action action : bucket.getAction()) {
+                // if the output action is a group
+                if (GroupActionCase.class.equals(action.getAction().getImplementedInterface())) {
+                    Long groupId = ((GroupActionCase) (action.getAction())).getGroupAction().getGroupId();
+                    // see if that output group is installed
+                    if (!installedGroupIds.contains(groupId)) {
+                        // if not installed, we have missing dependencies and cannot install this pending group
+                        okToInstall = false;
+                        break;
+                    }
+                }
+            }
+            if (!okToInstall) {
+                break;
+            }
+        }
+        return okToInstall;
+    }
+
+    public static <E> int countTotalAdds(final List<ItemSyncBox<E>> groupsAddPlan) {
+        int count = 0;
+        for (ItemSyncBox<E> groupItemSyncBox : groupsAddPlan) {
+            count += groupItemSyncBox.getItemsToAdd().size();
+        }
+        return count;
+    }
+
+    public static <E> int countTotalUpdated(final List<ItemSyncBox<E>> groupsAddPlan) {
+        int count = 0;
+        for (ItemSyncBox<E> groupItemSyncBox : groupsAddPlan) {
+            count += groupItemSyncBox.getItemsToUpdate().size();
+        }
+        return count;
+    }
+}
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/SyncCrudCounters.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/SyncCrudCounters.java
new file mode 100644 (file)
index 0000000..34a670c
--- /dev/null
@@ -0,0 +1,38 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.util;
+
+/**
+ * One-shot (per sync) placeholder for counts of added/updated/removed flows/groups/meters.
+ */
+public class SyncCrudCounters {
+
+    private final CrudCounts flowCrudCounts;
+    private final CrudCounts groupCrudCounts;
+    private final CrudCounts meterCrudCounts;
+
+    public SyncCrudCounters() {
+        flowCrudCounts = new CrudCounts();
+        groupCrudCounts = new CrudCounts();
+        meterCrudCounts = new CrudCounts();
+    }
+
+    public CrudCounts getFlowCrudCounts() {
+        return flowCrudCounts;
+    }
+
+    public CrudCounts getGroupCrudCounts() {
+        return groupCrudCounts;
+    }
+
+    public CrudCounts getMeterCrudCounts() {
+        return meterCrudCounts;
+    }
+
+}