Bug 4149: Implement per-shard DatastoreContext settings
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 724c8d2c03dc787549f511f60ac3e289ea763978..4f3d4aa7f931b1af11e1198de001d02831ac6d63 100644 (file)
@@ -117,7 +117,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private ShardManagerInfo mBean;
 
-    private DatastoreContext datastoreContext;
+    private DatastoreContextFactory datastoreContextFactory;
 
     private final CountDownLatch waitTillReadyCountdownLatch;
 
@@ -130,21 +130,19 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     /**
      */
     protected ShardManager(ClusterWrapper cluster, Configuration configuration,
-            DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch,
+            DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch,
             PrimaryShardInfoFutureCache primaryShardInfoCache) {
 
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
-        this.datastoreContext = datastoreContext;
-        this.type = datastoreContext.getDataStoreType();
+        this.datastoreContextFactory = datastoreContextFactory;
+        this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
         this.primaryShardInfoCache = primaryShardInfoCache;
 
         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
-        this.datastoreContext = DatastoreContext.newBuilderFrom(datastoreContext).shardPeerAddressResolver(
-                peerAddressResolver).build();
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
@@ -155,7 +153,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     public static Props props(
             final ClusterWrapper cluster,
             final Configuration configuration,
-            final DatastoreContext datastoreContext,
+            final DatastoreContextFactory datastoreContextFactory,
             final CountDownLatch waitTillReadyCountdownLatch,
             final PrimaryShardInfoFutureCache primaryShardInfoCache) {
 
@@ -164,7 +162,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
         Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
 
-        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext,
+        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContextFactory,
                 waitTillReadyCountdownLatch, primaryShardInfoCache));
     }
 
@@ -195,8 +193,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             memberUnreachable((ClusterEvent.UnreachableMember)message);
         } else if(message instanceof ClusterEvent.ReachableMember) {
             memberReachable((ClusterEvent.ReachableMember) message);
-        } else if(message instanceof DatastoreContext) {
-            onDatastoreContext((DatastoreContext)message);
+        } else if(message instanceof DatastoreContextFactory) {
+            onDatastoreContextFactory((DatastoreContextFactory)message);
         } else if(message instanceof RoleChangeNotification) {
             onRoleChangeNotification((RoleChangeNotification) message);
         } else if(message instanceof FollowerInitialSyncUpStatus){
@@ -239,7 +237,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
             DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
             if(shardDatastoreContext == null) {
-                shardDatastoreContext = datastoreContext;
+                shardDatastoreContext = newShardDatastoreContext(moduleShardConfig.getShardName());
             } else {
                 shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
                         peerAddressResolver).build();
@@ -266,6 +264,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
+        return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
+                shardPeerAddressResolver(peerAddressResolver);
+    }
+
+    private DatastoreContext newShardDatastoreContext(String shardName) {
+        return newShardDatastoreContextBuilder(shardName).build();
+    }
+
     private void checkReady(){
         if (isReadyWithLeaderId()) {
             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
@@ -443,11 +450,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
 
-                FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration();
+                FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
                 if(shardInformation.isShardInitialized()) {
                     // If the shard is already initialized then we'll wait enough time for the shard to
                     // elect a leader, ie 2 times the election timeout.
-                    timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig()
+                    timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
                 }
 
@@ -574,13 +581,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onDatastoreContext(DatastoreContext context) {
-        datastoreContext = DatastoreContext.newBuilderFrom(context).shardPeerAddressResolver(
-                peerAddressResolver).build();
+    private void onDatastoreContextFactory(DatastoreContextFactory factory) {
+        datastoreContextFactory = factory;
         for (ShardInformation info : localShards.values()) {
-            if (info.getActor() != null) {
-                info.getActor().tell(datastoreContext, getSelf());
-            }
+            info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
         }
     }
 
@@ -699,12 +703,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
             localShardActorNames.add(shardId.toString());
-            localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext,
-                    shardPropsCreator, peerAddressResolver));
+            localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
+                    newShardDatastoreContext(shardName), shardPropsCreator, peerAddressResolver));
         }
 
         mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
-                datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
+                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), localShardActorNames);
 
         mBean.setShardManager(this);
     }
@@ -755,12 +759,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return mBean;
     }
 
-    private DatastoreContext getInitShardDataStoreContext() {
-        return (DatastoreContext.newBuilderFrom(datastoreContext)
-                .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
-                .build());
-    }
-
     private void checkLocalShardExists(final String shardName, final ActorRef sender) {
         if (localShards.containsKey(shardName)) {
             String msg = String.format("Local shard %s already exists", shardName);
@@ -802,7 +800,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
-        Timeout findPrimaryTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
+        Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+                getShardInitializationTimeout().duration().$times(2));
 
         final ActorRef sender = getSender();
         Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout);
@@ -835,8 +834,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+
+        DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
+                DisableElectionsRaftPolicy.class.getName()).build();
+
         final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
-                          getPeerAddresses(shardName), getInitShardDataStoreContext(),
+                          getPeerAddresses(shardName), datastoreContext,
                           new DefaultShardPropsCreator(), peerAddressResolver);
         localShards.put(shardName, shardInfo);
         shardInfo.setActor(newShardActor(schemaContext, shardInfo));
@@ -845,8 +848,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
                 response.getPrimaryPath(), shardId);
 
-        Timeout addServerTimeout = new Timeout(datastoreContext
-                       .getShardLeaderElectionTimeout().duration().$times(4));
+        Timeout addServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(4));
         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
             new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout);
 
@@ -884,7 +886,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
 
             // Make the local shard voting capable
-            shardInfo.setDatastoreContext(datastoreContext, getSelf());
+            shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
 
             mBean.addLocalShard(shardInfo.getShardId().toString());
             sender.tell(new akka.actor.Status.Success(true), getSelf());
@@ -998,6 +1000,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return localShardDataTree;
         }
 
+        DatastoreContext getDatastoreContext() {
+            return datastoreContext;
+        }
+
+        void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
+            this.datastoreContext = datastoreContext;
+            if (actor != null) {
+                LOG.debug ("Sending new DatastoreContext to {}", shardId);
+                actor.tell(this.datastoreContext, sender);
+            }
+        }
+
         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
 
@@ -1135,16 +1149,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         void setLeaderVersion(short leaderVersion) {
             this.leaderVersion = leaderVersion;
         }
-
-        void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
-            this.datastoreContext = datastoreContext;
-            //notify the datastoreContextchange
-            LOG.debug ("Notifying RaftPolicy change via datastoreContextChange for {}",
-                 this.shardName);
-            if (actor != null) {
-                actor.tell(this.datastoreContext, sender);
-            }
-        }
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {
@@ -1152,22 +1156,23 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         final ClusterWrapper cluster;
         final Configuration configuration;
-        final DatastoreContext datastoreContext;
+        final DatastoreContextFactory datastoreContextFactory;
         private final CountDownLatch waitTillReadyCountdownLatch;
         private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
-        ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext,
-                CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) {
+        ShardManagerCreator(ClusterWrapper cluster, Configuration configuration,
+                DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch,
+                PrimaryShardInfoFutureCache primaryShardInfoCache) {
             this.cluster = cluster;
             this.configuration = configuration;
-            this.datastoreContext = datastoreContext;
+            this.datastoreContextFactory = datastoreContextFactory;
             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
             this.primaryShardInfoCache = primaryShardInfoCache;
         }
 
         @Override
         public ShardManager create() throws Exception {
-            return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch,
+            return new ShardManager(cluster, configuration, datastoreContextFactory, waitTillReadyCountdownLatch,
                     primaryShardInfoCache);
         }
     }