<artifactId>model-flow-service</artifactId>
</dependency>
- <dependency>
- <groupId>org.opendaylight.openflowplugin</groupId>
- <artifactId>openflowplugin-common</artifactId>
- </dependency>
-
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-common</artifactId>
<artifactId>sal-common-util</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-singleton-common-api</artifactId>
+ </dependency>
+
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
package org.opendaylight.openflowplugin.applications.frsync;
import java.util.EventListener;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
import org.opendaylight.yangtools.yang.binding.DataObject;
/**
* Unifying listener for data and event changes on node.
*/
-public interface NodeListener<T extends DataObject> extends EventListener, DataTreeChangeListener<T> {
+public interface NodeListener<T extends DataObject> extends EventListener, ClusteredDataTreeChangeListener<T> {
}
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.openflowplugin.applications.frsync.NodeListener;
import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
+import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SyncPlanPushStrategyFlatBatchImpl;
import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
private static final Logger LOG = LoggerFactory.getLogger(ForwardingRulesSyncProvider.class);
private final DataBroker dataService;
+ private final ClusterSingletonServiceProvider clusterSingletonService;
private final SalTableService salTableService;
private final SalFlatBatchService flatBatchService;
public ForwardingRulesSyncProvider(final BindingAwareBroker broker,
final DataBroker dataBroker,
- final RpcConsumerRegistry rpcRegistry) {
- Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
+ final RpcConsumerRegistry rpcRegistry,
+ final ClusterSingletonServiceProvider clusterSingletonService) {
+ Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null!");
this.dataService = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
+ this.clusterSingletonService = Preconditions.checkNotNull(clusterSingletonService,
+ "ClusterSingletonServiceProvider can not be null!");
this.salTableService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalTableService.class),
"RPC SalTableService not found.");
this.flatBatchService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalFlatBatchService.class),
.setTableForwarder(tableForwarder);
final ReconciliationRegistry reconciliationRegistry = new ReconciliationRegistry();
+ final DeviceMastershipManager deviceMastershipManager =
+ new DeviceMastershipManager(clusterSingletonService, reconciliationRegistry);
final SyncReactor syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, reconciliationRegistry);
final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry,
new SemaphoreKeeperGuavaImpl<>(1, true));
+ final SyncReactor syncReactorFutureZip = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
- final SyncReactor reactor = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
+ final SyncReactor reactor = new SyncReactorClusterDecorator(syncReactorFutureZip, deviceMastershipManager);
final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
final NodeListener<FlowCapableNode> nodeListenerConfig =
new SimplifiedConfigListener(reactor, configSnapshot, operationalDao);
final NodeListener<Node> nodeListenerOperational =
- new SimplifiedOperationalListener(reactor, operationalSnapshot, configDao, reconciliationRegistry);
+ new SimplifiedOperationalListener(reactor, operationalSnapshot, configDao, reconciliationRegistry, deviceMastershipManager);
dataTreeConfigChangeListener =
dataService.registerDataTreeChangeListener(nodeConfigDataTreePath, nodeListenerConfig);
@Override
public void onDataTreeChanged(Collection<DataTreeModification<FlowCapableNode>> modifications) {
- LOG.trace("Inventory Config changes {}", modifications.size());
+ LOG.trace("Config changes: {}", modifications.size());
super.onDataTreeChanged(modifications);
}
configSnapshot.updateCache(nodeId, Optional.fromNullable(modification.getRootNode().getDataAfter()));
-
final Optional<FlowCapableNode> operationalNode = operationalDao.loadByNodeId(nodeId);
if (!operationalNode.isPresent()) {
LOG.info("Skip syncup, {} operational is not present", nodeId.getValue());
* optimal case (but the switch could be reprogrammed by another person/system.</li>
* </ul>
*/
- private ListenableFuture<Boolean> onNodeAdded(InstanceIdentifier<FlowCapableNode> nodePath,
- FlowCapableNode dataAfter, FlowCapableNode operationalNode) throws InterruptedException {
+ private ListenableFuture<Boolean> onNodeAdded(final InstanceIdentifier<FlowCapableNode> nodePath,
+ final FlowCapableNode dataAfter,
+ final FlowCapableNode operationalNode) throws InterruptedException {
NodeId nodeId = PathUtil.digNodeId(nodePath);
LOG.trace("onNodeAdded {}", nodeId);
return reactor.syncup(nodePath, dataAfter, operationalNode, dsType());
* system which is updating operational store (that components is also trying to solve
* scale/performance issues on several layers).
*/
- private ListenableFuture<Boolean> onNodeUpdated(InstanceIdentifier<FlowCapableNode> nodePath,
- FlowCapableNode dataBefore, FlowCapableNode dataAfter) throws InterruptedException {
+ private ListenableFuture<Boolean> onNodeUpdated(final InstanceIdentifier<FlowCapableNode> nodePath,
+ final FlowCapableNode dataBefore,
+ final FlowCapableNode dataAfter) throws InterruptedException {
NodeId nodeId = PathUtil.digNodeId(nodePath);
LOG.trace("onNodeUpdated {}", nodeId);
return reactor.syncup(nodePath, dataAfter, dataBefore, dsType());
* probably optimized using dedicated wipe-out RPC, but it has impact on switch if it is
* programmed by two person/system
*/
- private ListenableFuture<Boolean> onNodeDeleted(InstanceIdentifier<FlowCapableNode> nodePath,
- FlowCapableNode dataBefore) throws InterruptedException {
+ private ListenableFuture<Boolean> onNodeDeleted(final InstanceIdentifier<FlowCapableNode> nodePath,
+ final FlowCapableNode dataBefore) throws InterruptedException {
NodeId nodeId = PathUtil.digNodeId(nodePath);
LOG.trace("onNodeDeleted {}", nodeId);
return reactor.syncup(nodePath, null, dataBefore, dsType());
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
+import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
import org.opendaylight.openflowplugin.applications.frsync.util.ModificationUtil;
import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
*/
public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node> {
private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalListener.class);
+ public static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
private final SyncReactor reactor;
private final FlowCapableNodeSnapshotDao operationalSnapshot;
private final FlowCapableNodeDao configDao;
private final ReconciliationRegistry reconciliationRegistry;
+ private final DeviceMastershipManager deviceMastershipManager;
public SimplifiedOperationalListener(final SyncReactor reactor,
final FlowCapableNodeSnapshotDao operationalSnapshot,
final FlowCapableNodeDao configDao,
- final ReconciliationRegistry reconciliationRegistry) {
+ final ReconciliationRegistry reconciliationRegistry,
+ final DeviceMastershipManager deviceMastershipManager) {
this.reactor = reactor;
this.operationalSnapshot = operationalSnapshot;
this.configDao = configDao;
this.reconciliationRegistry = reconciliationRegistry;
+ this.deviceMastershipManager = deviceMastershipManager;
}
@Override
public void onDataTreeChanged(Collection<DataTreeModification<Node>> modifications) {
- // TODO return for clustered listener if not master for device
- LOG.trace("Inventory Operational changes {}", modifications.size());
+ LOG.trace("Operational changes: {}", modifications.size());
super.onDataTreeChanged(modifications);
}
*/
protected Optional<ListenableFuture<Boolean>> processNodeModification(
DataTreeModification<Node> modification) throws InterruptedException {
-
+ final NodeId nodeId = ModificationUtil.nodeId(modification);
updateCache(modification);
- // TODO register cluster service if node added
- if (isReconciliationNeeded(modification)) {
+
+ if (isAdd(modification) || isAddLogical(modification)) {
+ deviceMastershipManager.onDeviceConnected(nodeId);
+ }
+
+ if (isRegisteredAndConsistentForReconcile(modification)) {
return reconciliation(modification);
+ } else {
+ return skipModification(modification);
}
- return skipModification(modification);
}
/**
NodeId nodeId = ModificationUtil.nodeId(modification);
if (isDelete(modification) || isDeleteLogical(modification)) {
operationalSnapshot.updateCache(nodeId, Optional.absent());
- // TODO unregister/close cluster service if node deleted
- reconciliationRegistry.unregisterIfRegistered(nodeId);
+ deviceMastershipManager.onDeviceDisconnected(nodeId);
return;
}
operationalSnapshot.updateCache(nodeId, Optional.fromNullable(ModificationUtil.flowCapableNodeAfter(modification)));
}
private Optional<ListenableFuture<Boolean>> skipModification(DataTreeModification<Node> modification) {
- LOG.trace("Skipping Inventory Operational modification {}, before {}, after {}",
+ LOG.trace("Skipping operational modification: {}, before {}, after {}",
ModificationUtil.nodeIdValue(modification),
modification.getRootNode().getDataBefore() == null ? "null" : "nonnull",
modification.getRootNode().getDataAfter() == null ? "null" : "nonnull");
return false;
}
- private boolean isReconciliationNeeded(DataTreeModification<Node> modification) {
- return isAdd(modification) || isAddLogical(modification) || isRegisteredAndConsistentForReconcile(modification);
- }
-
private Optional<ListenableFuture<Boolean>> reconciliation(DataTreeModification<Node> modification) throws InterruptedException {
final NodeId nodeId = ModificationUtil.nodeId(modification);
final Optional<FlowCapableNode> nodeConfiguration = configDao.loadByNodeId(nodeId);
final FlowCapableNode fcNode = ModificationUtil.flowCapableNodeAfter(modification);
return Optional.of(reactor.syncup(nodePath, nodeConfiguration.get(), fcNode, dsType()));
} else {
+ LOG.debug("Config not present for reconciliation: {}", nodeId.getValue());
return skipModification(modification);
}
}
.getAugmentation(FlowCapableStatisticsGatheringStatus.class);
if (gatheringStatus == null) {
- LOG.trace("Statistics gathering never started for: {}", nodeId.getValue());
+ LOG.trace("Statistics gathering never started: {}", nodeId.getValue());
return false;
}
final SnapshotGatheringStatusEnd gatheringStatusEnd = gatheringStatus.getSnapshotGatheringStatusEnd();
if (gatheringStatusEnd == null) {
- LOG.trace("Statistics gathering is not over yet for: {}", nodeId.getValue());
+ LOG.trace("Statistics gathering is not over yet: {}", nodeId.getValue());
return false;
}
if (!gatheringStatusEnd.isSucceeded()) {
- LOG.debug("Statistics gathering was not successful for: {}", nodeId.getValue());
+ LOG.trace("Statistics gathering was not successful: {}", nodeId.getValue());
return false;
}
try {
- Date timestampOfRegistration = reconciliationRegistry.getRegistration(nodeId);
- final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(ReconciliationRegistry.DATE_AND_TIME_FORMAT);
+ Date timestampOfRegistration = reconciliationRegistry.getRegistrationTimestamp(nodeId);
+ final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
Date timestampOfStatistics = simpleDateFormat.parse(gatheringStatusEnd.getEnd().getValue());
if (timestampOfStatistics.after(timestampOfRegistration)) {
- LOG.debug("Fresh operational present for: {} -> going retry!", nodeId.getValue());
+ LOG.debug("Fresh operational present: {}", nodeId.getValue());
return true;
}
} catch (ParseException e) {
LOG.error("Timestamp parsing error {}", e);
}
- LOG.debug("Fresh operational not present for: {}", nodeId.getValue());
+ LOG.debug("Fresh operational not present: {}", nodeId.getValue());
return false;
}
--- /dev/null
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Decorator for cluster related issues.
+ */
+public class SyncReactorClusterDecorator implements SyncReactor {
+ private static final Logger LOG = LoggerFactory.getLogger(SyncReactorClusterDecorator.class);
+ private final SyncReactor delegate;
+ private final DeviceMastershipManager deviceMastershipManager;
+
+ public SyncReactorClusterDecorator(final SyncReactor delegate,
+ final DeviceMastershipManager deviceMastershipManager) {
+ this.delegate = delegate;
+ this.deviceMastershipManager = deviceMastershipManager;
+ }
+
+ @Override
+ public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+ final FlowCapableNode configTree,
+ final FlowCapableNode operationalTree,
+ final LogicalDatastoreType dsType) throws InterruptedException {
+ final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
+ LOG.trace("syncup cluster {}", nodeId.getValue());
+
+ if (!deviceMastershipManager.isDeviceMastered(nodeId)) {
+ LOG.debug("Skip syncup since not master for: {}", nodeId.getValue());
+ return Futures.immediateFuture(Boolean.TRUE);
+ } else {
+ return delegate.syncup(flowcapableNodePath, configTree,operationalTree, dsType);
+ }
+ }
+}
private final SyncReactor delegate;
private final ReconciliationRegistry reconciliationRegistry;
- public SyncReactorRetryDecorator(final SyncReactor delegate, ReconciliationRegistry reconciliationRegistry) {
+ public SyncReactorRetryDecorator(final SyncReactor delegate, final ReconciliationRegistry reconciliationRegistry) {
this.delegate = delegate;
this.reconciliationRegistry = reconciliationRegistry;
}
LOG.trace("syncup retry {}", nodeId.getValue());
if (dsType == LogicalDatastoreType.CONFIGURATION && reconciliationRegistry.isRegistered(nodeId)) {
- LOG.trace("Config change ignored because device is in retry [{}]", nodeId);
+ LOG.debug("Config change ignored because {} is in reconcile.", nodeId.getValue());
return Futures.immediateFuture(Boolean.FALSE);
}
return true;
} else {
reconciliationRegistry.register(nodeId);
- // TODO elicit statistics gathering if not running actually
- // triggerStatisticsGathering(nodeId);
return false;
}
}
--- /dev/null
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl.clustering;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ClusterSingletonService} clusterSingletonServiceRegistration per connected device.
+ */
+public class DeviceMastership implements ClusterSingletonService {
+ private static final Logger LOG = LoggerFactory.getLogger(DeviceMastership.class);
+ private final NodeId nodeId;
+ private final ServiceGroupIdentifier identifier;
+ private final ReconciliationRegistry reconciliationRegistry;
+ private ClusterSingletonServiceRegistration clusterSingletonServiceRegistration;
+ private boolean deviceMastered;
+
+ public DeviceMastership(final NodeId nodeId, final ReconciliationRegistry reconciliationRegistry) {
+ this.nodeId = nodeId;
+ this.identifier = ServiceGroupIdentifier.create(nodeId.getValue());
+ this.reconciliationRegistry = reconciliationRegistry;
+ this.deviceMastered = false;
+ }
+
+ @Override
+ public void instantiateServiceInstance() {
+ deviceMastered = true;
+ reconciliationRegistry.register(nodeId);
+ LOG.trace("FRS started for: {}", nodeId.getValue());
+ }
+
+ @Override
+ public ListenableFuture<Void> closeServiceInstance() {
+ deviceMastered = false;
+ reconciliationRegistry.unregisterIfRegistered(nodeId);
+ LOG.debug("FRS stopped for: {}", nodeId.getValue());
+ return Futures.immediateFuture(null);
+ }
+
+ @Override
+ public ServiceGroupIdentifier getIdentifier() {
+ return identifier;
+ }
+
+ public boolean isDeviceMastered() {
+ return deviceMastered;
+ }
+
+ public void setClusterSingletonServiceRegistration(final ClusterSingletonServiceRegistration registration) {
+ this.clusterSingletonServiceRegistration = registration;
+ }
+
+ public ClusterSingletonServiceRegistration getClusterSingletonServiceRegistration() {
+ return clusterSingletonServiceRegistration;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl.clustering;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manager for clustering service registrations of {@link DeviceMastership}.
+ */
+public class DeviceMastershipManager {
+ private static final Logger LOG = LoggerFactory.getLogger(DeviceMastershipManager.class);
+ private final ClusterSingletonServiceProvider clusterSingletonService;
+ private final ConcurrentHashMap<NodeId, DeviceMastership> deviceMasterships = new ConcurrentHashMap();
+ private final ReconciliationRegistry reconciliationRegistry;
+
+ public DeviceMastershipManager(final ClusterSingletonServiceProvider clusterSingletonService,
+ final ReconciliationRegistry reconciliationRegistry) {
+ this.clusterSingletonService = clusterSingletonService;
+ this.reconciliationRegistry = reconciliationRegistry;
+ }
+
+ public void onDeviceConnected(final NodeId nodeId) {
+ final DeviceMastership mastership = new DeviceMastership(nodeId, reconciliationRegistry);
+ final ClusterSingletonServiceRegistration registration = clusterSingletonService.registerClusterSingletonService(mastership);
+ mastership.setClusterSingletonServiceRegistration(registration);
+ deviceMasterships.put(nodeId, mastership);
+ LOG.debug("FRS service registered for: {}", nodeId.getValue());
+ }
+
+
+ public void onDeviceDisconnected(final NodeId nodeId) {
+ final DeviceMastership mastership = deviceMasterships.remove(nodeId);
+ final ClusterSingletonServiceRegistration registration = mastership.getClusterSingletonServiceRegistration();
+ if (registration != null) {
+ try {
+ registration.close();
+ } catch (Exception e) {
+ LOG.error("FRS cluster service close fail: {}", nodeId.getValue());
+ }
+ }
+ LOG.debug("FRS service unregistered for: {}", nodeId.getValue());
+ }
+
+ public boolean isDeviceMastered(final NodeId nodeId) {
+ if (deviceMasterships.get(nodeId) == null) {
+ return false;
+ } else {
+ return deviceMasterships.get(nodeId).isDeviceMastered();
+ }
+ }
+
+ @VisibleForTesting
+ ConcurrentHashMap<NodeId, DeviceMastership> getDeviceMasterships() {
+ return deviceMasterships;
+ }
+}
import org.slf4j.LoggerFactory;
/**
- * Holder of registration request for fresh operational.
+ * Holder of registration request for reconciliation (fresh operational).
*/
public class ReconciliationRegistry {
-
private static final Logger LOG = LoggerFactory.getLogger(ReconciliationRegistry.class);
private final Map<NodeId, Date> registration = new ConcurrentHashMap<>();
- public static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
public Date register(NodeId nodeId) {
Date timestamp = new Date();
registration.put(nodeId, timestamp);
LOG.debug("Registered for next consistent operational: {}", nodeId.getValue());
+ // TODO elicit statistics gathering if not running actually
return timestamp;
}
return registration.get(nodeId) != null;
}
- public Date getRegistration(NodeId nodeId) {
+ public Date getRegistrationTimestamp(NodeId nodeId) {
return registration.get(nodeId);
}
odl:use-default-for-reference-types="true">
<reference id="broker" interface="org.opendaylight.controller.sal.binding.api.BindingAwareBroker"/>
- <reference id="dataBroker" interface="org.opendaylight.controller.md.sal.binding.api.DataBroker"/>
+ <reference id="dataBroker" interface="org.opendaylight.controller.md.sal.binding.api.DataBroker" odl:type="pingpong"/>
<reference id="rpcRegistry" interface="org.opendaylight.controller.sal.binding.api.RpcProviderRegistry"/>
+ <reference id="clusterSingletonService" interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"/>
<bean id="frSync" class="org.opendaylight.openflowplugin.applications.frsync.impl.ForwardingRulesSyncProvider"
destroy-method="close">
<argument ref="broker"/>
<argument ref="dataBroker"/>
<argument ref="rpcRegistry"/>
+ <argument ref="clusterSingletonService"/>
</bean>
</blueprint>
\ No newline at end of file
import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.SalTableService;
@Mock
private BindingAwareBroker broker;
@Mock
- private BindingAwareBroker.ProviderContext providerContext;
+ private ProviderContext providerContext;
+ @Mock
+ private ClusterSingletonServiceProvider clusterSingletonService;
@Before
public void setUp() throws Exception {
}
});
- provider = new ForwardingRulesSyncProvider(broker, dataBroker, rpcRegistry);
+ provider = new ForwardingRulesSyncProvider(broker, dataBroker, rpcRegistry, clusterSingletonService);
Mockito.verify(rpcRegistry).getRpcService(SalTableService.class);
Mockito.verify(rpcRegistry).getRpcService(SalFlatBatchService.class);
Mockito.verify(broker).registerProvider(provider);
public void testOnDataTreeChangedSyncupAdd() throws InterruptedException {
Mockito.when(roTx.read(LogicalDatastoreType.OPERATIONAL, fcNodePath))
.thenReturn(Futures.immediateCheckedFuture(Optional.of(dataBefore)));
+ Mockito.when(configModification.getDataBefore()).thenReturn(null);
Mockito.when(configModification.getDataAfter()).thenReturn(dataAfter);
nodeListenerConfig.onDataTreeChanged(Collections.singleton(dataTreeModification));
Mockito.when(roTx.read(LogicalDatastoreType.OPERATIONAL, fcNodePath))
.thenReturn(Futures.immediateCheckedFuture(Optional.of(dataBefore)));
Mockito.when(configModification.getDataBefore()).thenReturn(dataBefore);
+ Mockito.when(configModification.getDataAfter()).thenReturn(null);
nodeListenerConfig.onDataTreeChanged(Collections.singleton(dataTreeModification));
Mockito.verifyZeroInteractions(reactor);
Mockito.verify(roTx).close();
}
+
}
\ No newline at end of file
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
+import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
private InstanceIdentifier<FlowCapableNode> fcNodePath;
private SimplifiedOperationalListener nodeListenerOperational;
private final LogicalDatastoreType dsType = LogicalDatastoreType.OPERATIONAL;
- private final String timestampBefore = "0000-12-12T01:01:01.000-07:00";
- private final String timestampAfter = "9999-12-12T01:01:01.000-07:00";
- private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(ReconciliationRegistry.DATE_AND_TIME_FORMAT);
+ private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(SimplifiedOperationalListener.DATE_AND_TIME_FORMAT);
@Mock
private SyncReactor reactor;
@Mock
private FlowCapableNode fcOperationalNode;
@Mock
- private ReconciliationRegistry reconciliationRegistry;
- @Mock
private FlowCapableStatisticsGatheringStatus statisticsGatheringStatus;
@Mock
private SnapshotGatheringStatusEnd snapshotGatheringStatusEnd;
+ @Mock
+ private ReconciliationRegistry reconciliationRegistry;
+ @Mock
+ private DeviceMastershipManager deviceMastershipManager;
@Before
public void setUp() throws Exception {
final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
new FlowCapableNodeOdlDao(db, LogicalDatastoreType.CONFIGURATION));
- nodeListenerOperational = new SimplifiedOperationalListener(reactor, operationalSnapshot, configDao, reconciliationRegistry);
+ nodeListenerOperational = new SimplifiedOperationalListener(reactor, operationalSnapshot, configDao, reconciliationRegistry, deviceMastershipManager);
InstanceIdentifier<Node> nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
fcNodePath = nodePath.augmentation(FlowCapableNode.class);
}
@Test
- public void testOnDataTreeChangedSyncupAdd() throws InterruptedException {
+ public void testOnDataTreeChangeAddSyncup() throws Exception {
+ Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
+ operationalAdd();
+ prepareFreshOperational(true);
+
Mockito.when(roTx.read(LogicalDatastoreType.CONFIGURATION, fcNodePath))
.thenReturn(Futures.immediateCheckedFuture(Optional.of(configNode)));
Mockito.when(reactor.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
- Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+ Mockito.verify(deviceMastershipManager).onDeviceConnected(NODE_ID);
Mockito.verify(reactor).syncup(fcNodePath, configNode, fcOperationalNode, dsType);
- Mockito.verifyNoMoreInteractions(reactor);
Mockito.verify(roTx).close();
}
@Test
- public void testOnDataTreeChangedAddSkip() {
+ public void testOnDataTreeChangedAddSkip() throws Exception {
// Related to bug 5920 -> https://bugs.opendaylight.org/show_bug.cgi?id=5920
+ Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
+ operationalAdd();
+ prepareFreshOperational(true);
+
Mockito.when(roTx.read(LogicalDatastoreType.CONFIGURATION, fcNodePath))
.thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
- Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+ Mockito.verify(deviceMastershipManager).onDeviceConnected(NODE_ID);
Mockito.verifyZeroInteractions(reactor);
Mockito.verify(roTx).close();
}
@Test
- public void testOnDataTreeChangedSyncupDeletePhysical() {
+ public void testOnDataTreeChangedSyncupDeletePhysical() throws Exception {
Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+ Mockito.when(operationalModification.getDataAfter()).thenReturn(null);
Mockito.when(dataTreeModification.getRootNode().getModificationType()).thenReturn(ModificationType.DELETE);
+ Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(false);
nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+ Mockito.verify(deviceMastershipManager).onDeviceDisconnected(NODE_ID);
Mockito.verifyZeroInteractions(reactor);
}
Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
List<NodeConnector> nodeConnectorList = Mockito.mock(List.class);
Mockito.when(operationalNode.getNodeConnector()).thenReturn(nodeConnectorList);
+ Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(false);
nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+ Mockito.verify(deviceMastershipManager).onDeviceDisconnected(NODE_ID);
Mockito.verifyZeroInteractions(reactor);
}
@Test
public void testOnDataTreeChangedReconcileNotRegistered() {
- Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
- Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+ operationalUpdate();
Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(false);
nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
@Test
public void testOnDataTreeChangedReconcileButStaticsGatheringNotStarted() {
- Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
- Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
+ operationalUpdate();
Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(null);
nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
@Test
public void testOnDataTreeChangedReconcileButStaticsGatheringNotFinished() {
- Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
- Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
+ operationalUpdate();
Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(null);
@Test
public void testOnDataTreeChangedReconcileButStaticsGatheringNotSuccessful() {
- Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
- Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
+ operationalUpdate();
Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(false);
@Test
public void testOnDataTreeChangedReconcileAndFreshOperationalNotPresent() throws ParseException {
- final DateAndTime timestamp = Mockito.mock(DateAndTime.class);
- Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
- Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
- Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
- Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
- Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(true);
- Mockito.when(snapshotGatheringStatusEnd.getEnd()).thenReturn(timestamp);
- Mockito.when(snapshotGatheringStatusEnd.getEnd().getValue()).thenReturn(timestampBefore);
- Mockito.when(reconciliationRegistry.getRegistration(NODE_ID)).thenReturn(simpleDateFormat.parse(timestampAfter));
+ operationalUpdate();
+ prepareFreshOperational(false);
nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
@Test
public void testOnDataTreeChangedReconcileAndFreshOperationalPresent() throws Exception {
- final DateAndTime timestamp = Mockito.mock(DateAndTime.class);
+ Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
+ operationalUpdate();
+ prepareFreshOperational(true);
+
Mockito.when(roTx.read(LogicalDatastoreType.CONFIGURATION, fcNodePath))
.thenReturn(Futures.immediateCheckedFuture(Optional.of(configNode)));
Mockito.when(reactor.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
- Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
- Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
- Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
- Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
- Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
- Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(true);
- Mockito.when(snapshotGatheringStatusEnd.getEnd()).thenReturn(timestamp);
- Mockito.when(snapshotGatheringStatusEnd.getEnd().getValue()).thenReturn(timestampAfter);
- Mockito.when(reconciliationRegistry.getRegistration(NODE_ID)).thenReturn(simpleDateFormat.parse(timestampBefore));
nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
Mockito.verify(reactor).syncup(fcNodePath, configNode, fcOperationalNode, dsType);
- Mockito.verifyNoMoreInteractions(reactor);
Mockito.verify(roTx).close();
}
- @Test
- public void testOnDataTreeChangedReconcileAndNodeDeleted() {
- Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
- Mockito.when(dataTreeModification.getRootNode().getModificationType()).thenReturn(DataObjectModification.ModificationType.DELETE);
+ private void prepareFreshOperational(final boolean afterRegistration) throws ParseException {
+ Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
+ Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
+ Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(true);
+ Mockito.when(snapshotGatheringStatusEnd.getEnd()).thenReturn(Mockito.mock(DateAndTime.class));
+ final String timestampBefore = "0000-12-12T01:01:01.000-07:00";
+ final String timestampAfter = "9999-12-12T01:01:01.000-07:00";
+ if (afterRegistration) {
+ Mockito.when(snapshotGatheringStatusEnd.getEnd().getValue()).thenReturn(timestampAfter);
+ Mockito.when(reconciliationRegistry.getRegistrationTimestamp(NODE_ID)).thenReturn(simpleDateFormat.parse(timestampBefore));
+ } else {
+ Mockito.when(snapshotGatheringStatusEnd.getEnd().getValue()).thenReturn(timestampBefore);
+ Mockito.when(reconciliationRegistry.getRegistrationTimestamp(NODE_ID)).thenReturn(simpleDateFormat.parse(timestampAfter));
+ }
+ }
- nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+ private void operationalAdd() {
+ Mockito.when(operationalModification.getDataBefore()).thenReturn(null);
+ Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+ }
- Mockito.verify(reconciliationRegistry).unregisterIfRegistered(NODE_ID);
- Mockito.verifyZeroInteractions(reactor);
+ private void operationalUpdate() {
+ Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+ Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
}
}
--- /dev/null
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+/**
+ * Test for {@link SyncReactorClusterDecorator}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class SyncReactorClusterDecoratorTest {
+
+ private static final NodeId NODE_ID = new NodeId("test-node");
+ private SyncReactorClusterDecorator reactor;
+ private InstanceIdentifier<FlowCapableNode> fcNodePath;
+
+ @Mock
+ private SyncReactorFutureZipDecorator delegate;
+ @Mock
+ private DeviceMastershipManager deviceMastershipManager;
+ @Mock
+ private FlowCapableNode fcConfigNode;
+ @Mock
+ private FlowCapableNode fcOperationalNode;
+
+ @Before
+ public void setUp() {
+ reactor = new SyncReactorClusterDecorator(delegate, deviceMastershipManager);
+
+ InstanceIdentifier<Node> nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
+ fcNodePath = nodePath.augmentation(FlowCapableNode.class);
+ }
+
+ @Test
+ public void testSyncupMaster() throws InterruptedException {
+ Mockito.when(deviceMastershipManager.isDeviceMastered(NODE_ID)).thenReturn(true);
+
+ reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode, LogicalDatastoreType.CONFIGURATION);
+
+ Mockito.verify(delegate).syncup(fcNodePath, fcConfigNode, fcOperationalNode, LogicalDatastoreType.CONFIGURATION);
+ Mockito.verifyNoMoreInteractions(delegate);
+ }
+
+ @Test
+ public void testSyncupSlave() throws InterruptedException {
+ Mockito.when(deviceMastershipManager.isDeviceMastered(NODE_ID)).thenReturn(false);
+
+ reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode, LogicalDatastoreType.CONFIGURATION);
+
+ Mockito.verifyZeroInteractions(delegate);
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl.clustering;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+
+/**
+ * Test for {@link DeviceMastershipManager}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class DeviceMastershipManagerTest {
+ private static final NodeId NODE_ID = new NodeId("testNode");
+ private DeviceMastershipManager deviceMastershipManager;
+ @Mock
+ private ClusterSingletonServiceRegistration registration;
+ @Mock
+ private ClusterSingletonServiceProvider clusterSingletonService;
+
+ @Before
+ public void setUp() throws Exception {
+ deviceMastershipManager = new DeviceMastershipManager(clusterSingletonService, new ReconciliationRegistry());
+ Mockito.when(clusterSingletonService.registerClusterSingletonService(Matchers.<ClusterSingletonService>any()))
+ .thenReturn(registration);
+ }
+
+ @Test
+ public void testOnDeviceConnectedAndDisconnected() {
+ // no context
+ Assert.assertNull(deviceMastershipManager.getDeviceMasterships().get(NODE_ID));
+ // create context - register
+ deviceMastershipManager.onDeviceConnected(NODE_ID);
+ DeviceMastership registration = deviceMastershipManager.getDeviceMasterships().get(NODE_ID);
+ Assert.assertNotNull(registration);
+ Mockito.verify(clusterSingletonService).registerClusterSingletonService(registration);
+ // destroy context - unregister
+ deviceMastershipManager.onDeviceDisconnected(NODE_ID);
+ Assert.assertNull(deviceMastershipManager.getDeviceMasterships().get(NODE_ID));
+ }
+
+ @Test
+ public void testIsDeviceMasteredOrSlaved() {
+ // no context
+ Assert.assertFalse(deviceMastershipManager.isDeviceMastered(NODE_ID));
+ deviceMastershipManager.onDeviceConnected(NODE_ID);
+ // is master
+ deviceMastershipManager.getDeviceMasterships().get(NODE_ID).instantiateServiceInstance();
+ Assert.assertTrue(deviceMastershipManager.isDeviceMastered(NODE_ID));
+ // is not master
+ deviceMastershipManager.getDeviceMasterships().get(NODE_ID).closeServiceInstance();
+ Assert.assertFalse(deviceMastershipManager.isDeviceMastered(NODE_ID));
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl.clustering;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+
+/**
+ * Test for {@link DeviceMastership}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class DeviceMastershipTest {
+ private static final NodeId NODE_ID = new NodeId("testNode");
+ private DeviceMastership deviceMastership;
+
+ @Mock
+ private DeviceMastershipManager deviceMastershipManager;
+ @Mock
+ private ReconciliationRegistry reconciliationRegistry;
+
+ @Before
+ public void setUp() throws Exception {
+ deviceMastership = new DeviceMastership(NODE_ID, reconciliationRegistry);
+ }
+
+ @Test
+ public void instantiateServiceInstance() {
+ deviceMastership.instantiateServiceInstance();
+ Mockito.verify(reconciliationRegistry).register(NODE_ID);
+ Assert.assertTrue(deviceMastership.isDeviceMastered());
+ }
+
+ @Test
+ public void closeServiceInstance() {
+ deviceMastership.closeServiceInstance();
+ Mockito.verify(reconciliationRegistry).unregisterIfRegistered(NODE_ID);
+ Assert.assertFalse(deviceMastership.isDeviceMastered());
+ }
+}
\ No newline at end of file