Bug 6170 + Bug 5919 - made FRS cluster-aware 89/41889/18
authorAndrej Leitner <anleitne@cisco.com>
Wed, 13 Jul 2016 07:47:43 +0000 (09:47 +0200)
committerAndrej Leitner <anleitne@cisco.com>
Wed, 27 Jul 2016 07:53:31 +0000 (09:53 +0200)
 - NodeListener changed to clustered DTCL
 - added ClusterRegistration per connected device
   with corresponding actions (on mastership change)
 - added ClusterServiceManager to register services
   at the provider and keep these registrations
 - added ClusterDecorator to skip modifications
   if device is not currently mastered
 - added and updated tests

Change-Id: I4fd9459e67cc5127f7eacca93adbd456a34fab0f
Signed-off-by: Andrej Leitner <anleitne@cisco.com>
17 files changed:
applications/forwardingrules-sync/pom.xml
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/NodeListener.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProvider.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedConfigListener.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalListener.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorClusterDecorator.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorRetryDecorator.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/clustering/DeviceMastership.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/clustering/DeviceMastershipManager.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/ReconciliationRegistry.java
applications/forwardingrules-sync/src/main/resources/org/opendaylight/blueprint/forwardingrules-sync.xml
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProviderTest.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedConfigListenerTest.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalListenerTest.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorClusterDecoratorTest.java [new file with mode: 0644]
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/clustering/DeviceMastershipManagerTest.java [new file with mode: 0644]
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/clustering/DeviceMastershipTest.java [new file with mode: 0644]

index 067a462f2ee9dccdbbc66a5a3de49400a003ea3d..02b725fc27beccbc72ad3160ac6c22914aad3ebe 100644 (file)
             <artifactId>model-flow-service</artifactId>
         </dependency>
 
-        <dependency>
-            <groupId>org.opendaylight.openflowplugin</groupId>
-            <artifactId>openflowplugin-common</artifactId>
-        </dependency>
-
         <dependency>
             <groupId>org.opendaylight.yangtools</groupId>
             <artifactId>yang-common</artifactId>
             <artifactId>sal-common-util</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-singleton-common-api</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>com.google.code.findbugs</groupId>
             <artifactId>jsr305</artifactId>
index f3bc24928f31b93f4188d706568c66ae0c9883fd..05ca0c20f6ee73838a4b4ea454dc26055ad308a6 100644 (file)
@@ -9,11 +9,11 @@
 package org.opendaylight.openflowplugin.applications.frsync;
 
 import java.util.EventListener;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 
 /**
  * Unifying listener for data and event changes on node.
  */
-public interface NodeListener<T extends DataObject> extends EventListener, DataTreeChangeListener<T> {
+public interface NodeListener<T extends DataObject> extends EventListener, ClusteredDataTreeChangeListener<T> {
 }
index ab71e682238ea6285e94434e020e8e42ce0ebb53..5292ac13f28a059d0dc0018a880b6ef775f79343 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.openflowplugin.applications.frsync.NodeListener;
 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
@@ -28,6 +29,7 @@ import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeCa
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
+import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
 import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SyncPlanPushStrategyFlatBatchImpl;
 import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
 import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
@@ -49,6 +51,7 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
     private static final Logger LOG = LoggerFactory.getLogger(ForwardingRulesSyncProvider.class);
 
     private final DataBroker dataService;
+    private final ClusterSingletonServiceProvider clusterSingletonService;
     private final SalTableService salTableService;
     private final SalFlatBatchService flatBatchService;
 
@@ -69,9 +72,12 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
 
     public ForwardingRulesSyncProvider(final BindingAwareBroker broker,
                                        final DataBroker dataBroker,
-                                       final RpcConsumerRegistry rpcRegistry) {
-        Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
+                                       final RpcConsumerRegistry rpcRegistry,
+                                       final ClusterSingletonServiceProvider clusterSingletonService) {
+        Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null!");
         this.dataService = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
+        this.clusterSingletonService = Preconditions.checkNotNull(clusterSingletonService,
+                "ClusterSingletonServiceProvider can not be null!");
         this.salTableService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalTableService.class),
                 "RPC SalTableService not found.");
         this.flatBatchService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalFlatBatchService.class),
@@ -98,13 +104,16 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
                 .setTableForwarder(tableForwarder);
 
         final ReconciliationRegistry reconciliationRegistry = new ReconciliationRegistry();
+        final DeviceMastershipManager deviceMastershipManager =
+                new DeviceMastershipManager(clusterSingletonService, reconciliationRegistry);
 
         final SyncReactor syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
         final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, reconciliationRegistry);
         final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry,
                 new SemaphoreKeeperGuavaImpl<>(1, true));
+        final SyncReactor syncReactorFutureZip = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
 
-        final SyncReactor reactor = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
+        final SyncReactor reactor = new SyncReactorClusterDecorator(syncReactorFutureZip, deviceMastershipManager);
 
         final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
         final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
@@ -116,7 +125,7 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
         final NodeListener<FlowCapableNode> nodeListenerConfig =
                 new SimplifiedConfigListener(reactor, configSnapshot, operationalDao);
         final NodeListener<Node> nodeListenerOperational =
-                new SimplifiedOperationalListener(reactor, operationalSnapshot, configDao, reconciliationRegistry);
+                new SimplifiedOperationalListener(reactor, operationalSnapshot, configDao, reconciliationRegistry, deviceMastershipManager);
 
         dataTreeConfigChangeListener =
                 dataService.registerDataTreeChangeListener(nodeConfigDataTreePath, nodeListenerConfig);
index d5249c2e555652044bd6aeac7d6db4b7b3963a6e..602ed4643617904b0d3f71e5f885a91c313563be 100644 (file)
@@ -43,7 +43,7 @@ public class SimplifiedConfigListener extends AbstractFrmSyncListener<FlowCapabl
 
     @Override
     public void onDataTreeChanged(Collection<DataTreeModification<FlowCapableNode>> modifications) {
-        LOG.trace("Inventory Config changes {}", modifications.size());
+        LOG.trace("Config changes: {}", modifications.size());
         super.onDataTreeChanged(modifications);
     }
 
@@ -59,7 +59,6 @@ public class SimplifiedConfigListener extends AbstractFrmSyncListener<FlowCapabl
 
         configSnapshot.updateCache(nodeId, Optional.fromNullable(modification.getRootNode().getDataAfter()));
 
-
         final Optional<FlowCapableNode> operationalNode = operationalDao.loadByNodeId(nodeId);
         if (!operationalNode.isPresent()) {
             LOG.info("Skip syncup, {} operational is not present", nodeId.getValue());
@@ -91,8 +90,9 @@ public class SimplifiedConfigListener extends AbstractFrmSyncListener<FlowCapabl
      * optimal case (but the switch could be reprogrammed by another person/system.</li>
      * </ul>
      */
-    private ListenableFuture<Boolean> onNodeAdded(InstanceIdentifier<FlowCapableNode> nodePath,
-                           FlowCapableNode dataAfter, FlowCapableNode operationalNode) throws InterruptedException {
+    private ListenableFuture<Boolean> onNodeAdded(final InstanceIdentifier<FlowCapableNode> nodePath,
+                                                  final FlowCapableNode dataAfter,
+                                                  final FlowCapableNode operationalNode) throws InterruptedException {
         NodeId nodeId = PathUtil.digNodeId(nodePath);
         LOG.trace("onNodeAdded {}", nodeId);
         return reactor.syncup(nodePath, dataAfter, operationalNode, dsType());
@@ -105,8 +105,9 @@ public class SimplifiedConfigListener extends AbstractFrmSyncListener<FlowCapabl
      * system which is updating operational store (that components is also trying to solve
      * scale/performance issues on several layers).
      */
-    private ListenableFuture<Boolean> onNodeUpdated(InstanceIdentifier<FlowCapableNode> nodePath,
-                          FlowCapableNode dataBefore, FlowCapableNode dataAfter) throws InterruptedException {
+    private ListenableFuture<Boolean> onNodeUpdated(final InstanceIdentifier<FlowCapableNode> nodePath,
+                                                    final FlowCapableNode dataBefore,
+                                                    final FlowCapableNode dataAfter) throws InterruptedException {
         NodeId nodeId = PathUtil.digNodeId(nodePath);
         LOG.trace("onNodeUpdated {}", nodeId);
         return reactor.syncup(nodePath, dataAfter, dataBefore, dsType());
@@ -117,8 +118,8 @@ public class SimplifiedConfigListener extends AbstractFrmSyncListener<FlowCapabl
      * probably optimized using dedicated wipe-out RPC, but it has impact on switch if it is
      * programmed by two person/system
      */
-    private ListenableFuture<Boolean> onNodeDeleted(InstanceIdentifier<FlowCapableNode> nodePath,
-                                                    FlowCapableNode dataBefore) throws InterruptedException {
+    private ListenableFuture<Boolean> onNodeDeleted(final InstanceIdentifier<FlowCapableNode> nodePath,
+                                                    final FlowCapableNode dataBefore) throws InterruptedException {
         NodeId nodeId = PathUtil.digNodeId(nodePath);
         LOG.trace("onNodeDeleted {}", nodeId);
         return reactor.syncup(nodePath, null, dataBefore, dsType());
index f3bcaf379b6b4c39ffe26a613ce23b09e6bc7674..7717bb2930e4b05e8727bb050ba926d0e0b55106 100644 (file)
@@ -22,6 +22,7 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
+import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
 import org.opendaylight.openflowplugin.applications.frsync.util.ModificationUtil;
 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
 import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
@@ -42,26 +43,29 @@ import org.slf4j.LoggerFactory;
  */
 public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node> {
     private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalListener.class);
+    public static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
 
     private final SyncReactor reactor;
     private final FlowCapableNodeSnapshotDao operationalSnapshot;
     private final FlowCapableNodeDao configDao;
     private final ReconciliationRegistry reconciliationRegistry;
+    private final DeviceMastershipManager deviceMastershipManager;
 
     public SimplifiedOperationalListener(final SyncReactor reactor,
                                          final FlowCapableNodeSnapshotDao operationalSnapshot,
                                          final FlowCapableNodeDao configDao,
-                                         final ReconciliationRegistry reconciliationRegistry) {
+                                         final ReconciliationRegistry reconciliationRegistry,
+                                         final DeviceMastershipManager deviceMastershipManager) {
         this.reactor = reactor;
         this.operationalSnapshot = operationalSnapshot;
         this.configDao = configDao;
         this.reconciliationRegistry = reconciliationRegistry;
+        this.deviceMastershipManager = deviceMastershipManager;
     }
 
     @Override
     public void onDataTreeChanged(Collection<DataTreeModification<Node>> modifications) {
-        // TODO return for clustered listener if not master for device
-        LOG.trace("Inventory Operational changes {}", modifications.size());
+        LOG.trace("Operational changes: {}", modifications.size());
         super.onDataTreeChanged(modifications);
     }
 
@@ -77,13 +81,18 @@ public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node>
      */
     protected Optional<ListenableFuture<Boolean>> processNodeModification(
             DataTreeModification<Node> modification) throws InterruptedException {
-
+        final NodeId nodeId = ModificationUtil.nodeId(modification);
         updateCache(modification);
-        // TODO register cluster service if node added
-        if (isReconciliationNeeded(modification)) {
+
+        if (isAdd(modification) || isAddLogical(modification)) {
+            deviceMastershipManager.onDeviceConnected(nodeId);
+        }
+
+        if (isRegisteredAndConsistentForReconcile(modification)) {
             return reconciliation(modification);
+        } else {
+            return skipModification(modification);
         }
-        return skipModification(modification);
     }
 
     /**
@@ -95,15 +104,14 @@ public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node>
         NodeId nodeId = ModificationUtil.nodeId(modification);
         if (isDelete(modification) || isDeleteLogical(modification)) {
             operationalSnapshot.updateCache(nodeId, Optional.absent());
-            // TODO unregister/close cluster service if node deleted
-            reconciliationRegistry.unregisterIfRegistered(nodeId);
+            deviceMastershipManager.onDeviceDisconnected(nodeId);
             return;
         }
         operationalSnapshot.updateCache(nodeId, Optional.fromNullable(ModificationUtil.flowCapableNodeAfter(modification)));
     }
 
     private Optional<ListenableFuture<Boolean>> skipModification(DataTreeModification<Node> modification) {
-        LOG.trace("Skipping Inventory Operational modification {}, before {}, after {}",
+        LOG.trace("Skipping operational modification: {}, before {}, after {}",
                 ModificationUtil.nodeIdValue(modification),
                 modification.getRootNode().getDataBefore() == null ? "null" : "nonnull",
                 modification.getRootNode().getDataAfter() == null ? "null" : "nonnull");
@@ -160,10 +168,6 @@ public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node>
         return false;
     }
 
-    private boolean isReconciliationNeeded(DataTreeModification<Node> modification) {
-        return isAdd(modification) || isAddLogical(modification) || isRegisteredAndConsistentForReconcile(modification);
-    }
-
     private Optional<ListenableFuture<Boolean>> reconciliation(DataTreeModification<Node> modification) throws InterruptedException {
         final NodeId nodeId = ModificationUtil.nodeId(modification);
         final Optional<FlowCapableNode> nodeConfiguration = configDao.loadByNodeId(nodeId);
@@ -176,6 +180,7 @@ public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node>
             final FlowCapableNode fcNode = ModificationUtil.flowCapableNodeAfter(modification);
             return Optional.of(reactor.syncup(nodePath, nodeConfiguration.get(), fcNode, dsType()));
         } else {
+            LOG.debug("Config not present for reconciliation: {}", nodeId.getValue());
             return skipModification(modification);
         }
     }
@@ -191,34 +196,34 @@ public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node>
                 .getAugmentation(FlowCapableStatisticsGatheringStatus.class);
 
         if (gatheringStatus == null) {
-            LOG.trace("Statistics gathering never started for: {}", nodeId.getValue());
+            LOG.trace("Statistics gathering never started: {}", nodeId.getValue());
             return false;
         }
 
         final SnapshotGatheringStatusEnd gatheringStatusEnd = gatheringStatus.getSnapshotGatheringStatusEnd();
 
         if (gatheringStatusEnd == null) {
-            LOG.trace("Statistics gathering is not over yet for: {}", nodeId.getValue());
+            LOG.trace("Statistics gathering is not over yet: {}", nodeId.getValue());
             return false;
         }
 
         if (!gatheringStatusEnd.isSucceeded()) {
-            LOG.debug("Statistics gathering was not successful for: {}", nodeId.getValue());
+            LOG.trace("Statistics gathering was not successful: {}", nodeId.getValue());
             return false;
         }
 
         try {
-            Date timestampOfRegistration = reconciliationRegistry.getRegistration(nodeId);
-            final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(ReconciliationRegistry.DATE_AND_TIME_FORMAT);
+            Date timestampOfRegistration = reconciliationRegistry.getRegistrationTimestamp(nodeId);
+            final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
             Date timestampOfStatistics = simpleDateFormat.parse(gatheringStatusEnd.getEnd().getValue());
             if (timestampOfStatistics.after(timestampOfRegistration)) {
-                LOG.debug("Fresh operational present for: {} -> going retry!", nodeId.getValue());
+                LOG.debug("Fresh operational present: {}", nodeId.getValue());
                 return true;
             }
         } catch (ParseException e) {
             LOG.error("Timestamp parsing error {}", e);
         }
-        LOG.debug("Fresh operational not present for: {}", nodeId.getValue());
+        LOG.debug("Fresh operational not present: {}", nodeId.getValue());
         return false;
     }
 
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorClusterDecorator.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorClusterDecorator.java
new file mode 100644 (file)
index 0000000..458614e
--- /dev/null
@@ -0,0 +1,52 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Decorator for cluster related issues.
+ */
+public class SyncReactorClusterDecorator implements SyncReactor {
+    private static final Logger LOG = LoggerFactory.getLogger(SyncReactorClusterDecorator.class);
+    private final SyncReactor delegate;
+    private final DeviceMastershipManager deviceMastershipManager;
+
+    public SyncReactorClusterDecorator(final SyncReactor delegate,
+                                       final DeviceMastershipManager deviceMastershipManager) {
+        this.delegate = delegate;
+        this.deviceMastershipManager = deviceMastershipManager;
+    }
+
+    @Override
+    public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+                                            final FlowCapableNode configTree,
+                                            final FlowCapableNode operationalTree,
+                                            final LogicalDatastoreType dsType) throws InterruptedException {
+        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
+        LOG.trace("syncup cluster {}", nodeId.getValue());
+
+        if (!deviceMastershipManager.isDeviceMastered(nodeId)) {
+            LOG.debug("Skip syncup since not master for: {}", nodeId.getValue());
+            return Futures.immediateFuture(Boolean.TRUE);
+        } else {
+            return delegate.syncup(flowcapableNodePath, configTree,operationalTree, dsType);
+        }
+    }
+}
index fdcaea2e1a88d43deca0d21b938909c865312b0c..69029db52d441da99adf96357880c2ac85eae0ad 100644 (file)
@@ -31,7 +31,7 @@ public class SyncReactorRetryDecorator implements SyncReactor {
     private final SyncReactor delegate;
     private final ReconciliationRegistry reconciliationRegistry;
 
-    public SyncReactorRetryDecorator(final SyncReactor delegate, ReconciliationRegistry reconciliationRegistry) {
+    public SyncReactorRetryDecorator(final SyncReactor delegate, final ReconciliationRegistry reconciliationRegistry) {
         this.delegate = delegate;
         this.reconciliationRegistry = reconciliationRegistry;
     }
@@ -44,7 +44,7 @@ public class SyncReactorRetryDecorator implements SyncReactor {
         LOG.trace("syncup retry {}", nodeId.getValue());
 
         if (dsType == LogicalDatastoreType.CONFIGURATION && reconciliationRegistry.isRegistered(nodeId)) {
-            LOG.trace("Config change ignored because device is in retry [{}]", nodeId);
+            LOG.debug("Config change ignored because {} is in reconcile.", nodeId.getValue());
             return Futures.immediateFuture(Boolean.FALSE);
         }
 
@@ -59,8 +59,6 @@ public class SyncReactorRetryDecorator implements SyncReactor {
                     return true;
                 } else {
                     reconciliationRegistry.register(nodeId);
-                    // TODO  elicit statistics gathering if not running actually
-                    // triggerStatisticsGathering(nodeId);
                     return false;
                 }
             }
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/clustering/DeviceMastership.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/clustering/DeviceMastership.java
new file mode 100644 (file)
index 0000000..3a23962
--- /dev/null
@@ -0,0 +1,71 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl.clustering;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ClusterSingletonService} clusterSingletonServiceRegistration per connected device.
+ */
+public class DeviceMastership implements ClusterSingletonService {
+    private static final Logger LOG = LoggerFactory.getLogger(DeviceMastership.class);
+    private final NodeId nodeId;
+    private final ServiceGroupIdentifier identifier;
+    private final ReconciliationRegistry reconciliationRegistry;
+    private ClusterSingletonServiceRegistration clusterSingletonServiceRegistration;
+    private boolean deviceMastered;
+
+    public DeviceMastership(final NodeId nodeId, final ReconciliationRegistry reconciliationRegistry) {
+        this.nodeId = nodeId;
+        this.identifier = ServiceGroupIdentifier.create(nodeId.getValue());
+        this.reconciliationRegistry = reconciliationRegistry;
+        this.deviceMastered = false;
+    }
+
+    @Override
+    public void instantiateServiceInstance() {
+        deviceMastered = true;
+        reconciliationRegistry.register(nodeId);
+        LOG.trace("FRS started for: {}", nodeId.getValue());
+    }
+
+    @Override
+    public ListenableFuture<Void> closeServiceInstance() {
+        deviceMastered = false;
+        reconciliationRegistry.unregisterIfRegistered(nodeId);
+        LOG.debug("FRS stopped for: {}", nodeId.getValue());
+        return Futures.immediateFuture(null);
+    }
+
+    @Override
+    public ServiceGroupIdentifier getIdentifier() {
+        return identifier;
+    }
+
+    public boolean isDeviceMastered() {
+        return deviceMastered;
+    }
+
+    public void setClusterSingletonServiceRegistration(final ClusterSingletonServiceRegistration registration) {
+        this.clusterSingletonServiceRegistration = registration;
+    }
+
+    public ClusterSingletonServiceRegistration getClusterSingletonServiceRegistration() {
+        return clusterSingletonServiceRegistration;
+    }
+
+}
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/clustering/DeviceMastershipManager.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/clustering/DeviceMastershipManager.java
new file mode 100644 (file)
index 0000000..0a9a8be
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl.clustering;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manager for clustering service registrations of {@link DeviceMastership}.
+ */
+public class DeviceMastershipManager {
+    private static final Logger LOG = LoggerFactory.getLogger(DeviceMastershipManager.class);
+    private final ClusterSingletonServiceProvider clusterSingletonService;
+    private final ConcurrentHashMap<NodeId, DeviceMastership> deviceMasterships = new ConcurrentHashMap();
+    private final ReconciliationRegistry reconciliationRegistry;
+
+    public DeviceMastershipManager(final ClusterSingletonServiceProvider clusterSingletonService,
+                                   final ReconciliationRegistry reconciliationRegistry) {
+        this.clusterSingletonService = clusterSingletonService;
+        this.reconciliationRegistry = reconciliationRegistry;
+    }
+
+    public void onDeviceConnected(final NodeId nodeId) {
+        final DeviceMastership mastership = new DeviceMastership(nodeId, reconciliationRegistry);
+        final ClusterSingletonServiceRegistration registration = clusterSingletonService.registerClusterSingletonService(mastership);
+        mastership.setClusterSingletonServiceRegistration(registration);
+        deviceMasterships.put(nodeId, mastership);
+        LOG.debug("FRS service registered for: {}", nodeId.getValue());
+    }
+
+
+    public void onDeviceDisconnected(final NodeId nodeId) {
+        final DeviceMastership mastership = deviceMasterships.remove(nodeId);
+        final ClusterSingletonServiceRegistration registration = mastership.getClusterSingletonServiceRegistration();
+        if (registration != null) {
+            try {
+                registration.close();
+            } catch (Exception e) {
+                LOG.error("FRS cluster service close fail: {}", nodeId.getValue());
+            }
+        }
+        LOG.debug("FRS service unregistered for: {}", nodeId.getValue());
+    }
+
+    public boolean isDeviceMastered(final NodeId nodeId) {
+        if (deviceMasterships.get(nodeId) == null) {
+            return false;
+        } else {
+            return deviceMasterships.get(nodeId).isDeviceMastered();
+        }
+    }
+
+    @VisibleForTesting
+    ConcurrentHashMap<NodeId, DeviceMastership> getDeviceMasterships() {
+        return deviceMasterships;
+    }
+}
index 5b18b41900c59125941b023541105abfbeaeb276..e9fac578706b12db3f1ace7c9051cc27c80ac253 100644 (file)
@@ -16,18 +16,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Holder of registration request for fresh operational.
+ * Holder of registration request for reconciliation (fresh operational).
  */
 public class ReconciliationRegistry {
-
     private static final Logger LOG = LoggerFactory.getLogger(ReconciliationRegistry.class);
     private final Map<NodeId, Date> registration = new ConcurrentHashMap<>();
-    public static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
 
     public Date register(NodeId nodeId) {
         Date timestamp = new Date();
         registration.put(nodeId, timestamp);
         LOG.debug("Registered for next consistent operational: {}", nodeId.getValue());
+        // TODO  elicit statistics gathering if not running actually
         return timestamp;
     }
 
@@ -43,7 +42,7 @@ public class ReconciliationRegistry {
         return registration.get(nodeId) != null;
     }
 
-    public Date getRegistration(NodeId nodeId) {
+    public Date getRegistrationTimestamp(NodeId nodeId) {
         return registration.get(nodeId);
     }
 
index c60e9600e736638782cea112a49c5651e5506602..e1f24449239d298a1f222537faa40fbfe5317d58 100644 (file)
            odl:use-default-for-reference-types="true">
 
     <reference id="broker" interface="org.opendaylight.controller.sal.binding.api.BindingAwareBroker"/>
-    <reference id="dataBroker" interface="org.opendaylight.controller.md.sal.binding.api.DataBroker"/>
+    <reference id="dataBroker" interface="org.opendaylight.controller.md.sal.binding.api.DataBroker" odl:type="pingpong"/>
     <reference id="rpcRegistry" interface="org.opendaylight.controller.sal.binding.api.RpcProviderRegistry"/>
+    <reference id="clusterSingletonService" interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"/>
 
     <bean id="frSync" class="org.opendaylight.openflowplugin.applications.frsync.impl.ForwardingRulesSyncProvider"
           destroy-method="close">
         <argument ref="broker"/>
         <argument ref="dataBroker"/>
         <argument ref="rpcRegistry"/>
+        <argument ref="clusterSingletonService"/>
     </bean>
 
 </blueprint>
\ No newline at end of file
index 76fb1fad803b57f91e90879625058f6bb523502f..b795b2f728b73f8059ec48a185f2a90e14c6397a 100644 (file)
@@ -20,7 +20,9 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.SalTableService;
@@ -40,7 +42,9 @@ public class ForwardingRulesSyncProviderTest {
     @Mock
     private BindingAwareBroker broker;
     @Mock
-    private BindingAwareBroker.ProviderContext providerContext;
+    private ProviderContext providerContext;
+    @Mock
+    private ClusterSingletonServiceProvider clusterSingletonService;
 
     @Before
     public void setUp() throws Exception {
@@ -53,7 +57,7 @@ public class ForwardingRulesSyncProviderTest {
                     }
                 });
 
-        provider = new ForwardingRulesSyncProvider(broker, dataBroker, rpcRegistry);
+        provider = new ForwardingRulesSyncProvider(broker, dataBroker, rpcRegistry, clusterSingletonService);
         Mockito.verify(rpcRegistry).getRpcService(SalTableService.class);
         Mockito.verify(rpcRegistry).getRpcService(SalFlatBatchService.class);
         Mockito.verify(broker).registerProvider(provider);
index 5c600464ddfa08862967492231df7c5e1d8ddbfc..c0ef37e34efd5721980655a8ec9d6aaaa9db7ebb 100644 (file)
@@ -92,6 +92,7 @@ public class SimplifiedConfigListenerTest {
     public void testOnDataTreeChangedSyncupAdd() throws InterruptedException {
         Mockito.when(roTx.read(LogicalDatastoreType.OPERATIONAL, fcNodePath))
                 .thenReturn(Futures.immediateCheckedFuture(Optional.of(dataBefore)));
+        Mockito.when(configModification.getDataBefore()).thenReturn(null);
         Mockito.when(configModification.getDataAfter()).thenReturn(dataAfter);
 
         nodeListenerConfig.onDataTreeChanged(Collections.singleton(dataTreeModification));
@@ -120,6 +121,7 @@ public class SimplifiedConfigListenerTest {
         Mockito.when(roTx.read(LogicalDatastoreType.OPERATIONAL, fcNodePath))
                 .thenReturn(Futures.immediateCheckedFuture(Optional.of(dataBefore)));
         Mockito.when(configModification.getDataBefore()).thenReturn(dataBefore);
+        Mockito.when(configModification.getDataAfter()).thenReturn(null);
 
         nodeListenerConfig.onDataTreeChanged(Collections.singleton(dataTreeModification));
 
@@ -138,4 +140,5 @@ public class SimplifiedConfigListenerTest {
         Mockito.verifyZeroInteractions(reactor);
         Mockito.verify(roTx).close();
     }
+
 }
\ No newline at end of file
index a3245c83419bf5112f7550cb034ae14d810d2d08..a47b803fa8accba498379140d4cbb96197580f00 100644 (file)
@@ -1,5 +1,5 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -34,6 +34,7 @@ import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeCa
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
+import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
 import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
@@ -56,9 +57,7 @@ public class SimplifiedOperationalListenerTest {
     private InstanceIdentifier<FlowCapableNode> fcNodePath;
     private SimplifiedOperationalListener nodeListenerOperational;
     private final LogicalDatastoreType dsType = LogicalDatastoreType.OPERATIONAL;
-    private final String timestampBefore = "0000-12-12T01:01:01.000-07:00";
-    private final String timestampAfter = "9999-12-12T01:01:01.000-07:00";
-    private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(ReconciliationRegistry.DATE_AND_TIME_FORMAT);
+    private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(SimplifiedOperationalListener.DATE_AND_TIME_FORMAT);
 
     @Mock
     private SyncReactor reactor;
@@ -75,11 +74,13 @@ public class SimplifiedOperationalListenerTest {
     @Mock
     private FlowCapableNode fcOperationalNode;
     @Mock
-    private ReconciliationRegistry reconciliationRegistry;
-    @Mock
     private FlowCapableStatisticsGatheringStatus statisticsGatheringStatus;
     @Mock
     private SnapshotGatheringStatusEnd snapshotGatheringStatusEnd;
+    @Mock
+    private ReconciliationRegistry reconciliationRegistry;
+    @Mock
+    private DeviceMastershipManager deviceMastershipManager;
 
     @Before
     public void setUp() throws Exception {
@@ -89,7 +90,7 @@ public class SimplifiedOperationalListenerTest {
         final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
                 new FlowCapableNodeOdlDao(db, LogicalDatastoreType.CONFIGURATION));
 
-        nodeListenerOperational = new SimplifiedOperationalListener(reactor, operationalSnapshot, configDao, reconciliationRegistry);
+        nodeListenerOperational = new SimplifiedOperationalListener(reactor, operationalSnapshot, configDao, reconciliationRegistry, deviceMastershipManager);
         InstanceIdentifier<Node> nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
         fcNodePath = nodePath.augmentation(FlowCapableNode.class);
 
@@ -109,40 +110,50 @@ public class SimplifiedOperationalListenerTest {
     }
 
     @Test
-    public void testOnDataTreeChangedSyncupAdd() throws InterruptedException {
+    public void testOnDataTreeChangeAddSyncup() throws Exception {
+        Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
+        operationalAdd();
+        prepareFreshOperational(true);
+
         Mockito.when(roTx.read(LogicalDatastoreType.CONFIGURATION, fcNodePath))
                 .thenReturn(Futures.immediateCheckedFuture(Optional.of(configNode)));
         Mockito.when(reactor.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
                 Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
-        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
 
         nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
 
+        Mockito.verify(deviceMastershipManager).onDeviceConnected(NODE_ID);
         Mockito.verify(reactor).syncup(fcNodePath, configNode, fcOperationalNode, dsType);
-        Mockito.verifyNoMoreInteractions(reactor);
         Mockito.verify(roTx).close();
     }
 
     @Test
-    public void testOnDataTreeChangedAddSkip() {
+    public void testOnDataTreeChangedAddSkip() throws Exception {
         // Related to bug 5920 -> https://bugs.opendaylight.org/show_bug.cgi?id=5920
+        Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
+        operationalAdd();
+        prepareFreshOperational(true);
+
         Mockito.when(roTx.read(LogicalDatastoreType.CONFIGURATION, fcNodePath))
                 .thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
-        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
 
         nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
 
+        Mockito.verify(deviceMastershipManager).onDeviceConnected(NODE_ID);
         Mockito.verifyZeroInteractions(reactor);
         Mockito.verify(roTx).close();
     }
 
     @Test
-    public void testOnDataTreeChangedSyncupDeletePhysical() {
+    public void testOnDataTreeChangedSyncupDeletePhysical() throws Exception {
         Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+        Mockito.when(operationalModification.getDataAfter()).thenReturn(null);
         Mockito.when(dataTreeModification.getRootNode().getModificationType()).thenReturn(ModificationType.DELETE);
+        Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(false);
 
         nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
 
+        Mockito.verify(deviceMastershipManager).onDeviceDisconnected(NODE_ID);
         Mockito.verifyZeroInteractions(reactor);
     }
 
@@ -151,16 +162,17 @@ public class SimplifiedOperationalListenerTest {
         Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
         List<NodeConnector> nodeConnectorList = Mockito.mock(List.class);
         Mockito.when(operationalNode.getNodeConnector()).thenReturn(nodeConnectorList);
+        Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(false);
 
         nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
 
+        Mockito.verify(deviceMastershipManager).onDeviceDisconnected(NODE_ID);
         Mockito.verifyZeroInteractions(reactor);
     }
 
     @Test
     public void testOnDataTreeChangedReconcileNotRegistered() {
-        Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
-        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+        operationalUpdate();
         Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(false);
 
         nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
@@ -170,9 +182,8 @@ public class SimplifiedOperationalListenerTest {
 
     @Test
     public void testOnDataTreeChangedReconcileButStaticsGatheringNotStarted() {
-        Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
-        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
         Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
+        operationalUpdate();
         Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(null);
 
         nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
@@ -182,9 +193,8 @@ public class SimplifiedOperationalListenerTest {
 
     @Test
     public void testOnDataTreeChangedReconcileButStaticsGatheringNotFinished() {
-        Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
-        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
         Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
+        operationalUpdate();
         Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
         Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(null);
 
@@ -195,9 +205,8 @@ public class SimplifiedOperationalListenerTest {
 
     @Test
     public void testOnDataTreeChangedReconcileButStaticsGatheringNotSuccessful() {
-        Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
-        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
         Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
+        operationalUpdate();
         Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
         Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
         Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(false);
@@ -209,16 +218,9 @@ public class SimplifiedOperationalListenerTest {
 
     @Test
     public void testOnDataTreeChangedReconcileAndFreshOperationalNotPresent() throws ParseException {
-        final DateAndTime timestamp = Mockito.mock(DateAndTime.class);
-        Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
-        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
         Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
-        Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
-        Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
-        Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(true);
-        Mockito.when(snapshotGatheringStatusEnd.getEnd()).thenReturn(timestamp);
-        Mockito.when(snapshotGatheringStatusEnd.getEnd().getValue()).thenReturn(timestampBefore);
-        Mockito.when(reconciliationRegistry.getRegistration(NODE_ID)).thenReturn(simpleDateFormat.parse(timestampAfter));
+        operationalUpdate();
+        prepareFreshOperational(false);
 
         nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
 
@@ -227,36 +229,44 @@ public class SimplifiedOperationalListenerTest {
 
     @Test
     public void testOnDataTreeChangedReconcileAndFreshOperationalPresent() throws Exception {
-        final DateAndTime timestamp = Mockito.mock(DateAndTime.class);
+        Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
+        operationalUpdate();
+        prepareFreshOperational(true);
+
         Mockito.when(roTx.read(LogicalDatastoreType.CONFIGURATION, fcNodePath))
                 .thenReturn(Futures.immediateCheckedFuture(Optional.of(configNode)));
         Mockito.when(reactor.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
                 Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
-        Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
-        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
-        Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
-        Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
-        Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
-        Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(true);
-        Mockito.when(snapshotGatheringStatusEnd.getEnd()).thenReturn(timestamp);
-        Mockito.when(snapshotGatheringStatusEnd.getEnd().getValue()).thenReturn(timestampAfter);
-        Mockito.when(reconciliationRegistry.getRegistration(NODE_ID)).thenReturn(simpleDateFormat.parse(timestampBefore));
 
         nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
 
         Mockito.verify(reactor).syncup(fcNodePath, configNode, fcOperationalNode, dsType);
-        Mockito.verifyNoMoreInteractions(reactor);
         Mockito.verify(roTx).close();
     }
 
-    @Test
-    public void testOnDataTreeChangedReconcileAndNodeDeleted() {
-        Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
-        Mockito.when(dataTreeModification.getRootNode().getModificationType()).thenReturn(DataObjectModification.ModificationType.DELETE);
+    private void prepareFreshOperational(final boolean afterRegistration) throws ParseException {
+        Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
+        Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
+        Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(true);
+        Mockito.when(snapshotGatheringStatusEnd.getEnd()).thenReturn(Mockito.mock(DateAndTime.class));
+        final String timestampBefore = "0000-12-12T01:01:01.000-07:00";
+        final String timestampAfter = "9999-12-12T01:01:01.000-07:00";
+        if (afterRegistration) {
+            Mockito.when(snapshotGatheringStatusEnd.getEnd().getValue()).thenReturn(timestampAfter);
+            Mockito.when(reconciliationRegistry.getRegistrationTimestamp(NODE_ID)).thenReturn(simpleDateFormat.parse(timestampBefore));
+        } else {
+            Mockito.when(snapshotGatheringStatusEnd.getEnd().getValue()).thenReturn(timestampBefore);
+            Mockito.when(reconciliationRegistry.getRegistrationTimestamp(NODE_ID)).thenReturn(simpleDateFormat.parse(timestampAfter));
+        }
+    }
 
-        nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+    private void operationalAdd() {
+        Mockito.when(operationalModification.getDataBefore()).thenReturn(null);
+        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+    }
 
-        Mockito.verify(reconciliationRegistry).unregisterIfRegistered(NODE_ID);
-        Mockito.verifyZeroInteractions(reactor);
+    private void operationalUpdate() {
+        Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+        Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
     }
 }
diff --git a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorClusterDecoratorTest.java b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorClusterDecoratorTest.java
new file mode 100644 (file)
index 0000000..3754d13
--- /dev/null
@@ -0,0 +1,72 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+/**
+ * Test for {@link SyncReactorClusterDecorator}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class SyncReactorClusterDecoratorTest {
+
+    private static final NodeId NODE_ID = new NodeId("test-node");
+    private SyncReactorClusterDecorator reactor;
+    private InstanceIdentifier<FlowCapableNode> fcNodePath;
+
+    @Mock
+    private SyncReactorFutureZipDecorator delegate;
+    @Mock
+    private DeviceMastershipManager deviceMastershipManager;
+    @Mock
+    private FlowCapableNode fcConfigNode;
+    @Mock
+    private FlowCapableNode fcOperationalNode;
+
+    @Before
+    public void setUp() {
+        reactor = new SyncReactorClusterDecorator(delegate, deviceMastershipManager);
+
+        InstanceIdentifier<Node> nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
+        fcNodePath = nodePath.augmentation(FlowCapableNode.class);
+    }
+
+    @Test
+    public void testSyncupMaster() throws InterruptedException {
+        Mockito.when(deviceMastershipManager.isDeviceMastered(NODE_ID)).thenReturn(true);
+
+        reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode, LogicalDatastoreType.CONFIGURATION);
+
+        Mockito.verify(delegate).syncup(fcNodePath, fcConfigNode, fcOperationalNode, LogicalDatastoreType.CONFIGURATION);
+        Mockito.verifyNoMoreInteractions(delegate);
+    }
+
+    @Test
+    public void testSyncupSlave() throws InterruptedException {
+        Mockito.when(deviceMastershipManager.isDeviceMastered(NODE_ID)).thenReturn(false);
+
+        reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode, LogicalDatastoreType.CONFIGURATION);
+
+        Mockito.verifyZeroInteractions(delegate);
+    }
+
+}
\ No newline at end of file
diff --git a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/clustering/DeviceMastershipManagerTest.java b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/clustering/DeviceMastershipManagerTest.java
new file mode 100644 (file)
index 0000000..d73df5b
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl.clustering;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+
+/**
+ * Test for {@link DeviceMastershipManager}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class DeviceMastershipManagerTest {
+    private static final NodeId NODE_ID = new NodeId("testNode");
+    private DeviceMastershipManager deviceMastershipManager;
+    @Mock
+    private ClusterSingletonServiceRegistration registration;
+    @Mock
+    private ClusterSingletonServiceProvider clusterSingletonService;
+
+    @Before
+    public void setUp() throws Exception {
+        deviceMastershipManager = new DeviceMastershipManager(clusterSingletonService, new ReconciliationRegistry());
+        Mockito.when(clusterSingletonService.registerClusterSingletonService(Matchers.<ClusterSingletonService>any()))
+                .thenReturn(registration);
+    }
+
+    @Test
+    public void testOnDeviceConnectedAndDisconnected() {
+        // no context
+        Assert.assertNull(deviceMastershipManager.getDeviceMasterships().get(NODE_ID));
+        // create context - register
+        deviceMastershipManager.onDeviceConnected(NODE_ID);
+        DeviceMastership registration = deviceMastershipManager.getDeviceMasterships().get(NODE_ID);
+        Assert.assertNotNull(registration);
+        Mockito.verify(clusterSingletonService).registerClusterSingletonService(registration);
+        // destroy context - unregister
+        deviceMastershipManager.onDeviceDisconnected(NODE_ID);
+        Assert.assertNull(deviceMastershipManager.getDeviceMasterships().get(NODE_ID));
+    }
+
+    @Test
+    public void testIsDeviceMasteredOrSlaved() {
+        // no context
+        Assert.assertFalse(deviceMastershipManager.isDeviceMastered(NODE_ID));
+        deviceMastershipManager.onDeviceConnected(NODE_ID);
+        // is master
+        deviceMastershipManager.getDeviceMasterships().get(NODE_ID).instantiateServiceInstance();
+        Assert.assertTrue(deviceMastershipManager.isDeviceMastered(NODE_ID));
+        // is not master
+        deviceMastershipManager.getDeviceMasterships().get(NODE_ID).closeServiceInstance();
+        Assert.assertFalse(deviceMastershipManager.isDeviceMastered(NODE_ID));
+    }
+
+}
diff --git a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/clustering/DeviceMastershipTest.java b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/clustering/DeviceMastershipTest.java
new file mode 100644 (file)
index 0000000..be45a9d
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl.clustering;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+
+/**
+ * Test for {@link DeviceMastership}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class DeviceMastershipTest {
+    private static final NodeId NODE_ID = new NodeId("testNode");
+    private DeviceMastership deviceMastership;
+
+    @Mock
+    private DeviceMastershipManager deviceMastershipManager;
+    @Mock
+    private ReconciliationRegistry reconciliationRegistry;
+
+    @Before
+    public void setUp() throws Exception {
+        deviceMastership = new DeviceMastership(NODE_ID, reconciliationRegistry);
+    }
+
+    @Test
+    public void instantiateServiceInstance() {
+        deviceMastership.instantiateServiceInstance();
+        Mockito.verify(reconciliationRegistry).register(NODE_ID);
+        Assert.assertTrue(deviceMastership.isDeviceMastered());
+    }
+
+    @Test
+    public void closeServiceInstance() {
+        deviceMastership.closeServiceInstance();
+        Mockito.verify(reconciliationRegistry).unregisterIfRegistered(NODE_ID);
+        Assert.assertFalse(deviceMastership.isDeviceMastered());
+    }
+}
\ No newline at end of file