Dynamically update DatastoreContext properties 71/15571/6
authortpantelis <tpanteli@brocade.com>
Thu, 19 Feb 2015 21:00:38 +0000 (16:00 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Sun, 22 Feb 2015 13:25:47 +0000 (08:25 -0500)
Added a ConfigurationListener to the DatastoreContextConfigAdminOverlay
that is registered as an OSGi service to receive ConfigurationEvents
from the Config Admin when the
org.opendaylight.controller.cluster.datastore.cfg file is modified.

The DistributedDataStore registers as a listener to the
DatastoreContextConfigAdminOverlay to get updated when the
DatastoreContext instance changes due to a ConfigurationEvent. The
DistributedDataStore sets the new DatastoreContext instance in the
ActorContext which updates its cached instance and properties and
sends the DistributedDataStore instance as a message to the ShardManager.
The ShardManager simply sends it out to all the Shards which update
their cached DatastoreContext state.

The Shard also updates the new ConfigParams instance in the base RaftActor
which sets it in its RaftActorContext. There was one place in
FollowerLogInformationImpl where it had a local cache of a property
obtained from the ConfigParams so I changed the ctor to pass the
RaftActorContext instead and obtain the property each time from the
ConfigParams.

A slight downside to this is that we'll have to be cognizant of not caching
DatastoreContext/ConfigParams properties and assuming they're static. Or
if we do cache we need to handle updates.

I toyed around with trying to restart the DistributedDataStore with a
new DatastoreContext and ActorContext and killing the previous
ShardManager and Shard actors and creating new ones. However recreating
the Shards without being disruptive to clients is tricky and risky, eg handling
infight transactions, and I didn't see a clean way to do it without
possibly causing inflight transactions to fail. .So I went with the simpler
approach of just pushing an updated DatastoreContext to the actors.

Change-Id: Ie608f61da36ac58a806208925a3c4277968c2f5b
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
19 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties

index 3e6742c17d37c178d30c7d570490ff993c068806..d4d13899eb770e4ee8bb88a634dd85ec2cfb7bbc 100644 (file)
@@ -43,6 +43,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
     private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
     private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis();
+    private FiniteDuration electionTimeOutInterval;
 
     // 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's
     // in-memory journal can use before it needs to snapshot
@@ -52,6 +53,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
         this.heartBeatInterval = heartBeatInterval;
+        electionTimeOutInterval = null;
     }
 
     public void setSnapshotBatchCount(long snapshotBatchCount) {
@@ -72,6 +74,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     public void setElectionTimeoutFactor(long electionTimeoutFactor){
         this.electionTimeoutFactor = electionTimeoutFactor;
+        electionTimeOutInterval = null;
     }
 
     @Override
@@ -92,7 +95,11 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     @Override
     public FiniteDuration getElectionTimeOutInterval() {
-        return getHeartBeatInterval().$times(electionTimeoutFactor);
+        if(electionTimeOutInterval == null) {
+            electionTimeOutInterval = getHeartBeatInterval().$times(electionTimeoutFactor);
+        }
+
+        return electionTimeOutInterval;
     }
 
     @Override
index 04b9f163f4b6ad7be0f423e485902f7b3d5ccb79..90e128256132437c5f2acd4cb861e0e74a759d44 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.raft;
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import scala.concurrent.duration.FiniteDuration;
 
 public class FollowerLogInformationImpl implements FollowerLogInformation {
     private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
@@ -21,18 +20,17 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
 
-    private final long followerTimeoutMillis;
+    private final RaftActorContext context;
 
     private volatile long nextIndex;
 
     private volatile long matchIndex;
 
-    public FollowerLogInformationImpl(String id, long nextIndex,
-        long matchIndex, FiniteDuration followerTimeoutDuration) {
+    public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
         this.id = id;
-        this.nextIndex = nextIndex;
+        this.nextIndex = context.getCommitIndex();
         this.matchIndex = matchIndex;
-        this.followerTimeoutMillis = followerTimeoutDuration.toMillis();
+        this.context = context;
     }
 
     @Override
@@ -78,7 +76,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
     @Override
     public boolean isFollowerActive() {
         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
-        return (stopwatch.isRunning()) && (elapsed <= followerTimeoutMillis);
+        return (stopwatch.isRunning()) &&
+                (elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis());
     }
 
     @Override
@@ -107,7 +106,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
         builder.append("FollowerLogInformationImpl [id=").append(id).append(", nextIndex=").append(nextIndex)
                 .append(", matchIndex=").append(matchIndex).append(", stopwatch=")
                 .append(stopwatch.elapsed(TimeUnit.MILLISECONDS))
-                .append(", followerTimeoutMillis=").append(followerTimeoutMillis).append("]");
+                .append(", followerTimeoutMillis=")
+                .append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]");
         return builder.toString();
     }
 
index 285be39c0b3286c2abbdaf3c08e09c1f85c0224f..edcfcc979c1161f6094fecc50cd68be76bebbb62 100644 (file)
@@ -98,7 +98,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * This context should NOT be passed directly to any other actor it is
      * only to be consumed by the RaftActorBehaviors
      */
-    private final RaftActorContext context;
+    private final RaftActorContextImpl context;
 
     /**
      * The in-memory journal
@@ -139,6 +139,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         super.preStart();
     }
 
+    @Override
+    public void postStop() {
+        if(currentBehavior != null) {
+            try {
+                currentBehavior.close();
+            } catch (Exception e) {
+                LOG.debug("{}: Error closing behavior {}", persistenceId(), currentBehavior.state());
+            }
+        }
+
+        super.postStop();
+    }
+
     @Override
     public void handleRecover(Object message) {
         if(persistence().isRecoveryApplicable()) {
@@ -511,6 +524,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return context;
     }
 
+    protected void updateConfigParams(ConfigParams configParams) {
+        context.setConfigParams(configParams);
+    }
+
     /**
      * setPeerAddress sets the address of a known peer at a later time.
      * <p>
index b71b3be3522e1082e46f56f681a0ee49440ab79d..6fc5e4369bb4879e1e8dd2ca01f92b98d5ec4ba4 100644 (file)
@@ -37,7 +37,7 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private final Logger LOG;
 
-    private final ConfigParams configParams;
+    private ConfigParams configParams;
 
     private boolean snapshotCaptureInitiated;
 
@@ -59,6 +59,10 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.LOG = logger;
     }
 
+    void setConfigParams(ConfigParams configParams) {
+        this.configParams = configParams;
+    }
+
     @Override
     public ActorRef actorOf(Props props){
         return context.actorOf(props);
index be51ba069cc5056636646566d1db00b30154073a..890d45e8fb664b20a4f27e8996d38b15e27a0120 100644 (file)
@@ -97,9 +97,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
         for (String followerId : context.getPeerAddresses().keySet()) {
             FollowerLogInformation followerLogInformation =
-                new FollowerLogInformationImpl(followerId,
-                    context.getCommitIndex(), -1,
-                    context.getConfigParams().getElectionTimeOutInterval());
+                new FollowerLogInformationImpl(followerId, -1, context);
 
             ftlBuilder.put(followerId, followerLogInformation);
         }
index 84d1545a65e3302462b69f9ee96f8dd982955654..5be9030f5957bd361c73ead2d11353eb5d178f95 100644 (file)
@@ -7,29 +7,29 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.TimeUnit;
 import org.junit.Test;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 public class FollowerLogInformationImplTest {
 
     @Test
     public void testIsFollowerActive() {
 
-        FiniteDuration timeoutDuration =
-            new FiniteDuration(500, TimeUnit.MILLISECONDS);
-
-        FollowerLogInformation followerLogInformation =
-            new FollowerLogInformationImpl(
-                "follower1", 10, 9, timeoutDuration);
+        MockRaftActorContext context = new MockRaftActorContext();
+        context.setCommitIndex(10);
 
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
+        configParams.setElectionTimeoutFactor(1);
+        context.setConfigParams(configParams);
 
+        FollowerLogInformation followerLogInformation =
+            new FollowerLogInformationImpl("follower1", 9, context);
 
         assertFalse("Follower should be termed inactive before stopwatch starts",
             followerLogInformation.isFollowerActive());
index 297d781251cc69854363800171599dcdc2bdc1c7..c3161a592feb979b11b6032c735fb41a90ffa574 100644 (file)
@@ -37,22 +37,7 @@ public class MockRaftActorContext implements RaftActorContext {
     private boolean snapshotCaptureInitiated;
 
     public MockRaftActorContext(){
-        electionTerm = null;
-
-        initReplicatedLog();
-    }
-
-    public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
-        this.id = id;
-        this.system = system;
-        this.actor = actor;
-
-        final String id1 = id;
         electionTerm = new ElectionTerm() {
-            /**
-             * Identifier of the actor whose election term information this is
-             */
-            private final String id = id1;
             private long currentTerm = 1;
             private String votedFor = "";
 
@@ -81,6 +66,13 @@ public class MockRaftActorContext implements RaftActorContext {
         };
 
         configParams = new DefaultConfigParamsImpl();
+    }
+
+    public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
+        this();
+        this.id = id;
+        this.system = system;
+        this.actor = actor;
 
         initReplicatedLog();
     }
index 20ee55519b945e42ffb3e06acf53b53d42a93556..2688d0195df662f004a4d4738bf16330387b194f 100644 (file)
@@ -11,8 +11,11 @@ import java.io.IOException;
 import java.util.Dictionary;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.cm.Configuration;
 import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.service.cm.ConfigurationEvent;
+import org.osgi.service.cm.ConfigurationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,12 +28,19 @@ import org.slf4j.LoggerFactory;
 public class DatastoreContextConfigAdminOverlay implements AutoCloseable {
     public static final String CONFIG_ID = "org.opendaylight.controller.cluster.datastore";
 
+    public static interface Listener {
+        void onDatastoreContextUpdated(DatastoreContext context);
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(DatastoreContextConfigAdminOverlay.class);
 
     private final DatastoreContextIntrospector introspector;
     private final BundleContext bundleContext;
+    private ServiceRegistration<?> configListenerServiceRef;
+    private Listener listener;
 
-    public DatastoreContextConfigAdminOverlay(DatastoreContextIntrospector introspector, BundleContext bundleContext) {
+    public DatastoreContextConfigAdminOverlay(DatastoreContextIntrospector introspector,
+            BundleContext bundleContext) {
         this.introspector = introspector;
         this.bundleContext = bundleContext;
 
@@ -40,9 +50,16 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable {
             LOG.warn("No ConfigurationAdmin service found");
         } else {
             overlaySettings(configAdminServiceReference);
+
+            configListenerServiceRef = bundleContext.registerService(ConfigurationListener.class.getName(),
+                    new DatastoreConfigurationListener(), null);
         }
     }
 
+    public void setListener(Listener listener) {
+        this.listener = listener;
+    }
+
     private void overlaySettings(ServiceReference<ConfigurationAdmin> configAdminServiceReference) {
         try {
             ConfigurationAdmin configAdmin = bundleContext.getService(configAdminServiceReference);
@@ -53,7 +70,11 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable {
 
                 LOG.debug("Overlaying settings: {}", properties);
 
-                introspector.update(properties);
+                if(introspector.update(properties)) {
+                    if(listener != null) {
+                        listener.onDatastoreContextUpdated(introspector.getContext());
+                    }
+                }
             } else {
                 LOG.debug("No Configuration found for {}", CONFIG_ID);
             }
@@ -72,5 +93,20 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable {
 
     @Override
     public void close() {
+        listener = null;
+
+        if(configListenerServiceRef != null) {
+            configListenerServiceRef.unregister();
+        }
+    }
+
+    private class DatastoreConfigurationListener implements ConfigurationListener {
+        @Override
+        public void configurationEvent(ConfigurationEvent event) {
+            if(CONFIG_ID.equals(event.getPid()) && event.getType() == ConfigurationEvent.CM_UPDATED) {
+                LOG.debug("configurationEvent: config {} was updated", CONFIG_ID);
+                overlaySettings(event.getReference());
+            }
+        }
     }
 }
index 3ca64210bea11986689765fdeee5936f2ba99bc3..0bbeefd6fa64f3c5db3caf2e79f655dc4450a7c0 100644 (file)
@@ -196,11 +196,13 @@ public class DatastoreContextIntrospector {
      * @param properties the properties to apply
      * @return true if the cached DatastoreContext was updated, false otherwise.
      */
-    public boolean update(Dictionary<String, Object> properties) {
+    public synchronized boolean update(Dictionary<String, Object> properties) {
         if(properties == null || properties.isEmpty()) {
             return false;
         }
 
+        LOG.debug("In update: properties: {}", properties);
+
         Builder builder = DatastoreContext.newBuilderFrom(context);
 
         final String dataStoreTypePrefix = context.getDataStoreType() + '.';
@@ -291,12 +293,12 @@ public class DatastoreContextIntrospector {
     }
 
     private Object constructorValueRecursively(Class<?> toType, Object fromValue) throws Exception {
-        LOG.debug("convertValueRecursively - toType: {}, fromValue {} ({})",
+        LOG.trace("convertValueRecursively - toType: {}, fromValue {} ({})",
                 toType.getSimpleName(), fromValue, fromValue.getClass().getSimpleName());
 
         Constructor<?> ctor = constructors.get(toType);
 
-        LOG.debug("Found {}", ctor);
+        LOG.trace("Found {}", ctor);
 
         if(ctor == null) {
             throw new IllegalArgumentException(String.format("Constructor not found for type %s", toType));
index 434efc9111f48e3910af12386d3cd1d4f5e7360a..51182deb1dcd6586ee1d421d477e3072363d4a83 100644 (file)
@@ -34,7 +34,8 @@ import org.slf4j.LoggerFactory;
 /**
  *
  */
-public class DistributedDataStore implements DOMStore, SchemaContextListener, AutoCloseable {
+public class DistributedDataStore implements DOMStore, SchemaContextListener,
+        DatastoreContextConfigAdminOverlay.Listener, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
     public static final int REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR = 24; // 24 times the usual operation timeout
@@ -127,6 +128,14 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
         actorContext.setSchemaContext(schemaContext);
     }
 
+    @Override
+    public void onDatastoreContextUpdated(DatastoreContext context) {
+        LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreType());
+
+        actorContext.setDatastoreContext(context);
+        datastoreConfigMXBean.setContext(context);
+    }
+
     @Override
     public void close() {
         datastoreConfigMXBean.unregisterMBean();
index f3a55c55a720fe21e8718cc4a346c13706074f49..ee9f4f3ad5276e5586da6135902835f0896f2b6f 100644 (file)
@@ -34,6 +34,8 @@ public class DistributedDataStoreFactory {
         final DistributedDataStore dataStore = new DistributedDataStore(actorSystem,
                 new ClusterWrapperImpl(actorSystem), config, introspector.getContext());
 
+        overlay.setListener(dataStore);
+
         ShardStrategyFactory.setConfiguration(config);
         schemaService.registerSchemaContextListener(dataStore);
 
index 0672023fcbfe0e207036006d85ca9d1509a94a38..c509580ac657f920564cc2c4db1a43f91fa7d325 100644 (file)
@@ -113,9 +113,9 @@ public class Shard extends RaftActor {
     private final List<DelayedListenerRegistration> delayedListenerRegistrations =
                                                                        Lists.newArrayList();
 
-    private final DatastoreContext datastoreContext;
+    private DatastoreContext datastoreContext;
 
-    private final DataPersistenceProvider dataPersistenceProvider;
+    private DataPersistenceProvider dataPersistenceProvider;
 
     private SchemaContext schemaContext;
 
@@ -123,7 +123,7 @@ public class Shard extends RaftActor {
 
     private final ShardCommitCoordinator commitCoordinator;
 
-    private final long transactionCommitTimeout;
+    private long transactionCommitTimeout;
 
     private Cancellable txCommitTimeoutCheckSchedule;
 
@@ -175,8 +175,7 @@ public class Shard extends RaftActor {
         commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
                 datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
 
-        transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
-                datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+        setTransactionCommitTimeout();
 
         // create a notifier actor for each cluster member
         roleChangeNotifier = createRoleChangeNotifier(name.toString());
@@ -185,6 +184,11 @@ public class Shard extends RaftActor {
                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
     }
 
+    private void setTransactionCommitTimeout() {
+        transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
+                datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+    }
+
     private static Map<String, String> mapPeerAddresses(
         final Map<ShardIdentifier, String> peerAddresses) {
         Map<String, String> map = new HashMap<>();
@@ -216,11 +220,15 @@ public class Shard extends RaftActor {
 
     @Override
     public void postStop() {
+        LOG.info("Stopping Shard {}", persistenceId());
+
         super.postStop();
 
         if(txCommitTimeoutCheckSchedule != null) {
             txCommitTimeoutCheckSchedule.cancel();
         }
+
+        shardMBean.unregisterMBean();
     }
 
     @Override
@@ -278,6 +286,8 @@ public class Shard extends RaftActor {
                         resolved.getPeerAddress());
             } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
                 handleTransactionCommitTimeoutCheck();
+            } else if(message instanceof DatastoreContext) {
+                onDatastoreContext((DatastoreContext)message);
             } else {
                 super.onReceiveCommand(message);
             }
@@ -291,6 +301,24 @@ public class Shard extends RaftActor {
         return roleChangeNotifier;
     }
 
+    private void onDatastoreContext(DatastoreContext context) {
+        datastoreContext = context;
+
+        commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity());
+
+        setTransactionCommitTimeout();
+
+        if(datastoreContext.isPersistent() &&
+                dataPersistenceProvider instanceof NonPersistentRaftDataProvider) {
+            dataPersistenceProvider = new PersistentDataProvider();
+        } else if(!datastoreContext.isPersistent() &&
+                dataPersistenceProvider instanceof PersistentDataProvider) {
+            dataPersistenceProvider = new NonPersistentRaftDataProvider();
+        }
+
+        updateConfigParams(datastoreContext.getShardRaftConfig());
+    }
+
     private void handleTransactionCommitTimeoutCheck() {
         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
         if(cohortEntry != null) {
index 8b95404c4e6bdd06e2f4bb9d234eeab4465dd81f..951bc22545804af11bbc37fd8ef28d03cd53fc02 100644 (file)
@@ -34,7 +34,7 @@ public class ShardCommitCoordinator {
 
     private final Queue<CohortEntry> queuedCohortEntries;
 
-    private final int queueCapacity;
+    private int queueCapacity;
 
     private final Logger log;
 
@@ -54,6 +54,10 @@ public class ShardCommitCoordinator {
         queuedCohortEntries = new LinkedList<>();
     }
 
+    public void setQueueCapacity(int queueCapacity) {
+        this.queueCapacity = queueCapacity;
+    }
+
     /**
      * This method caches a cohort entry for the given transactions ID in preparation for the
      * subsequent 3-phase commit.
index 426a2e0934f173560647a13569b22d1e06f632b2..775cae35e22843cafd223bce22eb5232a230868f 100644 (file)
@@ -39,7 +39,6 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
@@ -90,9 +89,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final String shardDispatcherPath;
 
-    private ShardManagerInfoMBean mBean;
+    private ShardManagerInfo mBean;
 
-    private final DatastoreContext datastoreContext;
+    private DatastoreContext datastoreContext;
 
     private Collection<String> knownModules = Collections.emptySet();
 
@@ -132,6 +131,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext));
     }
 
+    @Override
+    public void postStop() {
+        LOG.info("Stopping ShardManager");
+
+        mBean.unregisterMBean();
+    }
+
     @Override
     public void handleCommand(Object message) throws Exception {
         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
@@ -148,6 +154,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             memberRemoved((ClusterEvent.MemberRemoved) message);
         } else if(message instanceof ClusterEvent.UnreachableMember) {
             ignoreMessage(message);
+        } else if(message instanceof DatastoreContext) {
+            onDatastoreContext((DatastoreContext)message);
         } else{
             unknownMessage(message);
         }
@@ -258,6 +266,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private void onDatastoreContext(DatastoreContext context) {
+        datastoreContext = context;
+        for (ShardInformation info : localShards.values()) {
+            if (info.getActor() != null) {
+                info.getActor().tell(datastoreContext, getSelf());
+            }
+        }
+    }
+
     /**
      * Notifies all the local shards of a change in the schema context
      *
index 26e6318f6d4d14aa5944a909696bbaa8b5f7f207..7eede29b65690db530fd4b9cfb9acb130365fcb6 100644 (file)
@@ -85,18 +85,19 @@ public class ActorContext {
     private final ActorRef shardManager;
     private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
-    private final DatastoreContext datastoreContext;
-    private final FiniteDuration operationDuration;
-    private final Timeout operationTimeout;
+    private DatastoreContext datastoreContext;
+    private FiniteDuration operationDuration;
+    private Timeout operationTimeout;
     private final String selfAddressHostPort;
-    private final RateLimiter txRateLimiter;
+    private RateLimiter txRateLimiter;
     private final MetricRegistry metricRegistry = new MetricRegistry();
     private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
     private final int transactionOutstandingOperationLimit;
-    private final Timeout transactionCommitOperationTimeout;
+    private Timeout transactionCommitOperationTimeout;
     private final Dispatchers dispatchers;
 
     private volatile SchemaContext schemaContext;
+    private volatile boolean updated;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
@@ -112,14 +113,9 @@ public class ActorContext {
         this.clusterWrapper = clusterWrapper;
         this.configuration = configuration;
         this.datastoreContext = datastoreContext;
-        this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
 
-        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
-        operationTimeout = new Timeout(operationDuration);
-        transactionCommitOperationTimeout =  new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
-                TimeUnit.SECONDS));
-
+        setCachedProperties();
 
         Address selfAddress = clusterWrapper.getSelfAddress();
         if (selfAddress != null && !selfAddress.host().isEmpty()) {
@@ -133,6 +129,16 @@ public class ActorContext {
 
     }
 
+    private void setCachedProperties() {
+        txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
+
+        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
+        operationTimeout = new Timeout(operationDuration);
+
+        transactionCommitOperationTimeout =  new Timeout(Duration.create(
+                datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
+    }
+
     public DatastoreContext getDatastoreContext() {
         return datastoreContext;
     }
@@ -157,7 +163,25 @@ public class ActorContext {
         this.schemaContext = schemaContext;
 
         if(shardManager != null) {
-            shardManager.tell(new UpdateSchemaContext(schemaContext), null);
+            shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
+        }
+    }
+
+    public void setDatastoreContext(DatastoreContext context) {
+        this.datastoreContext = context;
+        setCachedProperties();
+
+        // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
+        // will be published immediately even though they may not be immediately visible to other
+        // threads due to unsynchronized reads. That's OK though - we're going for eventual
+        // consistency here as immediately visible updates to these members aren't critical. These
+        // members could've been made volatile but wanted to avoid volatile reads as these are
+        // accessed often and updates will be infrequent.
+
+        updated = true;
+
+        if(shardManager != null) {
+            shardManager.tell(context, ActorRef.noSender());
         }
     }
 
index 3693c01b42348f59d3b26daa3ed184138e3d2bed..ecfcdefcb28d2eeecc24e934180372178ba7678f 100644 (file)
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import java.io.IOException;
 import java.util.Dictionary;
 import java.util.Hashtable;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.cm.Configuration;
 import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.service.cm.ConfigurationEvent;
+import org.osgi.service.cm.ConfigurationListener;
 
 /**
  * Unit tests for DatastoreContextConfigAdminOverlay.
  *
  * @author Thomas Pantelis
  */
+@SuppressWarnings("unchecked")
 public class DatastoreContextConfigAdminOverlayTest {
 
-    @SuppressWarnings("unchecked")
-    @Test
-    public void test() throws IOException {
-        BundleContext mockBundleContext = mock(BundleContext.class);
-        ServiceReference<ConfigurationAdmin> mockServiceRef = mock(ServiceReference.class);
-        ConfigurationAdmin mockConfigAdmin = mock(ConfigurationAdmin.class);
-        Configuration mockConfig = mock(Configuration.class);
-        DatastoreContextIntrospector mockIntrospector = mock(DatastoreContextIntrospector.class);
+    @Mock
+    private BundleContext mockBundleContext;
+
+    @Mock
+    private ServiceReference<ConfigurationAdmin> mockConfigAdminServiceRef;
+
+    @Mock
+    private ConfigurationAdmin mockConfigAdmin;
+
+    @Mock
+    private Configuration mockConfig;
+
+    @Mock
+    private DatastoreContextIntrospector mockIntrospector;
+
+    @Mock
+    private ServiceRegistration<?> configListenerServiceReg;
+
+    @Before
+    public void setup() throws IOException {
+        MockitoAnnotations.initMocks(this);
 
-        doReturn(mockServiceRef).when(mockBundleContext).getServiceReference(ConfigurationAdmin.class);
-        doReturn(mockConfigAdmin).when(mockBundleContext).getService(mockServiceRef);
+        doReturn(mockConfigAdminServiceRef).when(mockBundleContext).getServiceReference(ConfigurationAdmin.class);
+        doReturn(mockConfigAdmin).when(mockBundleContext).getService(mockConfigAdminServiceRef);
+        doReturn(configListenerServiceReg).when(mockBundleContext).registerService(
+                eq(ConfigurationListener.class.getName()), any(), any(Dictionary.class));
 
         doReturn(mockConfig).when(mockConfigAdmin).getConfiguration(DatastoreContextConfigAdminOverlay.CONFIG_ID);
 
         doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(mockConfig).getPid();
 
+    }
+
+    @Test
+    public void testUpdateOnConstruction() {
+        Dictionary<String, Object> properties = new Hashtable<>();
+        properties.put("property", "value");
+        doReturn(properties).when(mockConfig).getProperties();
+
+        DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+                mockIntrospector, mockBundleContext);
+
+        verify(mockIntrospector).update(properties);
+
+        verify(mockBundleContext).ungetService(mockConfigAdminServiceRef);
+
+        overlay.close();
+    }
+
+    @Test
+    public void testUpdateOnConfigurationEvent() {
+        DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+                mockIntrospector, mockBundleContext);
+
+        reset(mockIntrospector);
+
+        DatastoreContext context = DatastoreContext.newBuilder().build();
+        doReturn(context).when(mockIntrospector).getContext();
+
+        DatastoreContextConfigAdminOverlay.Listener mockListener =
+                mock(DatastoreContextConfigAdminOverlay.Listener.class);
+
+        overlay.setListener(mockListener);
+
         Dictionary<String, Object> properties = new Hashtable<>();
         properties.put("property", "value");
         doReturn(properties).when(mockConfig).getProperties();
 
-        try(DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
-                mockIntrospector, mockBundleContext)) {
-        }
+        doReturn(true).when(mockIntrospector).update(properties);
+
+        ArgumentCaptor<ConfigurationListener> configListener =
+                ArgumentCaptor.forClass(ConfigurationListener.class);
+        verify(mockBundleContext).registerService(eq(ConfigurationListener.class.getName()),
+                configListener.capture(), any(Dictionary.class));
+
+        ConfigurationEvent configEvent = mock(ConfigurationEvent.class);
+        doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(configEvent).getPid();
+        doReturn(mockConfigAdminServiceRef).when(configEvent).getReference();
+        doReturn(ConfigurationEvent.CM_UPDATED).when(configEvent).getType();
+
+        configListener.getValue().configurationEvent(configEvent);
 
         verify(mockIntrospector).update(properties);
 
-        verify(mockBundleContext).ungetService(mockServiceRef);
+        verify(mockListener).onDatastoreContextUpdated(context);
+
+        verify(mockBundleContext, times(2)).ungetService(mockConfigAdminServiceRef);
+
+        overlay.close();
+
+        verify(configListenerServiceReg).unregister();
+    }
+
+    @Test
+    public void testConfigurationEventWithDifferentPid() {
+        DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+                mockIntrospector, mockBundleContext);
+
+        reset(mockIntrospector);
+
+        ArgumentCaptor<ConfigurationListener> configListener =
+                ArgumentCaptor.forClass(ConfigurationListener.class);
+        verify(mockBundleContext).registerService(eq(ConfigurationListener.class.getName()),
+                configListener.capture(), any(Dictionary.class));
+
+        ConfigurationEvent configEvent = mock(ConfigurationEvent.class);
+        doReturn("other-pid").when(configEvent).getPid();
+        doReturn(mockConfigAdminServiceRef).when(configEvent).getReference();
+        doReturn(ConfigurationEvent.CM_UPDATED).when(configEvent).getType();
+
+        configListener.getValue().configurationEvent(configEvent);
+
+        verify(mockIntrospector, times(0)).update(any(Dictionary.class));
+
+        overlay.close();
+    }
+
+    @Test
+    public void testConfigurationEventWithNonUpdateEventType() {
+        DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+                mockIntrospector, mockBundleContext);
+
+        reset(mockIntrospector);
+
+        ArgumentCaptor<ConfigurationListener> configListener =
+                ArgumentCaptor.forClass(ConfigurationListener.class);
+        verify(mockBundleContext).registerService(eq(ConfigurationListener.class.getName()),
+                configListener.capture(), any(Dictionary.class));
+
+        ConfigurationEvent configEvent = mock(ConfigurationEvent.class);
+        doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(configEvent).getPid();
+        doReturn(mockConfigAdminServiceRef).when(configEvent).getReference();
+        doReturn(ConfigurationEvent.CM_DELETED).when(configEvent).getType();
+
+        configListener.getValue().configurationEvent(configEvent);
+
+        verify(mockIntrospector, times(0)).update(any(Dictionary.class));
+
+        overlay.close();
     }
 }
index 4b0651a48eb07f4814bd990386bbe94d32159dfd..e03693569b0812b3db941990b1dc56957c62568b 100644 (file)
@@ -1564,6 +1564,31 @@ public class ShardTest extends AbstractActorTest {
 
     }
 
+    @Test
+    public void testOnDatastoreContext() {
+        new ShardTestKit(getSystem()) {{
+            dataStoreContextBuilder.persistent(true);
+
+            TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
+
+            assertEquals("isRecoveryApplicable", true,
+                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+            waitUntilLeader(shard);
+
+            shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
+
+            assertEquals("isRecoveryApplicable", false,
+                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+            shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
+
+            assertEquals("isRecoveryApplicable", true,
+                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
 
     private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
index 3c6a0cef5c605fbb23e3c9501676f67d95805e9d..fd41c49390b484fd0d4343befa2f920d542e2f74 100644 (file)
@@ -17,6 +17,7 @@ import com.google.common.base.Optional;
 import com.typesafe.config.ConfigFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.time.StopWatch;
+import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
@@ -339,4 +340,29 @@ public class ActorContextTest extends AbstractActorTest{
 
     }
 
+    @Test
+    public void testSetDatastoreContext() {
+        new JavaTestKit(getSystem()) {{
+            ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
+                            mock(Configuration.class), DatastoreContext.newBuilder().
+                                operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
+
+            assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
+            assertEquals("getTransactionCommitOperationTimeout", 7,
+                    actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+
+            DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
+                    shardTransactionCommitTimeoutInSeconds(8).build();
+
+            actorContext.setDatastoreContext(newContext);
+
+            expectMsgClass(duration("5 seconds"), DatastoreContext.class);
+
+            Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
+
+            assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
+            assertEquals("getTransactionCommitOperationTimeout", 8,
+                    actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+        }};
+    }
 }
index 9ed3d276a3b59c3d15fd3417888f42ddf70f6073..3dd752ec30f14eecd43d8934d6ecf210687d9c4a 100644 (file)
@@ -3,4 +3,4 @@ org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a
 org.slf4j.simpleLogger.logFile=System.out
 org.slf4j.simpleLogger.showShortLogName=true
 org.slf4j.simpleLogger.levelInBrackets=true
-org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=trace
\ No newline at end of file
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug
\ No newline at end of file