From: tpantelis Date: Thu, 19 Feb 2015 21:00:38 +0000 (-0500) Subject: Dynamically update DatastoreContext properties X-Git-Tag: release/lithium~456^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=a23ab6d60b7b57184a8fe59e282e46b448c86d6a Dynamically update DatastoreContext properties Added a ConfigurationListener to the DatastoreContextConfigAdminOverlay that is registered as an OSGi service to receive ConfigurationEvents from the Config Admin when the org.opendaylight.controller.cluster.datastore.cfg file is modified. The DistributedDataStore registers as a listener to the DatastoreContextConfigAdminOverlay to get updated when the DatastoreContext instance changes due to a ConfigurationEvent. The DistributedDataStore sets the new DatastoreContext instance in the ActorContext which updates its cached instance and properties and sends the DistributedDataStore instance as a message to the ShardManager. The ShardManager simply sends it out to all the Shards which update their cached DatastoreContext state. The Shard also updates the new ConfigParams instance in the base RaftActor which sets it in its RaftActorContext. There was one place in FollowerLogInformationImpl where it had a local cache of a property obtained from the ConfigParams so I changed the ctor to pass the RaftActorContext instead and obtain the property each time from the ConfigParams. A slight downside to this is that we'll have to be cognizant of not caching DatastoreContext/ConfigParams properties and assuming they're static. Or if we do cache we need to handle updates. I toyed around with trying to restart the DistributedDataStore with a new DatastoreContext and ActorContext and killing the previous ShardManager and Shard actors and creating new ones. However recreating the Shards without being disruptive to clients is tricky and risky, eg handling infight transactions, and I didn't see a clean way to do it without possibly causing inflight transactions to fail. .So I went with the simpler approach of just pushing an updated DatastoreContext to the actors. Change-Id: Ie608f61da36ac58a806208925a3c4277968c2f5b Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java index 3e6742c17d..d4d13899eb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java @@ -43,6 +43,7 @@ public class DefaultConfigParamsImpl implements ConfigParams { private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT; private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE; private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis(); + private FiniteDuration electionTimeOutInterval; // 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's // in-memory journal can use before it needs to snapshot @@ -52,6 +53,7 @@ public class DefaultConfigParamsImpl implements ConfigParams { public void setHeartBeatInterval(FiniteDuration heartBeatInterval) { this.heartBeatInterval = heartBeatInterval; + electionTimeOutInterval = null; } public void setSnapshotBatchCount(long snapshotBatchCount) { @@ -72,6 +74,7 @@ public class DefaultConfigParamsImpl implements ConfigParams { public void setElectionTimeoutFactor(long electionTimeoutFactor){ this.electionTimeoutFactor = electionTimeoutFactor; + electionTimeOutInterval = null; } @Override @@ -92,7 +95,11 @@ public class DefaultConfigParamsImpl implements ConfigParams { @Override public FiniteDuration getElectionTimeOutInterval() { - return getHeartBeatInterval().$times(electionTimeoutFactor); + if(electionTimeOutInterval == null) { + electionTimeOutInterval = getHeartBeatInterval().$times(electionTimeoutFactor); + } + + return electionTimeOutInterval; } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index 04b9f163f4..90e1282561 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.raft; import com.google.common.base.Stopwatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import scala.concurrent.duration.FiniteDuration; public class FollowerLogInformationImpl implements FollowerLogInformation { private static final AtomicLongFieldUpdater NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex"); @@ -21,18 +20,17 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { private final Stopwatch stopwatch = Stopwatch.createUnstarted(); - private final long followerTimeoutMillis; + private final RaftActorContext context; private volatile long nextIndex; private volatile long matchIndex; - public FollowerLogInformationImpl(String id, long nextIndex, - long matchIndex, FiniteDuration followerTimeoutDuration) { + public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) { this.id = id; - this.nextIndex = nextIndex; + this.nextIndex = context.getCommitIndex(); this.matchIndex = matchIndex; - this.followerTimeoutMillis = followerTimeoutDuration.toMillis(); + this.context = context; } @Override @@ -78,7 +76,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { @Override public boolean isFollowerActive() { long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); - return (stopwatch.isRunning()) && (elapsed <= followerTimeoutMillis); + return (stopwatch.isRunning()) && + (elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis()); } @Override @@ -107,7 +106,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { builder.append("FollowerLogInformationImpl [id=").append(id).append(", nextIndex=").append(nextIndex) .append(", matchIndex=").append(matchIndex).append(", stopwatch=") .append(stopwatch.elapsed(TimeUnit.MILLISECONDS)) - .append(", followerTimeoutMillis=").append(followerTimeoutMillis).append("]"); + .append(", followerTimeoutMillis=") + .append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]"); return builder.toString(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 285be39c0b..edcfcc979c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -98,7 +98,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * This context should NOT be passed directly to any other actor it is * only to be consumed by the RaftActorBehaviors */ - private final RaftActorContext context; + private final RaftActorContextImpl context; /** * The in-memory journal @@ -139,6 +139,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { super.preStart(); } + @Override + public void postStop() { + if(currentBehavior != null) { + try { + currentBehavior.close(); + } catch (Exception e) { + LOG.debug("{}: Error closing behavior {}", persistenceId(), currentBehavior.state()); + } + } + + super.postStop(); + } + @Override public void handleRecover(Object message) { if(persistence().isRecoveryApplicable()) { @@ -511,6 +524,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return context; } + protected void updateConfigParams(ConfigParams configParams) { + context.setConfigParams(configParams); + } + /** * setPeerAddress sets the address of a known peer at a later time. *

diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index b71b3be352..6fc5e4369b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -37,7 +37,7 @@ public class RaftActorContextImpl implements RaftActorContext { private final Logger LOG; - private final ConfigParams configParams; + private ConfigParams configParams; private boolean snapshotCaptureInitiated; @@ -59,6 +59,10 @@ public class RaftActorContextImpl implements RaftActorContext { this.LOG = logger; } + void setConfigParams(ConfigParams configParams) { + this.configParams = configParams; + } + @Override public ActorRef actorOf(Props props){ return context.actorOf(props); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index be51ba069c..890d45e8fb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -97,9 +97,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { final Builder ftlBuilder = ImmutableMap.builder(); for (String followerId : context.getPeerAddresses().keySet()) { FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(followerId, - context.getCommitIndex(), -1, - context.getConfigParams().getElectionTimeOutInterval()); + new FollowerLogInformationImpl(followerId, -1, context); ftlBuilder.put(followerId, followerLogInformation); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java index 84d1545a65..5be9030f59 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java @@ -7,29 +7,29 @@ */ package org.opendaylight.controller.cluster.raft; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.concurrent.TimeUnit; import org.junit.Test; import scala.concurrent.duration.FiniteDuration; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - public class FollowerLogInformationImplTest { @Test public void testIsFollowerActive() { - FiniteDuration timeoutDuration = - new FiniteDuration(500, TimeUnit.MILLISECONDS); - - FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl( - "follower1", 10, 9, timeoutDuration); + MockRaftActorContext context = new MockRaftActorContext(); + context.setCommitIndex(10); + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS)); + configParams.setElectionTimeoutFactor(1); + context.setConfigParams(configParams); + FollowerLogInformation followerLogInformation = + new FollowerLogInformationImpl("follower1", 9, context); assertFalse("Follower should be termed inactive before stopwatch starts", followerLogInformation.isFollowerActive()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 297d781251..c3161a592f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -37,22 +37,7 @@ public class MockRaftActorContext implements RaftActorContext { private boolean snapshotCaptureInitiated; public MockRaftActorContext(){ - electionTerm = null; - - initReplicatedLog(); - } - - public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){ - this.id = id; - this.system = system; - this.actor = actor; - - final String id1 = id; electionTerm = new ElectionTerm() { - /** - * Identifier of the actor whose election term information this is - */ - private final String id = id1; private long currentTerm = 1; private String votedFor = ""; @@ -81,6 +66,13 @@ public class MockRaftActorContext implements RaftActorContext { }; configParams = new DefaultConfigParamsImpl(); + } + + public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){ + this(); + this.id = id; + this.system = system; + this.actor = actor; initReplicatedLog(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java index 20ee55519b..2688d0195d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java @@ -11,8 +11,11 @@ import java.io.IOException; import java.util.Dictionary; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceReference; +import org.osgi.framework.ServiceRegistration; import org.osgi.service.cm.Configuration; import org.osgi.service.cm.ConfigurationAdmin; +import org.osgi.service.cm.ConfigurationEvent; +import org.osgi.service.cm.ConfigurationListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,12 +28,19 @@ import org.slf4j.LoggerFactory; public class DatastoreContextConfigAdminOverlay implements AutoCloseable { public static final String CONFIG_ID = "org.opendaylight.controller.cluster.datastore"; + public static interface Listener { + void onDatastoreContextUpdated(DatastoreContext context); + } + private static final Logger LOG = LoggerFactory.getLogger(DatastoreContextConfigAdminOverlay.class); private final DatastoreContextIntrospector introspector; private final BundleContext bundleContext; + private ServiceRegistration configListenerServiceRef; + private Listener listener; - public DatastoreContextConfigAdminOverlay(DatastoreContextIntrospector introspector, BundleContext bundleContext) { + public DatastoreContextConfigAdminOverlay(DatastoreContextIntrospector introspector, + BundleContext bundleContext) { this.introspector = introspector; this.bundleContext = bundleContext; @@ -40,9 +50,16 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable { LOG.warn("No ConfigurationAdmin service found"); } else { overlaySettings(configAdminServiceReference); + + configListenerServiceRef = bundleContext.registerService(ConfigurationListener.class.getName(), + new DatastoreConfigurationListener(), null); } } + public void setListener(Listener listener) { + this.listener = listener; + } + private void overlaySettings(ServiceReference configAdminServiceReference) { try { ConfigurationAdmin configAdmin = bundleContext.getService(configAdminServiceReference); @@ -53,7 +70,11 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable { LOG.debug("Overlaying settings: {}", properties); - introspector.update(properties); + if(introspector.update(properties)) { + if(listener != null) { + listener.onDatastoreContextUpdated(introspector.getContext()); + } + } } else { LOG.debug("No Configuration found for {}", CONFIG_ID); } @@ -72,5 +93,20 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable { @Override public void close() { + listener = null; + + if(configListenerServiceRef != null) { + configListenerServiceRef.unregister(); + } + } + + private class DatastoreConfigurationListener implements ConfigurationListener { + @Override + public void configurationEvent(ConfigurationEvent event) { + if(CONFIG_ID.equals(event.getPid()) && event.getType() == ConfigurationEvent.CM_UPDATED) { + LOG.debug("configurationEvent: config {} was updated", CONFIG_ID); + overlaySettings(event.getReference()); + } + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java index 3ca64210be..0bbeefd6fa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java @@ -196,11 +196,13 @@ public class DatastoreContextIntrospector { * @param properties the properties to apply * @return true if the cached DatastoreContext was updated, false otherwise. */ - public boolean update(Dictionary properties) { + public synchronized boolean update(Dictionary properties) { if(properties == null || properties.isEmpty()) { return false; } + LOG.debug("In update: properties: {}", properties); + Builder builder = DatastoreContext.newBuilderFrom(context); final String dataStoreTypePrefix = context.getDataStoreType() + '.'; @@ -291,12 +293,12 @@ public class DatastoreContextIntrospector { } private Object constructorValueRecursively(Class toType, Object fromValue) throws Exception { - LOG.debug("convertValueRecursively - toType: {}, fromValue {} ({})", + LOG.trace("convertValueRecursively - toType: {}, fromValue {} ({})", toType.getSimpleName(), fromValue, fromValue.getClass().getSimpleName()); Constructor ctor = constructors.get(toType); - LOG.debug("Found {}", ctor); + LOG.trace("Found {}", ctor); if(ctor == null) { throw new IllegalArgumentException(String.format("Constructor not found for type %s", toType)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 434efc9111..51182deb1d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -34,7 +34,8 @@ import org.slf4j.LoggerFactory; /** * */ -public class DistributedDataStore implements DOMStore, SchemaContextListener, AutoCloseable { +public class DistributedDataStore implements DOMStore, SchemaContextListener, + DatastoreContextConfigAdminOverlay.Listener, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); public static final int REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR = 24; // 24 times the usual operation timeout @@ -127,6 +128,14 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au actorContext.setSchemaContext(schemaContext); } + @Override + public void onDatastoreContextUpdated(DatastoreContext context) { + LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreType()); + + actorContext.setDatastoreContext(context); + datastoreConfigMXBean.setContext(context); + } + @Override public void close() { datastoreConfigMXBean.unregisterMBean(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java index f3a55c55a7..ee9f4f3ad5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java @@ -34,6 +34,8 @@ public class DistributedDataStoreFactory { final DistributedDataStore dataStore = new DistributedDataStore(actorSystem, new ClusterWrapperImpl(actorSystem), config, introspector.getContext()); + overlay.setListener(dataStore); + ShardStrategyFactory.setConfiguration(config); schemaService.registerSchemaContextListener(dataStore); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 0672023fcb..c509580ac6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -113,9 +113,9 @@ public class Shard extends RaftActor { private final List delayedListenerRegistrations = Lists.newArrayList(); - private final DatastoreContext datastoreContext; + private DatastoreContext datastoreContext; - private final DataPersistenceProvider dataPersistenceProvider; + private DataPersistenceProvider dataPersistenceProvider; private SchemaContext schemaContext; @@ -123,7 +123,7 @@ public class Shard extends RaftActor { private final ShardCommitCoordinator commitCoordinator; - private final long transactionCommitTimeout; + private long transactionCommitTimeout; private Cancellable txCommitTimeoutCheckSchedule; @@ -175,8 +175,7 @@ public class Shard extends RaftActor { commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES), datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString()); - transactionCommitTimeout = TimeUnit.MILLISECONDS.convert( - datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); + setTransactionCommitTimeout(); // create a notifier actor for each cluster member roleChangeNotifier = createRoleChangeNotifier(name.toString()); @@ -185,6 +184,11 @@ public class Shard extends RaftActor { getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); } + private void setTransactionCommitTimeout() { + transactionCommitTimeout = TimeUnit.MILLISECONDS.convert( + datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); + } + private static Map mapPeerAddresses( final Map peerAddresses) { Map map = new HashMap<>(); @@ -216,11 +220,15 @@ public class Shard extends RaftActor { @Override public void postStop() { + LOG.info("Stopping Shard {}", persistenceId()); + super.postStop(); if(txCommitTimeoutCheckSchedule != null) { txCommitTimeoutCheckSchedule.cancel(); } + + shardMBean.unregisterMBean(); } @Override @@ -278,6 +286,8 @@ public class Shard extends RaftActor { resolved.getPeerAddress()); } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) { handleTransactionCommitTimeoutCheck(); + } else if(message instanceof DatastoreContext) { + onDatastoreContext((DatastoreContext)message); } else { super.onReceiveCommand(message); } @@ -291,6 +301,24 @@ public class Shard extends RaftActor { return roleChangeNotifier; } + private void onDatastoreContext(DatastoreContext context) { + datastoreContext = context; + + commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity()); + + setTransactionCommitTimeout(); + + if(datastoreContext.isPersistent() && + dataPersistenceProvider instanceof NonPersistentRaftDataProvider) { + dataPersistenceProvider = new PersistentDataProvider(); + } else if(!datastoreContext.isPersistent() && + dataPersistenceProvider instanceof PersistentDataProvider) { + dataPersistenceProvider = new NonPersistentRaftDataProvider(); + } + + updateConfigParams(datastoreContext.getShardRaftConfig()); + } + private void handleTransactionCommitTimeoutCheck() { CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry(); if(cohortEntry != null) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 8b95404c4e..951bc22545 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -34,7 +34,7 @@ public class ShardCommitCoordinator { private final Queue queuedCohortEntries; - private final int queueCapacity; + private int queueCapacity; private final Logger log; @@ -54,6 +54,10 @@ public class ShardCommitCoordinator { queuedCohortEntries = new LinkedList<>(); } + public void setQueueCapacity(int queueCapacity) { + this.queueCapacity = queueCapacity; + } + /** * This method caches a cohort entry for the given transactions ID in preparation for the * subsequent 3-phase commit. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 426a2e0934..775cae35e2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -39,7 +39,6 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; -import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; @@ -90,9 +89,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final String shardDispatcherPath; - private ShardManagerInfoMBean mBean; + private ShardManagerInfo mBean; - private final DatastoreContext datastoreContext; + private DatastoreContext datastoreContext; private Collection knownModules = Collections.emptySet(); @@ -132,6 +131,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext)); } + @Override + public void postStop() { + LOG.info("Stopping ShardManager"); + + mBean.unregisterMBean(); + } + @Override public void handleCommand(Object message) throws Exception { if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) { @@ -148,6 +154,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { memberRemoved((ClusterEvent.MemberRemoved) message); } else if(message instanceof ClusterEvent.UnreachableMember) { ignoreMessage(message); + } else if(message instanceof DatastoreContext) { + onDatastoreContext((DatastoreContext)message); } else{ unknownMessage(message); } @@ -258,6 +266,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void onDatastoreContext(DatastoreContext context) { + datastoreContext = context; + for (ShardInformation info : localShards.values()) { + if (info.getActor() != null) { + info.getActor().tell(datastoreContext, getSelf()); + } + } + } + /** * Notifies all the local shards of a change in the schema context * diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 26e6318f6d..7eede29b65 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -85,18 +85,19 @@ public class ActorContext { private final ActorRef shardManager; private final ClusterWrapper clusterWrapper; private final Configuration configuration; - private final DatastoreContext datastoreContext; - private final FiniteDuration operationDuration; - private final Timeout operationTimeout; + private DatastoreContext datastoreContext; + private FiniteDuration operationDuration; + private Timeout operationTimeout; private final String selfAddressHostPort; - private final RateLimiter txRateLimiter; + private RateLimiter txRateLimiter; private final MetricRegistry metricRegistry = new MetricRegistry(); private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build(); private final int transactionOutstandingOperationLimit; - private final Timeout transactionCommitOperationTimeout; + private Timeout transactionCommitOperationTimeout; private final Dispatchers dispatchers; private volatile SchemaContext schemaContext; + private volatile boolean updated; public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration) { @@ -112,14 +113,9 @@ public class ActorContext { this.clusterWrapper = clusterWrapper; this.configuration = configuration; this.datastoreContext = datastoreContext; - this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit()); this.dispatchers = new Dispatchers(actorSystem.dispatchers()); - operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); - operationTimeout = new Timeout(operationDuration); - transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(), - TimeUnit.SECONDS)); - + setCachedProperties(); Address selfAddress = clusterWrapper.getSelfAddress(); if (selfAddress != null && !selfAddress.host().isEmpty()) { @@ -133,6 +129,16 @@ public class ActorContext { } + private void setCachedProperties() { + txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit()); + + operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); + operationTimeout = new Timeout(operationDuration); + + transactionCommitOperationTimeout = new Timeout(Duration.create( + datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS)); + } + public DatastoreContext getDatastoreContext() { return datastoreContext; } @@ -157,7 +163,25 @@ public class ActorContext { this.schemaContext = schemaContext; if(shardManager != null) { - shardManager.tell(new UpdateSchemaContext(schemaContext), null); + shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); + } + } + + public void setDatastoreContext(DatastoreContext context) { + this.datastoreContext = context; + setCachedProperties(); + + // We write the 'updated' volatile to trigger a write memory barrier so that the writes above + // will be published immediately even though they may not be immediately visible to other + // threads due to unsynchronized reads. That's OK though - we're going for eventual + // consistency here as immediately visible updates to these members aren't critical. These + // members could've been made volatile but wanted to avoid volatile reads as these are + // accessed often and updates will be infrequent. + + updated = true; + + if(shardManager != null) { + shardManager.tell(context, ActorRef.noSender()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java index 3693c01b42..ecfcdefcb2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java @@ -7,51 +7,175 @@ */ package org.opendaylight.controller.cluster.datastore; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.Dictionary; import java.util.Hashtable; +import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceReference; +import org.osgi.framework.ServiceRegistration; import org.osgi.service.cm.Configuration; import org.osgi.service.cm.ConfigurationAdmin; +import org.osgi.service.cm.ConfigurationEvent; +import org.osgi.service.cm.ConfigurationListener; /** * Unit tests for DatastoreContextConfigAdminOverlay. * * @author Thomas Pantelis */ +@SuppressWarnings("unchecked") public class DatastoreContextConfigAdminOverlayTest { - @SuppressWarnings("unchecked") - @Test - public void test() throws IOException { - BundleContext mockBundleContext = mock(BundleContext.class); - ServiceReference mockServiceRef = mock(ServiceReference.class); - ConfigurationAdmin mockConfigAdmin = mock(ConfigurationAdmin.class); - Configuration mockConfig = mock(Configuration.class); - DatastoreContextIntrospector mockIntrospector = mock(DatastoreContextIntrospector.class); + @Mock + private BundleContext mockBundleContext; + + @Mock + private ServiceReference mockConfigAdminServiceRef; + + @Mock + private ConfigurationAdmin mockConfigAdmin; + + @Mock + private Configuration mockConfig; + + @Mock + private DatastoreContextIntrospector mockIntrospector; + + @Mock + private ServiceRegistration configListenerServiceReg; + + @Before + public void setup() throws IOException { + MockitoAnnotations.initMocks(this); - doReturn(mockServiceRef).when(mockBundleContext).getServiceReference(ConfigurationAdmin.class); - doReturn(mockConfigAdmin).when(mockBundleContext).getService(mockServiceRef); + doReturn(mockConfigAdminServiceRef).when(mockBundleContext).getServiceReference(ConfigurationAdmin.class); + doReturn(mockConfigAdmin).when(mockBundleContext).getService(mockConfigAdminServiceRef); + doReturn(configListenerServiceReg).when(mockBundleContext).registerService( + eq(ConfigurationListener.class.getName()), any(), any(Dictionary.class)); doReturn(mockConfig).when(mockConfigAdmin).getConfiguration(DatastoreContextConfigAdminOverlay.CONFIG_ID); doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(mockConfig).getPid(); + } + + @Test + public void testUpdateOnConstruction() { + Dictionary properties = new Hashtable<>(); + properties.put("property", "value"); + doReturn(properties).when(mockConfig).getProperties(); + + DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay( + mockIntrospector, mockBundleContext); + + verify(mockIntrospector).update(properties); + + verify(mockBundleContext).ungetService(mockConfigAdminServiceRef); + + overlay.close(); + } + + @Test + public void testUpdateOnConfigurationEvent() { + DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay( + mockIntrospector, mockBundleContext); + + reset(mockIntrospector); + + DatastoreContext context = DatastoreContext.newBuilder().build(); + doReturn(context).when(mockIntrospector).getContext(); + + DatastoreContextConfigAdminOverlay.Listener mockListener = + mock(DatastoreContextConfigAdminOverlay.Listener.class); + + overlay.setListener(mockListener); + Dictionary properties = new Hashtable<>(); properties.put("property", "value"); doReturn(properties).when(mockConfig).getProperties(); - try(DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay( - mockIntrospector, mockBundleContext)) { - } + doReturn(true).when(mockIntrospector).update(properties); + + ArgumentCaptor configListener = + ArgumentCaptor.forClass(ConfigurationListener.class); + verify(mockBundleContext).registerService(eq(ConfigurationListener.class.getName()), + configListener.capture(), any(Dictionary.class)); + + ConfigurationEvent configEvent = mock(ConfigurationEvent.class); + doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(configEvent).getPid(); + doReturn(mockConfigAdminServiceRef).when(configEvent).getReference(); + doReturn(ConfigurationEvent.CM_UPDATED).when(configEvent).getType(); + + configListener.getValue().configurationEvent(configEvent); verify(mockIntrospector).update(properties); - verify(mockBundleContext).ungetService(mockServiceRef); + verify(mockListener).onDatastoreContextUpdated(context); + + verify(mockBundleContext, times(2)).ungetService(mockConfigAdminServiceRef); + + overlay.close(); + + verify(configListenerServiceReg).unregister(); + } + + @Test + public void testConfigurationEventWithDifferentPid() { + DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay( + mockIntrospector, mockBundleContext); + + reset(mockIntrospector); + + ArgumentCaptor configListener = + ArgumentCaptor.forClass(ConfigurationListener.class); + verify(mockBundleContext).registerService(eq(ConfigurationListener.class.getName()), + configListener.capture(), any(Dictionary.class)); + + ConfigurationEvent configEvent = mock(ConfigurationEvent.class); + doReturn("other-pid").when(configEvent).getPid(); + doReturn(mockConfigAdminServiceRef).when(configEvent).getReference(); + doReturn(ConfigurationEvent.CM_UPDATED).when(configEvent).getType(); + + configListener.getValue().configurationEvent(configEvent); + + verify(mockIntrospector, times(0)).update(any(Dictionary.class)); + + overlay.close(); + } + + @Test + public void testConfigurationEventWithNonUpdateEventType() { + DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay( + mockIntrospector, mockBundleContext); + + reset(mockIntrospector); + + ArgumentCaptor configListener = + ArgumentCaptor.forClass(ConfigurationListener.class); + verify(mockBundleContext).registerService(eq(ConfigurationListener.class.getName()), + configListener.capture(), any(Dictionary.class)); + + ConfigurationEvent configEvent = mock(ConfigurationEvent.class); + doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(configEvent).getPid(); + doReturn(mockConfigAdminServiceRef).when(configEvent).getReference(); + doReturn(ConfigurationEvent.CM_DELETED).when(configEvent).getType(); + + configListener.getValue().configurationEvent(configEvent); + + verify(mockIntrospector, times(0)).update(any(Dictionary.class)); + + overlay.close(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 4b0651a48e..e03693569b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -1564,6 +1564,31 @@ public class ShardTest extends AbstractActorTest { } + @Test + public void testOnDatastoreContext() { + new ShardTestKit(getSystem()) {{ + dataStoreContextBuilder.persistent(true); + + TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext"); + + assertEquals("isRecoveryApplicable", true, + shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + + waitUntilLeader(shard); + + shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender()); + + assertEquals("isRecoveryApplicable", false, + shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + + shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender()); + + assertEquals("isRecoveryApplicable", true, + shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } private NormalizedNode readStore(final InMemoryDOMDataStore store) throws ReadFailedException { DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 3c6a0cef5c..fd41c49390 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -17,6 +17,7 @@ import com.google.common.base.Optional; import com.typesafe.config.ConfigFactory; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.time.StopWatch; +import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; @@ -339,4 +340,29 @@ public class ActorContextTest extends AbstractActorTest{ } + @Test + public void testSetDatastoreContext() { + new JavaTestKit(getSystem()) {{ + ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class), + mock(Configuration.class), DatastoreContext.newBuilder(). + operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build()); + + assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds()); + assertEquals("getTransactionCommitOperationTimeout", 7, + actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); + + DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6). + shardTransactionCommitTimeoutInSeconds(8).build(); + + actorContext.setDatastoreContext(newContext); + + expectMsgClass(duration("5 seconds"), DatastoreContext.class); + + Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext()); + + assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds()); + assertEquals("getTransactionCommitOperationTimeout", 8, + actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); + }}; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties index 9ed3d276a3..3dd752ec30 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties @@ -3,4 +3,4 @@ org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a org.slf4j.simpleLogger.logFile=System.out org.slf4j.simpleLogger.showShortLogName=true org.slf4j.simpleLogger.levelInBrackets=true -org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=trace \ No newline at end of file +org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug \ No newline at end of file