Bug 5577 Retry mechanism 09/39309/19
authorAndrej Leitner <anleitne@cisco.com>
Mon, 23 May 2016 08:02:01 +0000 (10:02 +0200)
committerAndrej Leitner <anleitne@cisco.com>
Mon, 20 Jun 2016 14:36:13 +0000 (16:36 +0200)
  - added RetryDecorator
  - added RetryRegistry for fresh operational registration
  - added successor of operational listener for retry
  - updated FutureZipDecorator compression
  - changed wiring in provider and using now only one reactor
  - added and updated unit tests

Change-Id: I29d34ab1ce2cc3181cab73066f9ca1078e2df0a1
Signed-off-by: Andrej Leitner <anleitne@cisco.com>
21 files changed:
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/SyncReactor.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProvider.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedConfigListener.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalListener.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalRetryListener.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureDecorator.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureWithCompressionDecorator.java [deleted file]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecorator.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecorator.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorImpl.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorRetryDecorator.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/RetryRegistry.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/ZipQueueEntry.java [new file with mode: 0644]
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedConfigListenerTest.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalListenerTest.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalRetryListenerTest.java [new file with mode: 0644]
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecoratorTest.java [new file with mode: 0644]
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecoratorTest.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorImplTest.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorRetryDecoratorTest.java [new file with mode: 0644]
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/util/RetryRegistryTest.java [new file with mode: 0644]

index 141e3b465ae915b87fa06c0e1490d9c7ae7f68bd..430310bb4d339a737abe62f932f8de3d81fe7e6e 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.openflowplugin.applications.frsync;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
@@ -20,9 +21,11 @@ public interface SyncReactor {
      * @param flowcapableNodePath path to openflow augmentation of node
      * @param configTree configured node
      * @param operationalTree device reflection
+     * @param dsType type of DS change
      * @return synchronization outcome
      */
     ListenableFuture<Boolean> syncup(InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-            FlowCapableNode configTree, FlowCapableNode operationalTree) throws InterruptedException;
+                                     FlowCapableNode configTree, FlowCapableNode operationalTree,
+                                     LogicalDatastoreType dsType) throws InterruptedException;
 
 }
index a35cde26003ea1ab3dc2008615f3deabc32e1e36..9376f8320e079121629197966fdd2b2404aa6edb 100644 (file)
@@ -27,6 +27,7 @@ import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDa
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
 import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SyncPlanPushStrategyFlatBatchImpl;
+import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
 import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
 import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
@@ -66,7 +67,6 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
     private ListenerRegistration<NodeListener> dataTreeConfigChangeListener;
     private ListenerRegistration<NodeListener> dataTreeOperationalChangeListener;
 
-
     public ForwardingRulesSyncProvider(final BindingAwareBroker broker,
                                        final DataBroker dataBroker,
                                        final RpcConsumerRegistry rpcRegistry) {
@@ -106,12 +106,14 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
                 .setFlatBatchService(flatBatchService)
                 .setTableForwarder(tableForwarder);
 
-        final SyncReactorImpl syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
-        final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorImpl,
+        final RetryRegistry retryRegistry = new RetryRegistry();
+
+        final SyncReactor syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
+        final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, retryRegistry);
+        final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry,
                 new SemaphoreKeeperGuavaImpl<InstanceIdentifier<FlowCapableNode>>(1, true));
 
-        final SyncReactor cfgReactor = new SyncReactorFutureWithCompressionDecorator(syncReactorGuard, syncThreadPool);
-        final SyncReactor operReactor = new SyncReactorFutureWithCompressionDecorator(syncReactorGuard, syncThreadPool);
+        final SyncReactor reactor = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
 
         final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
         final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
@@ -120,8 +122,10 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
         final FlowCapableNodeDao operationalDao = new FlowCapableNodeCachedDao(operationalSnapshot,
                 new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.OPERATIONAL));
 
-        final NodeListener<FlowCapableNode> nodeListenerConfig = new SimplifiedConfigListener(cfgReactor, configSnapshot, operationalDao);
-        final NodeListener<Node> nodeListenerOperational = new SimplifiedOperationalListener(operReactor, operationalSnapshot, configDao);
+        final NodeListener<FlowCapableNode> nodeListenerConfig =
+                new SimplifiedConfigListener(reactor, configSnapshot, operationalDao);
+        final NodeListener<Node> nodeListenerOperational =
+                new SimplifiedOperationalRetryListener(reactor, operationalSnapshot, configDao, retryRegistry);
 
         try {
             SimpleTaskRetryLooper looper1 = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
index c2298aa7dec155ac6b99bfbe26794d47a0de3779..298fb6d15df4505c3286139f4088d2afe912882c 100644 (file)
@@ -63,7 +63,7 @@ public class SimplifiedConfigListener extends AbstractFrmSyncListener<FlowCapabl
         final Optional<FlowCapableNode> operationalNode = operationalDao.loadByNodeId(nodeId);
         if (!operationalNode.isPresent()) {
             LOG.info("Skip syncup, {} operational is not present", nodeId.getValue());
-            return Optional.absent();// we try to reconfigure switch is alive
+            return Optional.absent();
         }
 
         final DataObjectModification<FlowCapableNode> configModification = modification.getRootNode();
@@ -95,9 +95,7 @@ public class SimplifiedConfigListener extends AbstractFrmSyncListener<FlowCapabl
             FlowCapableNode dataBefore, FlowCapableNode dataAfter, FlowCapableNode operationalNode)
                     throws InterruptedException {
         LOG.trace("onNodeAdded {}", nodePath);
-        
-        final ListenableFuture<Boolean> endResult =
-                reactor.syncup(nodePath, dataAfter, operationalNode);
+        final ListenableFuture<Boolean> endResult = reactor.syncup(nodePath, dataAfter, operationalNode, dsType());
         return endResult;
     }
 
@@ -112,9 +110,7 @@ public class SimplifiedConfigListener extends AbstractFrmSyncListener<FlowCapabl
             FlowCapableNode dataBefore, FlowCapableNode dataAfter, FlowCapableNode operationalNodeNode)
                     throws InterruptedException {
         LOG.trace("onNodeUpdated {}", nodePath);
-        
-        final ListenableFuture<Boolean> endResult =
-                reactor.syncup(nodePath, dataAfter, dataBefore);
+        final ListenableFuture<Boolean> endResult = reactor.syncup(nodePath, dataAfter, dataBefore, dsType());
         return endResult;
     }
 
@@ -126,9 +122,7 @@ public class SimplifiedConfigListener extends AbstractFrmSyncListener<FlowCapabl
     protected ListenableFuture<Boolean> onNodeDeleted(InstanceIdentifier<FlowCapableNode> nodePath,
             FlowCapableNode dataBefore, FlowCapableNode operationalNode) throws InterruptedException {
         LOG.trace("onNodeDeleted {}", nodePath);
-        
-        final ListenableFuture<Boolean> endResult =
-                reactor.syncup(nodePath, null, dataBefore);
+        final ListenableFuture<Boolean> endResult = reactor.syncup(nodePath, null, dataBefore, dsType());
         return endResult;
     }
 
index 573ca80d68d66a9314271af5653d167561d037b9..2563fa9dedac977487d8595e02dd4a673c9fcb89 100644 (file)
@@ -36,12 +36,12 @@ import org.slf4j.LoggerFactory;
 public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node> {
     private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalListener.class);
 
-    private final SyncReactor reactor;
-    private FlowCapableNodeSnapshotDao operationalSnapshot;
-    private FlowCapableNodeDao configDao;
+    protected final SyncReactor reactor;
+    protected final FlowCapableNodeSnapshotDao operationalSnapshot;
+    protected final FlowCapableNodeDao configDao;
 
-    public SimplifiedOperationalListener(SyncReactor reactor,
-            FlowCapableNodeSnapshotDao operationalSnapshot, FlowCapableNodeDao configDao) {
+    public SimplifiedOperationalListener(SyncReactor reactor, FlowCapableNodeSnapshotDao operationalSnapshot,
+                                         FlowCapableNodeDao configDao) {
         this.reactor = reactor;
         this.operationalSnapshot = operationalSnapshot;
         this.configDao = configDao;
@@ -65,13 +65,11 @@ public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node>
      */
     protected Optional<ListenableFuture<Boolean>> processNodeModification(
             DataTreeModification<Node> modification) throws ReadFailedException, InterruptedException {
-        updateCache(modification);
 
-        if (isAdd(modification) || isAddLogical(modification)) {
+        updateCache(modification);
+        if (isReconciliationNeeded(modification)) {
             return reconciliation(modification);
         }
-        // TODO: else = explicit reconciliation required
-
         return skipModification(modification);
     }
 
@@ -79,32 +77,28 @@ public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node>
      * Remove if delete. Update only if FlowCapableNode Augmentation modified.
      *
      * @param modification Datastore modification
+     * @return true for cache update, false for cache remove
      */
-    protected void updateCache(DataTreeModification<Node> modification) {
-        try {
-            boolean isDelete = isDelete(modification) || isDeleteLogical(modification);
-            if (isDelete) {
-                operationalSnapshot.updateCache(nodeId(modification), Optional.<FlowCapableNode>absent());
-                return;
-            }
-
-            operationalSnapshot.updateCache(nodeId(modification), Optional.fromNullable(flowCapableNodeAfter(modification)));
-        } catch(Exception e) {
-            LOG.error("update cache failed {}", nodeId(modification), e);
+    protected boolean updateCache(DataTreeModification<Node> modification) {
+        if (isDelete(modification) || isDeleteLogical(modification)) {
+            operationalSnapshot.updateCache(nodeId(modification), Optional.<FlowCapableNode>absent());
+            return false;
         }
+        operationalSnapshot.updateCache(nodeId(modification), Optional.fromNullable(flowCapableNodeAfter(modification)));
+        return true;
     }
 
-    protected Optional<ListenableFuture<Boolean>> skipModification(DataTreeModification<Node> modification) {
+    private Optional<ListenableFuture<Boolean>> skipModification(DataTreeModification<Node> modification) {
         LOG.trace("Skipping Inventory Operational modification {}, before {}, after {}", nodeIdValue(modification),
                 modification.getRootNode().getDataBefore() == null ? "null" : "nonnull",
                 modification.getRootNode().getDataAfter() == null ? "null" : "nonnull");
-        return Optional.absent();// skip otherwise event
+        return Optional.absent();
     }
 
     /**
      * ModificationType.DELETE
      */
-    protected boolean isDelete(DataTreeModification<Node> modification) {
+    private boolean isDelete(DataTreeModification<Node> modification) {
         if (ModificationType.DELETE == modification.getRootNode().getModificationType()) {
             LOG.trace("Delete {} (physical)", nodeIdValue(modification));
             return true;
@@ -116,7 +110,7 @@ public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node>
     /**
      * All connectors disappeared from operational store (logical delete).
      */
-    protected boolean isDeleteLogical(DataTreeModification<Node> modification) {
+    private boolean isDeleteLogical(DataTreeModification<Node> modification) {
         final DataObjectModification<Node> rootNode = modification.getRootNode();
         if (!safeConnectorsEmpty(rootNode.getDataBefore()) && safeConnectorsEmpty(rootNode.getDataAfter())) {
             LOG.trace("Delete {} (logical)", nodeIdValue(modification));
@@ -126,7 +120,7 @@ public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node>
         return false;
     }
 
-    protected boolean isAdd(DataTreeModification<Node> modification) {
+    private boolean isAdd(DataTreeModification<Node> modification) {
         final DataObjectModification<Node> rootNode = modification.getRootNode();
         final Node dataAfter = rootNode.getDataAfter();
         final Node dataBefore = rootNode.getDataBefore();
@@ -141,7 +135,7 @@ public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node>
     /**
      * All connectors appeared in operational store (logical add).
      */
-    protected boolean isAddLogical(DataTreeModification<Node> modification) {
+    private boolean isAddLogical(DataTreeModification<Node> modification) {
         final DataObjectModification<Node> rootNode = modification.getRootNode();
         if (safeConnectorsEmpty(rootNode.getDataBefore()) && !safeConnectorsEmpty(rootNode.getDataAfter())) {
             LOG.trace("Add {} (logical)", nodeIdValue(modification));
@@ -151,20 +145,22 @@ public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node>
         return false;
     }
 
-    protected Optional<ListenableFuture<Boolean>> reconciliation(
-            DataTreeModification<Node> modification) throws InterruptedException {
-        final NodeId nodeId = nodeId(modification);
-
-        LOG.debug("Reconciliation: {}", nodeId.getValue());
+    protected boolean isReconciliationNeeded(DataTreeModification<Node> modification) {
+        return isAdd(modification) || isAddLogical(modification);
+    }
 
+    private Optional<ListenableFuture<Boolean>> reconciliation(DataTreeModification<Node> modification) throws InterruptedException {
+        final NodeId nodeId = nodeId(modification);
         final Optional<FlowCapableNode> nodeConfiguration = configDao.loadByNodeId(nodeId);
-        final InstanceIdentifier<FlowCapableNode> nodePath = InstanceIdentifier.create(Nodes.class)
-                .child(Node.class, new NodeKey(nodeId(modification))).augmentation(FlowCapableNode.class);
 
-        if (nodeConfiguration.isPresent())
-            return Optional.of(reactor.syncup(nodePath, nodeConfiguration.get(), flowCapableNodeAfter(modification)));
-        else
+        if (nodeConfiguration.isPresent()) {
+            LOG.debug("Reconciliation: {}", nodeId.getValue());
+            final InstanceIdentifier<FlowCapableNode> nodePath = InstanceIdentifier.create(Nodes.class)
+                    .child(Node.class, new NodeKey(nodeId(modification))).augmentation(FlowCapableNode.class);
+            return Optional.of(reactor.syncup(nodePath, nodeConfiguration.get(), flowCapableNodeAfter(modification), dsType()));
+        } else {
             return skipModification(modification);
+        }
     }
 
     static FlowCapableNode flowCapableNodeAfter(DataTreeModification<Node> modification) {
@@ -199,7 +195,6 @@ public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node>
         final DataObjectModification<Node> rootNode = modification.getRootNode();
         final Node dataAfter = rootNode.getDataAfter();
 
-
         if (dataAfter != null) {
             return dataAfter.getId();
         }
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalRetryListener.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalRetryListener.java
new file mode 100644 (file)
index 0000000..849a278
--- /dev/null
@@ -0,0 +1,114 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
+import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Modified {@link SimplifiedOperationalListener} for usage of retry mechanism.
+ */
+public class SimplifiedOperationalRetryListener extends SimplifiedOperationalListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalRetryListener.class);
+    private final RetryRegistry retryRegistry;
+
+    public SimplifiedOperationalRetryListener(SyncReactor reactor, FlowCapableNodeSnapshotDao operationalSnapshot,
+                                              FlowCapableNodeDao configDao, RetryRegistry retryRegistry) {
+        super(reactor, operationalSnapshot, configDao);
+        this.retryRegistry = retryRegistry;
+    }
+
+    /**
+     * Adding condition check for retry.
+     *
+     * @param modification operational datastore modification
+     * @return true if reconciliation is needed, false otherwise
+     */
+    protected boolean isReconciliationNeeded(DataTreeModification<Node> modification) {
+        return super.isReconciliationNeeded(modification) || isRegisteredAndConsistentForRetry(modification);
+    }
+
+    /**
+     * If node is removed unregister for retry in addition.
+     *
+     * @param modification operational datastore modification
+     * @return true for cache update, false for cache remove and retry unregister
+     */
+    protected boolean updateCache(DataTreeModification<Node> modification) {
+        boolean nodeUpdated = super.updateCache(modification);
+        if (!nodeUpdated) { // node removed if not updated
+            retryRegistry.unregisterIfRegistered(nodeId(modification));
+        }
+        return nodeUpdated;
+    }
+
+    /**
+     * Check if retry should be proceeded.
+     *
+     * @param modification operational modification
+     * @return true if device is registered for retry and actual modification is consistent, false otherwise
+     */
+    @VisibleForTesting
+    boolean isRegisteredAndConsistentForRetry(DataTreeModification<Node> modification) {
+        final NodeId nodeId = PathUtil.digNodeId(modification.getRootPath().getRootIdentifier());
+
+        if (!retryRegistry.isRegistered(nodeId)) {
+            return false;
+        }
+
+        final FlowCapableStatisticsGatheringStatus gatheringStatus = modification.getRootNode().getDataAfter()
+                .getAugmentation(FlowCapableStatisticsGatheringStatus.class);
+
+        if (gatheringStatus == null) {
+            LOG.trace("Statistics gathering never started for: {}", nodeId.getValue());
+            return false;
+        }
+
+        final SnapshotGatheringStatusEnd gatheringStatusEnd = gatheringStatus.getSnapshotGatheringStatusEnd();
+
+        if (gatheringStatusEnd == null) {
+            LOG.trace("Statistics gathering is not over yet for: {}", nodeId.getValue());
+            return false;
+        }
+
+        if (!gatheringStatusEnd.isSucceeded()) {
+            LOG.debug("Statistics gathering was not successful for: {}", nodeId.getValue());
+            return false;
+        }
+
+        try {
+            Date timestampOfRegistration = retryRegistry.getRegistration(nodeId);;
+            final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(RetryRegistry.DATE_AND_TIME_FORMAT);
+            Date timestampOfStatistics = simpleDateFormat.parse(gatheringStatusEnd.getEnd().getValue());
+            if (timestampOfStatistics.after(timestampOfRegistration)) {
+                LOG.debug("Fresh operational present for: {} -> going retry!", nodeId.getValue());
+                return true;
+            }
+        } catch (ParseException e) {
+            LOG.error("Timestamp parsing error {}", e);
+        }
+        LOG.debug("Fresh operational not present for: {}", nodeId.getValue());
+        return false;
+    }
+}
index 33db6ea9c54f467253abea1125571fba0a2f2f0d..1eaa81731f3bfe05ec3178381e82beac9ea76bfb 100644 (file)
@@ -13,6 +13,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import java.util.concurrent.Callable;\r
 import java.util.concurrent.TimeUnit;\r
 import java.util.concurrent.TimeoutException;\r
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;\r
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
@@ -39,7 +40,8 @@ public class SyncReactorFutureDecorator implements SyncReactor {
     }\r
 \r
     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
-            final FlowCapableNode configTree, final FlowCapableNode operationalTree) throws InterruptedException {\r
+                                            final FlowCapableNode configTree, final FlowCapableNode operationalTree,\r
+                                            final LogicalDatastoreType dsType) throws InterruptedException {\r
         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
         LOG.trace("syncup {}", nodeId.getValue());\r
 \r
@@ -48,7 +50,7 @@ public class SyncReactorFutureDecorator implements SyncReactor {
                 final String oldThreadName = updateThreadName(nodeId);\r
 \r
                 try {\r
-                    final Boolean ret = doSyncupInFuture(flowcapableNodePath, configTree, operationalTree)\r
+                    final Boolean ret = doSyncupInFuture(flowcapableNodePath, configTree, operationalTree, dsType)\r
                             .get(10000, TimeUnit.MILLISECONDS);\r
                     LOG.trace("ret {} {}", nodeId.getValue(), ret);\r
                     return true;\r
@@ -60,17 +62,17 @@ public class SyncReactorFutureDecorator implements SyncReactor {
                 }\r
             }\r
         });\r
-        \r
+\r
         return syncup;\r
     }\r
 \r
     protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
-            final FlowCapableNode configTree, final FlowCapableNode operationalTree)\r
-                    throws InterruptedException {\r
+                                                         final FlowCapableNode configTree, final FlowCapableNode operationalTree,\r
+                                                         final LogicalDatastoreType dsType) throws InterruptedException {\r
         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
         LOG.trace("doSyncupInFuture {}", nodeId.getValue());\r
 \r
-        return delegate.syncup(flowcapableNodePath, configTree, operationalTree);\r
+        return delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);\r
     }\r
 \r
     static String threadName() {\r
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureWithCompressionDecorator.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureWithCompressionDecorator.java
deleted file mode 100644 (file)
index ce7c579..0000000
+++ /dev/null
@@ -1,104 +0,0 @@
-/**\r
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-\r
-package org.opendaylight.openflowplugin.applications.frsync.impl;\r
-\r
-import com.google.common.util.concurrent.Futures;\r
-import com.google.common.util.concurrent.ListenableFuture;\r
-import com.google.common.util.concurrent.ListeningExecutorService;\r
-import java.util.HashMap;\r
-import java.util.Map;\r
-import java.util.concurrent.Semaphore;\r
-import javax.annotation.concurrent.GuardedBy;\r
-import org.apache.commons.lang3.tuple.Pair;\r
-import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
-import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-/**\r
- * Enriches {@link SyncReactorFutureDecorator} with state compression.\r
- */\r
-public class SyncReactorFutureWithCompressionDecorator extends SyncReactorFutureDecorator {\r
-\r
-    private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureWithCompressionDecorator.class);\r
-\r
-    @GuardedBy("compressionGuard")\r
-    final Map<InstanceIdentifier<FlowCapableNode>, Pair<FlowCapableNode, FlowCapableNode>> compressionQueue =\r
-            new HashMap<>();\r
-    final Semaphore compressionGuard = new Semaphore(1, false);\r
-\r
-    public SyncReactorFutureWithCompressionDecorator(SyncReactor delegate, ListeningExecutorService executorService) {\r
-        super(delegate, executorService);\r
-    }\r
-\r
-    public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
-            final FlowCapableNode configTree, final FlowCapableNode operationalTree) throws InterruptedException {\r
-        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
-        LOG.trace("syncup {}", nodeId.getValue());\r
-\r
-        try {\r
-            compressionGuard.acquire();\r
-\r
-            final boolean newFutureNecessary = updateCompressionState(flowcapableNodePath, configTree, operationalTree);\r
-            if (newFutureNecessary) {\r
-                super.syncup(flowcapableNodePath, configTree, operationalTree);\r
-            }\r
-            return Futures.immediateFuture(true);\r
-        } finally {\r
-            compressionGuard.release();\r
-        }\r
-    }\r
-\r
-    protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
-            final FlowCapableNode configTree, final FlowCapableNode operationalTree)\r
-                    throws InterruptedException {\r
-        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
-        LOG.trace("doSyncupInFuture {}", nodeId.getValue());\r
-\r
-        final Pair<FlowCapableNode, FlowCapableNode> lastCompressionState =\r
-                removeLastCompressionState(flowcapableNodePath);\r
-        if (lastCompressionState == null) {\r
-            return Futures.immediateFuture(true);\r
-        } else {\r
-            return super.doSyncupInFuture(flowcapableNodePath,\r
-                    lastCompressionState.getLeft(), lastCompressionState.getRight());\r
-        }\r
-    }\r
-\r
-    protected boolean updateCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
-            final FlowCapableNode configTree, final FlowCapableNode operationalTree) {\r
-        final Pair<FlowCapableNode, FlowCapableNode> previous = compressionQueue.get(flowcapableNodePath);\r
-        if (previous != null) {\r
-            final FlowCapableNode previousOperational = previous.getRight();\r
-            compressionQueue.put(flowcapableNodePath, Pair.of(configTree, previousOperational));\r
-            return false;\r
-        } else {\r
-            compressionQueue.put(flowcapableNodePath, Pair.of(configTree, operationalTree));\r
-            return true;\r
-        }\r
-    }\r
-\r
-    protected Pair<FlowCapableNode/* config */, FlowCapableNode/* operational */> removeLastCompressionState(\r
-            final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {\r
-        try {\r
-            try {\r
-                compressionGuard.acquire();\r
-            } catch (InterruptedException e) {\r
-                return null;\r
-            }\r
-\r
-            return compressionQueue.remove(flowcapableNodePath);\r
-        } finally {\r
-            compressionGuard.release();\r
-        }\r
-    }\r
-}\r
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecorator.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecorator.java
new file mode 100644 (file)
index 0000000..15b79ec
--- /dev/null
@@ -0,0 +1,125 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.ZipQueueEntry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Enriches {@link SyncReactorFutureDecorator} with state compression.
+ */
+public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureZipDecorator.class);
+
+    @GuardedBy("compressionGuard")
+    protected final Map<InstanceIdentifier<FlowCapableNode>, ZipQueueEntry> compressionQueue = new HashMap<>();
+    protected final Semaphore compressionGuard = new Semaphore(1, false);
+
+    public SyncReactorFutureZipDecorator(SyncReactor delegate, ListeningExecutorService executorService) {
+        super(delegate, executorService);
+    }
+
+    public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+                                            final FlowCapableNode configTree, final FlowCapableNode operationalTree,
+                                            final LogicalDatastoreType dsType) throws InterruptedException {
+        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
+        LOG.trace("syncup zip {}", nodeId.getValue());
+
+        try {
+            compressionGuard.acquire();
+
+            final boolean newFutureNecessary = updateCompressionState(flowcapableNodePath, configTree, operationalTree, dsType);
+            if (newFutureNecessary) {
+                super.syncup(flowcapableNodePath, configTree, operationalTree, dsType);
+            }
+            return Futures.immediateFuture(true);
+        } finally {
+            compressionGuard.release();
+        }
+    }
+
+    protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+                                                         final FlowCapableNode configTree, final FlowCapableNode operationalTree,
+                                                         final LogicalDatastoreType dsType) throws InterruptedException {
+        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
+        LOG.trace("doSyncupInFuture zip {}", nodeId.getValue());
+
+        final ZipQueueEntry lastCompressionState = removeLastCompressionState(flowcapableNodePath);
+        if (lastCompressionState == null) {
+            return Futures.immediateFuture(true);
+        } else {
+            return super.doSyncupInFuture(flowcapableNodePath,
+                    lastCompressionState.getLeft(), lastCompressionState.getRight(), dsType);
+        }
+    }
+
+    /**
+     * If there is config delta in compression queue for the device and new configuration is coming,
+     * update its zip queue entry. Create/replace zip queue entry for the device with operational delta otherwise.
+     */
+    protected boolean updateCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+                                             final FlowCapableNode configTree, final FlowCapableNode operationalTree,
+                                             final LogicalDatastoreType dsType) {
+        final ZipQueueEntry previousEntry = compressionQueue.get(flowcapableNodePath);
+
+        if (previousEntry != null && dsType == LogicalDatastoreType.CONFIGURATION
+                && previousEntry.getDsType() == LogicalDatastoreType.CONFIGURATION) {
+            putOptimizedConfigDelta(flowcapableNodePath, configTree, previousEntry);
+        } else {
+            putLatestOperationalDelta(flowcapableNodePath, configTree, operationalTree, dsType);
+        }
+        return previousEntry == null;
+    }
+
+    private void putOptimizedConfigDelta(InstanceIdentifier<FlowCapableNode> flowcapableNodePath, FlowCapableNode configTree,
+                                         ZipQueueEntry previous) {
+        compressionQueue.put(flowcapableNodePath, new ZipQueueEntry(configTree, previous.getRight(), previous.getDsType()));
+    }
+
+    private void putLatestOperationalDelta(InstanceIdentifier<FlowCapableNode> flowcapableNodePath, FlowCapableNode configTree,
+                                           FlowCapableNode operationalTree, LogicalDatastoreType dsType) {
+        compressionQueue.put(flowcapableNodePath, new ZipQueueEntry(configTree, operationalTree, dsType));
+    }
+
+    private ZipQueueEntry removeLastCompressionState(
+            final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
+        try {
+            try {
+                compressionGuard.acquire();
+            } catch (InterruptedException e) {
+                return null;
+            }
+
+            return compressionQueue.remove(flowcapableNodePath);
+        } finally {
+            compressionGuard.release();
+        }
+    }
+
+    @VisibleForTesting
+    Map<InstanceIdentifier<FlowCapableNode>, ZipQueueEntry> getCompressionQueue() {
+        return compressionQueue;
+    }
+}
\ No newline at end of file
index 92627831e7b8f007a086a5434a82dec0522f1f31..90ead252bc52be1c458fb118b7e2b9da0199206e 100644 (file)
@@ -15,6 +15,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import java.util.concurrent.Semaphore;\r
 import java.util.concurrent.TimeUnit;\r
 import javax.annotation.Nullable;\r
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;\r
 import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;\r
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
@@ -41,7 +42,8 @@ public class SyncReactorGuardDecorator implements SyncReactor {
     }\r
 \r
     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
-            final FlowCapableNode configTree, final FlowCapableNode operationalTree) throws InterruptedException {\r
+                                            final FlowCapableNode configTree, final FlowCapableNode operationalTree,\r
+                                            final LogicalDatastoreType dsType) throws InterruptedException {\r
         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
         LOG.trace("syncup {}", nodeId.getValue());\r
 \r
@@ -55,10 +57,10 @@ public class SyncReactorGuardDecorator implements SyncReactor {
                         formatNanos(stampAfterGuard - stampBeforeGuard),\r
                         guard, threadName());\r
             }\r
-            \r
+\r
             final ListenableFuture<Boolean> endResult =\r
-                    delegate.syncup(flowcapableNodePath, configTree, operationalTree);//TODO handle InteruptedException\r
-            \r
+                    delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);//TODO handle InteruptedException\r
+\r
             Futures.addCallback(endResult, new FutureCallback<Boolean>() {\r
                 @Override\r
                 public void onSuccess(@Nullable final Boolean result) {\r
index de4819e11d79787886ffb527125f3890ea35ad60..0528470789958bd1b5c035bc00e60dcaf6161d10 100644 (file)
@@ -18,6 +18,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
 import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SynchronizationDiffInput;
@@ -56,7 +57,8 @@ public class SyncReactorImpl implements SyncReactor {
 
     @Override
     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> nodeIdent,
-                                            final FlowCapableNode configTree, final FlowCapableNode operationalTree) {
+                                            final FlowCapableNode configTree, final FlowCapableNode operationalTree,
+                                            final LogicalDatastoreType dsType) {
 
         LOG.trace("syncup {} cfg:{} oper:{}", nodeIdent, configTree == null ? "is null" : "non null", operationalTree == null ? "is null" : "non null");
         final SyncCrudCounters counters = new SyncCrudCounters();
@@ -121,6 +123,7 @@ public class SyncReactorImpl implements SyncReactor {
                     );
                 }
 
+                LOG.trace("syncup errors: {}", input.getErrors());
                 return input.isSuccessful();
             }
         });
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorRetryDecorator.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorRetryDecorator.java
new file mode 100644 (file)
index 0000000..9a6669b
--- /dev/null
@@ -0,0 +1,69 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Adding retry mechanism in case of unsuccessful syncup.
+ */
+public class SyncReactorRetryDecorator implements SyncReactor{
+
+    private static final Logger LOG = LoggerFactory.getLogger(SyncReactorRetryDecorator.class);
+
+    private final SyncReactor delegate;
+    private final RetryRegistry retryRegistry;
+
+    public SyncReactorRetryDecorator (final SyncReactor delegate, RetryRegistry retryRegistry) {
+        this.delegate = delegate;
+        this.retryRegistry = retryRegistry;
+    }
+
+    public ListenableFuture<Boolean> syncup (final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+                                             final FlowCapableNode configTree, final FlowCapableNode operationalTree,
+                                             final LogicalDatastoreType dsType) throws InterruptedException {
+
+        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
+        LOG.trace("syncup retry {}", nodeId.getValue());
+
+        if (dsType == LogicalDatastoreType.CONFIGURATION && retryRegistry.isRegistered(nodeId)) {
+            LOG.trace("Config change ignored because device is in retry [{}]", nodeId);
+            return Futures.immediateFuture(Boolean.FALSE);
+        }
+
+        ListenableFuture<Boolean> syncupResult = delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);
+
+        return Futures.transform(syncupResult, new Function<Boolean, Boolean>() {
+            @Override
+            public Boolean apply(Boolean result) {
+                LOG.trace("syncup ret in retry {}", result);
+                if (result) {
+                    retryRegistry.unregisterIfRegistered(nodeId);
+                    return true;
+                } else {
+                    retryRegistry.register(nodeId);
+                    // TODO  elicit statistics gathering if not running actually
+                    // triggerStatisticsGathering(nodeId);
+                    return false;
+                }
+            }
+        });
+    }
+}
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/RetryRegistry.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/RetryRegistry.java
new file mode 100644 (file)
index 0000000..eedc6c1
--- /dev/null
@@ -0,0 +1,52 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.util;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holder of registration request for fresh operational.
+ */
+public class RetryRegistry {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RetryRegistry.class);
+    private final Map<NodeId, Date> registration = new ConcurrentHashMap<>();
+    public static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
+
+    public Date register(NodeId nodeId) {
+        Date timestamp = new Date();
+        registration.put(nodeId, timestamp);
+        if (timestamp != null) {
+            LOG.debug("Registered for next consistent operational: {}", nodeId.getValue());
+        }
+        return timestamp;
+    }
+
+    public Date unregisterIfRegistered(NodeId nodeId) {
+        Date timestamp = registration.remove(nodeId);
+        if (timestamp != null) {
+            LOG.debug("Unregistered for next consistent operational: {}", nodeId.getValue());
+        }
+        return timestamp;
+    }
+
+    public boolean isRegistered(NodeId nodeId) {
+        return registration.get(nodeId) != null;
+    }
+
+    public Date getRegistration(NodeId nodeId) {
+        return registration.get(nodeId);
+    }
+
+}
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/ZipQueueEntry.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/ZipQueueEntry.java
new file mode 100644 (file)
index 0000000..13077a1
--- /dev/null
@@ -0,0 +1,41 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.util;
+
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+
+/**
+ * Simple compression queue entry for {@link org.opendaylight.openflowplugin.applications.frsync.impl.SyncReactorFutureZipDecorator}.
+ */
+public class ZipQueueEntry {
+    private FlowCapableNode after;
+    private FlowCapableNode before;
+    private LogicalDatastoreType dsTypeBefore;
+
+    public ZipQueueEntry(final FlowCapableNode after, final FlowCapableNode before, final LogicalDatastoreType dsTypeBefore) {
+        this.after = after;
+        this.before = before;
+        this.dsTypeBefore = dsTypeBefore;
+
+    }
+
+    public FlowCapableNode getLeft() {
+        return after;
+    }
+
+    public FlowCapableNode getRight() {
+        return before;
+    }
+
+    public LogicalDatastoreType getDsType() {
+        return dsTypeBefore;
+    }
+
+}
index 7e37a1f9a5487b930e4b6286edd4f46277f59129..4c20088ded8fa1897bc91bf5c00357c90e348e02 100644 (file)
@@ -43,32 +43,44 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 @RunWith(MockitoJUnitRunner.class)
 public class SimplifiedConfigListenerTest {
 
+    private static final NodeId NODE_ID = new NodeId("testNode");
+    private InstanceIdentifier<FlowCapableNode> fcNodePath;
+    private SimplifiedConfigListener nodeListenerConfig;
+    private LogicalDatastoreType dsType = LogicalDatastoreType.CONFIGURATION;
+
     @Mock
     private SyncReactor reactor;
     @Mock
-    private DataBroker db;
+    private ReadOnlyTransaction roTx;
     @Mock
     private DataTreeModification<FlowCapableNode> dataTreeModification;
     @Mock
-    private ReadOnlyTransaction roTx;
-    @Mock
     private DataObjectModification<FlowCapableNode> configModification;
-
-    private InstanceIdentifier<FlowCapableNode> nodePath;
-    private SimplifiedConfigListener nodeListenerConfig;
+    @Mock
+    private FlowCapableNode dataBefore;
+    @Mock
+    private FlowCapableNode dataAfter;
 
     @Before
     public void setUp() throws Exception {
-        final FlowCapableNodeSnapshotDao configSnaphot = new FlowCapableNodeSnapshotDao();
-        final FlowCapableNodeSnapshotDao operationalSnaphot = new FlowCapableNodeSnapshotDao();
-        final FlowCapableNodeDao operationalDao = new FlowCapableNodeCachedDao(operationalSnaphot,
+        final DataBroker db = Mockito.mock(DataBroker.class);
+        final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
+        final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
+        final FlowCapableNodeDao operationalDao = new FlowCapableNodeCachedDao(operationalSnapshot,
                 new FlowCapableNodeOdlDao(db, LogicalDatastoreType.OPERATIONAL));
 
-        
-        nodeListenerConfig = new SimplifiedConfigListener(reactor, configSnaphot, operationalDao);
-        nodePath = InstanceIdentifier.create(Nodes.class)
-                .child(Node.class, new NodeKey(new NodeId("testNode")))
+        nodeListenerConfig = new SimplifiedConfigListener(reactor, configSnapshot, operationalDao);
+        fcNodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID))
                 .augmentation(FlowCapableNode.class);
+
+        final DataTreeIdentifier<FlowCapableNode> dataTreeIdentifier =
+                new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, fcNodePath);
+
+        Mockito.when(db.newReadOnlyTransaction()).thenReturn(roTx);
+        Mockito.when(dataTreeModification.getRootPath()).thenReturn(dataTreeIdentifier);
+        Mockito.when(dataTreeModification.getRootNode()).thenReturn(configModification);
+        Mockito.when(reactor.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+                Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
     }
 
     @Test
@@ -77,24 +89,53 @@ public class SimplifiedConfigListenerTest {
     }
 
     @Test
-    public void testOnDataTreeChanged() throws Exception {
-        final FlowCapableNode configTree = Mockito.mock(FlowCapableNode.class);
-        final FlowCapableNode operationalTree = Mockito.mock(FlowCapableNode.class);
-        final DataTreeIdentifier<FlowCapableNode> dataTreeIdentifier =
-                new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, nodePath);
+    public void testOnDataTreeChangedSyncupAdd() throws InterruptedException {
+        Mockito.when(roTx.read(LogicalDatastoreType.OPERATIONAL, fcNodePath))
+                .thenReturn(Futures.immediateCheckedFuture(Optional.of(dataBefore)));
+        Mockito.when(configModification.getDataAfter()).thenReturn(dataAfter);
 
-        Mockito.when(dataTreeModification.getRootPath()).thenReturn(dataTreeIdentifier);
-        Mockito.when(dataTreeModification.getRootNode()).thenReturn(configModification);
-        Mockito.when(configModification.getDataAfter()).thenReturn(configTree);
-        Mockito.when(db.newReadOnlyTransaction()).thenReturn(roTx);
-        Mockito.doReturn(Futures.immediateCheckedFuture(Optional.of(operationalTree))).when(
-                roTx).read(LogicalDatastoreType.OPERATIONAL, nodePath);
-        Mockito.when(reactor.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(),Matchers.<FlowCapableNode>any(),Matchers.<FlowCapableNode>any()))
-                .thenReturn(Futures.immediateFuture(Boolean.TRUE));
+        nodeListenerConfig.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+        Mockito.verify(reactor).syncup(fcNodePath, dataAfter, dataBefore, dsType);
+        Mockito.verifyNoMoreInteractions(reactor);
+        Mockito.verify(roTx).close();
+    }
+
+    @Test
+    public void testOnDataTreeChangedSyncupUpdate() throws InterruptedException {
+        Mockito.when(roTx.read(LogicalDatastoreType.OPERATIONAL, fcNodePath))
+                .thenReturn(Futures.immediateCheckedFuture(Optional.of(dataBefore)));
+        Mockito.when(configModification.getDataBefore()).thenReturn(dataBefore);
+        Mockito.when(configModification.getDataAfter()).thenReturn(dataAfter);
+
+        nodeListenerConfig.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+        Mockito.verify(reactor).syncup(fcNodePath, dataAfter, dataBefore, dsType);
+        Mockito.verifyNoMoreInteractions(reactor);
+        Mockito.verify(roTx).close();
+    }
+
+    @Test
+    public void testOnDataTreeChangedSyncupDelete() throws InterruptedException {
+        Mockito.when(roTx.read(LogicalDatastoreType.OPERATIONAL, fcNodePath))
+                .thenReturn(Futures.immediateCheckedFuture(Optional.of(dataBefore)));
+        Mockito.when(configModification.getDataBefore()).thenReturn(dataBefore);
+
+        nodeListenerConfig.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+        Mockito.verify(reactor).syncup(fcNodePath, null, dataBefore, dsType);
+        Mockito.verifyNoMoreInteractions(reactor);
+        Mockito.verify(roTx).close();
+    }
+
+    @Test
+    public void testOnDataTreeChangedSkip() {
+        Mockito.when(roTx.read(LogicalDatastoreType.OPERATIONAL, fcNodePath)).
+                thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
 
         nodeListenerConfig.onDataTreeChanged(Collections.singleton(dataTreeModification));
 
-        Mockito.verify(reactor).syncup(nodePath, configTree, operationalTree);
+        Mockito.verifyZeroInteractions(reactor);
         Mockito.verify(roTx).close();
     }
 }
\ No newline at end of file
index 04005d2c0cfb72f477029265b7fddf7bbc063729..30383c0a2200dcfce9414281b36e9e343aaa6acc 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.openflowplugin.applications.frsync.impl;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
 import java.util.Collections;
+import java.util.List;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -21,6 +22,7 @@ import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
@@ -33,6 +35,7 @@ import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSn
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -43,7 +46,11 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 @RunWith(MockitoJUnitRunner.class)
 public class SimplifiedOperationalListenerTest {
 
-    public static final NodeId NODE_ID = new NodeId("testNode");
+    private static final NodeId NODE_ID = new NodeId("testNode");
+    private InstanceIdentifier<FlowCapableNode> fcNodePath;
+    private SimplifiedOperationalListener nodeListenerOperational;
+    private final LogicalDatastoreType dsType = LogicalDatastoreType.OPERATIONAL;
+
     @Mock
     private SyncReactor reactor;
     @Mock
@@ -51,37 +58,34 @@ public class SimplifiedOperationalListenerTest {
     @Mock
     private DataTreeModification<Node> dataTreeModification;
     @Mock
+    private DataObjectModification<Node> operationalModification;
+    @Mock
     private FlowCapableNode configNode;
     @Mock
+    private Node operationalNode;
+    @Mock
     private FlowCapableNode fcOperationalNode;
 
-    private InstanceIdentifier<Node> nodePath;
-    private InstanceIdentifier<FlowCapableNode> fcNodePath;
-    private SimplifiedOperationalListener nodeListenerOperational;
-
     @Before
     public void setUp() throws Exception {
         final DataBroker db = Mockito.mock(DataBroker.class);
-        final DataObjectModification<Node> operationalModification = Mockito.mock(DataObjectModification.class);
-        final Node operationalNode = Mockito.mock(Node.class);
-
-        final FlowCapableNodeSnapshotDao configSnaphot = new FlowCapableNodeSnapshotDao();
-        final FlowCapableNodeSnapshotDao operationalSnaphot = new FlowCapableNodeSnapshotDao();
-        final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnaphot,
+        final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
+        final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
+        final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
                 new FlowCapableNodeOdlDao(db, LogicalDatastoreType.CONFIGURATION));
-        nodeListenerOperational = new SimplifiedOperationalListener(reactor, operationalSnaphot, configDao);
-        nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
+
+        nodeListenerOperational = new SimplifiedOperationalListener(reactor, operationalSnapshot, configDao);
+        InstanceIdentifier<Node> nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
         fcNodePath = nodePath.augmentation(FlowCapableNode.class);
 
         final DataTreeIdentifier<Node> dataTreeIdentifier =
                 new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, nodePath);
 
-        Mockito.when(operationalNode.getId()).thenReturn(NODE_ID);
         Mockito.when(db.newReadOnlyTransaction()).thenReturn(roTx);
-        Mockito.when(operationalNode.getAugmentation(FlowCapableNode.class)).thenReturn(fcOperationalNode);
+        Mockito.when(operationalNode.getId()).thenReturn(NODE_ID);
         Mockito.when(dataTreeModification.getRootPath()).thenReturn(dataTreeIdentifier);
         Mockito.when(dataTreeModification.getRootNode()).thenReturn(operationalModification);
-        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+        Mockito.when(operationalNode.getAugmentation(FlowCapableNode.class)).thenReturn(fcOperationalNode);
     }
 
     @Test
@@ -90,27 +94,51 @@ public class SimplifiedOperationalListenerTest {
     }
 
     @Test
-    public void testOnDataTreeChangedSyncup() throws Exception {
-        Mockito.when(reactor.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(),Matchers.<FlowCapableNode>any(),
-                Matchers.<FlowCapableNode>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
+    public void testOnDataTreeChangedSyncupAdd() throws InterruptedException {
         Mockito.when(roTx.read(LogicalDatastoreType.CONFIGURATION, fcNodePath))
                 .thenReturn(Futures.immediateCheckedFuture(Optional.of(configNode)));
+        Mockito.when(reactor.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+                Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
+        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
 
         nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
 
-        Mockito.verify(reactor, Mockito.times(1)).syncup(fcNodePath, configNode, fcOperationalNode);
+        Mockito.verify(reactor).syncup(fcNodePath, configNode, fcOperationalNode, dsType);
+        Mockito.verifyNoMoreInteractions(reactor);
         Mockito.verify(roTx).close();
     }
 
     @Test
-    public void testOnDataTreeChangedSkip() throws Exception {
+    public void testOnDataTreeChangedAddSkip() {
         // Related to bug 5920 -> https://bugs.opendaylight.org/show_bug.cgi?id=5920
         Mockito.when(roTx.read(LogicalDatastoreType.CONFIGURATION, fcNodePath))
                 .thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
+        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
 
         nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
 
         Mockito.verifyZeroInteractions(reactor);
         Mockito.verify(roTx).close();
     }
+
+    @Test
+    public void testOnDataTreeChangedSyncupDeletePhysical() {
+        Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+        Mockito.when(dataTreeModification.getRootNode().getModificationType()).thenReturn(ModificationType.DELETE);
+
+        nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+        Mockito.verifyZeroInteractions(reactor);
+    }
+
+    @Test
+    public void testOnDataTreeChangedSyncupDeleteLogical() {
+        Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+        List<NodeConnector> nodeConnectorList = Mockito.mock(List.class);
+        Mockito.when(operationalNode.getNodeConnector()).thenReturn(nodeConnectorList);
+
+        nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+        Mockito.verifyZeroInteractions(reactor);
+    }
 }
diff --git a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalRetryListenerTest.java b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalRetryListenerTest.java
new file mode 100644 (file)
index 0000000..fd23c4a
--- /dev/null
@@ -0,0 +1,205 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeCachedDao;
+import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
+import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
+import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
+import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.DateAndTime;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+/**
+ * Test for {@link SimplifiedOperationalRetryListener}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class SimplifiedOperationalRetryListenerTest {
+
+    private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(RetryRegistry.DATE_AND_TIME_FORMAT);
+    private static final NodeId NODE_ID = new NodeId("testNode");
+    private InstanceIdentifier<FlowCapableNode> fcNodePath;
+    private SimplifiedOperationalRetryListener nodeListenerOperational;
+    private final String timestampBefore = "0000-12-12T01:01:01.000-07:00";
+    private final String timestampAfter = "9999-12-12T01:01:01.000-07:00";
+    private final LogicalDatastoreType dsType = LogicalDatastoreType.OPERATIONAL;
+
+    @Mock
+    private SyncReactor reactor;
+    @Mock
+    private ReadOnlyTransaction roTx;
+    @Mock
+    private DataTreeModification<Node> dataTreeModification;
+    @Mock
+    private DataObjectModification<Node> operationalModification;
+    @Mock
+    private FlowCapableNode configNode;
+    @Mock
+    private Node operationalNode;
+    @Mock
+    private FlowCapableNode fcOperationalNode;
+    @Mock
+    private RetryRegistry retryRegistry;
+    @Mock
+    private FlowCapableStatisticsGatheringStatus statisticsGatheringStatus;
+    @Mock
+    private SnapshotGatheringStatusEnd snapshotGatheringStatusEnd;
+
+    @Before
+    public void setUp() throws Exception {
+        final DataBroker db = Mockito.mock(DataBroker.class);
+        final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
+        final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
+        final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
+                new FlowCapableNodeOdlDao(db, LogicalDatastoreType.CONFIGURATION));
+
+        nodeListenerOperational = new SimplifiedOperationalRetryListener(reactor, operationalSnapshot, configDao, retryRegistry);
+        InstanceIdentifier<Node> nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
+        fcNodePath = nodePath.augmentation(FlowCapableNode.class);
+
+        final DataTreeIdentifier<Node> dataTreeIdentifier =
+                new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, nodePath);
+
+        Mockito.when(db.newReadOnlyTransaction()).thenReturn(roTx);
+        Mockito.when(operationalNode.getId()).thenReturn(NODE_ID);
+        Mockito.when(dataTreeModification.getRootPath()).thenReturn(dataTreeIdentifier);
+        Mockito.when(dataTreeModification.getRootNode()).thenReturn(operationalModification);
+        Mockito.when(operationalNode.getAugmentation(FlowCapableNode.class)).thenReturn(fcOperationalNode);
+    }
+
+    @Test
+    public void testOnDataTreeChangedRetryNotRegistered() {
+        Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+        Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(false);
+
+        nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+        Mockito.verifyZeroInteractions(reactor);
+    }
+
+    @Test
+    public void testOnDataTreeChangedRetryButStaticsGatheringNotStarted() {
+        Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+        Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(true);
+        Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(null);
+
+        nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+        Mockito.verifyZeroInteractions(reactor);
+    }
+
+    @Test
+    public void testOnDataTreeChangedRetryButStaticsGatheringNotFinished() {
+        Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+        Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(true);
+        Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
+        Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(null);
+
+        nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+        Mockito.verifyZeroInteractions(reactor);
+    }
+
+    @Test
+    public void testOnDataTreeChangedRetryButStaticsGatheringNotSuccessful() {
+        Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+        Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(true);
+        Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
+        Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
+        Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(false);
+
+        nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+        Mockito.verifyZeroInteractions(reactor);
+    }
+
+    @Test
+    public void testOnDataTreeChangedRetryAndFreshOperationalNotPresent() throws ParseException {
+        final DateAndTime timestamp = Mockito.mock(DateAndTime.class);
+        Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+        Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(true);
+        Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
+        Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
+        Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(true);
+        Mockito.when(snapshotGatheringStatusEnd.getEnd()).thenReturn(timestamp);
+        Mockito.when(snapshotGatheringStatusEnd.getEnd().getValue()).thenReturn(timestampBefore);
+        Mockito.when(retryRegistry.getRegistration(NODE_ID)).thenReturn(simpleDateFormat.parse(timestampAfter));
+
+        nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+        Mockito.verifyZeroInteractions(reactor);
+    }
+
+    @Test
+    public void testOnDataTreeChangedRetryAndFreshOperationalPresent() throws Exception {
+        final DateAndTime timestamp = Mockito.mock(DateAndTime.class);
+        Mockito.when(roTx.read(LogicalDatastoreType.CONFIGURATION, fcNodePath))
+                .thenReturn(Futures.immediateCheckedFuture(Optional.of(configNode)));
+        Mockito.when(reactor.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+                Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
+        Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+        Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(true);
+        Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
+        Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
+        Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(true);
+        Mockito.when(snapshotGatheringStatusEnd.getEnd()).thenReturn(timestamp);
+        Mockito.when(snapshotGatheringStatusEnd.getEnd().getValue()).thenReturn(timestampAfter);
+        Mockito.when(retryRegistry.getRegistration(NODE_ID)).thenReturn(simpleDateFormat.parse(timestampBefore));
+
+        nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+        Mockito.verify(reactor).syncup(fcNodePath, configNode, fcOperationalNode, dsType);
+        Mockito.verifyNoMoreInteractions(reactor);
+        Mockito.verify(roTx).close();
+    }
+
+    @Test
+    public void testOnDataTreeChangedRetryAndNodeDeleted() {
+        Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+        Mockito.when(dataTreeModification.getRootNode().getModificationType()).thenReturn(DataObjectModification.ModificationType.DELETE);
+
+        nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+        Mockito.verify(retryRegistry).unregisterIfRegistered(NODE_ID);
+        Mockito.verifyZeroInteractions(reactor);
+    }
+
+}
\ No newline at end of file
diff --git a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecoratorTest.java b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecoratorTest.java
new file mode 100644 (file)
index 0000000..f9fb441
--- /dev/null
@@ -0,0 +1,186 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InOrder;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for {@link SyncReactorFutureZipDecorator}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class SyncReactorFutureZipDecoratorTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureZipDecoratorTest.class);
+    private static final NodeId NODE_ID = new NodeId("testNode");
+    private SyncReactorFutureZipDecorator reactor;
+    private InstanceIdentifier<FlowCapableNode> fcNodePath;
+    private ListeningExecutorService syncThreadPool;
+
+    @Mock
+    private SyncReactorGuardDecorator delegate;
+
+    @Before
+    public void setUp() {
+        syncThreadPool = FrmExecutors.instance()
+                .newFixedThreadPool(1, new ThreadFactoryBuilder()
+                        .setNameFormat(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX + "%d")
+                        .setDaemon(false)
+                        .build());
+
+        reactor = new SyncReactorFutureZipDecorator(delegate, syncThreadPool);
+        fcNodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID))
+                .augmentation(FlowCapableNode.class);
+    }
+
+    @Test
+    public void testSyncupWithOptimizedConfigDeltaCompression() throws Exception {
+        final FlowCapableNode dataBefore = Mockito.mock(FlowCapableNode.class);
+        final FlowCapableNode dataAfter = Mockito.mock(FlowCapableNode.class);
+        final FlowCapableNode dataAfter2 = Mockito.mock(FlowCapableNode.class);
+        final CountDownLatch latchForFirst = new CountDownLatch(1);
+        final CountDownLatch latchForNext = new CountDownLatch(1);
+        final LogicalDatastoreType dsType = LogicalDatastoreType.CONFIGURATION;
+
+        Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+                Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenAnswer(new Answer<ListenableFuture<Boolean>>() {
+                    @Override
+                    public ListenableFuture<Boolean> answer(final InvocationOnMock invocationOnMock) throws Throwable {
+                        LOG.info("unlocking next configs");
+                        latchForNext.countDown();
+                        latchForFirst.await();
+                        LOG.info("unlocking first delegate");
+                        return Futures.immediateFuture(Boolean.TRUE);
+                    }
+                }).thenReturn(Futures.immediateFuture(Boolean.TRUE));
+
+        final List<ListenableFuture<Boolean>> allResults = new ArrayList<>();
+        allResults.add(reactor.syncup(fcNodePath, dataBefore, null, dsType));
+        latchForNext.await();
+
+        allResults.add(reactor.syncup(fcNodePath, dataAfter, dataBefore, dsType));
+        allResults.add(reactor.syncup(fcNodePath, null, dataAfter, dsType));
+        allResults.add(reactor.syncup(fcNodePath, dataAfter2, null, dsType));
+        latchForFirst.countDown();
+
+        Futures.allAsList(allResults).get(1, TimeUnit.SECONDS);
+        LOG.info("all configs done");
+
+        syncThreadPool.shutdown();
+        boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
+        if (!terminated) {
+            LOG.info("thread pool not terminated.");
+            syncThreadPool.shutdownNow();
+        }
+        final InOrder inOrder = Mockito.inOrder(delegate);
+        inOrder.verify(delegate).syncup(fcNodePath, dataBefore, null, dsType);
+        inOrder.verify(delegate).syncup(fcNodePath, dataAfter2, dataBefore, dsType);
+        inOrder.verifyNoMoreInteractions();
+    }
+
+    @Test
+    public void testSyncupConfigEmptyQueue() throws Exception {
+        final FlowCapableNode dataBefore = Mockito.mock(FlowCapableNode.class);
+        final FlowCapableNode dataAfter = Mockito.mock(FlowCapableNode.class);
+        final CountDownLatch latchForNext = new CountDownLatch(1);
+        final LogicalDatastoreType dsType = LogicalDatastoreType.CONFIGURATION;
+
+        Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+                Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenAnswer(new Answer<ListenableFuture<Boolean>>() {
+            @Override
+            public ListenableFuture<Boolean> answer(final InvocationOnMock invocationOnMock) throws Throwable {
+                LOG.info("unlocking next config");
+                latchForNext.countDown();
+                return Futures.immediateFuture(Boolean.TRUE);
+            }
+            }).thenReturn(Futures.immediateFuture(Boolean.TRUE));
+
+        reactor.syncup(fcNodePath, dataBefore, null, dsType);
+        latchForNext.await();
+        reactor.syncup(fcNodePath, dataAfter, dataBefore, dsType);
+
+        boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
+        if (!terminated) {
+            LOG.info("thread pool not terminated.");
+            syncThreadPool.shutdownNow();
+        }
+        final InOrder inOrder = Mockito.inOrder(delegate);
+        inOrder.verify(delegate).syncup(fcNodePath, dataBefore, null, dsType);
+        inOrder.verify(delegate).syncup(fcNodePath, dataAfter, dataBefore, dsType);
+        inOrder.verifyNoMoreInteractions();
+
+    }
+
+    @Test
+    public void testSyncupRewriteZipEntryWithOperationalDelta() throws Exception {
+        final FlowCapableNode configBefore = Mockito.mock(FlowCapableNode.class);
+        final FlowCapableNode configAfter = Mockito.mock(FlowCapableNode.class);
+        final FlowCapableNode configActual = Mockito.mock(FlowCapableNode.class);
+        final FlowCapableNode freshOperational = Mockito.mock(FlowCapableNode.class);
+        final CountDownLatch latchForFirst = new CountDownLatch(1);
+        final CountDownLatch latchForNext = new CountDownLatch(1);
+
+        Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+                Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenAnswer(new Answer<ListenableFuture<Boolean>>() {
+            @Override
+            public ListenableFuture<Boolean> answer(final InvocationOnMock invocationOnMock) throws Throwable {
+                LOG.info("unlocking for fresh operational");
+                latchForNext.countDown();
+                latchForFirst.await();
+                LOG.info("unlocking first delegate");
+                return Futures.immediateFuture(Boolean.TRUE);
+            }
+        }).thenReturn(Futures.immediateFuture(Boolean.TRUE));
+
+        reactor.syncup(fcNodePath, configAfter, configBefore, LogicalDatastoreType.CONFIGURATION);
+        latchForNext.await();
+
+        reactor.syncup(fcNodePath, configActual, freshOperational, LogicalDatastoreType.OPERATIONAL);
+        latchForFirst.countDown();
+
+        syncThreadPool.shutdown();
+        boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
+        if (!terminated) {
+            LOG.info("thread pool not terminated.");
+            syncThreadPool.shutdownNow();
+        }
+        Mockito.verify(delegate, Mockito.times(1)).syncup(fcNodePath, configActual, freshOperational, LogicalDatastoreType.OPERATIONAL);
+    }
+
+    @After
+    public void tearDown() {
+        syncThreadPool.shutdownNow();
+    }
+}
\ No newline at end of file
index 8eda58c26c695ac73a9665e44fc6192582bafd8a..d10a228bb7c94c0fa831a19b624807d2c19a9b52 100644 (file)
@@ -16,6 +16,7 @@ import org.mockito.Matchers;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
@@ -33,9 +34,10 @@ public class SyncReactorGuardDecoratorTest {
     private static final NodeId NODE_ID = new NodeId("test-node");
     private SyncReactorGuardDecorator reactor;
     private InstanceIdentifier<FlowCapableNode> fcNodePath;
+    private final LogicalDatastoreType dsType = LogicalDatastoreType.CONFIGURATION;
 
     @Mock
-    private SyncReactorImpl delegate;
+    private SyncReactorRetryDecorator delegate;
     @Mock
     private FlowCapableNode fcConfigNode;
     @Mock
@@ -43,7 +45,8 @@ public class SyncReactorGuardDecoratorTest {
 
     @Before
     public void setUp() throws Exception {
-        reactor = new SyncReactorGuardDecorator(delegate, new SemaphoreKeeperGuavaImpl<InstanceIdentifier<FlowCapableNode>>(1, true));
+        final SemaphoreKeeperGuavaImpl semaphoreKeeper = new SemaphoreKeeperGuavaImpl<InstanceIdentifier<FlowCapableNode>>(1, true);
+        reactor = new SyncReactorGuardDecorator(delegate, semaphoreKeeper);
         InstanceIdentifier<Node> nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
         fcNodePath = nodePath.augmentation(FlowCapableNode.class);
 
@@ -54,19 +57,24 @@ public class SyncReactorGuardDecoratorTest {
 
     @Test
     public void testSyncupSuccess() throws Exception {
-        Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(),Matchers.<FlowCapableNode>any(),
-                Matchers.<FlowCapableNode>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
-        reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode);
-        Mockito.verify(delegate).syncup(fcNodePath, fcConfigNode, fcOperationalNode);
+        Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+                Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
 
+        reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
+
+        Mockito.verify(delegate).syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
+        Mockito.verifyNoMoreInteractions(delegate);
     }
 
     @Test
     public void testSyncupFail() throws Exception {
-        Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(),Matchers.<FlowCapableNode>any(),
-                Matchers.<FlowCapableNode>any())).thenReturn(Futures.immediateFailedFuture(new Exception()));
-        reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode);
-        Mockito.verify(delegate).syncup(fcNodePath, fcConfigNode, fcOperationalNode);
+        Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+                Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFailedFuture(new Exception()));
+
+        reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
+
+        Mockito.verify(delegate).syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
+        Mockito.verifyNoMoreInteractions(delegate);
 
     }
 
index 17f470bb2e363dda02a3760b691ea148f365be78..588a4019369bafdfb9983a4df32ecb11fb5d820d 100644 (file)
@@ -22,6 +22,7 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
 import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SynchronizationDiffInput;
 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
@@ -54,13 +55,12 @@ public class SyncReactorImplTest {
     private static final NodeId NODE_ID = new NodeId("unit-nodeId");
     private static final InstanceIdentifier<FlowCapableNode> NODE_IDENT = InstanceIdentifier.create(Nodes.class)
             .child(Node.class, new NodeKey(NODE_ID)).augmentation(FlowCapableNode.class);
-
     private SyncReactorImpl reactor;
+
     @Mock
     private DataBroker db;
     @Mock
     private SyncPlanPushStrategy syncPlanPushStrategy;
-
     @Captor
     private ArgumentCaptor<Group> groupCaptor;
     @Captor
@@ -107,7 +107,7 @@ public class SyncReactorImplTest {
                 Matchers.<SyncCrudCounters>any()))
                 .thenReturn(RpcResultBuilder.<Void>success().buildFuture());
 
-        final ListenableFuture<Boolean> syncupResult = reactor.syncup(NODE_IDENT, configFcn, operationalFcn);
+        final ListenableFuture<Boolean> syncupResult = reactor.syncup(NODE_IDENT, configFcn, operationalFcn, LogicalDatastoreType.CONFIGURATION);
         try {
             Assert.assertTrue(syncupResult.isDone());
             final Boolean voidRpcResult = syncupResult.get(2, TimeUnit.SECONDS);
diff --git a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorRetryDecoratorTest.java b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorRetryDecoratorTest.java
new file mode 100644 (file)
index 0000000..e118a34
--- /dev/null
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.util.concurrent.Futures;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+/**
+ * Test for {@link SyncReactorRetryDecorator}
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class SyncReactorRetryDecoratorTest {
+
+    private static final NodeId NODE_ID = new NodeId("test-node");
+    private SyncReactorRetryDecorator reactor;
+    private InstanceIdentifier<FlowCapableNode> fcNodePath;
+    final LogicalDatastoreType dsType = LogicalDatastoreType.CONFIGURATION;
+
+    @Mock
+    private SyncReactorImpl delegate;
+    @Mock
+    private RetryRegistry retryRegistry;
+    @Mock
+    private FlowCapableNode fcConfigNode;
+    @Mock
+    private FlowCapableNode fcOperationalNode;
+
+    @Before
+    public void setUp() {
+        reactor = new SyncReactorRetryDecorator(delegate, retryRegistry);
+        InstanceIdentifier<Node> nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
+        fcNodePath = nodePath.augmentation(FlowCapableNode.class);
+
+        final Node operationalNode = Mockito.mock(Node.class);
+        Mockito.when(operationalNode.getId()).thenReturn(NODE_ID);
+        Mockito.when(operationalNode.getAugmentation(FlowCapableNode.class)).thenReturn(fcOperationalNode);
+    }
+
+    @Test
+    public void testSyncupSuccess() throws InterruptedException {
+        Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+                Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
+
+        reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
+
+        Mockito.verify(delegate).syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
+        Mockito.verifyNoMoreInteractions(delegate);
+        Mockito.verify(retryRegistry).unregisterIfRegistered(NODE_ID);
+    }
+
+    @Test
+    public void testSyncupFail() throws InterruptedException {
+        Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+                Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFuture(Boolean.FALSE));
+
+        reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
+
+        Mockito.verify(delegate).syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
+        Mockito.verifyNoMoreInteractions(delegate);
+        Mockito.verify(retryRegistry).register(NODE_ID);
+    }
+
+    @Test
+    public void testSyncupConfigIgnoreInRetry() throws InterruptedException {
+        Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(true);
+
+        reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
+
+        Mockito.verifyZeroInteractions(delegate);
+    }
+
+}
\ No newline at end of file
diff --git a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/util/RetryRegistryTest.java b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/util/RetryRegistryTest.java
new file mode 100644 (file)
index 0000000..115bb2c
--- /dev/null
@@ -0,0 +1,55 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.util;
+
+import java.util.Date;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+
+/**
+ * Test for {@link RetryRegistry}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class RetryRegistryTest {
+
+    private static final NodeId NODE_ID = new NodeId("testNode");
+    private RetryRegistry retryRegistry;
+
+    @Before
+    public void setUp() throws Exception {
+        retryRegistry = new RetryRegistry();
+    }
+
+    @Test
+    public void testRegister() {
+        Date timestamp = retryRegistry.register(NODE_ID);
+        Assert.assertEquals(true, retryRegistry.isRegistered(NODE_ID));
+        Assert.assertNotNull(timestamp);
+    }
+
+    @Test
+    public void testUnregisterIfRegistered() {
+        retryRegistry.register(NODE_ID);
+        Date timestamp = retryRegistry.unregisterIfRegistered(NODE_ID);
+        Assert.assertEquals(false, retryRegistry.isRegistered(NODE_ID));
+        Assert.assertNotNull(timestamp);
+    }
+
+    @Test
+    public void testUnregisterIfNotRegistered() {
+        Date timestamp = retryRegistry.unregisterIfRegistered(NODE_ID);
+        Assert.assertEquals(false, retryRegistry.isRegistered(NODE_ID));
+        Assert.assertNull(timestamp);
+    }
+
+}