import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
EXECUTOR_MAX_QUEUE_SIZE_PROP,
DEFAULT_EXECUTOR_MAX_QUEUE_SIZE), "DistDataStore"));
- public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, Configuration configuration) {
+ public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
+ Configuration configuration, InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
Preconditions.checkNotNull(type, "type should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
LOG.info("Creating ShardManager : {}", shardManagerId);
this.actorContext = new ActorContext(actorSystem, actorSystem
- .actorOf(ShardManager.props(type, cluster, configuration),
+ .actorOf(ShardManager.props(type, cluster, configuration, dataStoreProperties),
shardManagerId ), cluster, configuration);
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSystem;
+
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
public class DistributedDataStoreFactory {
- public static DistributedDataStore createInstance(String name, SchemaService schemaService){
+ public static DistributedDataStore createInstance(String name, SchemaService schemaService,
+ InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+
ActorSystem actorSystem = ActorSystemFactory.getInstance();
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
final DistributedDataStore dataStore =
- new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),config );
- ShardStrategyFactory.setConfiguration(config);
- schemaService
- .registerSchemaContextListener(dataStore);
+ new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
+ config, dataStoreProperties );
+ ShardStrategyFactory.setConfiguration(config);
+ schemaService.registerSchemaContextListener(dataStore);
return dataStore;
-
}
}
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
- private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses) {
+ private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
+ InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
this.name = name;
LOG.info("Shard created : {} persistent : {}", name, persistent);
- store = InMemoryDOMDataStoreFactory.create(name.toString(), null);
+ store = InMemoryDOMDataStoreFactory.create(name.toString(), null, dataStoreProperties);
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
return map;
}
-
-
-
public static Props props(final ShardIdentifier name,
- final Map<ShardIdentifier, String> peerAddresses) {
+ final Map<ShardIdentifier, String> peerAddresses,
+ final InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
Preconditions.checkNotNull(name, "name should not be null");
Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
@Override
public Shard create() throws Exception {
- return new Shard(name, peerAddresses);
+ return new Shard(name, peerAddresses, dataStoreProperties);
}
});
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+
import scala.concurrent.duration.Duration;
import java.util.ArrayList;
private ShardManagerInfoMBean mBean;
+ private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
+
/**
* @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
* configuration or operational
*/
- private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
+ private ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
+ InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
this.type = Preconditions.checkNotNull(type, "type should not be null");
this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
+ this.dataStoreProperties = dataStoreProperties;
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
public static Props props(final String type,
final ClusterWrapper cluster,
- final Configuration configuration) {
+ final Configuration configuration,
+ final InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
Preconditions.checkNotNull(type, "type should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
@Override
public ShardManager create() throws Exception {
- return new ShardManager(type, cluster, configuration);
+ return new ShardManager(type, cluster, configuration, dataStoreProperties);
}
});
}
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
ActorRef actor = getContext()
- .actorOf(Shard.props(shardId, peerAddresses),
+ .actorOf(Shard.props(shardId, peerAddresses, dataStoreProperties),
shardId.toString());
localShardActorNames.add(shardId.toString());
localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
public class DistributedConfigDataStoreProviderModule extends
org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModule {
@Override
public java.lang.AutoCloseable createInstance() {
- return DistributedDataStoreFactory
- .createInstance("config", getConfigSchemaServiceDependency());
+ return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
+ InMemoryDOMDataStoreConfigProperties.create(getMaxShardDataChangeExecutorPoolSize(),
+ getMaxShardDataChangeExecutorQueueSize(),
+ getMaxShardDataChangeListenerQueueSize()));
}
}
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
public class DistributedOperationalDataStoreProviderModule extends
org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModule {
@Override
public java.lang.AutoCloseable createInstance() {
- return DistributedDataStoreFactory
- .createInstance("operational", getOperationalSchemaServiceDependency());
+ return DistributedDataStoreFactory.createInstance("operational",
+ getOperationalSchemaServiceDependency(),
+ InMemoryDOMDataStoreConfigProperties.create(getMaxShardDataChangeExecutorPoolSize(),
+ getMaxShardDataChangeExecutorQueueSize(),
+ getMaxShardDataChangeListenerQueueSize()));
}
}
case distributed-config-datastore-provider {
when "/config:modules/config:module/config:type = 'distributed-config-datastore-provider'";
container config-schema-service {
- uses config:service-ref {
- refine type {
- mandatory false;
- config:required-identity sal:schema-service;
- }
- }
- }
+ uses config:service-ref {
+ refine type {
+ mandatory false;
+ config:required-identity sal:schema-service;
+ }
+ }
+ }
+
+ leaf max-shard-data-change-executor-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for each shard's data store data change notification executor.";
+ }
+
+ leaf max-shard-data-change-executor-pool-size {
+ default 20;
+ type uint16;
+ description "The maximum thread pool size for each shard's data store data change notification executor.";
+ }
+
+ leaf max-shard-data-change-listener-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for each shard's data store data change listeners.";
+ }
}
}
// Augments the 'configuration' choice node under modules/module.
- augment "/config:modules/config:module/config:configuration" {
- case distributed-operational-datastore-provider {
- when "/config:modules/config:module/config:type = 'distributed-operational-datastore-provider'";
+ augment "/config:modules/config:module/config:configuration" {
+ case distributed-operational-datastore-provider {
+ when "/config:modules/config:module/config:type = 'distributed-operational-datastore-provider'";
container operational-schema-service {
- uses config:service-ref {
- refine type {
- mandatory false;
- config:required-identity sal:schema-service;
- }
- }
- }
+ uses config:service-ref {
+ refine type {
+ mandatory false;
+ config:required-identity sal:schema-service;
+ }
+ }
+ }
+
+ leaf max-shard-data-change-executor-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for each shard's data store data change notification executor.";
+ }
+
+ leaf max-shard-data-change-executor-pool-size {
+ default 20;
+ type uint16;
+ description "The maximum thread pool size for each shard's data store data change notification executor.";
+ }
+
+ leaf max-shard-data-change-listener-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for each shard's data store data change listeners.";
+ }
}
}
}
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("config").build();
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP);
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
final ActorRef shard = getSystem().actorOf(props);
new Within(duration("5 seconds")) {
protected void run() {
try {
final DistributedDataStore distributedDataStore =
- new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration);
+ new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration, null);
distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
try {
final DistributedDataStore distributedDataStore =
new DistributedDataStore(getSystem(), "config",
- new MockClusterWrapper(), configuration);
+ new MockClusterWrapper(), configuration, null);
distributedDataStore.onGlobalContextUpdated(
SchemaContextHelper.full());
ActorSystem actorSystem = mock(ActorSystem.class);
new DistributedDataStore(actorSystem, "config",
- mock(ClusterWrapper.class), mock(Configuration.class));
+ mock(ClusterWrapper.class), mock(Configuration.class), null);
verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config"));
}
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
- new MockConfiguration());
+ new MockConfiguration(), null);
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
- new MockConfiguration());
+ new MockConfiguration(), null);
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
- new MockConfiguration());
+ new MockConfiguration(), null);
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", mockClusterWrapper,
- new MockConfiguration());
+ new MockConfiguration(), null);
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
- new MockConfiguration());
+ new MockConfiguration(), null);
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
- new MockConfiguration());
+ new MockConfiguration(), null);
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("config").build();
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP);
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
final ActorRef subject =
getSystem().actorOf(props, "testCreateTransactionChain");
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("config").build();
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP);
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
final ActorRef subject =
getSystem().actorOf(props, "testRegisterChangeListener");
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("config").build();
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP);
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
final ActorRef subject =
getSystem().actorOf(props, "testCreateTransaction");
.shardName("inventory").type("config").build();
peerAddresses.put(identifier, null);
- final Props props = Shard.props(identifier, peerAddresses);
+ final Props props = Shard.props(identifier, peerAddresses, null);
final ActorRef subject =
getSystem().actorOf(props, "testPeerAddressResolved");
throws Throwable {
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadOnlyTransaction(), shard,
TestModel.createTestContext());
throws Throwable {
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
throws Throwable {
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
TestModel.createTestContext());
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
}
-
-
}
@Test
public void testOnReceiveReadData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext);
final ActorRef subject = getSystem().actorOf(props, "testReadData");
@Test
public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props( store.newReadOnlyTransaction(), shard, testSchemaContext);
final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
@Test
public void testOnReceiveDataExistsPositive() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext);
final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
@Test
public void testOnReceiveDataExistsNegative() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+ Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext);
final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
@Test
public void testOnReceiveWriteData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newWriteOnlyTransaction(), shard, TestModel.createTestContext());
final ActorRef subject =
@Test
public void testOnReceiveMergeData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext);
final ActorRef subject =
@Test
public void testOnReceiveDeleteData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props( store.newWriteOnlyTransaction(), shard, TestModel.createTestContext());
final ActorRef subject =
@Test
public void testOnReceiveReadyTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props( store.newReadWriteTransaction(), shard, TestModel.createTestContext());
final ActorRef subject =
@Test
public void testOnReceiveCloseTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext());
final ActorRef subject =
public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
try {
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadOnlyTransaction(), shard, TestModel.createTestContext());
final TestActorRef subject = TestActorRef.apply(props,getSystem());
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
-import org.opendaylight.yangtools.util.PropertyUtils;
-
import com.google.common.collect.ImmutableMap;
/**
public final class DomInmemoryDataBrokerModule extends
org.opendaylight.controller.config.yang.md.sal.dom.impl.AbstractDomInmemoryDataBrokerModule {
- private static final String FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE_PROP =
- "mdsal.datastore-future-callback-queue.size";
- private static final int DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE = 1000;
-
- private static final String FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE_PROP =
- "mdsal.datastore-future-callback-pool.size";
- private static final int DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE = 20;
- private static final String COMMIT_EXECUTOR_MAX_QUEUE_SIZE_PROP =
- "mdsal.datastore-commit-queue.size";
- private static final int DEFAULT_COMMIT_EXECUTOR_MAX_QUEUE_SIZE = 5000;
-
public DomInmemoryDataBrokerModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier,
final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
* system it's running on.
*/
ExecutorService commitExecutor = SpecialExecutors.newBoundedSingleThreadExecutor(
- PropertyUtils.getIntSystemProperty(
- COMMIT_EXECUTOR_MAX_QUEUE_SIZE_PROP,
- DEFAULT_COMMIT_EXECUTOR_MAX_QUEUE_SIZE), "WriteTxCommit");
+ getMaxDataBrokerCommitQueueSize(), "WriteTxCommit");
/*
* We use an executor for commit ListenableFuture callbacks that favors reusing available
* reached, subsequent submitted tasks will block the caller.
*/
Executor listenableFutureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(
- PropertyUtils.getIntSystemProperty(
- FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE_PROP,
- DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE),
- PropertyUtils.getIntSystemProperty(
- FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE_PROP,
- DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE), "CommitFutures");
+ getMaxDataBrokerFutureCallbackPoolSize(), getMaxDataBrokerFutureCallbackQueueSize(),
+ "CommitFutures");
DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores,
new DeadlockDetectingListeningExecutorService(commitExecutor,
}
}
}
+
+ leaf max-data-broker-future-callback-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for the data broker's commit future callback executor.";
+ }
+
+ leaf max-data-broker-future-callback-pool-size {
+ default 20;
+ type uint16;
+ description "The maximum thread pool size for the data broker's commit future callback executor.";
+ }
+
+ leaf max-data-broker-commit-queue-size {
+ default 5000;
+ type uint16;
+ description "The maximum queue size for the data broker's commit executor.";
+ }
}
}
package org.opendaylight.controller.config.yang.inmemory_datastore_provider;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
public class InMemoryConfigDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryConfigDataStoreProviderModule {
@Override
public java.lang.AutoCloseable createInstance() {
- return InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency());
+ return InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency(),
+ InMemoryDOMDataStoreConfigProperties.create(getMaxDataChangeExecutorPoolSize(),
+ getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize()));
}
}
package org.opendaylight.controller.config.yang.inmemory_datastore_provider;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
public class InMemoryOperationalDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryOperationalDataStoreProviderModule {
@Override
public java.lang.AutoCloseable createInstance() {
- return InMemoryDOMDataStoreFactory.create("DOM-OPER", getOperationalSchemaServiceDependency());
+ return InMemoryDOMDataStoreFactory.create("DOM-OPER", getOperationalSchemaServiceDependency(),
+ InMemoryDOMDataStoreConfigProperties.create(getMaxDataChangeExecutorPoolSize(),
+ getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize()));
}
}
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import org.opendaylight.yangtools.util.ExecutorServiceUtil;
-import org.opendaylight.yangtools.util.PropertyUtils;
import org.opendaylight.yangtools.util.concurrent.NotificationManager;
import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
}
};
- private static final String DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP =
- "mdsal.datastore-dcl-notification-queue.size";
-
- private static final int DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE = 1000;
-
private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
private final ListenerTree listenerTree = ListenerTree.create();
private final AtomicLong txCounter = new AtomicLong(0);
public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor,
final ExecutorService dataChangeListenerExecutor) {
+ this(name, listeningExecutor, dataChangeListenerExecutor,
+ InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE);
+ }
+
+ public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor,
+ final ExecutorService dataChangeListenerExecutor, int maxDataChangeListenerQueueSize) {
this.name = Preconditions.checkNotNull(name);
this.listeningExecutor = Preconditions.checkNotNull(listeningExecutor);
this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
- int maxDCLQueueSize = PropertyUtils.getIntSystemProperty(
- DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP, DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE );
-
dataChangeListenerNotificationManager =
new QueuedNotificationManager<>(this.dataChangeListenerExecutor,
- DCL_NOTIFICATION_MGR_INVOKER, maxDCLQueueSize, "DataChangeListenerQueueMgr");
+ DCL_NOTIFICATION_MGR_INVOKER, maxDataChangeListenerQueueSize,
+ "DataChangeListenerQueueMgr");
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+/**
+ * Holds configuration properties when creating an {@link InMemoryDOMDataStore} instance via the
+ * {@link InMemoryDOMDataStoreFactory}
+ *
+ * @author Thomas Pantelis
+ * @see InMemoryDOMDataStoreFactory
+ */
+public class InMemoryDOMDataStoreConfigProperties {
+
+ public static final int DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE = 1000;
+ public static final int DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE = 20;
+ public static final int DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE = 1000;
+
+ private static final InMemoryDOMDataStoreConfigProperties DEFAULT =
+ create(DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE,
+ DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE,
+ DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE);
+
+ private final int maxDataChangeExecutorQueueSize;
+ private final int maxDataChangeExecutorPoolSize;
+ private final int maxDataChangeListenerQueueSize;
+
+ /**
+ * Constructs an instance with the given property values.
+ *
+ * @param maxDataChangeExecutorPoolSize
+ * maximum thread pool size for the data change notification executor.
+ * @param maxDataChangeExecutorQueueSize
+ * maximum queue size for the data change notification executor.
+ * @param maxDataChangeListenerQueueSize
+ * maximum queue size for the data change listeners.
+ */
+ public static InMemoryDOMDataStoreConfigProperties create(int maxDataChangeExecutorPoolSize,
+ int maxDataChangeExecutorQueueSize, int maxDataChangeListenerQueueSize) {
+ return new InMemoryDOMDataStoreConfigProperties(maxDataChangeExecutorPoolSize,
+ maxDataChangeExecutorQueueSize, maxDataChangeListenerQueueSize);
+ }
+
+ /**
+ * Returns the InMemoryDOMDataStoreConfigProperties instance with default values.
+ */
+ public static InMemoryDOMDataStoreConfigProperties getDefault() {
+ return DEFAULT;
+ }
+
+ private InMemoryDOMDataStoreConfigProperties(int maxDataChangeExecutorPoolSize,
+ int maxDataChangeExecutorQueueSize, int maxDataChangeListenerQueueSize) {
+ this.maxDataChangeExecutorQueueSize = maxDataChangeExecutorQueueSize;
+ this.maxDataChangeExecutorPoolSize = maxDataChangeExecutorPoolSize;
+ this.maxDataChangeListenerQueueSize = maxDataChangeListenerQueueSize;
+ }
+
+ /**
+ * Returns the maximum queue size for the data change notification executor.
+ */
+ public int getMaxDataChangeExecutorQueueSize() {
+ return maxDataChangeExecutorQueueSize;
+ }
+
+ /**
+ * Returns the maximum thread pool size for the data change notification executor.
+ */
+ public int getMaxDataChangeExecutorPoolSize() {
+ return maxDataChangeExecutorPoolSize;
+ }
+
+ /**
+ * Returns the maximum queue size for the data change listeners.
+ */
+ public int getMaxDataChangeListenerQueueSize() {
+ return maxDataChangeListenerQueueSize;
+ }
+}
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
-import org.opendaylight.yangtools.util.PropertyUtils;
import com.google.common.util.concurrent.MoreExecutors;
/**
*/
public final class InMemoryDOMDataStoreFactory {
- private static final String DCL_EXECUTOR_MAX_QUEUE_SIZE_PROP =
- "mdsal.datastore-dcl-notification-queue.size";
- private static final int DEFAULT_DCL_EXECUTOR_MAX_QUEUE_SIZE = 1000;
-
- private static final String DCL_EXECUTOR_MAX_POOL_SIZE_PROP =
- "mdsal.datastore-dcl-notification-pool.size";
- private static final int DEFAULT_DCL_EXECUTOR_MAX_POOL_SIZE = 20;
-
private InMemoryDOMDataStoreFactory() {
}
+ public static InMemoryDOMDataStore create(final String name,
+ @Nullable final SchemaService schemaService) {
+ return create(name, schemaService, null);
+ }
+
/**
* Creates an InMemoryDOMDataStore instance.
*
* @param name the name of the data store
* @param schemaService the SchemaService to which to register the data store.
+ * @param properties configuration properties for the InMemoryDOMDataStore instance. If null,
+ * default property values are used.
* @return an InMemoryDOMDataStore instance
*/
public static InMemoryDOMDataStore create(final String name,
- @Nullable final SchemaService schemaService) {
+ @Nullable final SchemaService schemaService,
+ @Nullable final InMemoryDOMDataStoreConfigProperties properties) {
+
+ InMemoryDOMDataStoreConfigProperties actualProperties = properties;
+ if(actualProperties == null) {
+ actualProperties = InMemoryDOMDataStoreConfigProperties.getDefault();
+ }
// For DataChangeListener notifications we use an executor that provides the fastest
// task execution time to get higher throughput as DataChangeListeners typically provide
// much of the business logic for a data model. If the executor queue size limit is reached,
// subsequent submitted notifications will block the calling thread.
- int dclExecutorMaxQueueSize = PropertyUtils.getIntSystemProperty(
- DCL_EXECUTOR_MAX_QUEUE_SIZE_PROP, DEFAULT_DCL_EXECUTOR_MAX_QUEUE_SIZE);
- int dclExecutorMaxPoolSize = PropertyUtils.getIntSystemProperty(
- DCL_EXECUTOR_MAX_POOL_SIZE_PROP, DEFAULT_DCL_EXECUTOR_MAX_POOL_SIZE);
+ int dclExecutorMaxQueueSize = actualProperties.getMaxDataChangeExecutorQueueSize();
+ int dclExecutorMaxPoolSize = actualProperties.getMaxDataChangeExecutorPoolSize();
ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool(
dclExecutorMaxPoolSize, dclExecutorMaxQueueSize, name + "-DCL" );
InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name,
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()),
- dataChangeListenerExecutor);
+ dataChangeListenerExecutor, actualProperties.getMaxDataChangeListenerQueueSize());
if(schemaService != null) {
schemaService.registerSchemaContextListener(dataStore);
when "/config:modules/config:module/config:type = 'inmemory-config-datastore-provider'";
container schema-service {
- uses config:service-ref {
+ uses config:service-ref {
refine type {
mandatory false;
config:required-identity sal:schema-service;
}
- }
+ }
+ }
+
+ leaf max-data-change-executor-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for the data change notification executor.";
+ }
+
+ leaf max-data-change-executor-pool-size {
+ default 20;
+ type uint16;
+ description "The maximum thread pool size for the data change notification executor.";
+ }
+
+ leaf max-data-change-listener-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for the data change listeners.";
}
}
}
+ // Augments the 'configuration' choice node under modules/module.
+ augment "/config:modules/config:module/config:configuration" {
+ case inmemory-operational-datastore-provider {
+ when "/config:modules/config:module/config:type = 'inmemory-operational-datastore-provider'";
+ // Yang does not allow two cases from same namespaces with same children
+ // Schema-service dependency renamed to operational-schema-service
+ // to prevent conflict with schema-service container from inmemory-config-datastore-provider
+ container operational-schema-service {
+ uses config:service-ref {
+ refine type {
+ mandatory false;
+ config:required-identity sal:schema-service;
+ }
+ }
+ }
+
+ leaf max-data-change-executor-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for the data change notification executor.";
+ }
- // Augments the 'configuration' choice node under modules/module.
- augment "/config:modules/config:module/config:configuration" {
- case inmemory-operational-datastore-provider {
- when "/config:modules/config:module/config:type = 'inmemory-operational-datastore-provider'";
+ leaf max-data-change-executor-pool-size {
+ default 20;
+ type uint16;
+ description "The maximum thread pool size for the data change notification executor.";
+ }
- // Yang does not allow two cases from same namespaces with same children
- // Schema-service dependency renamed to operational-schema-service
- // to prevent conflict with schema-service container from inmemory-config-datastore-provider
- container operational-schema-service {
- uses config:service-ref {
- refine type {
- mandatory false;
- config:required-identity sal:schema-service;
- }
- }
- }
+ leaf max-data-change-listener-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for the data change listeners.";
}
}
+ }
}