<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-data-impl</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+
</dependencies>
<build>
import akka.util.Timeout;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.text.WordUtils;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader;
import org.opendaylight.controller.cluster.datastore.config.FileConfigurationReader;
import org.opendaylight.controller.cluster.raft.ConfigParams;
public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
+ public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
private ConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
private DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
+ private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
private DatastoreContext(){
setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
return raftConfig.getElectionTimeoutFactor();
}
+ public String getDataStoreType(){
+ return dataStoreType;
+ }
public long getTransactionCreationInitialRateLimit() {
return transactionCreationInitialRateLimit;
return this;
}
+ public Builder dataStoreType(String dataStoreType){
+ datastoreContext.dataStoreType = dataStoreType;
+ datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreType) + "Datastore";
+ return this;
+ }
+
public DatastoreContext build() {
return datastoreContext;
}
private final ActorContext actorContext;
- public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
+ public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
Configuration configuration, DatastoreContext datastoreContext) {
Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
- Preconditions.checkNotNull(type, "type should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null");
+ String type = datastoreContext.getDataStoreType();
+
String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
LOG.info("Creating ShardManager : {}", shardManagerId);
actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
- ShardManager.props(type, cluster, configuration, datastoreContext)
+ ShardManager.props(cluster, configuration, datastoreContext)
.withMailbox(ActorContext.MAILBOX), shardManagerId ),
- cluster, configuration, datastoreContext, type);
+ cluster, configuration, datastoreContext);
}
public DistributedDataStore(ActorContext actorContext) {
private static volatile ActorSystem persistentActorSystem = null;
- public static DistributedDataStore createInstance(String name, SchemaService schemaService,
+ public static DistributedDataStore createInstance(SchemaService schemaService,
DatastoreContext datastoreContext, BundleContext bundleContext) {
ActorSystem actorSystem = getOrCreateInstance(bundleContext, datastoreContext.getConfigurationReader());
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
final DistributedDataStore dataStore =
- new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
+ new DistributedDataStore(actorSystem, new ClusterWrapperImpl(actorSystem),
config, datastoreContext);
ShardStrategyFactory.setConfiguration(config);
private final DataPersistenceProvider dataPersistenceProvider;
/**
- * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
- * configuration or operational
*/
- protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
+ protected ShardManager(ClusterWrapper cluster, Configuration configuration,
DatastoreContext datastoreContext) {
- this.type = Preconditions.checkNotNull(type, "type should not be null");
this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
this.datastoreContext = datastoreContext;
this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
+ this.type = datastoreContext.getDataStoreType();
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
}
- public static Props props(final String type,
+ public static Props props(
final ClusterWrapper cluster,
final Configuration configuration,
final DatastoreContext datastoreContext) {
- Preconditions.checkNotNull(type, "type should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
- return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext));
+ return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext));
}
@Override
private static class ShardManagerCreator implements Creator<ShardManager> {
private static final long serialVersionUID = 1L;
- final String type;
final ClusterWrapper cluster;
final Configuration configuration;
final DatastoreContext datastoreContext;
- ShardManagerCreator(String type, ClusterWrapper cluster,
+ ShardManagerCreator(ClusterWrapper cluster,
Configuration configuration, DatastoreContext datastoreContext) {
- this.type = type;
this.cluster = cluster;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
@Override
public ShardManager create() throws Exception {
- return new ShardManager(type, cluster, configuration, datastoreContext);
+ return new ShardManager(cluster, configuration, datastoreContext);
}
}
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
private final DatastoreContext datastoreContext;
- private final String dataStoreType;
private final FiniteDuration operationDuration;
private final Timeout operationTimeout;
private final String selfAddressHostPort;
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration) {
this(actorSystem, shardManager, clusterWrapper, configuration,
- DatastoreContext.newBuilder().build(), UNKNOWN_DATA_STORE_TYPE);
+ DatastoreContext.newBuilder().build());
}
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration,
- DatastoreContext datastoreContext, String dataStoreType) {
+ DatastoreContext datastoreContext) {
this.actorSystem = actorSystem;
this.shardManager = shardManager;
this.clusterWrapper = clusterWrapper;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
- this.dataStoreType = dataStoreType;
this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
* @return
*/
public Timer getOperationTimer(String operationName){
- final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType, operationName, METRIC_RATE);
+ final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
return metricRegistry.timer(rate);
}
* @return
*/
public String getDataStoreType() {
- return dataStoreType;
+ return datastoreContext.getDataStoreType();
}
/**
}
DatastoreContext datastoreContext = DatastoreContext.newBuilder()
- .dataStoreMXBeanType("DistributedConfigDatastore")
+ .dataStoreType("config")
.dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create(
props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
.transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
.build();
- return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
+ return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
datastoreContext, bundleContext);
}
}
DatastoreContext datastoreContext = DatastoreContext.newBuilder()
- .dataStoreMXBeanType("DistributedOperationalDatastore")
+ .dataStoreType("operational")
.dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create(
props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
.transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
.build();
- return DistributedDataStoreFactory.createInstance("operational",
- getOperationalSchemaServiceDependency(), datastoreContext, bundleContext);
+ return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
+ datastoreContext, bundleContext);
}
public void setBundleContext(BundleContext bundleContext) {
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
ShardStrategyFactory.setConfiguration(config);
+ datastoreContextBuilder.dataStoreType(typeName);
+
DatastoreContext datastoreContext = datastoreContextBuilder.build();
- DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster,
+
+ DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
config, datastoreContext);
SchemaContext schemaContext = SchemaContextHelper.full();
package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.Creator;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
-import java.net.URI;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
public class ShardManagerTest extends AbstractActorTest {
private static int ID_COUNTER = 1;
}
private Props newShardMgrProps() {
- return ShardManager.props(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
- DatastoreContext.newBuilder().build());
+
+ DatastoreContext.Builder builder = DatastoreContext.newBuilder();
+ builder.dataStoreType(shardMrgIDSuffix);
+ return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), builder.build());
}
@Test
public void testRecoveryApplicable(){
new JavaTestKit(getSystem()) {
{
- final Props persistentProps = ShardManager.props(shardMrgIDSuffix,
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(true).build());
+ final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(true).dataStoreType(shardMrgIDSuffix).build());
final TestActorRef<ShardManager> persistentShardManager =
TestActorRef.create(getSystem(), persistentProps);
assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
- final Props nonPersistentProps = ShardManager.props(shardMrgIDSuffix,
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(false).build());
+ final Props nonPersistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(false).dataStoreType(shardMrgIDSuffix).build());
final TestActorRef<ShardManager> nonPersistentShardManager =
TestActorRef.create(getSystem(), nonPersistentProps);
private static final long serialVersionUID = 1L;
@Override
public ShardManager create() throws Exception {
- return new ShardManager(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build()) {
+ return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()) {
@Override
protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
DataPersistenceProviderMonitor dataPersistenceProviderMonitor
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
TestShardManager(String shardMrgIDSuffix) {
- super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
- DatastoreContext.newBuilder().build());
+ super(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build());
}
@Override
DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
ActorContext actorContext =
new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext, "config");
+ mock(Configuration.class), mockDataStoreContext);
// Check that the initial value is being picked up from DataStoreContext
assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);