Dynamically update DatastoreContext properties 71/15571/6
authortpantelis <tpanteli@brocade.com>
Thu, 19 Feb 2015 21:00:38 +0000 (16:00 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Sun, 22 Feb 2015 13:25:47 +0000 (08:25 -0500)
Added a ConfigurationListener to the DatastoreContextConfigAdminOverlay
that is registered as an OSGi service to receive ConfigurationEvents
from the Config Admin when the
org.opendaylight.controller.cluster.datastore.cfg file is modified.

The DistributedDataStore registers as a listener to the
DatastoreContextConfigAdminOverlay to get updated when the
DatastoreContext instance changes due to a ConfigurationEvent. The
DistributedDataStore sets the new DatastoreContext instance in the
ActorContext which updates its cached instance and properties and
sends the DistributedDataStore instance as a message to the ShardManager.
The ShardManager simply sends it out to all the Shards which update
their cached DatastoreContext state.

The Shard also updates the new ConfigParams instance in the base RaftActor
which sets it in its RaftActorContext. There was one place in
FollowerLogInformationImpl where it had a local cache of a property
obtained from the ConfigParams so I changed the ctor to pass the
RaftActorContext instead and obtain the property each time from the
ConfigParams.

A slight downside to this is that we'll have to be cognizant of not caching
DatastoreContext/ConfigParams properties and assuming they're static. Or
if we do cache we need to handle updates.

I toyed around with trying to restart the DistributedDataStore with a
new DatastoreContext and ActorContext and killing the previous
ShardManager and Shard actors and creating new ones. However recreating
the Shards without being disruptive to clients is tricky and risky, eg handling
infight transactions, and I didn't see a clean way to do it without
possibly causing inflight transactions to fail. .So I went with the simpler
approach of just pushing an updated DatastoreContext to the actors.

Change-Id: Ie608f61da36ac58a806208925a3c4277968c2f5b
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
19 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties

index 3e6742c..d4d1389 100644 (file)
@@ -43,6 +43,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
     private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
     private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis();
+    private FiniteDuration electionTimeOutInterval;
 
     // 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's
     // in-memory journal can use before it needs to snapshot
@@ -52,6 +53,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
         this.heartBeatInterval = heartBeatInterval;
+        electionTimeOutInterval = null;
     }
 
     public void setSnapshotBatchCount(long snapshotBatchCount) {
@@ -72,6 +74,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     public void setElectionTimeoutFactor(long electionTimeoutFactor){
         this.electionTimeoutFactor = electionTimeoutFactor;
+        electionTimeOutInterval = null;
     }
 
     @Override
@@ -92,7 +95,11 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     @Override
     public FiniteDuration getElectionTimeOutInterval() {
-        return getHeartBeatInterval().$times(electionTimeoutFactor);
+        if(electionTimeOutInterval == null) {
+            electionTimeOutInterval = getHeartBeatInterval().$times(electionTimeoutFactor);
+        }
+
+        return electionTimeOutInterval;
     }
 
     @Override
index 04b9f16..90e1282 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.raft;
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import scala.concurrent.duration.FiniteDuration;
 
 public class FollowerLogInformationImpl implements FollowerLogInformation {
     private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
@@ -21,18 +20,17 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
 
-    private final long followerTimeoutMillis;
+    private final RaftActorContext context;
 
     private volatile long nextIndex;
 
     private volatile long matchIndex;
 
-    public FollowerLogInformationImpl(String id, long nextIndex,
-        long matchIndex, FiniteDuration followerTimeoutDuration) {
+    public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
         this.id = id;
-        this.nextIndex = nextIndex;
+        this.nextIndex = context.getCommitIndex();
         this.matchIndex = matchIndex;
-        this.followerTimeoutMillis = followerTimeoutDuration.toMillis();
+        this.context = context;
     }
 
     @Override
@@ -78,7 +76,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
     @Override
     public boolean isFollowerActive() {
         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
-        return (stopwatch.isRunning()) && (elapsed <= followerTimeoutMillis);
+        return (stopwatch.isRunning()) &&
+                (elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis());
     }
 
     @Override
@@ -107,7 +106,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
         builder.append("FollowerLogInformationImpl [id=").append(id).append(", nextIndex=").append(nextIndex)
                 .append(", matchIndex=").append(matchIndex).append(", stopwatch=")
                 .append(stopwatch.elapsed(TimeUnit.MILLISECONDS))
-                .append(", followerTimeoutMillis=").append(followerTimeoutMillis).append("]");
+                .append(", followerTimeoutMillis=")
+                .append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]");
         return builder.toString();
     }
 
index 285be39..edcfcc9 100644 (file)
@@ -98,7 +98,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * This context should NOT be passed directly to any other actor it is
      * only to be consumed by the RaftActorBehaviors
      */
-    private final RaftActorContext context;
+    private final RaftActorContextImpl context;
 
     /**
      * The in-memory journal
@@ -139,6 +139,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         super.preStart();
     }
 
+    @Override
+    public void postStop() {
+        if(currentBehavior != null) {
+            try {
+                currentBehavior.close();
+            } catch (Exception e) {
+                LOG.debug("{}: Error closing behavior {}", persistenceId(), currentBehavior.state());
+            }
+        }
+
+        super.postStop();
+    }
+
     @Override
     public void handleRecover(Object message) {
         if(persistence().isRecoveryApplicable()) {
@@ -511,6 +524,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return context;
     }
 
+    protected void updateConfigParams(ConfigParams configParams) {
+        context.setConfigParams(configParams);
+    }
+
     /**
      * setPeerAddress sets the address of a known peer at a later time.
      * <p>
index b71b3be..6fc5e43 100644 (file)
@@ -37,7 +37,7 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private final Logger LOG;
 
-    private final ConfigParams configParams;
+    private ConfigParams configParams;
 
     private boolean snapshotCaptureInitiated;
 
@@ -59,6 +59,10 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.LOG = logger;
     }
 
+    void setConfigParams(ConfigParams configParams) {
+        this.configParams = configParams;
+    }
+
     @Override
     public ActorRef actorOf(Props props){
         return context.actorOf(props);
index be51ba0..890d45e 100644 (file)
@@ -97,9 +97,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
         for (String followerId : context.getPeerAddresses().keySet()) {
             FollowerLogInformation followerLogInformation =
-                new FollowerLogInformationImpl(followerId,
-                    context.getCommitIndex(), -1,
-                    context.getConfigParams().getElectionTimeOutInterval());
+                new FollowerLogInformationImpl(followerId, -1, context);
 
             ftlBuilder.put(followerId, followerLogInformation);
         }
index 84d1545..5be9030 100644 (file)
@@ -7,29 +7,29 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.TimeUnit;
 import org.junit.Test;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 public class FollowerLogInformationImplTest {
 
     @Test
     public void testIsFollowerActive() {
 
-        FiniteDuration timeoutDuration =
-            new FiniteDuration(500, TimeUnit.MILLISECONDS);
-
-        FollowerLogInformation followerLogInformation =
-            new FollowerLogInformationImpl(
-                "follower1", 10, 9, timeoutDuration);
+        MockRaftActorContext context = new MockRaftActorContext();
+        context.setCommitIndex(10);
 
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
+        configParams.setElectionTimeoutFactor(1);
+        context.setConfigParams(configParams);
 
+        FollowerLogInformation followerLogInformation =
+            new FollowerLogInformationImpl("follower1", 9, context);
 
         assertFalse("Follower should be termed inactive before stopwatch starts",
             followerLogInformation.isFollowerActive());
index 297d781..c3161a5 100644 (file)
@@ -37,22 +37,7 @@ public class MockRaftActorContext implements RaftActorContext {
     private boolean snapshotCaptureInitiated;
 
     public MockRaftActorContext(){
-        electionTerm = null;
-
-        initReplicatedLog();
-    }
-
-    public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
-        this.id = id;
-        this.system = system;
-        this.actor = actor;
-
-        final String id1 = id;
         electionTerm = new ElectionTerm() {
-            /**
-             * Identifier of the actor whose election term information this is
-             */
-            private final String id = id1;
             private long currentTerm = 1;
             private String votedFor = "";
 
@@ -81,6 +66,13 @@ public class MockRaftActorContext implements RaftActorContext {
         };
 
         configParams = new DefaultConfigParamsImpl();
+    }
+
+    public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
+        this();
+        this.id = id;
+        this.system = system;
+        this.actor = actor;
 
         initReplicatedLog();
     }
index 20ee555..2688d01 100644 (file)
@@ -11,8 +11,11 @@ import java.io.IOException;
 import java.util.Dictionary;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.cm.Configuration;
 import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.service.cm.ConfigurationEvent;
+import org.osgi.service.cm.ConfigurationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,12 +28,19 @@ import org.slf4j.LoggerFactory;
 public class DatastoreContextConfigAdminOverlay implements AutoCloseable {
     public static final String CONFIG_ID = "org.opendaylight.controller.cluster.datastore";
 
+    public static interface Listener {
+        void onDatastoreContextUpdated(DatastoreContext context);
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(DatastoreContextConfigAdminOverlay.class);
 
     private final DatastoreContextIntrospector introspector;
     private final BundleContext bundleContext;
+    private ServiceRegistration<?> configListenerServiceRef;
+    private Listener listener;
 
-    public DatastoreContextConfigAdminOverlay(DatastoreContextIntrospector introspector, BundleContext bundleContext) {
+    public DatastoreContextConfigAdminOverlay(DatastoreContextIntrospector introspector,
+            BundleContext bundleContext) {
         this.introspector = introspector;
         this.bundleContext = bundleContext;
 
@@ -40,9 +50,16 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable {
             LOG.warn("No ConfigurationAdmin service found");
         } else {
             overlaySettings(configAdminServiceReference);
+
+            configListenerServiceRef = bundleContext.registerService(ConfigurationListener.class.getName(),
+                    new DatastoreConfigurationListener(), null);
         }
     }
 
+    public void setListener(Listener listener) {
+        this.listener = listener;
+    }
+
     private void overlaySettings(ServiceReference<ConfigurationAdmin> configAdminServiceReference) {
         try {
             ConfigurationAdmin configAdmin = bundleContext.getService(configAdminServiceReference);
@@ -53,7 +70,11 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable {
 
                 LOG.debug("Overlaying settings: {}", properties);
 
-                introspector.update(properties);
+                if(introspector.update(properties)) {
+                    if(listener != null) {
+                        listener.onDatastoreContextUpdated(introspector.getContext());
+                    }
+                }
             } else {
                 LOG.debug("No Configuration found for {}", CONFIG_ID);
             }
@@ -72,5 +93,20 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable {
 
     @Override
     public void close() {
+        listener = null;
+
+        if(configListenerServiceRef != null) {
+            configListenerServiceRef.unregister();
+        }
+    }
+
+    private class DatastoreConfigurationListener implements ConfigurationListener {
+        @Override
+        public void configurationEvent(ConfigurationEvent event) {
+            if(CONFIG_ID.equals(event.getPid()) && event.getType() == ConfigurationEvent.CM_UPDATED) {
+                LOG.debug("configurationEvent: config {} was updated", CONFIG_ID);
+                overlaySettings(event.getReference());
+            }
+        }
     }
 }
index 3ca6421..0bbeefd 100644 (file)
@@ -196,11 +196,13 @@ public class DatastoreContextIntrospector {
      * @param properties the properties to apply
      * @return true if the cached DatastoreContext was updated, false otherwise.
      */
-    public boolean update(Dictionary<String, Object> properties) {
+    public synchronized boolean update(Dictionary<String, Object> properties) {
         if(properties == null || properties.isEmpty()) {
             return false;
         }
 
+        LOG.debug("In update: properties: {}", properties);
+
         Builder builder = DatastoreContext.newBuilderFrom(context);
 
         final String dataStoreTypePrefix = context.getDataStoreType() + '.';
@@ -291,12 +293,12 @@ public class DatastoreContextIntrospector {
     }
 
     private Object constructorValueRecursively(Class<?> toType, Object fromValue) throws Exception {
-        LOG.debug("convertValueRecursively - toType: {}, fromValue {} ({})",
+        LOG.trace("convertValueRecursively - toType: {}, fromValue {} ({})",
                 toType.getSimpleName(), fromValue, fromValue.getClass().getSimpleName());
 
         Constructor<?> ctor = constructors.get(toType);
 
-        LOG.debug("Found {}", ctor);
+        LOG.trace("Found {}", ctor);
 
         if(ctor == null) {
             throw new IllegalArgumentException(String.format("Constructor not found for type %s", toType));
index 434efc9..51182de 100644 (file)
@@ -34,7 +34,8 @@ import org.slf4j.LoggerFactory;
 /**
  *
  */
-public class DistributedDataStore implements DOMStore, SchemaContextListener, AutoCloseable {
+public class DistributedDataStore implements DOMStore, SchemaContextListener,
+        DatastoreContextConfigAdminOverlay.Listener, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
     public static final int REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR = 24; // 24 times the usual operation timeout
@@ -127,6 +128,14 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
         actorContext.setSchemaContext(schemaContext);
     }
 
+    @Override
+    public void onDatastoreContextUpdated(DatastoreContext context) {
+        LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreType());
+
+        actorContext.setDatastoreContext(context);
+        datastoreConfigMXBean.setContext(context);
+    }
+
     @Override
     public void close() {
         datastoreConfigMXBean.unregisterMBean();
index f3a55c5..ee9f4f3 100644 (file)
@@ -34,6 +34,8 @@ public class DistributedDataStoreFactory {
         final DistributedDataStore dataStore = new DistributedDataStore(actorSystem,
                 new ClusterWrapperImpl(actorSystem), config, introspector.getContext());
 
+        overlay.setListener(dataStore);
+
         ShardStrategyFactory.setConfiguration(config);
         schemaService.registerSchemaContextListener(dataStore);
 
index 0672023..c509580 100644 (file)
@@ -113,9 +113,9 @@ public class Shard extends RaftActor {
     private final List<DelayedListenerRegistration> delayedListenerRegistrations =
                                                                        Lists.newArrayList();
 
-    private final DatastoreContext datastoreContext;
+    private DatastoreContext datastoreContext;
 
-    private final DataPersistenceProvider dataPersistenceProvider;
+    private DataPersistenceProvider dataPersistenceProvider;
 
     private SchemaContext schemaContext;
 
@@ -123,7 +123,7 @@ public class Shard extends RaftActor {
 
     private final ShardCommitCoordinator commitCoordinator;
 
-    private final long transactionCommitTimeout;
+    private long transactionCommitTimeout;
 
     private Cancellable txCommitTimeoutCheckSchedule;
 
@@ -175,8 +175,7 @@ public class Shard extends RaftActor {
         commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
                 datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
 
-        transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
-                datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+        setTransactionCommitTimeout();
 
         // create a notifier actor for each cluster member
         roleChangeNotifier = createRoleChangeNotifier(name.toString());
@@ -185,6 +184,11 @@ public class Shard extends RaftActor {
                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
     }
 
+    private void setTransactionCommitTimeout() {
+        transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
+                datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+    }
+
     private static Map<String, String> mapPeerAddresses(
         final Map<ShardIdentifier, String> peerAddresses) {
         Map<String, String> map = new HashMap<>();
@@ -216,11 +220,15 @@ public class Shard extends RaftActor {
 
     @Override
     public void postStop() {
+        LOG.info("Stopping Shard {}", persistenceId());
+
         super.postStop();
 
         if(txCommitTimeoutCheckSchedule != null) {
             txCommitTimeoutCheckSchedule.cancel();
         }
+
+        shardMBean.unregisterMBean();
     }
 
     @Override
@@ -278,6 +286,8 @@ public class Shard extends RaftActor {
                         resolved.getPeerAddress());
             } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
                 handleTransactionCommitTimeoutCheck();
+            } else if(message instanceof DatastoreContext) {
+                onDatastoreContext((DatastoreContext)message);
             } else {
                 super.onReceiveCommand(message);
             }
@@ -291,6 +301,24 @@ public class Shard extends RaftActor {
         return roleChangeNotifier;
     }
 
+    private void onDatastoreContext(DatastoreContext context) {
+        datastoreContext = context;
+
+        commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity());
+
+        setTransactionCommitTimeout();
+
+        if(datastoreContext.isPersistent() &&
+                dataPersistenceProvider instanceof NonPersistentRaftDataProvider) {
+            dataPersistenceProvider = new PersistentDataProvider();
+        } else if(!datastoreContext.isPersistent() &&
+                dataPersistenceProvider instanceof PersistentDataProvider) {
+            dataPersistenceProvider = new NonPersistentRaftDataProvider();
+        }
+
+        updateConfigParams(datastoreContext.getShardRaftConfig());
+    }
+
     private void handleTransactionCommitTimeoutCheck() {
         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
         if(cohortEntry != null) {
index 8b95404..951bc22 100644 (file)
@@ -34,7 +34,7 @@ public class ShardCommitCoordinator {
 
     private final Queue<CohortEntry> queuedCohortEntries;
 
-    private final int queueCapacity;
+    private int queueCapacity;
 
     private final Logger log;
 
@@ -54,6 +54,10 @@ public class ShardCommitCoordinator {
         queuedCohortEntries = new LinkedList<>();
     }
 
+    public void setQueueCapacity(int queueCapacity) {
+        this.queueCapacity = queueCapacity;
+    }
+
     /**
      * This method caches a cohort entry for the given transactions ID in preparation for the
      * subsequent 3-phase commit.
index 426a2e0..775cae3 100644 (file)
@@ -39,7 +39,6 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
@@ -90,9 +89,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final String shardDispatcherPath;
 
-    private ShardManagerInfoMBean mBean;
+    private ShardManagerInfo mBean;
 
-    private final DatastoreContext datastoreContext;
+    private DatastoreContext datastoreContext;
 
     private Collection<String> knownModules = Collections.emptySet();
 
@@ -132,6 +131,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext));
     }
 
+    @Override
+    public void postStop() {
+        LOG.info("Stopping ShardManager");
+
+        mBean.unregisterMBean();
+    }
+
     @Override
     public void handleCommand(Object message) throws Exception {
         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
@@ -148,6 +154,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             memberRemoved((ClusterEvent.MemberRemoved) message);
         } else if(message instanceof ClusterEvent.UnreachableMember) {
             ignoreMessage(message);
+        } else if(message instanceof DatastoreContext) {
+            onDatastoreContext((DatastoreContext)message);
         } else{
             unknownMessage(message);
         }
@@ -258,6 +266,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private void onDatastoreContext(DatastoreContext context) {
+        datastoreContext = context;
+        for (ShardInformation info : localShards.values()) {
+            if (info.getActor() != null) {
+                info.getActor().tell(datastoreContext, getSelf());
+            }
+        }
+    }
+
     /**
      * Notifies all the local shards of a change in the schema context
      *
index 26e6318..7eede29 100644 (file)
@@ -85,18 +85,19 @@ public class ActorContext {
     private final ActorRef shardManager;
     private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
-    private final DatastoreContext datastoreContext;
-    private final FiniteDuration operationDuration;
-    private final Timeout operationTimeout;
+    private DatastoreContext datastoreContext;
+    private FiniteDuration operationDuration;
+    private Timeout operationTimeout;
     private final String selfAddressHostPort;
-    private final RateLimiter txRateLimiter;
+    private RateLimiter txRateLimiter;
     private final MetricRegistry metricRegistry = new MetricRegistry();
     private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
     private final int transactionOutstandingOperationLimit;
-    private final Timeout transactionCommitOperationTimeout;
+    private Timeout transactionCommitOperationTimeout;
     private final Dispatchers dispatchers;
 
     private volatile SchemaContext schemaContext;
+    private volatile boolean updated;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
@@ -112,14 +113,9 @@ public class ActorContext {
         this.clusterWrapper = clusterWrapper;
         this.configuration = configuration;
         this.datastoreContext = datastoreContext;
-        this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
 
-        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
-        operationTimeout = new Timeout(operationDuration);
-        transactionCommitOperationTimeout =  new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
-                TimeUnit.SECONDS));
-
+        setCachedProperties();
 
         Address selfAddress = clusterWrapper.getSelfAddress();
         if (selfAddress != null && !selfAddress.host().isEmpty()) {
@@ -133,6 +129,16 @@ public class ActorContext {
 
     }
 
+    private void setCachedProperties() {
+        txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
+
+        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
+        operationTimeout = new Timeout(operationDuration);
+
+        transactionCommitOperationTimeout =  new Timeout(Duration.create(
+                datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
+    }
+
     public DatastoreContext getDatastoreContext() {
         return datastoreContext;
     }
@@ -157,7 +163,25 @@ public class ActorContext {
         this.schemaContext = schemaContext;
 
         if(shardManager != null) {
-            shardManager.tell(new UpdateSchemaContext(schemaContext), null);
+            shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
+        }
+    }
+
+    public void setDatastoreContext(DatastoreContext context) {
+        this.datastoreContext = context;
+        setCachedProperties();
+
+        // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
+        // will be published immediately even though they may not be immediately visible to other
+        // threads due to unsynchronized reads. That's OK though - we're going for eventual
+        // consistency here as immediately visible updates to these members aren't critical. These
+        // members could've been made volatile but wanted to avoid volatile reads as these are
+        // accessed often and updates will be infrequent.
+
+        updated = true;
+
+        if(shardManager != null) {
+            shardManager.tell(context, ActorRef.noSender());
         }
     }
 
index 3693c01..ecfcdef 100644 (file)
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import java.io.IOException;
 import java.util.Dictionary;
 import java.util.Hashtable;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.cm.Configuration;
 import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.service.cm.ConfigurationEvent;
+import org.osgi.service.cm.ConfigurationListener;
 
 /**
  * Unit tests for DatastoreContextConfigAdminOverlay.
  *
  * @author Thomas Pantelis
  */
+@SuppressWarnings("unchecked")
 public class DatastoreContextConfigAdminOverlayTest {
 
-    @SuppressWarnings("unchecked")
-    @Test
-    public void test() throws IOException {
-        BundleContext mockBundleContext = mock(BundleContext.class);
-        ServiceReference<ConfigurationAdmin> mockServiceRef = mock(ServiceReference.class);
-        ConfigurationAdmin mockConfigAdmin = mock(ConfigurationAdmin.class);
-        Configuration mockConfig = mock(Configuration.class);
-        DatastoreContextIntrospector mockIntrospector = mock(DatastoreContextIntrospector.class);
+    @Mock
+    private BundleContext mockBundleContext;
+
+    @Mock
+    private ServiceReference<ConfigurationAdmin> mockConfigAdminServiceRef;
+
+    @Mock
+    private ConfigurationAdmin mockConfigAdmin;
+
+    @Mock
+    private Configuration mockConfig;
+
+    @Mock
+    private DatastoreContextIntrospector mockIntrospector;
+
+    @Mock
+    private ServiceRegistration<?> configListenerServiceReg;
+
+    @Before
+    public void setup() throws IOException {
+        MockitoAnnotations.initMocks(this);
 
-        doReturn(mockServiceRef).when(mockBundleContext).getServiceReference(ConfigurationAdmin.class);
-        doReturn(mockConfigAdmin).when(mockBundleContext).getService(mockServiceRef);
+        doReturn(mockConfigAdminServiceRef).when(mockBundleContext).getServiceReference(ConfigurationAdmin.class);
+        doReturn(mockConfigAdmin).when(mockBundleContext).getService(mockConfigAdminServiceRef);
+        doReturn(configListenerServiceReg).when(mockBundleContext).registerService(
+                eq(ConfigurationListener.class.getName()), any(), any(Dictionary.class));
 
         doReturn(mockConfig).when(mockConfigAdmin).getConfiguration(DatastoreContextConfigAdminOverlay.CONFIG_ID);
 
         doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(mockConfig).getPid();
 
+    }
+
+    @Test
+    public void testUpdateOnConstruction() {
+        Dictionary<String, Object> properties = new Hashtable<>();
+        properties.put("property", "value");
+        doReturn(properties).when(mockConfig).getProperties();
+
+        DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+                mockIntrospector, mockBundleContext);
+
+        verify(mockIntrospector).update(properties);
+
+        verify(mockBundleContext).ungetService(mockConfigAdminServiceRef);
+
+        overlay.close();
+    }
+
+    @Test
+    public void testUpdateOnConfigurationEvent() {
+        DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+                mockIntrospector, mockBundleContext);
+
+        reset(mockIntrospector);
+
+        DatastoreContext context = DatastoreContext.newBuilder().build();
+        doReturn(context).when(mockIntrospector).getContext();
+
+        DatastoreContextConfigAdminOverlay.Listener mockListener =
+                mock(DatastoreContextConfigAdminOverlay.Listener.class);
+
+        overlay.setListener(mockListener);
+
         Dictionary<String, Object> properties = new Hashtable<>();
         properties.put("property", "value");
         doReturn(properties).when(mockConfig).getProperties();
 
-        try(DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
-                mockIntrospector, mockBundleContext)) {
-        }
+        doReturn(true).when(mockIntrospector).update(properties);
+
+        ArgumentCaptor<ConfigurationListener> configListener =
+                ArgumentCaptor.forClass(ConfigurationListener.class);
+        verify(mockBundleContext).registerService(eq(ConfigurationListener.class.getName()),
+                configListener.capture(), any(Dictionary.class));
+
+        ConfigurationEvent configEvent = mock(ConfigurationEvent.class);
+        doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(configEvent).getPid();
+        doReturn(mockConfigAdminServiceRef).when(configEvent).getReference();
+        doReturn(ConfigurationEvent.CM_UPDATED).when(configEvent).getType();
+
+        configListener.getValue().configurationEvent(configEvent);
 
         verify(mockIntrospector).update(properties);
 
-        verify(mockBundleContext).ungetService(mockServiceRef);
+        verify(mockListener).onDatastoreContextUpdated(context);
+
+        verify(mockBundleContext, times(2)).ungetService(mockConfigAdminServiceRef);
+
+        overlay.close();
+
+        verify(configListenerServiceReg).unregister();
+    }
+
+    @Test
+    public void testConfigurationEventWithDifferentPid() {
+        DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+                mockIntrospector, mockBundleContext);
+
+        reset(mockIntrospector);
+
+        ArgumentCaptor<ConfigurationListener> configListener =
+                ArgumentCaptor.forClass(ConfigurationListener.class);
+        verify(mockBundleContext).registerService(eq(ConfigurationListener.class.getName()),
+                configListener.capture(), any(Dictionary.class));
+
+        ConfigurationEvent configEvent = mock(ConfigurationEvent.class);
+        doReturn("other-pid").when(configEvent).getPid();
+        doReturn(mockConfigAdminServiceRef).when(configEvent).getReference();
+        doReturn(ConfigurationEvent.CM_UPDATED).when(configEvent).getType();
+
+        configListener.getValue().configurationEvent(configEvent);
+
+        verify(mockIntrospector, times(0)).update(any(Dictionary.class));
+
+        overlay.close();
+    }
+
+    @Test
+    public void testConfigurationEventWithNonUpdateEventType() {
+        DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+                mockIntrospector, mockBundleContext);
+
+        reset(mockIntrospector);
+
+        ArgumentCaptor<ConfigurationListener> configListener =
+                ArgumentCaptor.forClass(ConfigurationListener.class);
+        verify(mockBundleContext).registerService(eq(ConfigurationListener.class.getName()),
+                configListener.capture(), any(Dictionary.class));
+
+        ConfigurationEvent configEvent = mock(ConfigurationEvent.class);
+        doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(configEvent).getPid();
+        doReturn(mockConfigAdminServiceRef).when(configEvent).getReference();
+        doReturn(ConfigurationEvent.CM_DELETED).when(configEvent).getType();
+
+        configListener.getValue().configurationEvent(configEvent);
+
+        verify(mockIntrospector, times(0)).update(any(Dictionary.class));
+
+        overlay.close();
     }
 }
index 4b0651a..e036935 100644 (file)
@@ -1564,6 +1564,31 @@ public class ShardTest extends AbstractActorTest {
 
     }
 
+    @Test
+    public void testOnDatastoreContext() {
+        new ShardTestKit(getSystem()) {{
+            dataStoreContextBuilder.persistent(true);
+
+            TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
+
+            assertEquals("isRecoveryApplicable", true,
+                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+            waitUntilLeader(shard);
+
+            shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
+
+            assertEquals("isRecoveryApplicable", false,
+                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+            shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
+
+            assertEquals("isRecoveryApplicable", true,
+                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
 
     private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
index 3c6a0ce..fd41c49 100644 (file)
@@ -17,6 +17,7 @@ import com.google.common.base.Optional;
 import com.typesafe.config.ConfigFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.time.StopWatch;
+import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
@@ -339,4 +340,29 @@ public class ActorContextTest extends AbstractActorTest{
 
     }
 
+    @Test
+    public void testSetDatastoreContext() {
+        new JavaTestKit(getSystem()) {{
+            ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
+                            mock(Configuration.class), DatastoreContext.newBuilder().
+                                operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
+
+            assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
+            assertEquals("getTransactionCommitOperationTimeout", 7,
+                    actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+
+            DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
+                    shardTransactionCommitTimeoutInSeconds(8).build();
+
+            actorContext.setDatastoreContext(newContext);
+
+            expectMsgClass(duration("5 seconds"), DatastoreContext.class);
+
+            Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
+
+            assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
+            assertEquals("getTransactionCommitOperationTimeout", 8,
+                    actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+        }};
+    }
 }
index 9ed3d27..3dd752e 100644 (file)
@@ -3,4 +3,4 @@ org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a
 org.slf4j.simpleLogger.logFile=System.out
 org.slf4j.simpleLogger.showShortLogName=true
 org.slf4j.simpleLogger.levelInBrackets=true
-org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=trace
\ No newline at end of file
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug
\ No newline at end of file

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.