Bug 4149: Implement per-shard DatastoreContext settings 20/28420/6
authorTom Pantelis <tpanteli@brocade.com>
Thu, 15 Oct 2015 04:50:21 +0000 (00:50 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 6 Nov 2015 22:01:08 +0000 (22:01 +0000)
Added the ability to specify shard-specific settings in the .cfg file by
prefixing the shard name to the property name, similar to what we allow
at the datastore level.

I added a DatastoreContextFactory that has methods to get the base
DatastoreContext and a per-shard DatastoreContext. The
DatastoreContextFactory is now passed to the ShardManager instead of the
DatastoreContext. The DatastoreContextFactory uses the
DatastoreContextIntrospector to overlay per-shard settings onto the
base DatastoreContext.

Change-Id: I329c98c1577a74ebe665052f76e28da3867e2e86
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
14 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospectorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java

index 2688d0195df662f004a4d4738bf16330387b194f..448e810641053748452a2daf481970fe61dba11b 100644 (file)
@@ -29,7 +29,7 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable {
     public static final String CONFIG_ID = "org.opendaylight.controller.cluster.datastore";
 
     public static interface Listener {
-        void onDatastoreContextUpdated(DatastoreContext context);
+        void onDatastoreContextUpdated(DatastoreContextFactory contextFactory);
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(DatastoreContextConfigAdminOverlay.class);
@@ -72,7 +72,7 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable {
 
                 if(introspector.update(properties)) {
                     if(listener != null) {
-                        listener.onDatastoreContextUpdated(introspector.getContext());
+                        listener.onDatastoreContextUpdated(introspector.newContextFactory());
                     }
                 }
             } else {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextFactory.java
new file mode 100644 (file)
index 0000000..4652451
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Factory for creating DatastoreContext instances.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreContextFactory {
+    private final DatastoreContextIntrospector introspector;
+
+    public DatastoreContextFactory(DatastoreContextIntrospector introspector) {
+        this.introspector = introspector;
+    }
+
+    public DatastoreContext getShardDatastoreContext(String forShardName) {
+        return introspector.getShardDatastoreContext(forShardName);
+    }
+
+    public DatastoreContext getBaseDatastoreContext() {
+        return introspector.getContext();
+    }
+}
index 6a4d0a6f6d5bfc712b80950c149c733dfee43deb..094bb4a9e1d323debd66f207cab3626217c93116 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.primitives.Primitives;
 import java.beans.BeanInfo;
@@ -19,10 +20,12 @@ import java.beans.PropertyDescriptor;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Dictionary;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
@@ -180,6 +183,7 @@ public class DatastoreContextIntrospector {
     }
 
     private DatastoreContext context;
+    private Map<String, Object> currentProperties;
 
     public DatastoreContextIntrospector(DatastoreContext context) {
         this.context = context;
@@ -189,6 +193,36 @@ public class DatastoreContextIntrospector {
         return context;
     }
 
+    public DatastoreContextFactory newContextFactory() {
+        return new DatastoreContextFactory(this);
+    }
+
+    public synchronized DatastoreContext getShardDatastoreContext(String forShardName) {
+        if(currentProperties == null) {
+            return context;
+        }
+
+        Builder builder = DatastoreContext.newBuilderFrom(context);
+        String dataStoreTypePrefix = context.getDataStoreType() + '.';
+        final String shardNamePrefix = forShardName + '.';
+
+        List<String> keys = getSortedKeysByDatastoreType(currentProperties.keySet(), dataStoreTypePrefix);
+
+        for(String key: keys) {
+            Object value = currentProperties.get(key);
+            if(key.startsWith(dataStoreTypePrefix)) {
+                key = key.replaceFirst(dataStoreTypePrefix, "");
+            }
+
+            if(key.startsWith(shardNamePrefix)) {
+                key = key.replaceFirst(shardNamePrefix, "");
+                convertValueAndInvokeSetter(key, value, builder);
+            }
+        }
+
+        return builder.build();
+    }
+
     /**
      * Applies the given properties to the cached DatastoreContext and yields a new DatastoreContext
      * instance which can be obtained via {@link getContext}.
@@ -197,19 +231,50 @@ public class DatastoreContextIntrospector {
      * @return true if the cached DatastoreContext was updated, false otherwise.
      */
     public synchronized boolean update(Dictionary<String, Object> properties) {
+        currentProperties = null;
         if(properties == null || properties.isEmpty()) {
             return false;
         }
 
         LOG.debug("In update: properties: {}", properties);
 
+        ImmutableMap.Builder<String, Object> mapBuilder = ImmutableMap.<String, Object>builder();
+
         Builder builder = DatastoreContext.newBuilderFrom(context);
 
         final String dataStoreTypePrefix = context.getDataStoreType() + '.';
 
+        List<String> keys = getSortedKeysByDatastoreType(Collections.list(properties.keys()), dataStoreTypePrefix);
+
+        boolean updated = false;
+        for(String key: keys) {
+            Object value = properties.get(key);
+            mapBuilder.put(key, value);
+
+            // If the key is prefixed with the data store type, strip it off.
+            if(key.startsWith(dataStoreTypePrefix)) {
+                key = key.replaceFirst(dataStoreTypePrefix, "");
+            }
+
+            if(convertValueAndInvokeSetter(key, value, builder)) {
+                updated = true;
+            }
+        }
+
+        currentProperties = mapBuilder.build();
+
+        if(updated) {
+            context = builder.build();
+        }
+
+        return updated;
+    }
+
+    private ArrayList<String> getSortedKeysByDatastoreType(Collection<String> inKeys,
+            final String dataStoreTypePrefix) {
         // Sort the property keys by putting the names prefixed with the data store type last. This
         // is done so data store specific settings are applied after global settings.
-        ArrayList<String> keys = Collections.list(properties.keys());
+        ArrayList<String> keys = new ArrayList<>(inKeys);
         Collections.sort(keys, new Comparator<String>() {
             @Override
             public int compare(String key1, String key2) {
@@ -217,44 +282,33 @@ public class DatastoreContextIntrospector {
                            key2.startsWith(dataStoreTypePrefix) ? -1 : key1.compareTo(key2);
             }
         });
+        return keys;
+    }
 
-        boolean updated = false;
-        for(String key: keys) {
-            Object value = properties.get(key);
-            try {
-                // If the key is prefixed with the data store type, strip it off.
-                if(key.startsWith(dataStoreTypePrefix)) {
-                    key = key.replaceFirst(dataStoreTypePrefix, "");
-                }
-
-                key = convertToCamelCase(key);
-
-                // Convert the value to the right type.
-                value = convertValue(key, value);
-                if(value == null) {
-                    continue;
-                }
+    private boolean convertValueAndInvokeSetter(String inKey, Object inValue, Builder builder) {
+        String key = convertToCamelCase(inKey);
 
-                LOG.debug("Converted value for property {}: {} ({})",
-                        key, value, value.getClass().getSimpleName());
+        try {
+            // Convert the value to the right type.
+            Object value = convertValue(key, inValue);
+            if(value == null) {
+                return false;
+            }
 
-                // Call the setter method on the Builder instance.
-                Method setter = builderSetters.get(key);
-                setter.invoke(builder, constructorValueRecursively(
-                        Primitives.wrap(setter.getParameterTypes()[0]), value.toString()));
+            LOG.debug("Converted value for property {}: {} ({})",
+                    key, value, value.getClass().getSimpleName());
 
-                updated = true;
+            // Call the setter method on the Builder instance.
+            Method setter = builderSetters.get(key);
+            setter.invoke(builder, constructorValueRecursively(
+                    Primitives.wrap(setter.getParameterTypes()[0]), value.toString()));
 
-            } catch (Exception e) {
-                LOG.error("Error converting value ({}) for property {}", value, key, e);
-            }
-        }
-
-        if(updated) {
-            context = builder.build();
+            return true;
+        } catch (Exception e) {
+            LOG.error("Error converting value ({}) for property {}", inValue, key, e);
         }
 
-        return updated;
+        return false;
     }
 
     private static String convertToCamelCase(String inString) {
index 49f5388842d9b690609f063842fc60ce425ec78f..7f2416efc36cc17ce9101fc968ca45a1edbc3adf 100644 (file)
@@ -67,13 +67,13 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
     private final TransactionContextFactory txContextFactory;
 
     public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
-            Configuration configuration, DatastoreContext datastoreContext) {
+            Configuration configuration, DatastoreContextFactory datastoreContextFactory) {
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
-        Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null");
+        Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
 
-        this.type = datastoreContext.getDataStoreType();
+        this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
 
         String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
 
@@ -84,19 +84,21 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
 
         PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
         actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, cluster, configuration,
-                datastoreContext, shardDispatcher, shardManagerId, primaryShardInfoCache), cluster,
-                configuration, datastoreContext, primaryShardInfoCache);
+                datastoreContextFactory, shardDispatcher, shardManagerId, primaryShardInfoCache), cluster,
+                configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache);
 
         this.waitTillReadyTimeInMillis =
                 actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
 
         this.txContextFactory = TransactionContextFactory.create(actorContext);
 
-        datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(datastoreContext.getDataStoreMXBeanType());
-        datastoreConfigMXBean.setContext(datastoreContext);
+        datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(
+                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
+        datastoreConfigMXBean.setContext(datastoreContextFactory.getBaseDatastoreContext());
         datastoreConfigMXBean.registerMBean();
 
-        datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContext.getDataStoreMXBeanType(), actorContext);
+        datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContextFactory.getBaseDatastoreContext().
+                getDataStoreMXBeanType(), actorContext);
         datastoreInfoMXBean.registerMBean();
     }
 
@@ -177,11 +179,11 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
     }
 
     @Override
-    public void onDatastoreContextUpdated(DatastoreContext context) {
+    public void onDatastoreContextUpdated(DatastoreContextFactory contextFactory) {
         LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreType());
 
-        actorContext.setDatastoreContext(context);
-        datastoreConfigMXBean.setContext(context);
+        actorContext.setDatastoreContext(contextFactory);
+        datastoreConfigMXBean.setContext(contextFactory.getBaseDatastoreContext());
     }
 
     @Override
@@ -224,14 +226,14 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
     }
 
     private ActorRef createShardManager(ActorSystem actorSystem, ClusterWrapper cluster, Configuration configuration,
-                                        DatastoreContext datastoreContext, String shardDispatcher, String shardManagerId,
-                                        PrimaryShardInfoFutureCache primaryShardInfoCache){
+                                        DatastoreContextFactory datastoreContextFactory, String shardDispatcher,
+                                        String shardManagerId, PrimaryShardInfoFutureCache primaryShardInfoCache){
         Exception lastException = null;
 
         for(int i=0;i<100;i++) {
             try {
                 return actorSystem.actorOf(
-                        ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch,
+                        ShardManager.props(cluster, configuration, datastoreContextFactory, waitTillReadyCountDownLatch,
                                 primaryShardInfoCache).withDispatcher(shardDispatcher).withMailbox(
                                         ActorContext.MAILBOX), shardManagerId);
             } catch (Exception e){
index 25029b6f1800f099e8731e087517e0743d82518d..e5aa33a0f4caa98495178eb8f842a9f5e8c89904 100644 (file)
@@ -29,7 +29,7 @@ public class DistributedDataStoreFactory {
 
         Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
         final DistributedDataStore dataStore = new DistributedDataStore(actorSystem,
-                new ClusterWrapperImpl(actorSystem), config, introspector.getContext());
+                new ClusterWrapperImpl(actorSystem), config, introspector.newContextFactory());
 
         overlay.setListener(dataStore);
 
index 724c8d2c03dc787549f511f60ac3e289ea763978..4f3d4aa7f931b1af11e1198de001d02831ac6d63 100644 (file)
@@ -117,7 +117,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private ShardManagerInfo mBean;
 
-    private DatastoreContext datastoreContext;
+    private DatastoreContextFactory datastoreContextFactory;
 
     private final CountDownLatch waitTillReadyCountdownLatch;
 
@@ -130,21 +130,19 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     /**
      */
     protected ShardManager(ClusterWrapper cluster, Configuration configuration,
-            DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch,
+            DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch,
             PrimaryShardInfoFutureCache primaryShardInfoCache) {
 
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
-        this.datastoreContext = datastoreContext;
-        this.type = datastoreContext.getDataStoreType();
+        this.datastoreContextFactory = datastoreContextFactory;
+        this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
         this.primaryShardInfoCache = primaryShardInfoCache;
 
         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
-        this.datastoreContext = DatastoreContext.newBuilderFrom(datastoreContext).shardPeerAddressResolver(
-                peerAddressResolver).build();
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
@@ -155,7 +153,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     public static Props props(
             final ClusterWrapper cluster,
             final Configuration configuration,
-            final DatastoreContext datastoreContext,
+            final DatastoreContextFactory datastoreContextFactory,
             final CountDownLatch waitTillReadyCountdownLatch,
             final PrimaryShardInfoFutureCache primaryShardInfoCache) {
 
@@ -164,7 +162,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
         Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
 
-        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext,
+        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContextFactory,
                 waitTillReadyCountdownLatch, primaryShardInfoCache));
     }
 
@@ -195,8 +193,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             memberUnreachable((ClusterEvent.UnreachableMember)message);
         } else if(message instanceof ClusterEvent.ReachableMember) {
             memberReachable((ClusterEvent.ReachableMember) message);
-        } else if(message instanceof DatastoreContext) {
-            onDatastoreContext((DatastoreContext)message);
+        } else if(message instanceof DatastoreContextFactory) {
+            onDatastoreContextFactory((DatastoreContextFactory)message);
         } else if(message instanceof RoleChangeNotification) {
             onRoleChangeNotification((RoleChangeNotification) message);
         } else if(message instanceof FollowerInitialSyncUpStatus){
@@ -239,7 +237,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
             DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
             if(shardDatastoreContext == null) {
-                shardDatastoreContext = datastoreContext;
+                shardDatastoreContext = newShardDatastoreContext(moduleShardConfig.getShardName());
             } else {
                 shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
                         peerAddressResolver).build();
@@ -266,6 +264,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
+        return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
+                shardPeerAddressResolver(peerAddressResolver);
+    }
+
+    private DatastoreContext newShardDatastoreContext(String shardName) {
+        return newShardDatastoreContextBuilder(shardName).build();
+    }
+
     private void checkReady(){
         if (isReadyWithLeaderId()) {
             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
@@ -443,11 +450,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
 
-                FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration();
+                FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
                 if(shardInformation.isShardInitialized()) {
                     // If the shard is already initialized then we'll wait enough time for the shard to
                     // elect a leader, ie 2 times the election timeout.
-                    timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig()
+                    timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
                 }
 
@@ -574,13 +581,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onDatastoreContext(DatastoreContext context) {
-        datastoreContext = DatastoreContext.newBuilderFrom(context).shardPeerAddressResolver(
-                peerAddressResolver).build();
+    private void onDatastoreContextFactory(DatastoreContextFactory factory) {
+        datastoreContextFactory = factory;
         for (ShardInformation info : localShards.values()) {
-            if (info.getActor() != null) {
-                info.getActor().tell(datastoreContext, getSelf());
-            }
+            info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
         }
     }
 
@@ -699,12 +703,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
             localShardActorNames.add(shardId.toString());
-            localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext,
-                    shardPropsCreator, peerAddressResolver));
+            localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
+                    newShardDatastoreContext(shardName), shardPropsCreator, peerAddressResolver));
         }
 
         mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
-                datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
+                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), localShardActorNames);
 
         mBean.setShardManager(this);
     }
@@ -755,12 +759,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return mBean;
     }
 
-    private DatastoreContext getInitShardDataStoreContext() {
-        return (DatastoreContext.newBuilderFrom(datastoreContext)
-                .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
-                .build());
-    }
-
     private void checkLocalShardExists(final String shardName, final ActorRef sender) {
         if (localShards.containsKey(shardName)) {
             String msg = String.format("Local shard %s already exists", shardName);
@@ -802,7 +800,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
-        Timeout findPrimaryTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
+        Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+                getShardInitializationTimeout().duration().$times(2));
 
         final ActorRef sender = getSender();
         Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout);
@@ -835,8 +834,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+
+        DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
+                DisableElectionsRaftPolicy.class.getName()).build();
+
         final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
-                          getPeerAddresses(shardName), getInitShardDataStoreContext(),
+                          getPeerAddresses(shardName), datastoreContext,
                           new DefaultShardPropsCreator(), peerAddressResolver);
         localShards.put(shardName, shardInfo);
         shardInfo.setActor(newShardActor(schemaContext, shardInfo));
@@ -845,8 +848,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
                 response.getPrimaryPath(), shardId);
 
-        Timeout addServerTimeout = new Timeout(datastoreContext
-                       .getShardLeaderElectionTimeout().duration().$times(4));
+        Timeout addServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(4));
         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
             new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout);
 
@@ -884,7 +886,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
 
             // Make the local shard voting capable
-            shardInfo.setDatastoreContext(datastoreContext, getSelf());
+            shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
 
             mBean.addLocalShard(shardInfo.getShardId().toString());
             sender.tell(new akka.actor.Status.Success(true), getSelf());
@@ -998,6 +1000,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return localShardDataTree;
         }
 
+        DatastoreContext getDatastoreContext() {
+            return datastoreContext;
+        }
+
+        void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
+            this.datastoreContext = datastoreContext;
+            if (actor != null) {
+                LOG.debug ("Sending new DatastoreContext to {}", shardId);
+                actor.tell(this.datastoreContext, sender);
+            }
+        }
+
         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
 
@@ -1135,16 +1149,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         void setLeaderVersion(short leaderVersion) {
             this.leaderVersion = leaderVersion;
         }
-
-        void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
-            this.datastoreContext = datastoreContext;
-            //notify the datastoreContextchange
-            LOG.debug ("Notifying RaftPolicy change via datastoreContextChange for {}",
-                 this.shardName);
-            if (actor != null) {
-                actor.tell(this.datastoreContext, sender);
-            }
-        }
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {
@@ -1152,22 +1156,23 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         final ClusterWrapper cluster;
         final Configuration configuration;
-        final DatastoreContext datastoreContext;
+        final DatastoreContextFactory datastoreContextFactory;
         private final CountDownLatch waitTillReadyCountdownLatch;
         private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
-        ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext,
-                CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) {
+        ShardManagerCreator(ClusterWrapper cluster, Configuration configuration,
+                DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch,
+                PrimaryShardInfoFutureCache primaryShardInfoCache) {
             this.cluster = cluster;
             this.configuration = configuration;
-            this.datastoreContext = datastoreContext;
+            this.datastoreContextFactory = datastoreContextFactory;
             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
             this.primaryShardInfoCache = primaryShardInfoCache;
         }
 
         @Override
         public ShardManager create() throws Exception {
-            return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch,
+            return new ShardManager(cluster, configuration, datastoreContextFactory, waitTillReadyCountdownLatch,
                     primaryShardInfoCache);
         }
     }
index bc492887f97eeee69f9b77c5ef5308057768ed49..792064cd6700cc2d0cf6a09423e8964ee980eab1 100644 (file)
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
@@ -172,8 +173,8 @@ public class ActorContext {
         }
     }
 
-    public void setDatastoreContext(DatastoreContext context) {
-        this.datastoreContext = context;
+    public void setDatastoreContext(DatastoreContextFactory contextFactory) {
+        this.datastoreContext = contextFactory.getBaseDatastoreContext();
         setCachedProperties();
 
         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
@@ -186,7 +187,7 @@ public class ActorContext {
         updated = true;
 
         if(shardManager != null) {
-            shardManager.tell(context, ActorRef.noSender());
+            shardManager.tell(contextFactory, ActorRef.noSender());
         }
     }
 
index ecfcdefcb28d2eeecc24e934180372178ba7678f..700b96a87e35040c9821996d595f7a64b4c23623 100644 (file)
@@ -96,6 +96,8 @@ public class DatastoreContextConfigAdminOverlayTest {
 
         DatastoreContext context = DatastoreContext.newBuilder().build();
         doReturn(context).when(mockIntrospector).getContext();
+        DatastoreContextFactory contextFactory = new DatastoreContextFactory(mockIntrospector);
+        doReturn(contextFactory).when(mockIntrospector).newContextFactory();
 
         DatastoreContextConfigAdminOverlay.Listener mockListener =
                 mock(DatastoreContextConfigAdminOverlay.Listener.class);
@@ -122,7 +124,7 @@ public class DatastoreContextConfigAdminOverlayTest {
 
         verify(mockIntrospector).update(properties);
 
-        verify(mockListener).onDatastoreContextUpdated(context);
+        verify(mockListener).onDatastoreContextUpdated(contextFactory);
 
         verify(mockBundleContext, times(2)).ungetService(mockConfigAdminServiceRef);
 
index f2857f515e5535c453c162441d4b7b9cd03b777c..2bd2e61296daec3f41d9264b78d8652d3159c762 100644 (file)
@@ -185,4 +185,49 @@ public class DatastoreContextIntrospectorTest {
         assertEquals(false, configContext.isPersistent());
         assertEquals(444, configContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize());
     }
+
+    @Test
+    public void testGetDatastoreContextForShard() {
+        Dictionary<String, Object> properties = new Hashtable<>();
+        properties.put("shard-transaction-idle-timeout-in-minutes", "22"); // global setting
+        properties.put("operational.shard-transaction-idle-timeout-in-minutes", "33"); // operational override
+        properties.put("config.shard-transaction-idle-timeout-in-minutes", "44"); // config override
+        properties.put("topology.shard-transaction-idle-timeout-in-minutes", "55"); // global shard override
+
+        DatastoreContext operContext = DatastoreContext.newBuilder().dataStoreType("operational").build();
+        DatastoreContextIntrospector operIntrospector = new DatastoreContextIntrospector(operContext);
+
+        DatastoreContext shardContext = operIntrospector.newContextFactory().getShardDatastoreContext("topology");
+        assertEquals(10, shardContext.getShardTransactionIdleTimeout().toMinutes());
+
+        operIntrospector.update(properties);
+        operContext = operIntrospector.getContext();
+        assertEquals(33, operContext.getShardTransactionIdleTimeout().toMinutes());
+
+        shardContext = operIntrospector.newContextFactory().getShardDatastoreContext("topology");
+        assertEquals(55, shardContext.getShardTransactionIdleTimeout().toMinutes());
+
+        DatastoreContext configContext = DatastoreContext.newBuilder().dataStoreType("config").build();
+        DatastoreContextIntrospector configIntrospector = new DatastoreContextIntrospector(configContext);
+        configIntrospector.update(properties);
+        configContext = configIntrospector.getContext();
+        assertEquals(44, configContext.getShardTransactionIdleTimeout().toMinutes());
+
+        shardContext = configIntrospector.newContextFactory().getShardDatastoreContext("topology");
+        assertEquals(55, shardContext.getShardTransactionIdleTimeout().toMinutes());
+
+        properties.put("operational.topology.shard-transaction-idle-timeout-in-minutes", "66"); // operational shard override
+        properties.put("config.topology.shard-transaction-idle-timeout-in-minutes", "77"); // config shard override
+
+        operIntrospector.update(properties);
+        shardContext = operIntrospector.newContextFactory().getShardDatastoreContext("topology");
+        assertEquals(66, shardContext.getShardTransactionIdleTimeout().toMinutes());
+
+        configIntrospector.update(properties);
+        shardContext = configIntrospector.newContextFactory().getShardDatastoreContext("topology");
+        assertEquals(77, shardContext.getShardTransactionIdleTimeout().toMinutes());
+
+        shardContext = configIntrospector.newContextFactory().getShardDatastoreContext("default");
+        assertEquals(44, shardContext.getShardTransactionIdleTimeout().toMinutes());
+    }
 }
index c48105e8be1c71f2dc31518e93a6b5f2b0442958..5f935a6aa61e1e528ad7b7ca77faba5d274f8ff4 100644 (file)
@@ -34,6 +34,9 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
@@ -482,7 +485,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
         // Switch the leader to the follower
 
         followerDatastoreContextBuilder.shardElectionTimeoutFactor(1);
-        followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
+        sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
 
         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
 
@@ -580,7 +583,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
 
         followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1);
-        followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
+        sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
 
         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
 
@@ -612,7 +615,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
         Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 
         followerDatastoreContextBuilder.operationTimeoutInMillis(10).shardElectionTimeoutFactor(1);
-        followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
+        sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
 
         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
 
@@ -646,7 +649,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
 
         followerDatastoreContextBuilder.operationTimeoutInMillis(500);
-        followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
+        sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
 
         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
 
@@ -654,4 +657,17 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
         followerTestKit.doCommit(rwTx.ready());
     }
+
+    private void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) {
+        DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
+        Answer<DatastoreContext> answer = new Answer<DatastoreContext>() {
+            @Override
+            public DatastoreContext answer(InvocationOnMock invocation) {
+                return builder.build();
+            }
+        };
+        Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
+        Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
+        dataStore.onDatastoreContextUpdated(mockContextFactory);
+    }
 }
index 1a7bbb21b3867134e50e163ee429392b1da12fde..9c07a61e604486a5f04d0488c4393c9dfecf9396 100644 (file)
@@ -18,6 +18,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
+import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
@@ -64,8 +65,11 @@ public class IntegrationTestKit extends ShardTestKit {
         datastoreContextBuilder.dataStoreType(typeName);
 
         DatastoreContext datastoreContext = datastoreContextBuilder.build();
+        DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
+        Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
+        Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
 
-        DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, datastoreContext);
+        DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, mockContextFactory);
 
         dataStore.onGlobalContextUpdated(schemaContext);
 
@@ -192,4 +196,4 @@ public class IntegrationTestKit extends ShardTestKit {
             }
         }, expType);
     }
-}
\ No newline at end of file
+}
index bd7e7d65c663f7195b4d112836799280bf558057..6f3946d42219ed963b0ed03da2448a675165cf12 100644 (file)
@@ -20,6 +20,7 @@ import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.AddressFromURIString;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Status;
 import akka.cluster.Cluster;
@@ -38,15 +39,21 @@ import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.net.URI;
+import java.util.AbstractMap;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
@@ -142,9 +149,16 @@ public class ShardManagerTest extends AbstractActorTest {
         return newShardMgrProps(new MockConfiguration());
     }
 
+    private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
+        DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
+        Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
+        Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
+        return mockFactory;
+    }
+
     private Props newShardMgrProps(Configuration config) {
-        return ShardManager.props(new MockClusterWrapper(), config, datastoreContextBuilder.build(), ready,
-                primaryShardInfoCache);
+        return ShardManager.props(new MockClusterWrapper(), config,
+                newDatastoreContextFactory(datastoreContextBuilder.build()), ready, primaryShardInfoCache);
     }
 
     private Props newPropsShardMgrWithMockShardActor() {
@@ -158,14 +172,102 @@ public class ShardManagerTest extends AbstractActorTest {
             private static final long serialVersionUID = 1L;
             @Override
             public ShardManager create() throws Exception {
-                return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
-                        ready, name, shardActor, primaryShardInfoCache);
+                return new ForwardingShardManager(clusterWrapper, config, newDatastoreContextFactory(
+                        datastoreContextBuilder.build()), ready, name, shardActor, primaryShardInfoCache);
             }
         };
 
         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
     }
 
+    @Test
+    public void testPerShardDatastoreContext() throws Exception {
+        final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
+                datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
+
+        Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
+                shardElectionTimeoutFactor(6).build()).when(mockFactory).getShardDatastoreContext("default");
+
+        Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
+                shardElectionTimeoutFactor(7).build()).when(mockFactory).getShardDatastoreContext("topology");
+
+        final MockConfiguration mockConfig = new MockConfiguration() {
+            @Override
+            public Collection<String> getMemberShardNames(String memberName) {
+                return Arrays.asList("default", "topology");
+            }
+
+            @Override
+            public Collection<String> getMembersFromShardName(String shardName) {
+                return Arrays.asList("member-1");
+            }
+        };
+
+        final TestActorRef<MessageCollectorActor> defaultShardActor = TestActorRef.create(getSystem(),
+                Props.create(MessageCollectorActor.class), "default");
+        final TestActorRef<MessageCollectorActor> topologyShardActor = TestActorRef.create(getSystem(),
+                Props.create(MessageCollectorActor.class), "topology");
+
+        final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
+                new HashMap<String, Entry<ActorRef, DatastoreContext>>());
+        shardInfoMap.put("default", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(defaultShardActor, null));
+        shardInfoMap.put("topology", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(topologyShardActor, null));
+
+        final CountDownLatch newShardActorLatch = new CountDownLatch(2);
+        final Creator<ShardManager> creator = new Creator<ShardManager>() {
+            private static final long serialVersionUID = 1L;
+            @Override
+            public ShardManager create() throws Exception {
+                return new ShardManager(new MockClusterWrapper(), mockConfig, mockFactory, ready, primaryShardInfoCache) {
+                    @Override
+                    protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+                        Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
+                        ActorRef ref = null;
+                        if(entry != null) {
+                            ref = entry.getKey();
+                            entry.setValue(info.getDatastoreContext());
+                        }
+
+                        newShardActorLatch.countDown();
+                        return ref;
+                    }
+                };
+            }
+        };
+
+        JavaTestKit kit = new JavaTestKit(getSystem());
+
+        final ActorRef shardManager = getSystem().actorOf(Props.create(new DelegatingShardManagerCreator(creator)).
+                    withDispatcher(Dispatchers.DefaultDispatcherId()));
+
+        shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
+
+        assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
+        assertEquals("getShardElectionTimeoutFactor", 6, shardInfoMap.get("default").getValue().
+                getShardElectionTimeoutFactor());
+        assertEquals("getShardElectionTimeoutFactor", 7, shardInfoMap.get("topology").getValue().
+                getShardElectionTimeoutFactor());
+
+        DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
+                datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
+        Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
+                shardElectionTimeoutFactor(66).build()).when(newMockFactory).getShardDatastoreContext("default");
+
+        Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
+                shardElectionTimeoutFactor(77).build()).when(newMockFactory).getShardDatastoreContext("topology");
+
+        shardManager.tell(newMockFactory, kit.getRef());
+
+        DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor, DatastoreContext.class);
+        assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
+
+        newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
+        assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
+
+        defaultShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        topologyShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    }
+
     @Test
     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -842,10 +944,9 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
-        final Props persistentProps = ShardManager.props(
-                new MockClusterWrapper(),
-                new MockConfiguration(),
-                DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
+        final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+                newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready,
+                primaryShardInfoCache);
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -876,10 +977,9 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
-        final Props persistentProps = ShardManager.props(
-                new MockClusterWrapper(),
-                new MockConfiguration(),
-                DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
+        final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+                newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready,
+                primaryShardInfoCache);
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -904,15 +1004,15 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
-        final Props persistentProps = ShardManager.props(
-                new MockClusterWrapper(),
+        final Props persistentProps = ShardManager.props(new MockClusterWrapper(),
                 new MockConfiguration() {
                     @Override
                     public List<String> getMemberShardNames(String memberName) {
                         return Arrays.asList("default", "astronauts");
                     }
                 },
-                DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
+                newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready,
+                primaryShardInfoCache);
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -1192,8 +1292,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
         TestShardManager(String shardMrgIDSuffix) {
             super(new MockClusterWrapper(), new MockConfiguration(),
-                    DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
-                    new PrimaryShardInfoFutureCache());
+                    newDatastoreContextFactory(DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()),
+                    ready, new PrimaryShardInfoFutureCache());
         }
 
         @Override
@@ -1252,9 +1352,9 @@ public class ShardManagerTest extends AbstractActorTest {
         private final String name;
 
         protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
-                DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
+                DatastoreContextFactory factory, CountDownLatch waitTillReadyCountdownLatch, String name,
                 ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
-            super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
+            super(cluster, configuration, factory, waitTillReadyCountdownLatch, primaryShardInfoCache);
             this.shardActor = shardActor;
             this.name = name;
         }
index ce2ef2dd7aa8dc94840fd222e906d87ddab87e4e..54a27e08bb069934012eac07b0ca4c2b034f3662 100644 (file)
@@ -34,7 +34,9 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
@@ -91,7 +93,11 @@ public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnersh
             }
         });
 
-        dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, datastoreContext );
+        DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
+        Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
+        Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
+
+        dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, mockContextFactory);
 
         dataStore.onGlobalContextUpdated(SchemaContextHelper.entityOwners());
     }
index 6a5d6ef43442a91b31192131c114bae3b2e84567..7e484ebde3d79bcadbb182162c55c23c56675d78 100644 (file)
@@ -43,6 +43,7 @@ import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
@@ -374,9 +375,12 @@ public class ActorContextTest extends AbstractActorTest{
             DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
                     shardTransactionCommitTimeoutInSeconds(8).build();
 
-            actorContext.setDatastoreContext(newContext);
+            DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
+            Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
 
-            expectMsgClass(duration("5 seconds"), DatastoreContext.class);
+            actorContext.setDatastoreContext(mockContextFactory);
+
+            expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
 
             Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());