From c2d1b9207fe82d36db83501e1baaffe7bc7da9ae Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Thu, 15 Oct 2015 00:50:21 -0400 Subject: [PATCH] Bug 4149: Implement per-shard DatastoreContext settings Added the ability to specify shard-specific settings in the .cfg file by prefixing the shard name to the property name, similar to what we allow at the datastore level. I added a DatastoreContextFactory that has methods to get the base DatastoreContext and a per-shard DatastoreContext. The DatastoreContextFactory is now passed to the ShardManager instead of the DatastoreContext. The DatastoreContextFactory uses the DatastoreContextIntrospector to overlay per-shard settings onto the base DatastoreContext. Change-Id: I329c98c1577a74ebe665052f76e28da3867e2e86 Signed-off-by: Tom Pantelis --- .../DatastoreContextConfigAdminOverlay.java | 4 +- .../datastore/DatastoreContextFactory.java | 29 ++++ .../DatastoreContextIntrospector.java | 118 +++++++++++---- .../datastore/DistributedDataStore.java | 30 ++-- .../DistributedDataStoreFactory.java | 2 +- .../cluster/datastore/ShardManager.java | 101 +++++++------ .../cluster/datastore/utils/ActorContext.java | 7 +- ...atastoreContextConfigAdminOverlayTest.java | 4 +- .../DatastoreContextIntrospectorTest.java | 45 ++++++ ...butedDataStoreRemotingIntegrationTest.java | 24 ++- .../cluster/datastore/IntegrationTestKit.java | 8 +- .../cluster/datastore/ShardManagerTest.java | 138 +++++++++++++++--- ...DistributedEntityOwnershipServiceTest.java | 8 +- .../datastore/utils/ActorContextTest.java | 8 +- 14 files changed, 397 insertions(+), 129 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextFactory.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java index 2688d0195d..448e810641 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java @@ -29,7 +29,7 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable { public static final String CONFIG_ID = "org.opendaylight.controller.cluster.datastore"; public static interface Listener { - void onDatastoreContextUpdated(DatastoreContext context); + void onDatastoreContextUpdated(DatastoreContextFactory contextFactory); } private static final Logger LOG = LoggerFactory.getLogger(DatastoreContextConfigAdminOverlay.class); @@ -72,7 +72,7 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable { if(introspector.update(properties)) { if(listener != null) { - listener.onDatastoreContextUpdated(introspector.getContext()); + listener.onDatastoreContextUpdated(introspector.newContextFactory()); } } } else { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextFactory.java new file mode 100644 index 0000000000..4652451cf0 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextFactory.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +/** + * Factory for creating DatastoreContext instances. + * + * @author Thomas Pantelis + */ +public class DatastoreContextFactory { + private final DatastoreContextIntrospector introspector; + + public DatastoreContextFactory(DatastoreContextIntrospector introspector) { + this.introspector = introspector; + } + + public DatastoreContext getShardDatastoreContext(String forShardName) { + return introspector.getShardDatastoreContext(forShardName); + } + + public DatastoreContext getBaseDatastoreContext() { + return introspector.getContext(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java index 6a4d0a6f6d..094bb4a9e1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.primitives.Primitives; import java.beans.BeanInfo; @@ -19,10 +20,12 @@ import java.beans.PropertyDescriptor; import java.lang.reflect.Constructor; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.Dictionary; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.commons.lang3.StringUtils; @@ -180,6 +183,7 @@ public class DatastoreContextIntrospector { } private DatastoreContext context; + private Map currentProperties; public DatastoreContextIntrospector(DatastoreContext context) { this.context = context; @@ -189,6 +193,36 @@ public class DatastoreContextIntrospector { return context; } + public DatastoreContextFactory newContextFactory() { + return new DatastoreContextFactory(this); + } + + public synchronized DatastoreContext getShardDatastoreContext(String forShardName) { + if(currentProperties == null) { + return context; + } + + Builder builder = DatastoreContext.newBuilderFrom(context); + String dataStoreTypePrefix = context.getDataStoreType() + '.'; + final String shardNamePrefix = forShardName + '.'; + + List keys = getSortedKeysByDatastoreType(currentProperties.keySet(), dataStoreTypePrefix); + + for(String key: keys) { + Object value = currentProperties.get(key); + if(key.startsWith(dataStoreTypePrefix)) { + key = key.replaceFirst(dataStoreTypePrefix, ""); + } + + if(key.startsWith(shardNamePrefix)) { + key = key.replaceFirst(shardNamePrefix, ""); + convertValueAndInvokeSetter(key, value, builder); + } + } + + return builder.build(); + } + /** * Applies the given properties to the cached DatastoreContext and yields a new DatastoreContext * instance which can be obtained via {@link getContext}. @@ -197,19 +231,50 @@ public class DatastoreContextIntrospector { * @return true if the cached DatastoreContext was updated, false otherwise. */ public synchronized boolean update(Dictionary properties) { + currentProperties = null; if(properties == null || properties.isEmpty()) { return false; } LOG.debug("In update: properties: {}", properties); + ImmutableMap.Builder mapBuilder = ImmutableMap.builder(); + Builder builder = DatastoreContext.newBuilderFrom(context); final String dataStoreTypePrefix = context.getDataStoreType() + '.'; + List keys = getSortedKeysByDatastoreType(Collections.list(properties.keys()), dataStoreTypePrefix); + + boolean updated = false; + for(String key: keys) { + Object value = properties.get(key); + mapBuilder.put(key, value); + + // If the key is prefixed with the data store type, strip it off. + if(key.startsWith(dataStoreTypePrefix)) { + key = key.replaceFirst(dataStoreTypePrefix, ""); + } + + if(convertValueAndInvokeSetter(key, value, builder)) { + updated = true; + } + } + + currentProperties = mapBuilder.build(); + + if(updated) { + context = builder.build(); + } + + return updated; + } + + private ArrayList getSortedKeysByDatastoreType(Collection inKeys, + final String dataStoreTypePrefix) { // Sort the property keys by putting the names prefixed with the data store type last. This // is done so data store specific settings are applied after global settings. - ArrayList keys = Collections.list(properties.keys()); + ArrayList keys = new ArrayList<>(inKeys); Collections.sort(keys, new Comparator() { @Override public int compare(String key1, String key2) { @@ -217,44 +282,33 @@ public class DatastoreContextIntrospector { key2.startsWith(dataStoreTypePrefix) ? -1 : key1.compareTo(key2); } }); + return keys; + } - boolean updated = false; - for(String key: keys) { - Object value = properties.get(key); - try { - // If the key is prefixed with the data store type, strip it off. - if(key.startsWith(dataStoreTypePrefix)) { - key = key.replaceFirst(dataStoreTypePrefix, ""); - } - - key = convertToCamelCase(key); - - // Convert the value to the right type. - value = convertValue(key, value); - if(value == null) { - continue; - } + private boolean convertValueAndInvokeSetter(String inKey, Object inValue, Builder builder) { + String key = convertToCamelCase(inKey); - LOG.debug("Converted value for property {}: {} ({})", - key, value, value.getClass().getSimpleName()); + try { + // Convert the value to the right type. + Object value = convertValue(key, inValue); + if(value == null) { + return false; + } - // Call the setter method on the Builder instance. - Method setter = builderSetters.get(key); - setter.invoke(builder, constructorValueRecursively( - Primitives.wrap(setter.getParameterTypes()[0]), value.toString())); + LOG.debug("Converted value for property {}: {} ({})", + key, value, value.getClass().getSimpleName()); - updated = true; + // Call the setter method on the Builder instance. + Method setter = builderSetters.get(key); + setter.invoke(builder, constructorValueRecursively( + Primitives.wrap(setter.getParameterTypes()[0]), value.toString())); - } catch (Exception e) { - LOG.error("Error converting value ({}) for property {}", value, key, e); - } - } - - if(updated) { - context = builder.build(); + return true; + } catch (Exception e) { + LOG.error("Error converting value ({}) for property {}", inValue, key, e); } - return updated; + return false; } private static String convertToCamelCase(String inString) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 49f5388842..7f2416efc3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -67,13 +67,13 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, private final TransactionContextFactory txContextFactory; public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster, - Configuration configuration, DatastoreContext datastoreContext) { + Configuration configuration, DatastoreContextFactory datastoreContextFactory) { Preconditions.checkNotNull(actorSystem, "actorSystem should not be null"); Preconditions.checkNotNull(cluster, "cluster should not be null"); Preconditions.checkNotNull(configuration, "configuration should not be null"); - Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null"); + Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null"); - this.type = datastoreContext.getDataStoreType(); + this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreType(); String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString(); @@ -84,19 +84,21 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, cluster, configuration, - datastoreContext, shardDispatcher, shardManagerId, primaryShardInfoCache), cluster, - configuration, datastoreContext, primaryShardInfoCache); + datastoreContextFactory, shardDispatcher, shardManagerId, primaryShardInfoCache), cluster, + configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache); this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR; this.txContextFactory = TransactionContextFactory.create(actorContext); - datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(datastoreContext.getDataStoreMXBeanType()); - datastoreConfigMXBean.setContext(datastoreContext); + datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl( + datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType()); + datastoreConfigMXBean.setContext(datastoreContextFactory.getBaseDatastoreContext()); datastoreConfigMXBean.registerMBean(); - datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContext.getDataStoreMXBeanType(), actorContext); + datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContextFactory.getBaseDatastoreContext(). + getDataStoreMXBeanType(), actorContext); datastoreInfoMXBean.registerMBean(); } @@ -177,11 +179,11 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, } @Override - public void onDatastoreContextUpdated(DatastoreContext context) { + public void onDatastoreContextUpdated(DatastoreContextFactory contextFactory) { LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreType()); - actorContext.setDatastoreContext(context); - datastoreConfigMXBean.setContext(context); + actorContext.setDatastoreContext(contextFactory); + datastoreConfigMXBean.setContext(contextFactory.getBaseDatastoreContext()); } @Override @@ -224,14 +226,14 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, } private ActorRef createShardManager(ActorSystem actorSystem, ClusterWrapper cluster, Configuration configuration, - DatastoreContext datastoreContext, String shardDispatcher, String shardManagerId, - PrimaryShardInfoFutureCache primaryShardInfoCache){ + DatastoreContextFactory datastoreContextFactory, String shardDispatcher, + String shardManagerId, PrimaryShardInfoFutureCache primaryShardInfoCache){ Exception lastException = null; for(int i=0;i<100;i++) { try { return actorSystem.actorOf( - ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch, + ShardManager.props(cluster, configuration, datastoreContextFactory, waitTillReadyCountDownLatch, primaryShardInfoCache).withDispatcher(shardDispatcher).withMailbox( ActorContext.MAILBOX), shardManagerId); } catch (Exception e){ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java index 25029b6f18..e5aa33a0f4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java @@ -29,7 +29,7 @@ public class DistributedDataStoreFactory { Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf"); final DistributedDataStore dataStore = new DistributedDataStore(actorSystem, - new ClusterWrapperImpl(actorSystem), config, introspector.getContext()); + new ClusterWrapperImpl(actorSystem), config, introspector.newContextFactory()); overlay.setListener(dataStore); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 724c8d2c03..4f3d4aa7f9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -117,7 +117,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private ShardManagerInfo mBean; - private DatastoreContext datastoreContext; + private DatastoreContextFactory datastoreContextFactory; private final CountDownLatch waitTillReadyCountdownLatch; @@ -130,21 +130,19 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { /** */ protected ShardManager(ClusterWrapper cluster, Configuration configuration, - DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, + DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) { this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); - this.datastoreContext = datastoreContext; - this.type = datastoreContext.getDataStoreType(); + this.datastoreContextFactory = datastoreContextFactory; + this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreType(); this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; this.primaryShardInfoCache = primaryShardInfoCache; peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName()); - this.datastoreContext = DatastoreContext.newBuilderFrom(datastoreContext).shardPeerAddressResolver( - peerAddressResolver).build(); // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); @@ -155,7 +153,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { public static Props props( final ClusterWrapper cluster, final Configuration configuration, - final DatastoreContext datastoreContext, + final DatastoreContextFactory datastoreContextFactory, final CountDownLatch waitTillReadyCountdownLatch, final PrimaryShardInfoFutureCache primaryShardInfoCache) { @@ -164,7 +162,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null"); Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null"); - return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, + return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContextFactory, waitTillReadyCountdownLatch, primaryShardInfoCache)); } @@ -195,8 +193,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { memberUnreachable((ClusterEvent.UnreachableMember)message); } else if(message instanceof ClusterEvent.ReachableMember) { memberReachable((ClusterEvent.ReachableMember) message); - } else if(message instanceof DatastoreContext) { - onDatastoreContext((DatastoreContext)message); + } else if(message instanceof DatastoreContextFactory) { + onDatastoreContextFactory((DatastoreContextFactory)message); } else if(message instanceof RoleChangeNotification) { onRoleChangeNotification((RoleChangeNotification) message); } else if(message instanceof FollowerInitialSyncUpStatus){ @@ -239,7 +237,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { DatastoreContext shardDatastoreContext = createShard.getDatastoreContext(); if(shardDatastoreContext == null) { - shardDatastoreContext = datastoreContext; + shardDatastoreContext = newShardDatastoreContext(moduleShardConfig.getShardName()); } else { shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver( peerAddressResolver).build(); @@ -266,6 +264,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) { + return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)). + shardPeerAddressResolver(peerAddressResolver); + } + + private DatastoreContext newShardDatastoreContext(String shardName) { + return newShardDatastoreContextBuilder(shardName).build(); + } + private void checkReady(){ if (isReadyWithLeaderId()) { LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", @@ -443,11 +450,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName()); - FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration(); + FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration(); if(shardInformation.isShardInitialized()) { // If the shard is already initialized then we'll wait enough time for the shard to // elect a leader, ie 2 times the election timeout. - timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig() + timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig() .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS); } @@ -574,13 +581,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onDatastoreContext(DatastoreContext context) { - datastoreContext = DatastoreContext.newBuilderFrom(context).shardPeerAddressResolver( - peerAddressResolver).build(); + private void onDatastoreContextFactory(DatastoreContextFactory factory) { + datastoreContextFactory = factory; for (ShardInformation info : localShards.values()) { - if (info.getActor() != null) { - info.getActor().tell(datastoreContext, getSelf()); - } + info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf()); } } @@ -699,12 +703,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardIdentifier shardId = getShardIdentifier(memberName, shardName); Map peerAddresses = getPeerAddresses(shardName); localShardActorNames.add(shardId.toString()); - localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext, - shardPropsCreator, peerAddressResolver)); + localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, + newShardDatastoreContext(shardName), shardPropsCreator, peerAddressResolver)); } mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type, - datastoreContext.getDataStoreMXBeanType(), localShardActorNames); + datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), localShardActorNames); mBean.setShardManager(this); } @@ -755,12 +759,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return mBean; } - private DatastoreContext getInitShardDataStoreContext() { - return (DatastoreContext.newBuilderFrom(datastoreContext) - .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()) - .build()); - } - private void checkLocalShardExists(final String shardName, final ActorRef sender) { if (localShards.containsKey(shardName)) { String msg = String.format("Local shard %s already exists", shardName); @@ -802,7 +800,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } - Timeout findPrimaryTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2)); + Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext(). + getShardInitializationTimeout().duration().$times(2)); final ActorRef sender = getSender(); Future futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout); @@ -835,8 +834,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName()); + + DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation( + DisableElectionsRaftPolicy.class.getName()).build(); + final ShardInformation shardInfo = new ShardInformation(shardName, shardId, - getPeerAddresses(shardName), getInitShardDataStoreContext(), + getPeerAddresses(shardName), datastoreContext, new DefaultShardPropsCreator(), peerAddressResolver); localShards.put(shardName, shardInfo); shardInfo.setActor(newShardActor(schemaContext, shardInfo)); @@ -845,8 +848,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(), response.getPrimaryPath(), shardId); - Timeout addServerTimeout = new Timeout(datastoreContext - .getShardLeaderElectionTimeout().duration().$times(4)); + Timeout addServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(4)); Future futureObj = ask(getContext().actorSelection(response.getPrimaryPath()), new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout); @@ -884,7 +886,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName); // Make the local shard voting capable - shardInfo.setDatastoreContext(datastoreContext, getSelf()); + shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf()); mBean.addLocalShard(shardInfo.getShardId().toString()); sender.tell(new akka.actor.Status.Success(true), getSelf()); @@ -998,6 +1000,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return localShardDataTree; } + DatastoreContext getDatastoreContext() { + return datastoreContext; + } + + void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) { + this.datastoreContext = datastoreContext; + if (actor != null) { + LOG.debug ("Sending new DatastoreContext to {}", shardId); + actor.tell(this.datastoreContext, sender); + } + } + void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){ LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); @@ -1135,16 +1149,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { void setLeaderVersion(short leaderVersion) { this.leaderVersion = leaderVersion; } - - void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) { - this.datastoreContext = datastoreContext; - //notify the datastoreContextchange - LOG.debug ("Notifying RaftPolicy change via datastoreContextChange for {}", - this.shardName); - if (actor != null) { - actor.tell(this.datastoreContext, sender); - } - } } private static class ShardManagerCreator implements Creator { @@ -1152,22 +1156,23 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { final ClusterWrapper cluster; final Configuration configuration; - final DatastoreContext datastoreContext; + final DatastoreContextFactory datastoreContextFactory; private final CountDownLatch waitTillReadyCountdownLatch; private final PrimaryShardInfoFutureCache primaryShardInfoCache; - ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext, - CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) { + ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, + DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch, + PrimaryShardInfoFutureCache primaryShardInfoCache) { this.cluster = cluster; this.configuration = configuration; - this.datastoreContext = datastoreContext; + this.datastoreContextFactory = datastoreContextFactory; this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; this.primaryShardInfoCache = primaryShardInfoCache; } @Override public ShardManager create() throws Exception { - return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, + return new ShardManager(cluster, configuration, datastoreContextFactory, waitTillReadyCountdownLatch, primaryShardInfoCache); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index bc492887f9..792064cd67 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; @@ -172,8 +173,8 @@ public class ActorContext { } } - public void setDatastoreContext(DatastoreContext context) { - this.datastoreContext = context; + public void setDatastoreContext(DatastoreContextFactory contextFactory) { + this.datastoreContext = contextFactory.getBaseDatastoreContext(); setCachedProperties(); // We write the 'updated' volatile to trigger a write memory barrier so that the writes above @@ -186,7 +187,7 @@ public class ActorContext { updated = true; if(shardManager != null) { - shardManager.tell(context, ActorRef.noSender()); + shardManager.tell(contextFactory, ActorRef.noSender()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java index ecfcdefcb2..700b96a87e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java @@ -96,6 +96,8 @@ public class DatastoreContextConfigAdminOverlayTest { DatastoreContext context = DatastoreContext.newBuilder().build(); doReturn(context).when(mockIntrospector).getContext(); + DatastoreContextFactory contextFactory = new DatastoreContextFactory(mockIntrospector); + doReturn(contextFactory).when(mockIntrospector).newContextFactory(); DatastoreContextConfigAdminOverlay.Listener mockListener = mock(DatastoreContextConfigAdminOverlay.Listener.class); @@ -122,7 +124,7 @@ public class DatastoreContextConfigAdminOverlayTest { verify(mockIntrospector).update(properties); - verify(mockListener).onDatastoreContextUpdated(context); + verify(mockListener).onDatastoreContextUpdated(contextFactory); verify(mockBundleContext, times(2)).ungetService(mockConfigAdminServiceRef); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospectorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospectorTest.java index f2857f515e..2bd2e61296 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospectorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospectorTest.java @@ -185,4 +185,49 @@ public class DatastoreContextIntrospectorTest { assertEquals(false, configContext.isPersistent()); assertEquals(444, configContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize()); } + + @Test + public void testGetDatastoreContextForShard() { + Dictionary properties = new Hashtable<>(); + properties.put("shard-transaction-idle-timeout-in-minutes", "22"); // global setting + properties.put("operational.shard-transaction-idle-timeout-in-minutes", "33"); // operational override + properties.put("config.shard-transaction-idle-timeout-in-minutes", "44"); // config override + properties.put("topology.shard-transaction-idle-timeout-in-minutes", "55"); // global shard override + + DatastoreContext operContext = DatastoreContext.newBuilder().dataStoreType("operational").build(); + DatastoreContextIntrospector operIntrospector = new DatastoreContextIntrospector(operContext); + + DatastoreContext shardContext = operIntrospector.newContextFactory().getShardDatastoreContext("topology"); + assertEquals(10, shardContext.getShardTransactionIdleTimeout().toMinutes()); + + operIntrospector.update(properties); + operContext = operIntrospector.getContext(); + assertEquals(33, operContext.getShardTransactionIdleTimeout().toMinutes()); + + shardContext = operIntrospector.newContextFactory().getShardDatastoreContext("topology"); + assertEquals(55, shardContext.getShardTransactionIdleTimeout().toMinutes()); + + DatastoreContext configContext = DatastoreContext.newBuilder().dataStoreType("config").build(); + DatastoreContextIntrospector configIntrospector = new DatastoreContextIntrospector(configContext); + configIntrospector.update(properties); + configContext = configIntrospector.getContext(); + assertEquals(44, configContext.getShardTransactionIdleTimeout().toMinutes()); + + shardContext = configIntrospector.newContextFactory().getShardDatastoreContext("topology"); + assertEquals(55, shardContext.getShardTransactionIdleTimeout().toMinutes()); + + properties.put("operational.topology.shard-transaction-idle-timeout-in-minutes", "66"); // operational shard override + properties.put("config.topology.shard-transaction-idle-timeout-in-minutes", "77"); // config shard override + + operIntrospector.update(properties); + shardContext = operIntrospector.newContextFactory().getShardDatastoreContext("topology"); + assertEquals(66, shardContext.getShardTransactionIdleTimeout().toMinutes()); + + configIntrospector.update(properties); + shardContext = configIntrospector.newContextFactory().getShardDatastoreContext("topology"); + assertEquals(77, shardContext.getShardTransactionIdleTimeout().toMinutes()); + + shardContext = configIntrospector.newContextFactory().getShardDatastoreContext("default"); + assertEquals(44, shardContext.getShardTransactionIdleTimeout().toMinutes()); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index c48105e8be..5f935a6aa6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -34,6 +34,9 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -482,7 +485,7 @@ public class DistributedDataStoreRemotingIntegrationTest { // Switch the leader to the follower followerDatastoreContextBuilder.shardElectionTimeoutFactor(1); - followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build()); + sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder); JavaTestKit.shutdownActorSystem(leaderSystem, null, true); @@ -580,7 +583,7 @@ public class DistributedDataStoreRemotingIntegrationTest { JavaTestKit.shutdownActorSystem(leaderSystem, null, true); followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1); - followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build()); + sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder); DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); @@ -612,7 +615,7 @@ public class DistributedDataStoreRemotingIntegrationTest { Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); followerDatastoreContextBuilder.operationTimeoutInMillis(10).shardElectionTimeoutFactor(1); - followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build()); + sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder); DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); @@ -646,7 +649,7 @@ public class DistributedDataStoreRemotingIntegrationTest { JavaTestKit.shutdownActorSystem(leaderSystem, null, true); followerDatastoreContextBuilder.operationTimeoutInMillis(500); - followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build()); + sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder); DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); @@ -654,4 +657,17 @@ public class DistributedDataStoreRemotingIntegrationTest { followerTestKit.doCommit(rwTx.ready()); } + + private void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) { + DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class); + Answer answer = new Answer() { + @Override + public DatastoreContext answer(InvocationOnMock invocation) { + return builder.build(); + } + }; + Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext(); + Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString()); + dataStore.onDatastoreContextUpdated(mockContextFactory); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java index 1a7bbb21b3..9c07a61e60 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java @@ -18,6 +18,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; @@ -64,8 +65,11 @@ public class IntegrationTestKit extends ShardTestKit { datastoreContextBuilder.dataStoreType(typeName); DatastoreContext datastoreContext = datastoreContextBuilder.build(); + DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class); + Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext(); + Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString()); - DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, datastoreContext); + DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, mockContextFactory); dataStore.onGlobalContextUpdated(schemaContext); @@ -192,4 +196,4 @@ public class IntegrationTestKit extends ShardTestKit { } }, expType); } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index bd7e7d65c6..6f3946d422 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.AddressFromURIString; +import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; import akka.cluster.Cluster; @@ -38,15 +39,21 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.net.URI; +import java.util.AbstractMap; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; @@ -142,9 +149,16 @@ public class ShardManagerTest extends AbstractActorTest { return newShardMgrProps(new MockConfiguration()); } + private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) { + DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class); + Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext(); + Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString()); + return mockFactory; + } + private Props newShardMgrProps(Configuration config) { - return ShardManager.props(new MockClusterWrapper(), config, datastoreContextBuilder.build(), ready, - primaryShardInfoCache); + return ShardManager.props(new MockClusterWrapper(), config, + newDatastoreContextFactory(datastoreContextBuilder.build()), ready, primaryShardInfoCache); } private Props newPropsShardMgrWithMockShardActor() { @@ -158,14 +172,102 @@ public class ShardManagerTest extends AbstractActorTest { private static final long serialVersionUID = 1L; @Override public ShardManager create() throws Exception { - return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(), - ready, name, shardActor, primaryShardInfoCache); + return new ForwardingShardManager(clusterWrapper, config, newDatastoreContextFactory( + datastoreContextBuilder.build()), ready, name, shardActor, primaryShardInfoCache); } }; return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()); } + @Test + public void testPerShardDatastoreContext() throws Exception { + final DatastoreContextFactory mockFactory = newDatastoreContextFactory( + datastoreContextBuilder.shardElectionTimeoutFactor(5).build()); + + Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()). + shardElectionTimeoutFactor(6).build()).when(mockFactory).getShardDatastoreContext("default"); + + Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()). + shardElectionTimeoutFactor(7).build()).when(mockFactory).getShardDatastoreContext("topology"); + + final MockConfiguration mockConfig = new MockConfiguration() { + @Override + public Collection getMemberShardNames(String memberName) { + return Arrays.asList("default", "topology"); + } + + @Override + public Collection getMembersFromShardName(String shardName) { + return Arrays.asList("member-1"); + } + }; + + final TestActorRef defaultShardActor = TestActorRef.create(getSystem(), + Props.create(MessageCollectorActor.class), "default"); + final TestActorRef topologyShardActor = TestActorRef.create(getSystem(), + Props.create(MessageCollectorActor.class), "topology"); + + final Map> shardInfoMap = Collections.synchronizedMap( + new HashMap>()); + shardInfoMap.put("default", new AbstractMap.SimpleEntry(defaultShardActor, null)); + shardInfoMap.put("topology", new AbstractMap.SimpleEntry(topologyShardActor, null)); + + final CountDownLatch newShardActorLatch = new CountDownLatch(2); + final Creator creator = new Creator() { + private static final long serialVersionUID = 1L; + @Override + public ShardManager create() throws Exception { + return new ShardManager(new MockClusterWrapper(), mockConfig, mockFactory, ready, primaryShardInfoCache) { + @Override + protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { + Entry entry = shardInfoMap.get(info.getShardName()); + ActorRef ref = null; + if(entry != null) { + ref = entry.getKey(); + entry.setValue(info.getDatastoreContext()); + } + + newShardActorLatch.countDown(); + return ref; + } + }; + } + }; + + JavaTestKit kit = new JavaTestKit(getSystem()); + + final ActorRef shardManager = getSystem().actorOf(Props.create(new DelegatingShardManagerCreator(creator)). + withDispatcher(Dispatchers.DefaultDispatcherId())); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef()); + + assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS)); + assertEquals("getShardElectionTimeoutFactor", 6, shardInfoMap.get("default").getValue(). + getShardElectionTimeoutFactor()); + assertEquals("getShardElectionTimeoutFactor", 7, shardInfoMap.get("topology").getValue(). + getShardElectionTimeoutFactor()); + + DatastoreContextFactory newMockFactory = newDatastoreContextFactory( + datastoreContextBuilder.shardElectionTimeoutFactor(5).build()); + Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()). + shardElectionTimeoutFactor(66).build()).when(newMockFactory).getShardDatastoreContext("default"); + + Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()). + shardElectionTimeoutFactor(77).build()).when(newMockFactory).getShardDatastoreContext("topology"); + + shardManager.tell(newMockFactory, kit.getRef()); + + DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor, DatastoreContext.class); + assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor()); + + newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class); + assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor()); + + defaultShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + topologyShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + @Test public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ @@ -842,10 +944,9 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{ - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache); + final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), + newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready, + primaryShardInfoCache); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -876,10 +977,9 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{ - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache); + final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), + newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready, + primaryShardInfoCache); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -904,15 +1004,15 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{ - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), + final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration() { @Override public List getMemberShardNames(String memberName) { return Arrays.asList("default", "astronauts"); } }, - DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache); + newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready, + primaryShardInfoCache); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -1192,8 +1292,8 @@ public class ShardManagerTest extends AbstractActorTest { TestShardManager(String shardMrgIDSuffix) { super(new MockClusterWrapper(), new MockConfiguration(), - DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready, - new PrimaryShardInfoFutureCache()); + newDatastoreContextFactory(DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()), + ready, new PrimaryShardInfoFutureCache()); } @Override @@ -1252,9 +1352,9 @@ public class ShardManagerTest extends AbstractActorTest { private final String name; protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration, - DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name, + DatastoreContextFactory factory, CountDownLatch waitTillReadyCountdownLatch, String name, ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) { - super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache); + super(cluster, configuration, factory, waitTillReadyCountdownLatch, primaryShardInfoCache); this.shardActor = shardActor; this.name = name; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java index ce2ef2dd7a..54a27e08bb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java @@ -34,7 +34,9 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.ShardDataTree; import org.opendaylight.controller.cluster.datastore.config.Configuration; @@ -91,7 +93,11 @@ public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnersh } }); - dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, datastoreContext ); + DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class); + Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext(); + Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString()); + + dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, mockContextFactory); dataStore.onGlobalContextUpdated(SchemaContextHelper.entityOwners()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 6a5d6ef434..7e484ebde3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -43,6 +43,7 @@ import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; @@ -374,9 +375,12 @@ public class ActorContextTest extends AbstractActorTest{ DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6). shardTransactionCommitTimeoutInSeconds(8).build(); - actorContext.setDatastoreContext(newContext); + DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class); + Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext(); - expectMsgClass(duration("5 seconds"), DatastoreContext.class); + actorContext.setDatastoreContext(mockContextFactory); + + expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class); Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext()); -- 2.36.6