public static final String CONFIG_ID = "org.opendaylight.controller.cluster.datastore";
public static interface Listener {
- void onDatastoreContextUpdated(DatastoreContext context);
+ void onDatastoreContextUpdated(DatastoreContextFactory contextFactory);
}
private static final Logger LOG = LoggerFactory.getLogger(DatastoreContextConfigAdminOverlay.class);
if(introspector.update(properties)) {
if(listener != null) {
- listener.onDatastoreContextUpdated(introspector.getContext());
+ listener.onDatastoreContextUpdated(introspector.newContextFactory());
}
}
} else {
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Factory for creating DatastoreContext instances.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreContextFactory {
+ private final DatastoreContextIntrospector introspector;
+
+ public DatastoreContextFactory(DatastoreContextIntrospector introspector) {
+ this.introspector = introspector;
+ }
+
+ public DatastoreContext getShardDatastoreContext(String forShardName) {
+ return introspector.getShardDatastoreContext(forShardName);
+ }
+
+ public DatastoreContext getBaseDatastoreContext() {
+ return introspector.getContext();
+ }
+}
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Primitives;
import java.beans.BeanInfo;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Dictionary;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
}
private DatastoreContext context;
+ private Map<String, Object> currentProperties;
public DatastoreContextIntrospector(DatastoreContext context) {
this.context = context;
return context;
}
+ public DatastoreContextFactory newContextFactory() {
+ return new DatastoreContextFactory(this);
+ }
+
+ public synchronized DatastoreContext getShardDatastoreContext(String forShardName) {
+ if(currentProperties == null) {
+ return context;
+ }
+
+ Builder builder = DatastoreContext.newBuilderFrom(context);
+ String dataStoreTypePrefix = context.getDataStoreType() + '.';
+ final String shardNamePrefix = forShardName + '.';
+
+ List<String> keys = getSortedKeysByDatastoreType(currentProperties.keySet(), dataStoreTypePrefix);
+
+ for(String key: keys) {
+ Object value = currentProperties.get(key);
+ if(key.startsWith(dataStoreTypePrefix)) {
+ key = key.replaceFirst(dataStoreTypePrefix, "");
+ }
+
+ if(key.startsWith(shardNamePrefix)) {
+ key = key.replaceFirst(shardNamePrefix, "");
+ convertValueAndInvokeSetter(key, value, builder);
+ }
+ }
+
+ return builder.build();
+ }
+
/**
* Applies the given properties to the cached DatastoreContext and yields a new DatastoreContext
* instance which can be obtained via {@link getContext}.
* @return true if the cached DatastoreContext was updated, false otherwise.
*/
public synchronized boolean update(Dictionary<String, Object> properties) {
+ currentProperties = null;
if(properties == null || properties.isEmpty()) {
return false;
}
LOG.debug("In update: properties: {}", properties);
+ ImmutableMap.Builder<String, Object> mapBuilder = ImmutableMap.<String, Object>builder();
+
Builder builder = DatastoreContext.newBuilderFrom(context);
final String dataStoreTypePrefix = context.getDataStoreType() + '.';
+ List<String> keys = getSortedKeysByDatastoreType(Collections.list(properties.keys()), dataStoreTypePrefix);
+
+ boolean updated = false;
+ for(String key: keys) {
+ Object value = properties.get(key);
+ mapBuilder.put(key, value);
+
+ // If the key is prefixed with the data store type, strip it off.
+ if(key.startsWith(dataStoreTypePrefix)) {
+ key = key.replaceFirst(dataStoreTypePrefix, "");
+ }
+
+ if(convertValueAndInvokeSetter(key, value, builder)) {
+ updated = true;
+ }
+ }
+
+ currentProperties = mapBuilder.build();
+
+ if(updated) {
+ context = builder.build();
+ }
+
+ return updated;
+ }
+
+ private ArrayList<String> getSortedKeysByDatastoreType(Collection<String> inKeys,
+ final String dataStoreTypePrefix) {
// Sort the property keys by putting the names prefixed with the data store type last. This
// is done so data store specific settings are applied after global settings.
- ArrayList<String> keys = Collections.list(properties.keys());
+ ArrayList<String> keys = new ArrayList<>(inKeys);
Collections.sort(keys, new Comparator<String>() {
@Override
public int compare(String key1, String key2) {
key2.startsWith(dataStoreTypePrefix) ? -1 : key1.compareTo(key2);
}
});
+ return keys;
+ }
- boolean updated = false;
- for(String key: keys) {
- Object value = properties.get(key);
- try {
- // If the key is prefixed with the data store type, strip it off.
- if(key.startsWith(dataStoreTypePrefix)) {
- key = key.replaceFirst(dataStoreTypePrefix, "");
- }
-
- key = convertToCamelCase(key);
-
- // Convert the value to the right type.
- value = convertValue(key, value);
- if(value == null) {
- continue;
- }
+ private boolean convertValueAndInvokeSetter(String inKey, Object inValue, Builder builder) {
+ String key = convertToCamelCase(inKey);
- LOG.debug("Converted value for property {}: {} ({})",
- key, value, value.getClass().getSimpleName());
+ try {
+ // Convert the value to the right type.
+ Object value = convertValue(key, inValue);
+ if(value == null) {
+ return false;
+ }
- // Call the setter method on the Builder instance.
- Method setter = builderSetters.get(key);
- setter.invoke(builder, constructorValueRecursively(
- Primitives.wrap(setter.getParameterTypes()[0]), value.toString()));
+ LOG.debug("Converted value for property {}: {} ({})",
+ key, value, value.getClass().getSimpleName());
- updated = true;
+ // Call the setter method on the Builder instance.
+ Method setter = builderSetters.get(key);
+ setter.invoke(builder, constructorValueRecursively(
+ Primitives.wrap(setter.getParameterTypes()[0]), value.toString()));
- } catch (Exception e) {
- LOG.error("Error converting value ({}) for property {}", value, key, e);
- }
- }
-
- if(updated) {
- context = builder.build();
+ return true;
+ } catch (Exception e) {
+ LOG.error("Error converting value ({}) for property {}", inValue, key, e);
}
- return updated;
+ return false;
}
private static String convertToCamelCase(String inString) {
private final TransactionContextFactory txContextFactory;
public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
- Configuration configuration, DatastoreContext datastoreContext) {
+ Configuration configuration, DatastoreContextFactory datastoreContextFactory) {
Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
- Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null");
+ Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
- this.type = datastoreContext.getDataStoreType();
+ this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, cluster, configuration,
- datastoreContext, shardDispatcher, shardManagerId, primaryShardInfoCache), cluster,
- configuration, datastoreContext, primaryShardInfoCache);
+ datastoreContextFactory, shardDispatcher, shardManagerId, primaryShardInfoCache), cluster,
+ configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache);
this.waitTillReadyTimeInMillis =
actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
this.txContextFactory = TransactionContextFactory.create(actorContext);
- datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(datastoreContext.getDataStoreMXBeanType());
- datastoreConfigMXBean.setContext(datastoreContext);
+ datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
+ datastoreConfigMXBean.setContext(datastoreContextFactory.getBaseDatastoreContext());
datastoreConfigMXBean.registerMBean();
- datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContext.getDataStoreMXBeanType(), actorContext);
+ datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContextFactory.getBaseDatastoreContext().
+ getDataStoreMXBeanType(), actorContext);
datastoreInfoMXBean.registerMBean();
}
}
@Override
- public void onDatastoreContextUpdated(DatastoreContext context) {
+ public void onDatastoreContextUpdated(DatastoreContextFactory contextFactory) {
LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreType());
- actorContext.setDatastoreContext(context);
- datastoreConfigMXBean.setContext(context);
+ actorContext.setDatastoreContext(contextFactory);
+ datastoreConfigMXBean.setContext(contextFactory.getBaseDatastoreContext());
}
@Override
}
private ActorRef createShardManager(ActorSystem actorSystem, ClusterWrapper cluster, Configuration configuration,
- DatastoreContext datastoreContext, String shardDispatcher, String shardManagerId,
- PrimaryShardInfoFutureCache primaryShardInfoCache){
+ DatastoreContextFactory datastoreContextFactory, String shardDispatcher,
+ String shardManagerId, PrimaryShardInfoFutureCache primaryShardInfoCache){
Exception lastException = null;
for(int i=0;i<100;i++) {
try {
return actorSystem.actorOf(
- ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch,
+ ShardManager.props(cluster, configuration, datastoreContextFactory, waitTillReadyCountDownLatch,
primaryShardInfoCache).withDispatcher(shardDispatcher).withMailbox(
ActorContext.MAILBOX), shardManagerId);
} catch (Exception e){
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
final DistributedDataStore dataStore = new DistributedDataStore(actorSystem,
- new ClusterWrapperImpl(actorSystem), config, introspector.getContext());
+ new ClusterWrapperImpl(actorSystem), config, introspector.newContextFactory());
overlay.setListener(dataStore);
private ShardManagerInfo mBean;
- private DatastoreContext datastoreContext;
+ private DatastoreContextFactory datastoreContextFactory;
private final CountDownLatch waitTillReadyCountdownLatch;
/**
*/
protected ShardManager(ClusterWrapper cluster, Configuration configuration,
- DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch,
+ DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch,
PrimaryShardInfoFutureCache primaryShardInfoCache) {
this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
- this.datastoreContext = datastoreContext;
- this.type = datastoreContext.getDataStoreType();
+ this.datastoreContextFactory = datastoreContextFactory;
+ this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
this.primaryShardInfoCache = primaryShardInfoCache;
peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
- this.datastoreContext = DatastoreContext.newBuilderFrom(datastoreContext).shardPeerAddressResolver(
- peerAddressResolver).build();
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
public static Props props(
final ClusterWrapper cluster,
final Configuration configuration,
- final DatastoreContext datastoreContext,
+ final DatastoreContextFactory datastoreContextFactory,
final CountDownLatch waitTillReadyCountdownLatch,
final PrimaryShardInfoFutureCache primaryShardInfoCache) {
Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
- return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext,
+ return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContextFactory,
waitTillReadyCountdownLatch, primaryShardInfoCache));
}
memberUnreachable((ClusterEvent.UnreachableMember)message);
} else if(message instanceof ClusterEvent.ReachableMember) {
memberReachable((ClusterEvent.ReachableMember) message);
- } else if(message instanceof DatastoreContext) {
- onDatastoreContext((DatastoreContext)message);
+ } else if(message instanceof DatastoreContextFactory) {
+ onDatastoreContextFactory((DatastoreContextFactory)message);
} else if(message instanceof RoleChangeNotification) {
onRoleChangeNotification((RoleChangeNotification) message);
} else if(message instanceof FollowerInitialSyncUpStatus){
DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
if(shardDatastoreContext == null) {
- shardDatastoreContext = datastoreContext;
+ shardDatastoreContext = newShardDatastoreContext(moduleShardConfig.getShardName());
} else {
shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
peerAddressResolver).build();
}
}
+ private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
+ return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
+ shardPeerAddressResolver(peerAddressResolver);
+ }
+
+ private DatastoreContext newShardDatastoreContext(String shardName) {
+ return newShardDatastoreContextBuilder(shardName).build();
+ }
+
private void checkReady(){
if (isReadyWithLeaderId()) {
LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
- FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration();
+ FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
if(shardInformation.isShardInitialized()) {
// If the shard is already initialized then we'll wait enough time for the shard to
// elect a leader, ie 2 times the election timeout.
- timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig()
+ timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
.getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
}
}
}
- private void onDatastoreContext(DatastoreContext context) {
- datastoreContext = DatastoreContext.newBuilderFrom(context).shardPeerAddressResolver(
- peerAddressResolver).build();
+ private void onDatastoreContextFactory(DatastoreContextFactory factory) {
+ datastoreContextFactory = factory;
for (ShardInformation info : localShards.values()) {
- if (info.getActor() != null) {
- info.getActor().tell(datastoreContext, getSelf());
- }
+ info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
}
}
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
Map<String, String> peerAddresses = getPeerAddresses(shardName);
localShardActorNames.add(shardId.toString());
- localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext,
- shardPropsCreator, peerAddressResolver));
+ localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
+ newShardDatastoreContext(shardName), shardPropsCreator, peerAddressResolver));
}
mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
- datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), localShardActorNames);
mBean.setShardManager(this);
}
return mBean;
}
- private DatastoreContext getInitShardDataStoreContext() {
- return (DatastoreContext.newBuilderFrom(datastoreContext)
- .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
- .build());
- }
-
private void checkLocalShardExists(final String shardName, final ActorRef sender) {
if (localShards.containsKey(shardName)) {
String msg = String.format("Local shard %s already exists", shardName);
return;
}
- Timeout findPrimaryTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
+ Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+ getShardInitializationTimeout().duration().$times(2));
final ActorRef sender = getSender();
Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout);
ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+
+ DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
+ DisableElectionsRaftPolicy.class.getName()).build();
+
final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
- getPeerAddresses(shardName), getInitShardDataStoreContext(),
+ getPeerAddresses(shardName), datastoreContext,
new DefaultShardPropsCreator(), peerAddressResolver);
localShards.put(shardName, shardInfo);
shardInfo.setActor(newShardActor(schemaContext, shardInfo));
LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
response.getPrimaryPath(), shardId);
- Timeout addServerTimeout = new Timeout(datastoreContext
- .getShardLeaderElectionTimeout().duration().$times(4));
+ Timeout addServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(4));
Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout);
LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
// Make the local shard voting capable
- shardInfo.setDatastoreContext(datastoreContext, getSelf());
+ shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
mBean.addLocalShard(shardInfo.getShardId().toString());
sender.tell(new akka.actor.Status.Success(true), getSelf());
return localShardDataTree;
}
+ DatastoreContext getDatastoreContext() {
+ return datastoreContext;
+ }
+
+ void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
+ this.datastoreContext = datastoreContext;
+ if (actor != null) {
+ LOG.debug ("Sending new DatastoreContext to {}", shardId);
+ actor.tell(this.datastoreContext, sender);
+ }
+ }
+
void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
void setLeaderVersion(short leaderVersion) {
this.leaderVersion = leaderVersion;
}
-
- void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
- this.datastoreContext = datastoreContext;
- //notify the datastoreContextchange
- LOG.debug ("Notifying RaftPolicy change via datastoreContextChange for {}",
- this.shardName);
- if (actor != null) {
- actor.tell(this.datastoreContext, sender);
- }
- }
}
private static class ShardManagerCreator implements Creator<ShardManager> {
final ClusterWrapper cluster;
final Configuration configuration;
- final DatastoreContext datastoreContext;
+ final DatastoreContextFactory datastoreContextFactory;
private final CountDownLatch waitTillReadyCountdownLatch;
private final PrimaryShardInfoFutureCache primaryShardInfoCache;
- ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext,
- CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) {
+ ShardManagerCreator(ClusterWrapper cluster, Configuration configuration,
+ DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch,
+ PrimaryShardInfoFutureCache primaryShardInfoCache) {
this.cluster = cluster;
this.configuration = configuration;
- this.datastoreContext = datastoreContext;
+ this.datastoreContextFactory = datastoreContextFactory;
this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
this.primaryShardInfoCache = primaryShardInfoCache;
}
@Override
public ShardManager create() throws Exception {
- return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch,
+ return new ShardManager(cluster, configuration, datastoreContextFactory, waitTillReadyCountdownLatch,
primaryShardInfoCache);
}
}
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
}
}
- public void setDatastoreContext(DatastoreContext context) {
- this.datastoreContext = context;
+ public void setDatastoreContext(DatastoreContextFactory contextFactory) {
+ this.datastoreContext = contextFactory.getBaseDatastoreContext();
setCachedProperties();
// We write the 'updated' volatile to trigger a write memory barrier so that the writes above
updated = true;
if(shardManager != null) {
- shardManager.tell(context, ActorRef.noSender());
+ shardManager.tell(contextFactory, ActorRef.noSender());
}
}
DatastoreContext context = DatastoreContext.newBuilder().build();
doReturn(context).when(mockIntrospector).getContext();
+ DatastoreContextFactory contextFactory = new DatastoreContextFactory(mockIntrospector);
+ doReturn(contextFactory).when(mockIntrospector).newContextFactory();
DatastoreContextConfigAdminOverlay.Listener mockListener =
mock(DatastoreContextConfigAdminOverlay.Listener.class);
verify(mockIntrospector).update(properties);
- verify(mockListener).onDatastoreContextUpdated(context);
+ verify(mockListener).onDatastoreContextUpdated(contextFactory);
verify(mockBundleContext, times(2)).ungetService(mockConfigAdminServiceRef);
assertEquals(false, configContext.isPersistent());
assertEquals(444, configContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize());
}
+
+ @Test
+ public void testGetDatastoreContextForShard() {
+ Dictionary<String, Object> properties = new Hashtable<>();
+ properties.put("shard-transaction-idle-timeout-in-minutes", "22"); // global setting
+ properties.put("operational.shard-transaction-idle-timeout-in-minutes", "33"); // operational override
+ properties.put("config.shard-transaction-idle-timeout-in-minutes", "44"); // config override
+ properties.put("topology.shard-transaction-idle-timeout-in-minutes", "55"); // global shard override
+
+ DatastoreContext operContext = DatastoreContext.newBuilder().dataStoreType("operational").build();
+ DatastoreContextIntrospector operIntrospector = new DatastoreContextIntrospector(operContext);
+
+ DatastoreContext shardContext = operIntrospector.newContextFactory().getShardDatastoreContext("topology");
+ assertEquals(10, shardContext.getShardTransactionIdleTimeout().toMinutes());
+
+ operIntrospector.update(properties);
+ operContext = operIntrospector.getContext();
+ assertEquals(33, operContext.getShardTransactionIdleTimeout().toMinutes());
+
+ shardContext = operIntrospector.newContextFactory().getShardDatastoreContext("topology");
+ assertEquals(55, shardContext.getShardTransactionIdleTimeout().toMinutes());
+
+ DatastoreContext configContext = DatastoreContext.newBuilder().dataStoreType("config").build();
+ DatastoreContextIntrospector configIntrospector = new DatastoreContextIntrospector(configContext);
+ configIntrospector.update(properties);
+ configContext = configIntrospector.getContext();
+ assertEquals(44, configContext.getShardTransactionIdleTimeout().toMinutes());
+
+ shardContext = configIntrospector.newContextFactory().getShardDatastoreContext("topology");
+ assertEquals(55, shardContext.getShardTransactionIdleTimeout().toMinutes());
+
+ properties.put("operational.topology.shard-transaction-idle-timeout-in-minutes", "66"); // operational shard override
+ properties.put("config.topology.shard-transaction-idle-timeout-in-minutes", "77"); // config shard override
+
+ operIntrospector.update(properties);
+ shardContext = operIntrospector.newContextFactory().getShardDatastoreContext("topology");
+ assertEquals(66, shardContext.getShardTransactionIdleTimeout().toMinutes());
+
+ configIntrospector.update(properties);
+ shardContext = configIntrospector.newContextFactory().getShardDatastoreContext("topology");
+ assertEquals(77, shardContext.getShardTransactionIdleTimeout().toMinutes());
+
+ shardContext = configIntrospector.newContextFactory().getShardDatastoreContext("default");
+ assertEquals(44, shardContext.getShardTransactionIdleTimeout().toMinutes());
+ }
}
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
// Switch the leader to the follower
followerDatastoreContextBuilder.shardElectionTimeoutFactor(1);
- followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
+ sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1);
- followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
+ sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
followerDatastoreContextBuilder.operationTimeoutInMillis(10).shardElectionTimeoutFactor(1);
- followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
+ sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
followerDatastoreContextBuilder.operationTimeoutInMillis(500);
- followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
+ sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
followerTestKit.doCommit(rwTx.ready());
}
+
+ private void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) {
+ DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
+ Answer<DatastoreContext> answer = new Answer<DatastoreContext>() {
+ @Override
+ public DatastoreContext answer(InvocationOnMock invocation) {
+ return builder.build();
+ }
+ };
+ Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
+ Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
+ dataStore.onDatastoreContextUpdated(mockContextFactory);
+ }
}
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
datastoreContextBuilder.dataStoreType(typeName);
DatastoreContext datastoreContext = datastoreContextBuilder.build();
+ DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
+ Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
+ Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
- DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, datastoreContext);
+ DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, mockContextFactory);
dataStore.onGlobalContextUpdated(schemaContext);
}
}, expType);
}
-}
\ No newline at end of file
+}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.AddressFromURIString;
+import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status;
import akka.cluster.Cluster;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.ConfigFactory;
import java.net.URI;
+import java.util.AbstractMap;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
return newShardMgrProps(new MockConfiguration());
}
+ private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
+ DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
+ Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
+ Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
+ return mockFactory;
+ }
+
private Props newShardMgrProps(Configuration config) {
- return ShardManager.props(new MockClusterWrapper(), config, datastoreContextBuilder.build(), ready,
- primaryShardInfoCache);
+ return ShardManager.props(new MockClusterWrapper(), config,
+ newDatastoreContextFactory(datastoreContextBuilder.build()), ready, primaryShardInfoCache);
}
private Props newPropsShardMgrWithMockShardActor() {
private static final long serialVersionUID = 1L;
@Override
public ShardManager create() throws Exception {
- return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
- ready, name, shardActor, primaryShardInfoCache);
+ return new ForwardingShardManager(clusterWrapper, config, newDatastoreContextFactory(
+ datastoreContextBuilder.build()), ready, name, shardActor, primaryShardInfoCache);
}
};
return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
}
+ @Test
+ public void testPerShardDatastoreContext() throws Exception {
+ final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
+ datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
+
+ Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
+ shardElectionTimeoutFactor(6).build()).when(mockFactory).getShardDatastoreContext("default");
+
+ Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
+ shardElectionTimeoutFactor(7).build()).when(mockFactory).getShardDatastoreContext("topology");
+
+ final MockConfiguration mockConfig = new MockConfiguration() {
+ @Override
+ public Collection<String> getMemberShardNames(String memberName) {
+ return Arrays.asList("default", "topology");
+ }
+
+ @Override
+ public Collection<String> getMembersFromShardName(String shardName) {
+ return Arrays.asList("member-1");
+ }
+ };
+
+ final TestActorRef<MessageCollectorActor> defaultShardActor = TestActorRef.create(getSystem(),
+ Props.create(MessageCollectorActor.class), "default");
+ final TestActorRef<MessageCollectorActor> topologyShardActor = TestActorRef.create(getSystem(),
+ Props.create(MessageCollectorActor.class), "topology");
+
+ final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
+ new HashMap<String, Entry<ActorRef, DatastoreContext>>());
+ shardInfoMap.put("default", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(defaultShardActor, null));
+ shardInfoMap.put("topology", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(topologyShardActor, null));
+
+ final CountDownLatch newShardActorLatch = new CountDownLatch(2);
+ final Creator<ShardManager> creator = new Creator<ShardManager>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public ShardManager create() throws Exception {
+ return new ShardManager(new MockClusterWrapper(), mockConfig, mockFactory, ready, primaryShardInfoCache) {
+ @Override
+ protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+ Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
+ ActorRef ref = null;
+ if(entry != null) {
+ ref = entry.getKey();
+ entry.setValue(info.getDatastoreContext());
+ }
+
+ newShardActorLatch.countDown();
+ return ref;
+ }
+ };
+ }
+ };
+
+ JavaTestKit kit = new JavaTestKit(getSystem());
+
+ final ActorRef shardManager = getSystem().actorOf(Props.create(new DelegatingShardManagerCreator(creator)).
+ withDispatcher(Dispatchers.DefaultDispatcherId()));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
+
+ assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
+ assertEquals("getShardElectionTimeoutFactor", 6, shardInfoMap.get("default").getValue().
+ getShardElectionTimeoutFactor());
+ assertEquals("getShardElectionTimeoutFactor", 7, shardInfoMap.get("topology").getValue().
+ getShardElectionTimeoutFactor());
+
+ DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
+ datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
+ Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
+ shardElectionTimeoutFactor(66).build()).when(newMockFactory).getShardDatastoreContext("default");
+
+ Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
+ shardElectionTimeoutFactor(77).build()).when(newMockFactory).getShardDatastoreContext("topology");
+
+ shardManager.tell(newMockFactory, kit.getRef());
+
+ DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor, DatastoreContext.class);
+ assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
+
+ newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
+ assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
+
+ defaultShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ topologyShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
@Test
public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
@Test
public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
- final Props persistentProps = ShardManager.props(
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
+ final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+ newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready,
+ primaryShardInfoCache);
final TestActorRef<ShardManager> shardManager =
TestActorRef.create(getSystem(), persistentProps);
@Test
public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
- final Props persistentProps = ShardManager.props(
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
+ final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+ newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready,
+ primaryShardInfoCache);
final TestActorRef<ShardManager> shardManager =
TestActorRef.create(getSystem(), persistentProps);
@Test
public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
- final Props persistentProps = ShardManager.props(
- new MockClusterWrapper(),
+ final Props persistentProps = ShardManager.props(new MockClusterWrapper(),
new MockConfiguration() {
@Override
public List<String> getMemberShardNames(String memberName) {
return Arrays.asList("default", "astronauts");
}
},
- DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
+ newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready,
+ primaryShardInfoCache);
final TestActorRef<ShardManager> shardManager =
TestActorRef.create(getSystem(), persistentProps);
TestShardManager(String shardMrgIDSuffix) {
super(new MockClusterWrapper(), new MockConfiguration(),
- DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
- new PrimaryShardInfoFutureCache());
+ newDatastoreContextFactory(DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()),
+ ready, new PrimaryShardInfoFutureCache());
}
@Override
private final String name;
protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
- DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
+ DatastoreContextFactory factory, CountDownLatch waitTillReadyCountdownLatch, String name,
ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
- super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
+ super(cluster, configuration, factory, waitTillReadyCountdownLatch, primaryShardInfoCache);
this.shardActor = shardActor;
this.name = name;
}
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.ShardDataTree;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
}
});
- dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, datastoreContext );
+ DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
+ Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
+ Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
+
+ dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, mockContextFactory);
dataStore.onGlobalContextUpdated(SchemaContextHelper.entityOwners());
}
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
shardTransactionCommitTimeoutInSeconds(8).build();
- actorContext.setDatastoreContext(newContext);
+ DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
+ Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
- expectMsgClass(duration("5 seconds"), DatastoreContext.class);
+ actorContext.setDatastoreContext(mockContextFactory);
+
+ expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());