From: Moiz Raja Date: Wed, 4 Mar 2015 22:11:49 +0000 (+0000) Subject: Merge "Fix to karaf-parent to copy in dependencies." X-Git-Tag: release/lithium~455 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=2292a31efc0f7779d5c62d6f2de05f57fa2fafc3;hp=361e28e5c71e09ba4c267b20d0cf7546a3f1fcaa Merge "Fix to karaf-parent to copy in dependencies." --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java index 3e6742c17d..d4d13899eb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java @@ -43,6 +43,7 @@ public class DefaultConfigParamsImpl implements ConfigParams { private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT; private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE; private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis(); + private FiniteDuration electionTimeOutInterval; // 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's // in-memory journal can use before it needs to snapshot @@ -52,6 +53,7 @@ public class DefaultConfigParamsImpl implements ConfigParams { public void setHeartBeatInterval(FiniteDuration heartBeatInterval) { this.heartBeatInterval = heartBeatInterval; + electionTimeOutInterval = null; } public void setSnapshotBatchCount(long snapshotBatchCount) { @@ -72,6 +74,7 @@ public class DefaultConfigParamsImpl implements ConfigParams { public void setElectionTimeoutFactor(long electionTimeoutFactor){ this.electionTimeoutFactor = electionTimeoutFactor; + electionTimeOutInterval = null; } @Override @@ -92,7 +95,11 @@ public class DefaultConfigParamsImpl implements ConfigParams { @Override public FiniteDuration getElectionTimeOutInterval() { - return getHeartBeatInterval().$times(electionTimeoutFactor); + if(electionTimeOutInterval == null) { + electionTimeOutInterval = getHeartBeatInterval().$times(electionTimeoutFactor); + } + + return electionTimeOutInterval; } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index 04b9f163f4..90e1282561 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.raft; import com.google.common.base.Stopwatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import scala.concurrent.duration.FiniteDuration; public class FollowerLogInformationImpl implements FollowerLogInformation { private static final AtomicLongFieldUpdater NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex"); @@ -21,18 +20,17 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { private final Stopwatch stopwatch = Stopwatch.createUnstarted(); - private final long followerTimeoutMillis; + private final RaftActorContext context; private volatile long nextIndex; private volatile long matchIndex; - public FollowerLogInformationImpl(String id, long nextIndex, - long matchIndex, FiniteDuration followerTimeoutDuration) { + public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) { this.id = id; - this.nextIndex = nextIndex; + this.nextIndex = context.getCommitIndex(); this.matchIndex = matchIndex; - this.followerTimeoutMillis = followerTimeoutDuration.toMillis(); + this.context = context; } @Override @@ -78,7 +76,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { @Override public boolean isFollowerActive() { long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); - return (stopwatch.isRunning()) && (elapsed <= followerTimeoutMillis); + return (stopwatch.isRunning()) && + (elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis()); } @Override @@ -107,7 +106,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { builder.append("FollowerLogInformationImpl [id=").append(id).append(", nextIndex=").append(nextIndex) .append(", matchIndex=").append(matchIndex).append(", stopwatch=") .append(stopwatch.elapsed(TimeUnit.MILLISECONDS)) - .append(", followerTimeoutMillis=").append(followerTimeoutMillis).append("]"); + .append(", followerTimeoutMillis=") + .append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]"); return builder.toString(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index cb1b42aa1b..ec3f375bde 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -99,7 +99,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * This context should NOT be passed directly to any other actor it is * only to be consumed by the RaftActorBehaviors */ - private final RaftActorContext context; + private final RaftActorContextImpl context; /** * The in-memory journal @@ -140,6 +140,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { super.preStart(); } + @Override + public void postStop() { + if(currentBehavior != null) { + try { + currentBehavior.close(); + } catch (Exception e) { + LOG.debug("{}: Error closing behavior {}", persistenceId(), currentBehavior.state()); + } + } + + super.postStop(); + } + @Override public void handleRecover(Object message) { if(persistence().isRecoveryApplicable()) { @@ -515,6 +528,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return context; } + protected void updateConfigParams(ConfigParams configParams) { + context.setConfigParams(configParams); + } + /** * setPeerAddress sets the address of a known peer at a later time. *

diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index b71b3be352..6fc5e4369b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -37,7 +37,7 @@ public class RaftActorContextImpl implements RaftActorContext { private final Logger LOG; - private final ConfigParams configParams; + private ConfigParams configParams; private boolean snapshotCaptureInitiated; @@ -59,6 +59,10 @@ public class RaftActorContextImpl implements RaftActorContext { this.LOG = logger; } + void setConfigParams(ConfigParams configParams) { + this.configParams = configParams; + } + @Override public ActorRef actorOf(Props props){ return context.actorOf(props); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index be51ba069c..890d45e8fb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -97,9 +97,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { final Builder ftlBuilder = ImmutableMap.builder(); for (String followerId : context.getPeerAddresses().keySet()) { FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(followerId, - context.getCommitIndex(), -1, - context.getConfigParams().getElectionTimeOutInterval()); + new FollowerLogInformationImpl(followerId, -1, context); ftlBuilder.put(followerId, followerLogInformation); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java index 84d1545a65..5be9030f59 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java @@ -7,29 +7,29 @@ */ package org.opendaylight.controller.cluster.raft; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.concurrent.TimeUnit; import org.junit.Test; import scala.concurrent.duration.FiniteDuration; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - public class FollowerLogInformationImplTest { @Test public void testIsFollowerActive() { - FiniteDuration timeoutDuration = - new FiniteDuration(500, TimeUnit.MILLISECONDS); - - FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl( - "follower1", 10, 9, timeoutDuration); + MockRaftActorContext context = new MockRaftActorContext(); + context.setCommitIndex(10); + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS)); + configParams.setElectionTimeoutFactor(1); + context.setConfigParams(configParams); + FollowerLogInformation followerLogInformation = + new FollowerLogInformationImpl("follower1", 9, context); assertFalse("Follower should be termed inactive before stopwatch starts", followerLogInformation.isFollowerActive()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 297d781251..c3161a592f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -37,22 +37,7 @@ public class MockRaftActorContext implements RaftActorContext { private boolean snapshotCaptureInitiated; public MockRaftActorContext(){ - electionTerm = null; - - initReplicatedLog(); - } - - public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){ - this.id = id; - this.system = system; - this.actor = actor; - - final String id1 = id; electionTerm = new ElectionTerm() { - /** - * Identifier of the actor whose election term information this is - */ - private final String id = id1; private long currentTerm = 1; private String votedFor = ""; @@ -81,6 +66,13 @@ public class MockRaftActorContext implements RaftActorContext { }; configParams = new DefaultConfigParamsImpl(); + } + + public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){ + this(); + this.id = id; + this.system = system; + this.actor = actor; initReplicatedLog(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java index 20ee55519b..2688d0195d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java @@ -11,8 +11,11 @@ import java.io.IOException; import java.util.Dictionary; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceReference; +import org.osgi.framework.ServiceRegistration; import org.osgi.service.cm.Configuration; import org.osgi.service.cm.ConfigurationAdmin; +import org.osgi.service.cm.ConfigurationEvent; +import org.osgi.service.cm.ConfigurationListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,12 +28,19 @@ import org.slf4j.LoggerFactory; public class DatastoreContextConfigAdminOverlay implements AutoCloseable { public static final String CONFIG_ID = "org.opendaylight.controller.cluster.datastore"; + public static interface Listener { + void onDatastoreContextUpdated(DatastoreContext context); + } + private static final Logger LOG = LoggerFactory.getLogger(DatastoreContextConfigAdminOverlay.class); private final DatastoreContextIntrospector introspector; private final BundleContext bundleContext; + private ServiceRegistration configListenerServiceRef; + private Listener listener; - public DatastoreContextConfigAdminOverlay(DatastoreContextIntrospector introspector, BundleContext bundleContext) { + public DatastoreContextConfigAdminOverlay(DatastoreContextIntrospector introspector, + BundleContext bundleContext) { this.introspector = introspector; this.bundleContext = bundleContext; @@ -40,9 +50,16 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable { LOG.warn("No ConfigurationAdmin service found"); } else { overlaySettings(configAdminServiceReference); + + configListenerServiceRef = bundleContext.registerService(ConfigurationListener.class.getName(), + new DatastoreConfigurationListener(), null); } } + public void setListener(Listener listener) { + this.listener = listener; + } + private void overlaySettings(ServiceReference configAdminServiceReference) { try { ConfigurationAdmin configAdmin = bundleContext.getService(configAdminServiceReference); @@ -53,7 +70,11 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable { LOG.debug("Overlaying settings: {}", properties); - introspector.update(properties); + if(introspector.update(properties)) { + if(listener != null) { + listener.onDatastoreContextUpdated(introspector.getContext()); + } + } } else { LOG.debug("No Configuration found for {}", CONFIG_ID); } @@ -72,5 +93,20 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable { @Override public void close() { + listener = null; + + if(configListenerServiceRef != null) { + configListenerServiceRef.unregister(); + } + } + + private class DatastoreConfigurationListener implements ConfigurationListener { + @Override + public void configurationEvent(ConfigurationEvent event) { + if(CONFIG_ID.equals(event.getPid()) && event.getType() == ConfigurationEvent.CM_UPDATED) { + LOG.debug("configurationEvent: config {} was updated", CONFIG_ID); + overlaySettings(event.getReference()); + } + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java index 3ca64210be..0bbeefd6fa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java @@ -196,11 +196,13 @@ public class DatastoreContextIntrospector { * @param properties the properties to apply * @return true if the cached DatastoreContext was updated, false otherwise. */ - public boolean update(Dictionary properties) { + public synchronized boolean update(Dictionary properties) { if(properties == null || properties.isEmpty()) { return false; } + LOG.debug("In update: properties: {}", properties); + Builder builder = DatastoreContext.newBuilderFrom(context); final String dataStoreTypePrefix = context.getDataStoreType() + '.'; @@ -291,12 +293,12 @@ public class DatastoreContextIntrospector { } private Object constructorValueRecursively(Class toType, Object fromValue) throws Exception { - LOG.debug("convertValueRecursively - toType: {}, fromValue {} ({})", + LOG.trace("convertValueRecursively - toType: {}, fromValue {} ({})", toType.getSimpleName(), fromValue, fromValue.getClass().getSimpleName()); Constructor ctor = constructors.get(toType); - LOG.debug("Found {}", ctor); + LOG.trace("Found {}", ctor); if(ctor == null) { throw new IllegalArgumentException(String.format("Constructor not found for type %s", toType)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 434efc9111..51182deb1d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -34,7 +34,8 @@ import org.slf4j.LoggerFactory; /** * */ -public class DistributedDataStore implements DOMStore, SchemaContextListener, AutoCloseable { +public class DistributedDataStore implements DOMStore, SchemaContextListener, + DatastoreContextConfigAdminOverlay.Listener, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); public static final int REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR = 24; // 24 times the usual operation timeout @@ -127,6 +128,14 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au actorContext.setSchemaContext(schemaContext); } + @Override + public void onDatastoreContextUpdated(DatastoreContext context) { + LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreType()); + + actorContext.setDatastoreContext(context); + datastoreConfigMXBean.setContext(context); + } + @Override public void close() { datastoreConfigMXBean.unregisterMBean(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java index f3a55c55a7..ee9f4f3ad5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java @@ -34,6 +34,8 @@ public class DistributedDataStoreFactory { final DistributedDataStore dataStore = new DistributedDataStore(actorSystem, new ClusterWrapperImpl(actorSystem), config, introspector.getContext()); + overlay.setListener(dataStore); + ShardStrategyFactory.setConfiguration(config); schemaService.registerSchemaContextListener(dataStore); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 0672023fcb..c509580ac6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -113,9 +113,9 @@ public class Shard extends RaftActor { private final List delayedListenerRegistrations = Lists.newArrayList(); - private final DatastoreContext datastoreContext; + private DatastoreContext datastoreContext; - private final DataPersistenceProvider dataPersistenceProvider; + private DataPersistenceProvider dataPersistenceProvider; private SchemaContext schemaContext; @@ -123,7 +123,7 @@ public class Shard extends RaftActor { private final ShardCommitCoordinator commitCoordinator; - private final long transactionCommitTimeout; + private long transactionCommitTimeout; private Cancellable txCommitTimeoutCheckSchedule; @@ -175,8 +175,7 @@ public class Shard extends RaftActor { commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES), datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString()); - transactionCommitTimeout = TimeUnit.MILLISECONDS.convert( - datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); + setTransactionCommitTimeout(); // create a notifier actor for each cluster member roleChangeNotifier = createRoleChangeNotifier(name.toString()); @@ -185,6 +184,11 @@ public class Shard extends RaftActor { getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); } + private void setTransactionCommitTimeout() { + transactionCommitTimeout = TimeUnit.MILLISECONDS.convert( + datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); + } + private static Map mapPeerAddresses( final Map peerAddresses) { Map map = new HashMap<>(); @@ -216,11 +220,15 @@ public class Shard extends RaftActor { @Override public void postStop() { + LOG.info("Stopping Shard {}", persistenceId()); + super.postStop(); if(txCommitTimeoutCheckSchedule != null) { txCommitTimeoutCheckSchedule.cancel(); } + + shardMBean.unregisterMBean(); } @Override @@ -278,6 +286,8 @@ public class Shard extends RaftActor { resolved.getPeerAddress()); } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) { handleTransactionCommitTimeoutCheck(); + } else if(message instanceof DatastoreContext) { + onDatastoreContext((DatastoreContext)message); } else { super.onReceiveCommand(message); } @@ -291,6 +301,24 @@ public class Shard extends RaftActor { return roleChangeNotifier; } + private void onDatastoreContext(DatastoreContext context) { + datastoreContext = context; + + commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity()); + + setTransactionCommitTimeout(); + + if(datastoreContext.isPersistent() && + dataPersistenceProvider instanceof NonPersistentRaftDataProvider) { + dataPersistenceProvider = new PersistentDataProvider(); + } else if(!datastoreContext.isPersistent() && + dataPersistenceProvider instanceof PersistentDataProvider) { + dataPersistenceProvider = new NonPersistentRaftDataProvider(); + } + + updateConfigParams(datastoreContext.getShardRaftConfig()); + } + private void handleTransactionCommitTimeoutCheck() { CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry(); if(cohortEntry != null) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 8b95404c4e..951bc22545 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -34,7 +34,7 @@ public class ShardCommitCoordinator { private final Queue queuedCohortEntries; - private final int queueCapacity; + private int queueCapacity; private final Logger log; @@ -54,6 +54,10 @@ public class ShardCommitCoordinator { queuedCohortEntries = new LinkedList<>(); } + public void setQueueCapacity(int queueCapacity) { + this.queueCapacity = queueCapacity; + } + /** * This method caches a cohort entry for the given transactions ID in preparation for the * subsequent 3-phase commit. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 426a2e0934..775cae35e2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -39,7 +39,6 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; -import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; @@ -90,9 +89,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final String shardDispatcherPath; - private ShardManagerInfoMBean mBean; + private ShardManagerInfo mBean; - private final DatastoreContext datastoreContext; + private DatastoreContext datastoreContext; private Collection knownModules = Collections.emptySet(); @@ -132,6 +131,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext)); } + @Override + public void postStop() { + LOG.info("Stopping ShardManager"); + + mBean.unregisterMBean(); + } + @Override public void handleCommand(Object message) throws Exception { if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) { @@ -148,6 +154,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { memberRemoved((ClusterEvent.MemberRemoved) message); } else if(message instanceof ClusterEvent.UnreachableMember) { ignoreMessage(message); + } else if(message instanceof DatastoreContext) { + onDatastoreContext((DatastoreContext)message); } else{ unknownMessage(message); } @@ -258,6 +266,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void onDatastoreContext(DatastoreContext context) { + datastoreContext = context; + for (ShardInformation info : localShards.values()) { + if (info.getActor() != null) { + info.getActor().tell(datastoreContext, getSelf()); + } + } + } + /** * Notifies all the local shards of a change in the schema context * diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 26e6318f6d..7eede29b65 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -85,18 +85,19 @@ public class ActorContext { private final ActorRef shardManager; private final ClusterWrapper clusterWrapper; private final Configuration configuration; - private final DatastoreContext datastoreContext; - private final FiniteDuration operationDuration; - private final Timeout operationTimeout; + private DatastoreContext datastoreContext; + private FiniteDuration operationDuration; + private Timeout operationTimeout; private final String selfAddressHostPort; - private final RateLimiter txRateLimiter; + private RateLimiter txRateLimiter; private final MetricRegistry metricRegistry = new MetricRegistry(); private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build(); private final int transactionOutstandingOperationLimit; - private final Timeout transactionCommitOperationTimeout; + private Timeout transactionCommitOperationTimeout; private final Dispatchers dispatchers; private volatile SchemaContext schemaContext; + private volatile boolean updated; public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration) { @@ -112,14 +113,9 @@ public class ActorContext { this.clusterWrapper = clusterWrapper; this.configuration = configuration; this.datastoreContext = datastoreContext; - this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit()); this.dispatchers = new Dispatchers(actorSystem.dispatchers()); - operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); - operationTimeout = new Timeout(operationDuration); - transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(), - TimeUnit.SECONDS)); - + setCachedProperties(); Address selfAddress = clusterWrapper.getSelfAddress(); if (selfAddress != null && !selfAddress.host().isEmpty()) { @@ -133,6 +129,16 @@ public class ActorContext { } + private void setCachedProperties() { + txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit()); + + operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); + operationTimeout = new Timeout(operationDuration); + + transactionCommitOperationTimeout = new Timeout(Duration.create( + datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS)); + } + public DatastoreContext getDatastoreContext() { return datastoreContext; } @@ -157,7 +163,25 @@ public class ActorContext { this.schemaContext = schemaContext; if(shardManager != null) { - shardManager.tell(new UpdateSchemaContext(schemaContext), null); + shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); + } + } + + public void setDatastoreContext(DatastoreContext context) { + this.datastoreContext = context; + setCachedProperties(); + + // We write the 'updated' volatile to trigger a write memory barrier so that the writes above + // will be published immediately even though they may not be immediately visible to other + // threads due to unsynchronized reads. That's OK though - we're going for eventual + // consistency here as immediately visible updates to these members aren't critical. These + // members could've been made volatile but wanted to avoid volatile reads as these are + // accessed often and updates will be infrequent. + + updated = true; + + if(shardManager != null) { + shardManager.tell(context, ActorRef.noSender()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java index 3693c01b42..ecfcdefcb2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java @@ -7,51 +7,175 @@ */ package org.opendaylight.controller.cluster.datastore; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.Dictionary; import java.util.Hashtable; +import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceReference; +import org.osgi.framework.ServiceRegistration; import org.osgi.service.cm.Configuration; import org.osgi.service.cm.ConfigurationAdmin; +import org.osgi.service.cm.ConfigurationEvent; +import org.osgi.service.cm.ConfigurationListener; /** * Unit tests for DatastoreContextConfigAdminOverlay. * * @author Thomas Pantelis */ +@SuppressWarnings("unchecked") public class DatastoreContextConfigAdminOverlayTest { - @SuppressWarnings("unchecked") - @Test - public void test() throws IOException { - BundleContext mockBundleContext = mock(BundleContext.class); - ServiceReference mockServiceRef = mock(ServiceReference.class); - ConfigurationAdmin mockConfigAdmin = mock(ConfigurationAdmin.class); - Configuration mockConfig = mock(Configuration.class); - DatastoreContextIntrospector mockIntrospector = mock(DatastoreContextIntrospector.class); + @Mock + private BundleContext mockBundleContext; + + @Mock + private ServiceReference mockConfigAdminServiceRef; + + @Mock + private ConfigurationAdmin mockConfigAdmin; + + @Mock + private Configuration mockConfig; + + @Mock + private DatastoreContextIntrospector mockIntrospector; + + @Mock + private ServiceRegistration configListenerServiceReg; + + @Before + public void setup() throws IOException { + MockitoAnnotations.initMocks(this); - doReturn(mockServiceRef).when(mockBundleContext).getServiceReference(ConfigurationAdmin.class); - doReturn(mockConfigAdmin).when(mockBundleContext).getService(mockServiceRef); + doReturn(mockConfigAdminServiceRef).when(mockBundleContext).getServiceReference(ConfigurationAdmin.class); + doReturn(mockConfigAdmin).when(mockBundleContext).getService(mockConfigAdminServiceRef); + doReturn(configListenerServiceReg).when(mockBundleContext).registerService( + eq(ConfigurationListener.class.getName()), any(), any(Dictionary.class)); doReturn(mockConfig).when(mockConfigAdmin).getConfiguration(DatastoreContextConfigAdminOverlay.CONFIG_ID); doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(mockConfig).getPid(); + } + + @Test + public void testUpdateOnConstruction() { + Dictionary properties = new Hashtable<>(); + properties.put("property", "value"); + doReturn(properties).when(mockConfig).getProperties(); + + DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay( + mockIntrospector, mockBundleContext); + + verify(mockIntrospector).update(properties); + + verify(mockBundleContext).ungetService(mockConfigAdminServiceRef); + + overlay.close(); + } + + @Test + public void testUpdateOnConfigurationEvent() { + DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay( + mockIntrospector, mockBundleContext); + + reset(mockIntrospector); + + DatastoreContext context = DatastoreContext.newBuilder().build(); + doReturn(context).when(mockIntrospector).getContext(); + + DatastoreContextConfigAdminOverlay.Listener mockListener = + mock(DatastoreContextConfigAdminOverlay.Listener.class); + + overlay.setListener(mockListener); + Dictionary properties = new Hashtable<>(); properties.put("property", "value"); doReturn(properties).when(mockConfig).getProperties(); - try(DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay( - mockIntrospector, mockBundleContext)) { - } + doReturn(true).when(mockIntrospector).update(properties); + + ArgumentCaptor configListener = + ArgumentCaptor.forClass(ConfigurationListener.class); + verify(mockBundleContext).registerService(eq(ConfigurationListener.class.getName()), + configListener.capture(), any(Dictionary.class)); + + ConfigurationEvent configEvent = mock(ConfigurationEvent.class); + doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(configEvent).getPid(); + doReturn(mockConfigAdminServiceRef).when(configEvent).getReference(); + doReturn(ConfigurationEvent.CM_UPDATED).when(configEvent).getType(); + + configListener.getValue().configurationEvent(configEvent); verify(mockIntrospector).update(properties); - verify(mockBundleContext).ungetService(mockServiceRef); + verify(mockListener).onDatastoreContextUpdated(context); + + verify(mockBundleContext, times(2)).ungetService(mockConfigAdminServiceRef); + + overlay.close(); + + verify(configListenerServiceReg).unregister(); + } + + @Test + public void testConfigurationEventWithDifferentPid() { + DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay( + mockIntrospector, mockBundleContext); + + reset(mockIntrospector); + + ArgumentCaptor configListener = + ArgumentCaptor.forClass(ConfigurationListener.class); + verify(mockBundleContext).registerService(eq(ConfigurationListener.class.getName()), + configListener.capture(), any(Dictionary.class)); + + ConfigurationEvent configEvent = mock(ConfigurationEvent.class); + doReturn("other-pid").when(configEvent).getPid(); + doReturn(mockConfigAdminServiceRef).when(configEvent).getReference(); + doReturn(ConfigurationEvent.CM_UPDATED).when(configEvent).getType(); + + configListener.getValue().configurationEvent(configEvent); + + verify(mockIntrospector, times(0)).update(any(Dictionary.class)); + + overlay.close(); + } + + @Test + public void testConfigurationEventWithNonUpdateEventType() { + DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay( + mockIntrospector, mockBundleContext); + + reset(mockIntrospector); + + ArgumentCaptor configListener = + ArgumentCaptor.forClass(ConfigurationListener.class); + verify(mockBundleContext).registerService(eq(ConfigurationListener.class.getName()), + configListener.capture(), any(Dictionary.class)); + + ConfigurationEvent configEvent = mock(ConfigurationEvent.class); + doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(configEvent).getPid(); + doReturn(mockConfigAdminServiceRef).when(configEvent).getReference(); + doReturn(ConfigurationEvent.CM_DELETED).when(configEvent).getType(); + + configListener.getValue().configurationEvent(configEvent); + + verify(mockIntrospector, times(0)).update(any(Dictionary.class)); + + overlay.close(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index bbbc4db5e3..1ebd1b91dd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -1565,6 +1565,31 @@ public class ShardTest extends AbstractActorTest { } + @Test + public void testOnDatastoreContext() { + new ShardTestKit(getSystem()) {{ + dataStoreContextBuilder.persistent(true); + + TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext"); + + assertEquals("isRecoveryApplicable", true, + shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + + waitUntilLeader(shard); + + shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender()); + + assertEquals("isRecoveryApplicable", false, + shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + + shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender()); + + assertEquals("isRecoveryApplicable", true, + shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } private NormalizedNode readStore(final InMemoryDOMDataStore store) throws ReadFailedException { DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 3c6a0cef5c..fd41c49390 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -17,6 +17,7 @@ import com.google.common.base.Optional; import com.typesafe.config.ConfigFactory; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.time.StopWatch; +import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; @@ -339,4 +340,29 @@ public class ActorContextTest extends AbstractActorTest{ } + @Test + public void testSetDatastoreContext() { + new JavaTestKit(getSystem()) {{ + ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class), + mock(Configuration.class), DatastoreContext.newBuilder(). + operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build()); + + assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds()); + assertEquals("getTransactionCommitOperationTimeout", 7, + actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); + + DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6). + shardTransactionCommitTimeoutInSeconds(8).build(); + + actorContext.setDatastoreContext(newContext); + + expectMsgClass(duration("5 seconds"), DatastoreContext.class); + + Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext()); + + assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds()); + assertEquals("getTransactionCommitOperationTimeout", 8, + actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); + }}; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties index 9ed3d276a3..3dd752ec30 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties @@ -3,4 +3,4 @@ org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a org.slf4j.simpleLogger.logFile=System.out org.slf4j.simpleLogger.showShortLogName=true org.slf4j.simpleLogger.levelInBrackets=true -org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=trace \ No newline at end of file +org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug \ No newline at end of file