package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
+
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
/**
- * Contains contextual data for shards.
+ * Contains contextual data for a data store.
*
* @author Thomas Pantelis
*/
private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
private final Duration shardTransactionIdleTimeout;
+ private final int operationTimeoutInSeconds;
+ private final String dataStoreMXBeanType;
public DatastoreContext() {
this.dataStoreProperties = null;
+ this.dataStoreMXBeanType = "DistributedDatastore";
this.shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
+ this.operationTimeoutInSeconds = 5;
}
- public DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
- Duration shardTransactionIdleTimeout) {
+ public DatastoreContext(String dataStoreMXBeanType,
+ InMemoryDOMDataStoreConfigProperties dataStoreProperties,
+ Duration shardTransactionIdleTimeout,
+ int operationTimeoutInSeconds) {
+ this.dataStoreMXBeanType = dataStoreMXBeanType;
this.dataStoreProperties = Preconditions.checkNotNull(dataStoreProperties);
- this.shardTransactionIdleTimeout = Preconditions.checkNotNull(shardTransactionIdleTimeout);
+ this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
+ this.operationTimeoutInSeconds = operationTimeoutInSeconds;
}
public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
return shardTransactionIdleTimeout;
}
+ public String getDataStoreMXBeanType() {
+ return dataStoreMXBeanType;
+ }
+ public int getOperationTimeoutInSeconds() {
+ return operationTimeoutInSeconds;
+ }
}
package org.opendaylight.controller.cluster.datastore;
-import java.util.concurrent.TimeUnit;
-
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
-
/**
*
*/
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
private final ActorContext actorContext;
- private final DatastoreContext datastoreContext;
public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
- Configuration configuration, DistributedDataStoreProperties dataStoreProperties) {
+ Configuration configuration, DatastoreContext datastoreContext) {
Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
Preconditions.checkNotNull(type, "type should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
-
+ Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null");
String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
LOG.info("Creating ShardManager : {}", shardManagerId);
- datastoreContext = new DatastoreContext(InMemoryDOMDataStoreConfigProperties.create(
- dataStoreProperties.getMaxShardDataChangeExecutorPoolSize(),
- dataStoreProperties.getMaxShardDataChangeExecutorQueueSize(),
- dataStoreProperties.getMaxShardDataChangeListenerQueueSize()),
- Duration.create(dataStoreProperties.getShardTransactionIdleTimeoutInMinutes(),
- TimeUnit.MINUTES));
+ actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
+ ShardManager.props(type, cluster, configuration, datastoreContext)
+ .withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
- actorContext
- = new ActorContext(
- actorSystem, actorSystem.actorOf(
- ShardManager.props(type, cluster, configuration, datastoreContext).
- withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
-
- actorContext.setOperationTimeout(dataStoreProperties.getOperationTimeoutInSeconds());
+ actorContext.setOperationTimeout(datastoreContext.getOperationTimeoutInSeconds());
}
public DistributedDataStore(ActorContext actorContext) {
this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
- this.datastoreContext = new DatastoreContext();
}
-
@SuppressWarnings("unchecked")
@Override
public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
public class DistributedDataStoreFactory {
public static DistributedDataStore createInstance(String name, SchemaService schemaService,
- DistributedDataStoreProperties dataStoreProperties, BundleContext bundleContext) {
+ DatastoreContext datastoreContext, BundleContext bundleContext) {
ActorSystem actorSystem = ActorSystemFactory.createInstance(bundleContext);
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
final DistributedDataStore dataStore =
new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
- config, dataStoreProperties );
+ config, datastoreContext );
ShardStrategyFactory.setConfiguration(config);
schemaService.registerSchemaContextListener(dataStore);
return dataStore;
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-/**
- * Wrapper class for DistributedDataStore configuration properties.
- *
- * @author Thomas Pantelis
- */
-public class DistributedDataStoreProperties {
- private final int maxShardDataChangeListenerQueueSize;
- private final int maxShardDataChangeExecutorQueueSize;
- private final int maxShardDataChangeExecutorPoolSize;
- private final int shardTransactionIdleTimeoutInMinutes;
- private final int operationTimeoutInSeconds;
-
- public DistributedDataStoreProperties() {
- maxShardDataChangeListenerQueueSize = 1000;
- maxShardDataChangeExecutorQueueSize = 1000;
- maxShardDataChangeExecutorPoolSize = 20;
- shardTransactionIdleTimeoutInMinutes = 10;
- operationTimeoutInSeconds = 5;
- }
-
- public DistributedDataStoreProperties(int maxShardDataChangeListenerQueueSize,
- int maxShardDataChangeExecutorQueueSize, int maxShardDataChangeExecutorPoolSize,
- int shardTransactionIdleTimeoutInMinutes, int operationTimeoutInSeconds) {
- this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
- this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
- this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
- this.shardTransactionIdleTimeoutInMinutes = shardTransactionIdleTimeoutInMinutes;
- this.operationTimeoutInSeconds = operationTimeoutInSeconds;
- }
-
- public int getMaxShardDataChangeListenerQueueSize() {
- return maxShardDataChangeListenerQueueSize;
- }
-
- public int getMaxShardDataChangeExecutorQueueSize() {
- return maxShardDataChangeExecutorQueueSize;
- }
-
- public int getMaxShardDataChangeExecutorPoolSize() {
- return maxShardDataChangeExecutorPoolSize;
- }
-
- public int getShardTransactionIdleTimeoutInMinutes() {
- return shardTransactionIdleTimeoutInMinutes;
- }
-
- public int getOperationTimeoutInSeconds() {
- return operationTimeoutInSeconds;
- }
-}
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
datastoreContext.getDataStoreProperties());
- shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
-
+ shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
+ datastoreContext.getDataStoreMXBeanType());
+ shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
+ shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
}
return getContext().actorOf(
ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
- schemaContext,datastoreContext, name.toString()), transactionId.toString());
+ schemaContext,datastoreContext, shardMBean), transactionId.toString());
} else if (createTransaction.getTransactionType()
== TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
return getContext().actorOf(
ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
- schemaContext, datastoreContext,name.toString()), transactionId.toString());
+ schemaContext, datastoreContext, shardMBean), transactionId.toString());
} else if (createTransaction.getTransactionType()
return getContext().actorOf(
ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(),
- schemaContext, datastoreContext, name.toString()), transactionId.toString());
+ schemaContext, datastoreContext, shardMBean), transactionId.toString());
} else {
throw new IllegalArgumentException(
"Shard="+name + ":CreateTransaction message has unidentified transaction type="
public void onSuccess(Void v) {
sender.tell(new CommitTransactionReply().toSerializable(), self);
shardMBean.incrementCommittedTransactionCount();
- shardMBean.setLastCommittedTransactionTime(new Date());
+ shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
}
@Override
private void createTransactionChain() {
DOMStoreTransactionChain chain = store.createTransactionChain();
ActorRef transactionChain = getContext().actorOf(
- ShardTransactionChain.props(chain, schemaContext, datastoreContext,name.toString() ));
+ ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean));
getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
getSelf());
}
localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
}
- mBean = ShardManagerInfo
- .createShardManagerMBean("shard-manager-" + this.type, localShardActorNames);
-
+ mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+ datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
}
/**
import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
private final DOMStoreReadTransaction transaction;
public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext,String shardName) {
- super(shardActor, schemaContext, shardName);
+ SchemaContext schemaContext, ShardStats shardStats) {
+ super(shardActor, schemaContext, shardStats);
this.transaction = transaction;
}
import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
private final DOMStoreReadWriteTransaction transaction;
public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext,String shardName) {
- super(shardActor, schemaContext, shardName);
+ SchemaContext schemaContext, ShardStats shardStats) {
+ super(shardActor, schemaContext, shardStats);
this.transaction = transaction;
}
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.japi.Creator;
+
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
+
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
private final ActorRef shardActor;
protected final SchemaContext schemaContext;
- private final String shardName;
-
+ private final ShardStats shardStats;
private final MutableCompositeModification modification = new MutableCompositeModification();
protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
- String shardName) {
+ ShardStats shardStats) {
this.shardActor = shardActor;
this.schemaContext = schemaContext;
- this.shardName = shardName;
+ this.shardStats = shardStats;
}
public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext,DatastoreContext datastoreContext, String shardName) {
+ SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats) {
return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
- datastoreContext, shardName));
+ datastoreContext, shardStats));
}
protected abstract DOMStoreTransaction getDOMStoreTransaction();
sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
}
} catch (Exception e) {
- ShardMBeanFactory.getShardStatsMBean(shardName).incrementFailedReadTransactionsCount();
+ shardStats.incrementFailedReadTransactionsCount();
sender.tell(new akka.actor.Status.Failure(e), self);
}
protected void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) {
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
ActorRef cohortActor = getContext().actorOf(
- ThreePhaseCommitCohort.props(cohort, shardActor, modification, shardName), "cohort");
+ ThreePhaseCommitCohort.props(cohort, shardActor, modification, shardStats), "cohort");
getSender()
.tell(new ReadyTransactionReply(cohortActor.path()).toSerializable(), getSelf());
final ActorRef shardActor;
final SchemaContext schemaContext;
final DatastoreContext datastoreContext;
- final String shardName;
+ final ShardStats shardStats;
ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext, DatastoreContext datastoreContext, String shardName) {
+ SchemaContext schemaContext, DatastoreContext datastoreContext,
+ ShardStats shardStats) {
this.transaction = transaction;
this.shardActor = shardActor;
- this.shardName = shardName;
+ this.shardStats = shardStats;
this.schemaContext = schemaContext;
this.datastoreContext = datastoreContext;
}
ShardTransaction tx;
if(transaction instanceof DOMStoreReadWriteTransaction) {
tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
- shardActor, schemaContext, shardName);
+ shardActor, schemaContext, shardStats);
} else if(transaction instanceof DOMStoreReadTransaction) {
tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
- schemaContext, shardName);
+ schemaContext, shardStats);
} else {
tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
- shardActor, schemaContext, shardName);
+ shardActor, schemaContext, shardStats);
}
tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
import akka.actor.Props;
import akka.japi.Creator;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
private final DOMStoreTransactionChain chain;
private final DatastoreContext datastoreContext;
private final SchemaContext schemaContext;
- private final String shardName;
+ private final ShardStats shardStats;
public ShardTransactionChain(DOMStoreTransactionChain chain, SchemaContext schemaContext,
- DatastoreContext datastoreContext,String shardName) {
+ DatastoreContext datastoreContext, ShardStats shardStats) {
this.chain = chain;
this.datastoreContext = datastoreContext;
this.schemaContext = schemaContext;
- this.shardName = shardName;
+ this.shardStats = shardStats;
}
@Override
TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
return getContext().actorOf(
ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
- schemaContext, datastoreContext,shardName), transactionId);
+ schemaContext, datastoreContext, shardStats), transactionId);
} else if (createTransaction.getTransactionType() ==
TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
return getContext().actorOf(
ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
- schemaContext, datastoreContext,shardName), transactionId);
+ schemaContext, datastoreContext, shardStats), transactionId);
} else if (createTransaction.getTransactionType() ==
TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
return getContext().actorOf(
ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
- schemaContext, datastoreContext,shardName), transactionId);
+ schemaContext, datastoreContext, shardStats), transactionId);
} else {
throw new IllegalArgumentException (
"CreateTransaction message has unidentified transaction type=" +
}
public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext,
- DatastoreContext datastoreContext, String shardName) {
- return Props.create(new ShardTransactionChainCreator(chain, schemaContext, datastoreContext, shardName));
+ DatastoreContext datastoreContext, ShardStats shardStats) {
+ return Props.create(new ShardTransactionChainCreator(chain, schemaContext,
+ datastoreContext, shardStats));
}
private static class ShardTransactionChainCreator implements Creator<ShardTransactionChain> {
final DOMStoreTransactionChain chain;
final DatastoreContext datastoreContext;
final SchemaContext schemaContext;
- final String shardName;
+ final ShardStats shardStats;
ShardTransactionChainCreator(DOMStoreTransactionChain chain, SchemaContext schemaContext,
- DatastoreContext datastoreContext, String shardName) {
+ DatastoreContext datastoreContext, ShardStats shardStats) {
this.chain = chain;
this.datastoreContext = datastoreContext;
this.schemaContext = schemaContext;
- this.shardName = shardName;
+ this.shardStats = shardStats;
}
@Override
public ShardTransactionChain create() throws Exception {
- return new ShardTransactionChain(chain, schemaContext, datastoreContext,shardName);
+ return new ShardTransactionChain(chain, schemaContext, datastoreContext, shardStats);
}
}
}
import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
private final DOMStoreWriteTransaction transaction;
public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext,String shardName) {
- super(shardActor, schemaContext, shardName);
+ SchemaContext schemaContext, ShardStats shardStats) {
+ super(shardActor, schemaContext, shardStats);
this.transaction = transaction;
}
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
private final DOMStoreThreePhaseCommitCohort cohort;
private final ActorRef shardActor;
private final CompositeModification modification;
- private final String shardName;
+ private final ShardStats shardStats;
public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort,
- ActorRef shardActor, CompositeModification modification,String shardName) {
+ ActorRef shardActor, CompositeModification modification, ShardStats shardStats) {
this.cohort = cohort;
this.shardActor = shardActor;
this.modification = modification;
- this.shardName = shardName;
+ this.shardStats = shardStats;
}
private final LoggingAdapter log =
Logging.getLogger(getContext().system(), this);
public static Props props(final DOMStoreThreePhaseCommitCohort cohort,
- final ActorRef shardActor, final CompositeModification modification,
- String shardName) {
+ final ActorRef shardActor, final CompositeModification modification,
+ ShardStats shardStats) {
return Props.create(new ThreePhaseCommitCohortCreator(cohort, shardActor, modification,
- shardName));
+ shardStats));
}
@Override
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
public void onSuccess(Void v) {
- ShardMBeanFactory.getShardStatsMBean(shardName).incrementAbortTransactionsCount();
+ shardStats.incrementAbortTransactionsCount();
sender
.tell(new AbortTransactionReply().toSerializable(),
self);
final DOMStoreThreePhaseCommitCohort cohort;
final ActorRef shardActor;
final CompositeModification modification;
- final String shardName;
+ final ShardStats shardStats;
ThreePhaseCommitCohortCreator(DOMStoreThreePhaseCommitCohort cohort,
- ActorRef shardActor, CompositeModification modification, String shardName) {
+ ActorRef shardActor, CompositeModification modification, ShardStats shardStats) {
this.cohort = cohort;
this.shardActor = shardActor;
this.modification = modification;
- this.shardName = shardName;
+ this.shardStats = shardStats;
}
@Override
public ThreePhaseCommitCohort create() throws Exception {
- return new ThreePhaseCommitCohort(cohort, shardActor, modification, shardName);
+ return new ThreePhaseCommitCohort(cohort, shardActor, modification, shardStats);
}
}
}
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-
-package org.opendaylight.controller.cluster.datastore.jmx.mbeans;
-
-
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanRegistrationException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-
-/**
- * All MBeans should extend this class that help in registering and
- * unregistering the MBeans.
- * @author Basheeruddin <syedbahm@cisco.com>
- */
-
-
-public abstract class AbstractBaseMBean {
-
-
- public static String BASE_JMX_PREFIX = "org.opendaylight.controller:";
- public static String JMX_TYPE_DISTRIBUTED_DATASTORE = "DistributedDatastore";
- public static String JMX_CATEGORY_SHARD = "Shard";
- public static String JMX_CATEGORY_SHARD_MANAGER = "ShardManager";
-
- private static final Logger LOG = LoggerFactory
- .getLogger(AbstractBaseMBean.class);
-
- MBeanServer server = ManagementFactory.getPlatformMBeanServer();
- /**
- * gets the MBean ObjectName
- *
- * @return Object name of the MBean
- * @throws MalformedObjectNameException - The bean name does not have the right format.
- * @throws NullPointerException - The bean name is null
- */
- protected ObjectName getMBeanObjectName()
- throws MalformedObjectNameException, NullPointerException {
- String name = BASE_JMX_PREFIX + "type="+getMBeanType()+",Category="+
- getMBeanCategory() + ",name="+
- getMBeanName();
-
-
- return new ObjectName(name);
- }
-
- public boolean registerMBean() {
- boolean registered = false;
- try {
- // Object to identify MBean
- final ObjectName mbeanName = this.getMBeanObjectName();
-
- Preconditions.checkArgument(mbeanName != null,
- "Object name of the MBean cannot be null");
-
- LOG.debug("Register MBean {}", mbeanName);
-
- // unregistered if already registered
- if (server.isRegistered(mbeanName)) {
-
- LOG.debug("MBean {} found to be already registered", mbeanName);
-
- try {
- unregisterMBean(mbeanName);
- } catch (Exception e) {
-
- LOG.warn("unregister mbean {} resulted in exception {} ", mbeanName,
- e);
- }
- }
- server.registerMBean(this, mbeanName);
-
- LOG.debug("MBean {} registered successfully",
- mbeanName.getCanonicalName());
- registered = true;
- } catch (Exception e) {
-
- LOG.error("registration failed {}", e);
-
- }
- return registered;
- }
-
-
- public boolean unregisterMBean() {
- boolean unregister = false;
- try {
- ObjectName mbeanName = this.getMBeanObjectName();
- unregister = true;
- unregisterMBean(mbeanName);
- } catch (Exception e) {
-
- LOG.error("Failed when unregistering MBean {}", e);
- }
- return unregister;
- }
-
- private void unregisterMBean(ObjectName mbeanName)
- throws MBeanRegistrationException, InstanceNotFoundException {
-
- server.unregisterMBean(mbeanName);
-
- }
-
-
- /**
- * @return name of bean
- */
- protected abstract String getMBeanName();
-
- /**
- * @return type of the MBean
- */
- protected abstract String getMBeanType();
-
-
- /**
- * @return Category name of teh bean
- */
- protected abstract String getMBeanCategory();
-
- //require for test cases
- public MBeanServer getMBeanServer() {
- return server;
- }
-}
*/
package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
/**
* @author Basheeruddin syedbahm@cisco.com
*
*/
public class ShardMBeanFactory {
- private static Map<String, ShardStats> shardMBeans =
- new HashMap<String, ShardStats>();
- public static ShardStats getShardStatsMBean(String shardName) {
- if (shardMBeans.containsKey(shardName)) {
- return shardMBeans.get(shardName);
- } else {
- ShardStats shardStatsMBeanImpl = new ShardStats(shardName);
+ private static final Logger LOG = LoggerFactory.getLogger(ShardMBeanFactory.class);
- if (shardStatsMBeanImpl.registerMBean()) {
- shardMBeans.put(shardName, shardStatsMBeanImpl);
- }
- return shardStatsMBeanImpl;
+ private static Cache<String,ShardStats> shardMBeansCache =
+ CacheBuilder.newBuilder().weakValues().build();
+
+ public static ShardStats getShardStatsMBean(final String shardName, final String mxBeanType) {
+ final String finalMXBeanType = mxBeanType != null ? mxBeanType : "DistDataStore";
+ try {
+ return shardMBeansCache.get(shardName, new Callable<ShardStats>() {
+ @Override
+ public ShardStats call() throws Exception {
+ ShardStats shardStatsMBeanImpl = new ShardStats(shardName, finalMXBeanType);
+ shardStatsMBeanImpl.registerMBean();
+ return shardStatsMBeanImpl;
+ }
+ });
+ } catch(ExecutionException e) {
+ LOG.error(String.format("Could not create MXBean for shard: %s", shardName), e);
+ // Just return an instance that isn't registered.
+ return new ShardStats(shardName, finalMXBeanType);
}
}
-
}
package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl;
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats;
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
+import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
+ * Maintains statistics for a shard.
+ *
* @author Basheeruddin syedbahm@cisco.com
*/
-public class ShardStats extends AbstractBaseMBean implements ShardStatsMBean {
+public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
+ public static String JMX_CATEGORY_SHARD = "Shards";
- private final String shardName;
+ private final AtomicLong committedTransactionsCount = new AtomicLong();
- private long committedTransactionsCount = 0L;
+ private final AtomicLong readOnlyTransactionCount = new AtomicLong();
- private long readOnlyTransactionCount = 0L;
+ private final AtomicLong writeOnlyTransactionCount = new AtomicLong();
- private long writeOnlyTransactionCount = 0L;
-
- private long readWriteTransactionCount = 0L;
+ private final AtomicLong readWriteTransactionCount = new AtomicLong();
private String leader;
private String raftState;
- private long lastLogTerm = -1L;
+ private volatile long lastLogTerm = -1L;
+
+ private volatile long lastLogIndex = -1L;
- private long lastLogIndex = -1L;
+ private volatile long currentTerm = -1L;
- private long currentTerm = -1L;
+ private volatile long commitIndex = -1L;
- private long commitIndex = -1L;
+ private volatile long lastApplied = -1L;
- private long lastApplied = -1L;
+ private volatile long lastCommittedTransactionTime;
- private Date lastCommittedTransactionTime = new Date(0L);
+ private final AtomicLong failedTransactionsCount = new AtomicLong();
- private long failedTransactionsCount = 0L;
+ private final AtomicLong failedReadTransactionsCount = new AtomicLong();
- private long failedReadTransactionsCount = 0L;
+ private final AtomicLong abortTransactionsCount = new AtomicLong();
- private long abortTransactionsCount = 0L;
+ private ThreadExecutorStatsMXBeanImpl notificationExecutorStatsBean;
- private SimpleDateFormat sdf =
+ private ThreadExecutorStatsMXBeanImpl dataStoreExecutorStatsBean;
+
+ private QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
+
+ private final SimpleDateFormat sdf =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- ShardStats(String shardName) {
- this.shardName = shardName;
+ public ShardStats(String shardName, String mxBeanType) {
+ super(shardName, mxBeanType, JMX_CATEGORY_SHARD);
+ }
+
+ public void setDataStoreExecutor(ExecutorService dsExecutor) {
+ this.dataStoreExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(dsExecutor,
+ "notification-executor", getMBeanType(), getMBeanCategory());
}
+ public void setNotificationManager(QueuedNotificationManager<?, ?> manager) {
+ this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
+ "notification-manager", getMBeanType(), getMBeanCategory());
+
+ this.notificationExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(manager.getExecutor(),
+ "data-store-executor", getMBeanType(), getMBeanCategory());
+ }
@Override
public String getShardName() {
- return shardName;
+ return getMBeanName();
}
@Override
public long getCommittedTransactionsCount() {
- return committedTransactionsCount;
+ return committedTransactionsCount.get();
}
- @Override public String getLeader() {
+ @Override
+ public String getLeader() {
return leader;
}
- @Override public String getRaftState() {
+ @Override
+ public String getRaftState() {
return raftState;
}
- @Override public long getReadOnlyTransactionCount() {
- return readOnlyTransactionCount;
+ @Override
+ public long getReadOnlyTransactionCount() {
+ return readOnlyTransactionCount.get();
}
- @Override public long getWriteOnlyTransactionCount() {
- return writeOnlyTransactionCount;
+ @Override
+ public long getWriteOnlyTransactionCount() {
+ return writeOnlyTransactionCount.get();
}
- @Override public long getReadWriteTransactionCount() {
- return readWriteTransactionCount;
+ @Override
+ public long getReadWriteTransactionCount() {
+ return readWriteTransactionCount.get();
}
- @Override public long getLastLogIndex() {
+ @Override
+ public long getLastLogIndex() {
return lastLogIndex;
}
- @Override public long getLastLogTerm() {
+ @Override
+ public long getLastLogTerm() {
return lastLogTerm;
}
- @Override public long getCurrentTerm() {
+ @Override
+ public long getCurrentTerm() {
return currentTerm;
}
- @Override public long getCommitIndex() {
+ @Override
+ public long getCommitIndex() {
return commitIndex;
}
- @Override public long getLastApplied() {
+ @Override
+ public long getLastApplied() {
return lastApplied;
}
@Override
public String getLastCommittedTransactionTime() {
- return sdf.format(lastCommittedTransactionTime);
+ return sdf.format(new Date(lastCommittedTransactionTime));
}
- @Override public long getFailedTransactionsCount() {
- return failedTransactionsCount;
+ @Override
+ public long getFailedTransactionsCount() {
+ return failedTransactionsCount.get();
}
- @Override public long getFailedReadTransactionsCount() {
- return failedReadTransactionsCount;
+ @Override
+ public long getFailedReadTransactionsCount() {
+ return failedReadTransactionsCount.get();
}
- @Override public long getAbortTransactionsCount() {
- return abortTransactionsCount;
+ @Override
+ public long getAbortTransactionsCount() {
+ return abortTransactionsCount.get();
}
public long incrementCommittedTransactionCount() {
- return committedTransactionsCount++;
+ return committedTransactionsCount.incrementAndGet();
}
public long incrementReadOnlyTransactionCount() {
- return readOnlyTransactionCount++;
+ return readOnlyTransactionCount.incrementAndGet();
}
public long incrementWriteOnlyTransactionCount() {
- return writeOnlyTransactionCount++;
+ return writeOnlyTransactionCount.incrementAndGet();
}
public long incrementReadWriteTransactionCount() {
- return readWriteTransactionCount++;
+ return readWriteTransactionCount.incrementAndGet();
}
public long incrementFailedTransactionsCount() {
- return failedTransactionsCount++;
+ return failedTransactionsCount.incrementAndGet();
}
public long incrementFailedReadTransactionsCount() {
- return failedReadTransactionsCount++;
+ return failedReadTransactionsCount.incrementAndGet();
}
- public long incrementAbortTransactionsCount () { return abortTransactionsCount++;}
+ public long incrementAbortTransactionsCount ()
+ {
+ return abortTransactionsCount.incrementAndGet();
+ }
public void setLeader(String leader) {
this.leader = leader;
this.lastApplied = lastApplied;
}
-
- public void setLastCommittedTransactionTime(
- Date lastCommittedTransactionTime) {
+ public void setLastCommittedTransactionTime(long lastCommittedTransactionTime) {
this.lastCommittedTransactionTime = lastCommittedTransactionTime;
}
@Override
- protected String getMBeanName() {
- return shardName;
+ public ThreadExecutorStats getDataStoreExecutorStats() {
+ return dataStoreExecutorStatsBean.toThreadExecutorStats();
+ }
+
+ @Override
+ public ThreadExecutorStats getNotificationMgrExecutorStats() {
+ return notificationExecutorStatsBean.toThreadExecutorStats();
}
@Override
- protected String getMBeanType() {
- return JMX_TYPE_DISTRIBUTED_DATASTORE;
+ public List<ListenerNotificationQueueStats> getCurrentNotificationMgrListenerQueueStats() {
+ return notificationManagerStatsBean.getCurrentListenerQueueStats();
}
@Override
- protected String getMBeanCategory() {
- return JMX_CATEGORY_SHARD;
+ public int getMaxNotificationMgrListenerQueueSize() {
+ return notificationManagerStatsBean.getMaxListenerQueueSize();
}
/**
* resets the counters related to transactions
*/
-
+ @Override
public void resetTransactionCounters(){
- committedTransactionsCount = 0L;
+ committedTransactionsCount.set(0);
- readOnlyTransactionCount = 0L;
+ readOnlyTransactionCount.set(0);
- writeOnlyTransactionCount = 0L;
+ writeOnlyTransactionCount.set(0);
- readWriteTransactionCount = 0L;
+ readWriteTransactionCount.set(0);
- lastCommittedTransactionTime = new Date(0L);
+ lastCommittedTransactionTime = 0;
- failedTransactionsCount = 0L;
+ failedTransactionsCount.set(0);
- failedReadTransactionsCount = 0L;
+ failedReadTransactionsCount.set(0);
- abortTransactionsCount = 0L;
+ abortTransactionsCount.set(0);
}
-
-
}
-package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
-
-/**
- * @author: syedbahm
- */
-public interface ShardStatsMBean {
- String getShardName();
-
- long getCommittedTransactionsCount();
-
- String getLeader();
-
- String getRaftState();
-
- long getReadOnlyTransactionCount();
-
- long getWriteOnlyTransactionCount();
-
- long getReadWriteTransactionCount();
-
- long getLastLogIndex();
-
- long getLastLogTerm();
-
- long getCurrentTerm();
-
- long getCommitIndex();
-
- long getLastApplied();
-
- String getLastCommittedTransactionTime();
-
- long getFailedTransactionsCount();
-
- long getFailedReadTransactionsCount();
-
- long getAbortTransactionsCount();
-
- void resetTransactionCounters();
-
-}
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
+
+import java.util.List;
+
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats;
+import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
+
+/**
+ * @author: syedbahm
+ */
+public interface ShardStatsMXBean {
+
+ String getShardName();
+
+ long getCommittedTransactionsCount();
+
+ long getReadOnlyTransactionCount();
+
+ long getWriteOnlyTransactionCount();
+
+ long getReadWriteTransactionCount();
+
+ long getLastLogIndex();
+
+ long getLastLogTerm();
+
+ long getCurrentTerm();
+
+ long getCommitIndex();
+
+ long getLastApplied();
+
+ String getLastCommittedTransactionTime();
+
+ long getFailedTransactionsCount();
+
+ long getAbortTransactionsCount();
+
+ long getFailedReadTransactionsCount();
+
+ String getLeader();
+
+ String getRaftState();
+
+ ThreadExecutorStats getDataStoreExecutorStats();
+
+ ThreadExecutorStats getNotificationMgrExecutorStats();
+
+ List<ListenerNotificationQueueStats> getCurrentNotificationMgrListenerQueueStats();
+
+ int getMaxNotificationMgrListenerQueueSize();
+
+ void resetTransactionCounters();
+}
package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
-
import java.util.List;
-public class ShardManagerInfo extends AbstractBaseMBean implements
- ShardManagerInfoMBean {
-
- private final String name;
- private final List<String> localShards;
-
- public ShardManagerInfo(String name, List<String> localShards) {
- this.name = name;
- this.localShards = localShards;
- }
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfoMBean {
- @Override protected String getMBeanName() {
- return name;
- }
+ public static String JMX_CATEGORY_SHARD_MANAGER = "ShardManager";
- @Override protected String getMBeanType() {
- return JMX_TYPE_DISTRIBUTED_DATASTORE;
- }
+ private final List<String> localShards;
- @Override protected String getMBeanCategory() {
- return JMX_CATEGORY_SHARD_MANAGER;
+ public ShardManagerInfo(String name, String mxBeanType, List<String> localShards) {
+ super(name, mxBeanType, JMX_CATEGORY_SHARD_MANAGER);
+ this.localShards = localShards;
}
- public static ShardManagerInfo createShardManagerMBean(String name, List<String> localShards){
- ShardManagerInfo shardManagerInfo = new ShardManagerInfo(name,
- localShards);
+ public static ShardManagerInfo createShardManagerMBean(String name, String mxBeanType,
+ List<String> localShards){
+ ShardManagerInfo shardManagerInfo = new ShardManagerInfo(name, mxBeanType, localShards);
shardManagerInfo.registerMBean();
return shardManagerInfo;
}
- @Override public List<String> getLocalShards() {
+ @Override
+ public List<String> getLocalShards() {
return localShards;
}
}
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.osgi.framework.BundleContext;
+import scala.concurrent.duration.Duration;
+
public class DistributedConfigDataStoreProviderModule extends
org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModule {
private BundleContext bundleContext;
props = new ConfigProperties();
}
- return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
- new DistributedDataStoreProperties(
+ DatastoreContext datastoreContext = new DatastoreContext("DistributedConfigDatastore",
+ InMemoryDOMDataStoreConfigProperties.create(
props.getMaxShardDataChangeExecutorPoolSize().getValue(),
props.getMaxShardDataChangeExecutorQueueSize().getValue(),
props.getMaxShardDataChangeListenerQueueSize().getValue(),
- props.getShardTransactionIdleTimeoutInMinutes().getValue(),
- props.getOperationTimeoutInSeconds().getValue()), bundleContext);
+ props.getMaxShardDataStoreExecutorQueueSize().getValue()),
+ Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+ TimeUnit.MINUTES),
+ props.getOperationTimeoutInSeconds().getValue());
+
+ return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
+ datastoreContext, bundleContext);
}
public void setBundleContext(BundleContext bundleContext) {
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.osgi.framework.BundleContext;
+import scala.concurrent.duration.Duration;
+
public class DistributedOperationalDataStoreProviderModule extends
org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModule {
private BundleContext bundleContext;
props = new OperationalProperties();
}
- return DistributedDataStoreFactory.createInstance("operational",
- getOperationalSchemaServiceDependency(),
- new DistributedDataStoreProperties(
+ DatastoreContext datastoreContext = new DatastoreContext("DistributedOperationalDatastore",
+ InMemoryDOMDataStoreConfigProperties.create(
props.getMaxShardDataChangeExecutorPoolSize().getValue(),
props.getMaxShardDataChangeExecutorQueueSize().getValue(),
props.getMaxShardDataChangeListenerQueueSize().getValue(),
- props.getShardTransactionIdleTimeoutInMinutes().getValue(),
- props.getOperationTimeoutInSeconds().getValue()), bundleContext);
+ props.getMaxShardDataStoreExecutorQueueSize().getValue()),
+ Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+ TimeUnit.MINUTES),
+ props.getOperationTimeoutInSeconds().getValue());
+
+ return DistributedDataStoreFactory.createInstance("operational",
+ getOperationalSchemaServiceDependency(), datastoreContext, bundleContext);
}
public void setBundleContext(BundleContext bundleContext) {
type non-zero-uint16-type;
description "The maximum queue size for each shard's data store data change listeners.";
}
-
+
+ leaf max-shard-data-store-executor-queue-size {
+ default 5000;
+ type non-zero-uint16-type;
+ description "The maximum queue size for each shard's data store executor.";
+ }
+
leaf shard-transaction-idle-timeout-in-minutes {
default 10;
type non-zero-uint16-type;
final DistributedDataStore distributedDataStore =
new DistributedDataStore(getSystem(), "config",
new MockClusterWrapper(), configuration,
- new DistributedDataStoreProperties());
+ new DatastoreContext());
distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
new DistributedDataStore(actorSystem, "config",
mock(ClusterWrapper.class), mock(Configuration.class),
- new DistributedDataStoreProperties());
+ new DatastoreContext());
verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config"));
}
import org.junit.BeforeClass;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
private static final String mockShardName = "mockShardName";
+ private final ShardStats shardStats = new ShardStats(mockShardName, "DataStore");
+
@BeforeClass
public static void staticSetup() {
store.onGlobalContextUpdated(testSchemaContext);
public void testOnReceiveCreateTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
final Props props = ShardTransactionChain.props(store.createTransactionChain(),
- testSchemaContext, DATA_STORE_CONTEXT, mockShardName);
+ testSchemaContext, DATA_STORE_CONTEXT, shardStats);
final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction");
new Within(duration("1 seconds")) {
public void testOnReceiveCloseTransactionChain() throws Exception {
new JavaTestKit(getSystem()) {{
final Props props = ShardTransactionChain.props(store.createTransactionChain(),
- testSchemaContext, DATA_STORE_CONTEXT,mockShardName );
+ testSchemaContext, DATA_STORE_CONTEXT, shardStats );
final ActorRef subject = getSystem().actorOf(props, "testCloseTransactionChain");
new Within(duration("1 seconds")) {
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
private final DatastoreContext datastoreContext = new DatastoreContext();
+ private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
+
@BeforeClass
public static void staticSetup() {
store.onGlobalContextUpdated(testSchemaContext);
final ActorRef shard =
getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard =
getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard =
getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard =
getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard =
getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard =
getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props, "testNegativeMergeTransactionReady");
final ActorRef shard =
getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
private DatastoreContext datastoreContext = new DatastoreContext();
+ private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
+
@BeforeClass
public static void staticSetup() {
store.onGlobalContextUpdated(testSchemaContext);
final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
Collections.EMPTY_MAP, new DatastoreContext()));
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject = getSystem().actorOf(props, "testReadData");
new Within(duration("1 seconds")) {
final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
Collections.EMPTY_MAP, new DatastoreContext()));
final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
new Within(duration("1 seconds")) {
final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
Collections.EMPTY_MAP, new DatastoreContext()));
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
new Within(duration("1 seconds")) {
final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
Collections.EMPTY_MAP, new DatastoreContext()));
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
new Within(duration("1 seconds")) {
final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
Collections.EMPTY_MAP, new DatastoreContext()));
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject =
getSystem().actorOf(props, "testWriteData");
final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
Collections.EMPTY_MAP, new DatastoreContext()));
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject =
getSystem().actorOf(props, "testMergeData");
final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
Collections.EMPTY_MAP, new DatastoreContext()));
final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject =
getSystem().actorOf(props, "testDeleteData");
final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
Collections.EMPTY_MAP, new DatastoreContext()));
final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject =
getSystem().actorOf(props, "testReadyTransaction");
final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
Collections.EMPTY_MAP, new DatastoreContext()));
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject =
getSystem().actorOf(props, "testCloseTransaction");
final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
Collections.EMPTY_MAP, new DatastoreContext()));
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final TestActorRef subject = TestActorRef.apply(props,getSystem());
subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
@Test
public void testShardTransactionInactivity() {
- datastoreContext = new DatastoreContext(InMemoryDOMDataStoreConfigProperties.getDefault(),
- Duration.create(500, TimeUnit.MILLISECONDS));
+ datastoreContext = new DatastoreContext("Test",
+ InMemoryDOMDataStoreConfigProperties.getDefault(),
+ Duration.create(500, TimeUnit.MILLISECONDS), 5);
new JavaTestKit(getSystem()) {{
final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
Collections.EMPTY_MAP, new DatastoreContext()));
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject =
getSystem().actorOf(props, "testShardTransactionInactivity");
import org.junit.Test;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
private final DatastoreContext datastoreContext = new DatastoreContext();
+ private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
@BeforeClass
public static void staticSetup() {
final CompositeModification mockComposite =
Mockito.mock(CompositeModification.class);
final Props props =
- ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
+ ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
.create(getSystem(), props,
final CompositeModification mockComposite =
Mockito.mock(CompositeModification.class);
final Props props =
- ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
+ ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
.create(getSystem(), props,
final CompositeModification mockComposite =
Mockito.mock(CompositeModification.class);
final Props props =
- ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
+ ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shardTransaction =
getSystem().actorOf(ShardTransaction.props(store.newReadWriteTransaction(), subject,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()));
+ testSchemaContext, datastoreContext, shardStats));
ShardTransactionMessages.WriteData writeData =
ShardTransactionMessages.WriteData.newBuilder()
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+
+import java.lang.management.ManagementFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
@Before
public void setUp() throws Exception {
- shardStats = new ShardStats("shard-1");
+ shardStats = new ShardStats("shard-1", "DataStore");
shardStats.registerMBean();
- mbeanServer = shardStats.getMBeanServer();
+ mbeanServer = ManagementFactory.getPlatformMBeanServer();
String objectName =
- AbstractBaseMBean.BASE_JMX_PREFIX + "type=" + shardStats
+ AbstractMXBean.BASE_JMX_PREFIX + "type=" + shardStats
.getMBeanType() + ",Category=" +
shardStats.getMBeanCategory() + ",name=" +
shardStats.getMBeanName();
public void testGetShardName() throws Exception {
Object attribute = mbeanServer.getAttribute(testMBeanName, "ShardName");
- Assert.assertEquals((String) attribute, "shard-1");
+ Assert.assertEquals(attribute, "shard-1");
}
//now let us get from MBeanServer what is the transaction count.
Object attribute = mbeanServer.getAttribute(testMBeanName,
"CommittedTransactionsCount");
- Assert.assertEquals((Long) attribute, (Long) 3L);
+ Assert.assertEquals(attribute, 3L);
}
Assert.assertEquals(shardStats.getLastCommittedTransactionTime(),
sdf.format(new Date(0L)));
long millis = System.currentTimeMillis();
- shardStats.setLastCommittedTransactionTime(new Date(millis));
+ shardStats.setLastCommittedTransactionTime(millis);
//now let us get from MBeanServer what is the transaction count.
Object attribute = mbeanServer.getAttribute(testMBeanName,
"LastCommittedTransactionTime");
- Assert.assertEquals((String) attribute, sdf.format(new Date(millis)));
- Assert.assertNotEquals((String) attribute,
+ Assert.assertEquals(attribute, sdf.format(new Date(millis)));
+ Assert.assertNotEquals(attribute,
sdf.format(new Date(millis - 1)));
}
//now let us get from MBeanServer what is the transaction count.
Object attribute =
mbeanServer.getAttribute(testMBeanName, "FailedTransactionsCount");
- Assert.assertEquals((Long) attribute, (Long) 2L);
+ Assert.assertEquals(attribute, 2L);
}
@Test
//now let us get from MBeanServer what is the transaction count.
Object attribute =
mbeanServer.getAttribute(testMBeanName, "AbortTransactionsCount");
- Assert.assertEquals((Long) attribute, (Long) 2L);
+ Assert.assertEquals(attribute, 2L);
}
@Test
//now let us get from MBeanServer what is the transaction count.
Object attribute =
mbeanServer.getAttribute(testMBeanName, "FailedReadTransactionsCount");
- Assert.assertEquals((Long) attribute, (Long) 2L);
+ Assert.assertEquals(attribute, 2L);
}
@Test
//now let us get from MBeanServer what is the transaction count.
Object attribute = mbeanServer.getAttribute(testMBeanName,
"CommittedTransactionsCount");
- Assert.assertEquals((Long) attribute, (Long) 3L);
+ Assert.assertEquals(attribute, 3L);
//let us increment FailedReadTransactions count and then check
shardStats.incrementFailedReadTransactionsCount();
//now let us get from MBeanServer what is the transaction count.
attribute =
mbeanServer.getAttribute(testMBeanName, "FailedReadTransactionsCount");
- Assert.assertEquals((Long) attribute, (Long) 2L);
+ Assert.assertEquals(attribute, 2L);
//here we will reset the counters and check the above ones are 0 after reset
//now let us get from MBeanServer what is the transaction count.
attribute = mbeanServer.getAttribute(testMBeanName,
"CommittedTransactionsCount");
- Assert.assertEquals((Long) attribute, (Long) 0L);
+ Assert.assertEquals(attribute, 0L);
attribute =
mbeanServer.getAttribute(testMBeanName, "FailedReadTransactionsCount");
- Assert.assertEquals((Long) attribute, (Long) 0L);
+ Assert.assertEquals(attribute, 0L);
}