<bundle>mvn:org.opendaylight.controller/sal-netconf-connector/${project.version}</bundle>
<bundle>mvn:org.opendaylight.controller.model/model-inventory/${project.version}</bundle>
<bundle>mvn:org.opendaylight.controller/netconf-config-dispatcher/${config.version}</bundle>
+ <configfile finalname='${config.configfile.directory}/${config.netconf.client.configfile}'>mvn:org.opendaylight.controller/netconf-config/${netconf.version}/xml/config</configfile>
+ </feature>
+ <feature name='odl-mdsal-netconf-connector-ssh' version='${project.version}' description="OpenDaylight :: MDSAL :: Netconf Connector + Netconf SSH Server + loopback connection configuration">
+ <feature version='${netconf.version}'>odl-netconf-ssh</feature>
+ <feature version='${project.version}'>odl-mdsal-netconf-connector</feature>
<configfile finalname="${config.configfile.directory}/${config.netconf.connector.configfile}">mvn:org.opendaylight.controller/netconf-connector-config/${netconf.version}/xml/config</configfile>
</feature>
<feature name='odl-restconf' version='${project.version}' description="OpenDaylight :: Restconf">
<groupId>org.opendaylight.controller</groupId>
<artifactId>netconf-auth</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>netconf-tcp</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>netconf-ssh</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcpkix-jdk15on</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk15on</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>ietf-netconf-monitoring</artifactId>
<type>xml</type>
<classifier>config</classifier>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>netconf-connector-config</artifactId>
+ <version>${config.version}</version>
+ <type>xml</type>
+ <classifier>config</classifier>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>netconf-monitoring</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.aaa</groupId>
+ <artifactId>features-aaa</artifactId>
+ <version>${aaa.version}</version>
+ <classifier>features</classifier>
+ <type>xml</type>
+ </dependency>
</dependencies>
<build>
xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.2.0 http://karaf.apache.org/xmlns/features/v1.2.0">
<repository>mvn:org.opendaylight.controller/features-protocol-framework/${protocol-framework.version}/xml/features</repository>
<repository>mvn:org.opendaylight.controller/features-config/${config.version}/xml/features</repository>
+ <repository>mvn:org.opendaylight.aaa/features-aaa/${aaa.version}/xml/features</repository>
+
<feature name='odl-netconf-all' version='${project.version}' description="OpenDaylight :: Netconf :: All">
<feature version='${project.version}'>odl-netconf-api</feature>
<feature version='${project.version}'>odl-netconf-mapping-api</feature>
<feature version='${project.version}'>odl-netconf-util</feature>
<feature version='${project.version}'>odl-netconf-impl</feature>
+ <feature version='${project.version}'>odl-netconf-tcp</feature>
+ <feature version='${project.version}'>odl-netconf-ssh</feature>
<feature version='${project.version}'>odl-config-netconf-connector</feature>
<feature version='${project.version}'>odl-netconf-netty-util</feature>
<feature version='${project.version}'>odl-netconf-client</feature>
<feature version='${project.version}'>odl-netconf-mapping-api</feature>
<feature version='${project.version}'>odl-netconf-util</feature>
<feature version='${project.version}'>odl-netconf-netty-util</feature>
+ <!-- Netconf server without config connector is just an empty shell -->
+ <feature version='${project.version}'>odl-config-netconf-connector</feature>
+ <!-- Netconf will not provide schemas without monitoring -->
+ <feature version='${project.version}'>odl-netconf-monitoring</feature>
<bundle>mvn:org.opendaylight.controller/netconf-impl/${project.version}</bundle>
</feature>
+ <feature name='odl-netconf-ssh' version='${project.version}' description="OpenDaylight :: Netconf :: SSSH">
+ <feature version='${project.version}'>odl-netconf-tcp</feature>
+ <feature version='${aaa.version}'>odl-aaa-authn-plugin</feature>
+ <bundle>mvn:org.opendaylight.controller/netconf-ssh/${project.version}</bundle>
+ <bundle>mvn:org.bouncycastle/bcpkix-jdk15on/${bouncycastle.version}</bundle>
+ <bundle>mvn:org.bouncycastle/bcprov-jdk15on/${bouncycastle.version}</bundle>
+ </feature>
+ <feature name='odl-netconf-tcp' version='${project.version}' description="OpenDaylight :: Netconf :: TCP">
+ <feature version='${project.version}'>odl-netconf-impl</feature>
+ <bundle>mvn:org.opendaylight.controller/netconf-tcp/${project.version}</bundle>
+ </feature>
<feature name='odl-config-netconf-connector' version='${project.version}' description="OpenDaylight :: Netconf :: Connector">
<feature version='${config.version}'>odl-config-manager</feature>
<feature version='${project.version}'>odl-netconf-api</feature>
<feature name='odl-netconf-client' version='${project.version}' description="OpenDaylight :: Netconf :: Client">
<feature version='${project.version}'>odl-netconf-netty-util</feature>
<bundle>mvn:org.opendaylight.controller/netconf-client/${project.version}</bundle>
- <configfile finalname='${config.configfile.directory}/${config.netconf.client.configfile}'>mvn:org.opendaylight.controller/netconf-config/${netconf.version}/xml/config</configfile>
</feature>
<feature name='odl-netconf-monitoring' version='${project.version}' description="OpenDaylight :: Netconf :: Monitoring">
<feature version='${project.version}'>odl-netconf-util</feature>
<concepts.version>0.5.2-SNAPSHOT</concepts.version>
<concurrentlinkedhashmap.version>1.4</concurrentlinkedhashmap.version>
<config.version>0.2.5-SNAPSHOT</config.version>
+ <aaa.version>0.1.0-SNAPSHOT</aaa.version>
<config.configfile.directory>etc/opendaylight/karaf</config.configfile.directory>
<config.clustering.configfile>05-clustering.xml</config.clustering.configfile>
<config.netty.configfile>00-netty.xml</config.netty.configfile>
<artifactId>opendaylight-karaf-resources</artifactId>
<description>Resources for opendaylight-karaf</description>
<packaging>jar</packaging>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.6</version>
+ <executions>
+ <execution>
+ <id>copy</id>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <!-- here the phase you need -->
+ <phase>generate-resources</phase>
+ <configuration>
+ <artifactItems>
+ <!-- Needs to be copied to lib/ext in order to start bouncy provider for mina sshd -->
+ <artifactItem>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk15on</artifactId>
+ <version>${bouncycastle.version}</version>
+ <outputDirectory>target/classes/lib/ext</outputDirectory>
+ <destFileName>bcprov-jdk15on-${bouncycastle.version}.jar</destFileName>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
netconf.ssh.address=0.0.0.0
netconf.ssh.port=1830
netconf.ssh.pk.path = ./configuration/RSA.pk
+# Set security provider to BouncyCastle
+org.apache.karaf.security.providers = org.bouncycastle.jce.provider.BouncyCastleProvider
netconf.config.persister.active=1
<outputDirectory>target/assembly/lib</outputDirectory>
<destFileName>karaf.branding-${branding.version}.jar</destFileName>
</artifactItem>
+ <!-- Needs to be copied to lib/ext in order to start bouncy provider for mina sshd -->
+ <artifactItem>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk15on</artifactId>
+ <version>${bouncycastle.version}</version>
+ <outputDirectory>target/assembly/lib/ext</outputDirectory>
+ <destFileName>bcprov-jdk15on-${bouncycastle.version}.jar</destFileName>
+ </artifactItem>
</artifactItems>
</configuration>
</execution>
final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this);
this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter);
- this.txChain = dataBroker.createTransactionChain(this);
+ this.txChain = dataBroker.createTransactionChain(this);
thread = new Thread(this);
thread.setDaemon(true);
thread.setName("FlowCapableInventoryProvider");
thread.join();
thread = null;
}
- if(txChain != null) {
+ if (txChain != null) {
txChain.close();
txChain = null;
}
@Override
public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
- final Throwable cause) {
- LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.",transaction.getIdentifier(),cause);
+ final Throwable cause) {
+ LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.", transaction.getIdentifier(), cause);
}
import org.opendaylight.yangtools.sal.binding.generator.impl.RuntimeGeneratedMappingServiceImpl;
import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.RpcService;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
/**
*
*/
-public final class RuntimeMappingModule extends
- org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractRuntimeMappingModule {
+public final class RuntimeMappingModule extends AbstractRuntimeMappingModule {
private static final Logger LOG = LoggerFactory.getLogger(RuntimeMappingModule.class);
@Override
public Entry<YangInstanceIdentifier, CompositeNode> toDataDom(
- final Entry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject> entry) {
+ final Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry) {
return delegate.toDataDom(entry);
}
@Override
- public YangInstanceIdentifier toDataDom(
- final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject> path) {
+ public YangInstanceIdentifier toDataDom(final InstanceIdentifier<? extends DataObject> path) {
return delegate.toDataDom(path);
}
@Override
public DataObject dataObjectFromDataDom(
- final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject> path,
+ final InstanceIdentifier<? extends DataObject> path,
final CompositeNode result) throws DeserializationException {
return delegate.dataObjectFromDataDom(path, result);
}
@Override
- public org.opendaylight.yangtools.yang.binding.InstanceIdentifier<?> fromDataDom(final YangInstanceIdentifier entry)
+ public InstanceIdentifier<?> fromDataDom(final YangInstanceIdentifier entry)
throws DeserializationException {
return delegate.fromDataDom(entry);
}
*/
package org.opendaylight.controller.md.sal.binding.impl;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-
-
abstract class AbstractForwardedTransaction<T extends AsyncTransaction<YangInstanceIdentifier, NormalizedNode<?, ?>>>
implements Delegator<T>, Identifiable<Object> {
return codec;
}
- protected final <T extends DataObject> CheckedFuture<Optional<T>,ReadFailedException> doRead(
+ protected final <D extends DataObject> CheckedFuture<Optional<D>,ReadFailedException> doRead(
final DOMDataReadTransaction readTx, final LogicalDatastoreType store,
- final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<T> path) {
+ final InstanceIdentifier<D> path) {
+ Preconditions.checkArgument(!path.isWildcarded(), "Invalid read of wildcarded path %s", path);
return MappingCheckedFuture.create(
Futures.transform(readTx.read(store, codec.toNormalized(path)),
*/
package org.opendaylight.controller.md.sal.binding.impl;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.CheckedFuture;
import java.util.Collections;
import java.util.Map.Entry;
-
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.Identifiable;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.CheckedFuture;
/**
*
public abstract class AbstractWriteTransaction<T extends DOMDataWriteTransaction> extends
AbstractForwardedTransaction<T> {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractWriteTransaction.class);
-
- protected AbstractWriteTransaction(final T delegate,
- final BindingToNormalizedNodeCodec codec) {
+ protected AbstractWriteTransaction(final T delegate, final BindingToNormalizedNodeCodec codec) {
super(delegate, codec);
}
-
public final <U extends DataObject> void put(final LogicalDatastoreType store,
final InstanceIdentifier<U> path, final U data, final boolean createParents) {
- final Entry<org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier, NormalizedNode<?, ?>> normalized = getCodec()
- .toNormalizedNode(path, data);
- if(createParents) {
+ Preconditions.checkArgument(!path.isWildcarded(), "Cannot put data into wildcarded path %s", path);
+
+ final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalized = getCodec().toNormalizedNode(path, data);
+ if (createParents) {
ensureParentsByMerge(store, normalized.getKey(), path);
} else {
ensureListParentIfNeeded(store,path,normalized);
}
+
getDelegate().put(store, normalized.getKey(), normalized.getValue());
}
-
public final <U extends DataObject> void merge(final LogicalDatastoreType store,
final InstanceIdentifier<U> path, final U data,final boolean createParents) {
+ Preconditions.checkArgument(!path.isWildcarded(), "Cannot merge data into wildcarded path %s", path);
- final Entry<org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier, NormalizedNode<?, ?>> normalized = getCodec()
- .toNormalizedNode(path, data);
-
- if(createParents) {
+ final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalized = getCodec().toNormalizedNode(path, data);
+ if (createParents) {
ensureParentsByMerge(store, normalized.getKey(), path);
} else {
ensureListParentIfNeeded(store,path,normalized);
getDelegate().merge(store, normalized.getKey(), normalized.getValue());
}
-
/**
*
* Ensures list parent if item is list, otherwise noop.
* @param normalized Normalized version of data to be written
*/
private void ensureListParentIfNeeded(final LogicalDatastoreType store, final InstanceIdentifier<?> path,
- final Entry<org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier, NormalizedNode<?, ?>> normalized) {
- if(Identifiable.class.isAssignableFrom(path.getTargetType())) {
- org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier parentMapPath = getParent(normalized.getKey()).get();
+ final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalized) {
+ if (Identifiable.class.isAssignableFrom(path.getTargetType())) {
+ YangInstanceIdentifier parentMapPath = getParent(normalized.getKey()).get();
NormalizedNode<?, ?> emptyParent = getCodec().getDefaultNodeFor(parentMapPath);
getDelegate().merge(store, parentMapPath, emptyParent);
}
-
}
// FIXME (should be probaly part of InstanceIdentifier)
- protected static Optional<org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier> getParent(
- final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier child) {
+ protected static Optional<YangInstanceIdentifier> getParent(
+ final YangInstanceIdentifier child) {
Iterable<PathArgument> mapEntryItemPath = child.getPathArguments();
int parentPathSize = Iterables.size(mapEntryItemPath) - 1;
- if(parentPathSize > 1) {
- return Optional.of(org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.create(Iterables.limit(mapEntryItemPath, parentPathSize)));
+ if (parentPathSize > 1) {
+ return Optional.of(YangInstanceIdentifier.create(Iterables.limit(mapEntryItemPath, parentPathSize)));
} else if(parentPathSize == 0) {
- return Optional.of(org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.create(Collections.<PathArgument>emptyList()));
+ return Optional.of(YangInstanceIdentifier.create(Collections.<PathArgument>emptyList()));
} else {
return Optional.absent();
}
* @param path
*/
protected abstract void ensureParentsByMerge(LogicalDatastoreType store,
- org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier key, InstanceIdentifier<?> path);
+ YangInstanceIdentifier key, InstanceIdentifier<?> path);
protected final void doDelete(final LogicalDatastoreType store,
final InstanceIdentifier<?> path) {
- final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier normalized = getCodec().toNormalized(path);
+ Preconditions.checkArgument(!path.isWildcarded(), "Cannot delete wildcarded path %s", path);
+
+ final YangInstanceIdentifier normalized = getCodec().toNormalized(path);
getDelegate().delete(store, normalized);
}
*/
package org.opendaylight.controller.sal.binding.impl;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
-
import org.opendaylight.controller.md.sal.binding.util.AbstractBindingSalProviderInstance;
import org.opendaylight.controller.sal.binding.api.mount.MountProviderInstance;
import org.opendaylight.controller.sal.binding.api.mount.MountProviderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
public class MountPointManagerImpl implements MountProviderService {
public final Logger LOG = LoggerFactory.getLogger(MountPointManagerImpl.class);
private final InstanceIdentifier<?> identifier;
- public BindingMountPointImpl(final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<?> identifier,
+ public BindingMountPointImpl(final InstanceIdentifier<?> identifier,
final RpcProviderRegistryImpl rpcRegistry, final NotificationBrokerImpl notificationBroker,
final DataBrokerImpl dataBroker) {
super(rpcRegistry, notificationBroker, dataBroker);
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
+
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
/**
- * Contains contextual data for shards.
+ * Contains contextual data for a data store.
*
* @author Thomas Pantelis
*/
private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
private final Duration shardTransactionIdleTimeout;
+ private final int operationTimeoutInSeconds;
+ private final String dataStoreMXBeanType;
public DatastoreContext() {
this.dataStoreProperties = null;
+ this.dataStoreMXBeanType = "DistributedDatastore";
this.shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
+ this.operationTimeoutInSeconds = 5;
}
- public DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
- Duration shardTransactionIdleTimeout) {
+ public DatastoreContext(String dataStoreMXBeanType,
+ InMemoryDOMDataStoreConfigProperties dataStoreProperties,
+ Duration shardTransactionIdleTimeout,
+ int operationTimeoutInSeconds) {
+ this.dataStoreMXBeanType = dataStoreMXBeanType;
this.dataStoreProperties = Preconditions.checkNotNull(dataStoreProperties);
- this.shardTransactionIdleTimeout = Preconditions.checkNotNull(shardTransactionIdleTimeout);
+ this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
+ this.operationTimeoutInSeconds = operationTimeoutInSeconds;
}
public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
return shardTransactionIdleTimeout;
}
+ public String getDataStoreMXBeanType() {
+ return dataStoreMXBeanType;
+ }
+ public int getOperationTimeoutInSeconds() {
+ return operationTimeoutInSeconds;
+ }
}
package org.opendaylight.controller.cluster.datastore;
-import java.util.concurrent.TimeUnit;
-
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
-
/**
*
*/
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
private final ActorContext actorContext;
- private final DatastoreContext datastoreContext;
public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
- Configuration configuration, DistributedDataStoreProperties dataStoreProperties) {
+ Configuration configuration, DatastoreContext datastoreContext) {
Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
Preconditions.checkNotNull(type, "type should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
-
+ Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null");
String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
LOG.info("Creating ShardManager : {}", shardManagerId);
- datastoreContext = new DatastoreContext(InMemoryDOMDataStoreConfigProperties.create(
- dataStoreProperties.getMaxShardDataChangeExecutorPoolSize(),
- dataStoreProperties.getMaxShardDataChangeExecutorQueueSize(),
- dataStoreProperties.getMaxShardDataChangeListenerQueueSize()),
- Duration.create(dataStoreProperties.getShardTransactionIdleTimeoutInMinutes(),
- TimeUnit.MINUTES));
+ actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
+ ShardManager.props(type, cluster, configuration, datastoreContext)
+ .withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
- actorContext
- = new ActorContext(
- actorSystem, actorSystem.actorOf(
- ShardManager.props(type, cluster, configuration, datastoreContext).
- withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
-
- actorContext.setOperationTimeout(dataStoreProperties.getOperationTimeoutInSeconds());
+ actorContext.setOperationTimeout(datastoreContext.getOperationTimeoutInSeconds());
}
public DistributedDataStore(ActorContext actorContext) {
this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
- this.datastoreContext = new DatastoreContext();
}
-
@SuppressWarnings("unchecked")
@Override
public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
public class DistributedDataStoreFactory {
public static DistributedDataStore createInstance(String name, SchemaService schemaService,
- DistributedDataStoreProperties dataStoreProperties, BundleContext bundleContext) {
+ DatastoreContext datastoreContext, BundleContext bundleContext) {
ActorSystem actorSystem = ActorSystemFactory.createInstance(bundleContext);
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
final DistributedDataStore dataStore =
new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
- config, dataStoreProperties );
+ config, datastoreContext );
ShardStrategyFactory.setConfiguration(config);
schemaService.registerSchemaContextListener(dataStore);
return dataStore;
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-/**
- * Wrapper class for DistributedDataStore configuration properties.
- *
- * @author Thomas Pantelis
- */
-public class DistributedDataStoreProperties {
- private final int maxShardDataChangeListenerQueueSize;
- private final int maxShardDataChangeExecutorQueueSize;
- private final int maxShardDataChangeExecutorPoolSize;
- private final int shardTransactionIdleTimeoutInMinutes;
- private final int operationTimeoutInSeconds;
-
- public DistributedDataStoreProperties() {
- maxShardDataChangeListenerQueueSize = 1000;
- maxShardDataChangeExecutorQueueSize = 1000;
- maxShardDataChangeExecutorPoolSize = 20;
- shardTransactionIdleTimeoutInMinutes = 10;
- operationTimeoutInSeconds = 5;
- }
-
- public DistributedDataStoreProperties(int maxShardDataChangeListenerQueueSize,
- int maxShardDataChangeExecutorQueueSize, int maxShardDataChangeExecutorPoolSize,
- int shardTransactionIdleTimeoutInMinutes, int operationTimeoutInSeconds) {
- this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
- this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
- this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
- this.shardTransactionIdleTimeoutInMinutes = shardTransactionIdleTimeoutInMinutes;
- this.operationTimeoutInSeconds = operationTimeoutInSeconds;
- }
-
- public int getMaxShardDataChangeListenerQueueSize() {
- return maxShardDataChangeListenerQueueSize;
- }
-
- public int getMaxShardDataChangeExecutorQueueSize() {
- return maxShardDataChangeExecutorQueueSize;
- }
-
- public int getMaxShardDataChangeExecutorPoolSize() {
- return maxShardDataChangeExecutorPoolSize;
- }
-
- public int getShardTransactionIdleTimeoutInMinutes() {
- return shardTransactionIdleTimeoutInMinutes;
- }
-
- public int getOperationTimeoutInSeconds() {
- return operationTimeoutInSeconds;
- }
-}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Creator;
import akka.persistence.RecoveryFailure;
import akka.serialization.Serialization;
-
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
private SchemaContext schemaContext;
+ private ActorRef createSnapshotTransaction;
+
private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
- DatastoreContext datastoreContext) {
+ DatastoreContext datastoreContext, SchemaContext schemaContext) {
super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
this.name = name;
this.datastoreContext = datastoreContext;
+ this.schemaContext = schemaContext;
String setting = System.getProperty("shard.persistent");
store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
datastoreContext.getDataStoreProperties());
- shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
+ if(schemaContext != null) {
+ store.onGlobalContextUpdated(schemaContext);
+ }
+ shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
+ datastoreContext.getDataStoreMXBeanType());
+ shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
+ shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
}
public static Props props(final ShardIdentifier name,
final Map<ShardIdentifier, String> peerAddresses,
- DatastoreContext datastoreContext) {
+ DatastoreContext datastoreContext, SchemaContext schemaContext) {
Preconditions.checkNotNull(name, "name should not be null");
Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
- Preconditions.checkNotNull(datastoreContext, "shardContext should not be null");
+ Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
+ Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
- return Props.create(new ShardCreator(name, peerAddresses, datastoreContext));
+ return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
}
@Override public void onReceiveRecover(Object message) {
} else if (getLeader() != null) {
getLeader().forward(message, getContext());
}
+ } else if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+ // This must be for install snapshot. Don't want to open this up and trigger
+ // deSerialization
+ self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), self());
+
+ // Send a PoisonPill instead of sending close transaction because we do not really need
+ // a response
+ getSender().tell(PoisonPill.getInstance(), self());
+
} else if (message instanceof RegisterChangeListener) {
registerChangeListener((RegisterChangeListener) message);
} else if (message instanceof UpdateSchemaContext) {
}
private ActorRef createTypedTransactionActor(
- CreateTransaction createTransaction,
+ int transactionType,
ShardTransactionIdentifier transactionId) {
- if (createTransaction.getTransactionType()
+
+ if(this.schemaContext == null){
+ throw new NullPointerException("schemaContext should not be null");
+ }
+
+ if (transactionType
== TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
shardMBean.incrementReadOnlyTransactionCount();
return getContext().actorOf(
ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
- schemaContext,datastoreContext, name.toString()), transactionId.toString());
+ schemaContext,datastoreContext, shardMBean), transactionId.toString());
- } else if (createTransaction.getTransactionType()
+ } else if (transactionType
== TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
shardMBean.incrementReadWriteTransactionCount();
return getContext().actorOf(
ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
- schemaContext, datastoreContext,name.toString()), transactionId.toString());
+ schemaContext, datastoreContext, shardMBean), transactionId.toString());
- } else if (createTransaction.getTransactionType()
+ } else if (transactionType
== TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
shardMBean.incrementWriteOnlyTransactionCount();
return getContext().actorOf(
ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(),
- schemaContext, datastoreContext, name.toString()), transactionId.toString());
+ schemaContext, datastoreContext, shardMBean), transactionId.toString());
} else {
throw new IllegalArgumentException(
"Shard="+name + ":CreateTransaction message has unidentified transaction type="
- + createTransaction.getTransactionType());
+ + transactionType);
}
}
private void createTransaction(CreateTransaction createTransaction) {
+ createTransaction(createTransaction.getTransactionType(),
+ createTransaction.getTransactionId());
+ }
+
+ private ActorRef createTransaction(int transactionType, String remoteTransactionId) {
ShardTransactionIdentifier transactionId =
ShardTransactionIdentifier.builder()
- .remoteTransactionId(createTransaction.getTransactionId())
+ .remoteTransactionId(remoteTransactionId)
.build();
LOG.debug("Creating transaction : {} ", transactionId);
ActorRef transactionActor =
- createTypedTransactionActor(createTransaction, transactionId);
+ createTypedTransactionActor(transactionType, transactionId);
getSender()
.tell(new CreateTransactionReply(
Serialization.serializedActorPath(transactionActor),
- createTransaction.getTransactionId()).toSerializable(),
- getSelf()
- );
+ remoteTransactionId).toSerializable(),
+ getSelf());
+
+ return transactionActor;
}
+ private void syncCommitTransaction(DOMStoreWriteTransaction transaction)
+ throws ExecutionException, InterruptedException {
+ DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+ commitCohort.preCommit().get();
+ commitCohort.commit().get();
+ }
+
+
private void commit(final ActorRef sender, Object serialized) {
Modification modification = MutableCompositeModification
.fromSerializable(serialized, schemaContext);
LOG.debug(
"Could not find cohort for modification : {}. Writing modification using a new transaction",
modification);
- DOMStoreReadWriteTransaction transaction =
- store.newReadWriteTransaction();
+ DOMStoreWriteTransaction transaction =
+ store.newWriteOnlyTransaction();
modification.apply(transaction);
- DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
- ListenableFuture<Void> future =
- commitCohort.preCommit();
try {
- future.get();
- future = commitCohort.commit();
- future.get();
+ syncCommitTransaction(transaction);
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
LOG.error("Failed to commit", e);
public void onSuccess(Void v) {
sender.tell(new CommitTransactionReply().toSerializable(), self);
shardMBean.incrementCommittedTransactionCount();
- shardMBean.setLastCommittedTransactionTime(new Date());
+ shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
}
@Override
private void updateSchemaContext(UpdateSchemaContext message) {
this.schemaContext = message.getSchemaContext();
+ updateSchemaContext(message.getSchemaContext());
store.onGlobalContextUpdated(message.getSchemaContext());
}
+ @VisibleForTesting void updateSchemaContext(SchemaContext schemaContext) {
+ store.onGlobalContextUpdated(schemaContext);
+ }
+
private void registerChangeListener(
RegisterChangeListener registerChangeListener) {
private void createTransactionChain() {
DOMStoreTransactionChain chain = store.createTransactionChain();
ActorRef transactionChain = getContext().actorOf(
- ShardTransactionChain.props(chain, schemaContext, datastoreContext,name.toString() ));
+ ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean));
getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
- getSelf());
+ getSelf());
}
@Override protected void applyState(ActorRef clientActor, String identifier,
}
@Override protected void createSnapshot() {
- throw new UnsupportedOperationException("createSnapshot");
+ if (createSnapshotTransaction == null) {
+
+ // Create a transaction. We are really going to treat the transaction as a worker
+ // so that this actor does not get block building the snapshot
+ createSnapshotTransaction = createTransaction(
+ TransactionProxy.TransactionType.READ_ONLY.ordinal(),
+ "createSnapshot");
+
+ createSnapshotTransaction.tell(
+ new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
+
+ }
}
- @Override protected void applySnapshot(ByteString snapshot) {
- throw new UnsupportedOperationException("applySnapshot");
+ @VisibleForTesting @Override protected void applySnapshot(ByteString snapshot) {
+ // Since this will be done only on Recovery or when this actor is a Follower
+ // we can safely commit everything in here. We not need to worry about event notifications
+ // as they would have already been disabled on the follower
+ try {
+ DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+ NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
+ NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext)
+ .decode(YangInstanceIdentifier.builder().build(), serializedNode);
+
+ // delete everything first
+ transaction.delete(YangInstanceIdentifier.builder().build());
+
+ // Add everything from the remote node back
+ transaction.write(YangInstanceIdentifier.builder().build(), node);
+ syncCommitTransaction(transaction);
+ } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
+ LOG.error(e, "An exception occurred when applying snapshot");
+ }
}
@Override protected void onStateChanged() {
final ShardIdentifier name;
final Map<ShardIdentifier, String> peerAddresses;
final DatastoreContext datastoreContext;
+ final SchemaContext schemaContext;
ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
- DatastoreContext datastoreContext) {
+ DatastoreContext datastoreContext, SchemaContext schemaContext) {
this.name = name;
this.peerAddresses = peerAddresses;
this.datastoreContext = datastoreContext;
+ this.schemaContext = schemaContext;
}
@Override
public Shard create() throws Exception {
- return new Shard(name, peerAddresses, datastoreContext);
+ return new Shard(name, peerAddresses, datastoreContext, schemaContext);
}
}
+
+ @VisibleForTesting NormalizedNode readStore() throws ExecutionException, InterruptedException {
+ DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
+
+ CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
+ transaction.read(YangInstanceIdentifier.builder().build());
+
+ NormalizedNode<?, ?> node = future.get().get();
+
+ transaction.close();
+
+ return node;
+ }
+
+ @VisibleForTesting void writeToStore(YangInstanceIdentifier id, NormalizedNode node)
+ throws ExecutionException, InterruptedException {
+ DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+
+ transaction.write(id, node);
+
+ syncCommitTransaction(transaction);
+ }
+
}
import akka.cluster.ClusterEvent;
import akka.japi.Creator;
import akka.japi.Function;
-
import com.google.common.base.Preconditions;
-
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
import java.util.ArrayList;
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
- // Create all the local Shards and make them a child of the ShardManager
- // TODO: This may need to be initiated when we first get the schema context
- createLocalShards();
+ //createLocalShards(null);
}
public static Props props(final String type,
* @param message
*/
private void updateSchemaContext(Object message) {
- for(ShardInformation info : localShards.values()){
- info.getActor().tell(message,getSelf());
+ SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+
+ if(localShards.size() == 0){
+ createLocalShards(schemaContext);
+ } else {
+ for (ShardInformation info : localShards.values()) {
+ info.getActor().tell(message, getSelf());
+ }
}
}
* runs
*
*/
- private void createLocalShards() {
+ private void createLocalShards(SchemaContext schemaContext) {
String memberName = this.cluster.getCurrentMemberName();
List<String> memberShardNames =
this.configuration.getMemberShardNames(memberName);
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
ActorRef actor = getContext()
- .actorOf(Shard.props(shardId, peerAddresses, datastoreContext).
+ .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext).
withMailbox(ActorContext.MAILBOX), shardId.toString());
-
localShardActorNames.add(shardId.toString());
localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
}
- mBean = ShardManagerInfo
- .createShardManagerMBean("shard-manager-" + this.type, localShardActorNames);
-
+ mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+ datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
}
/**
import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
private final DOMStoreReadTransaction transaction;
public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext,String shardName) {
- super(shardActor, schemaContext, shardName);
+ SchemaContext schemaContext, ShardStats shardStats) {
+ super(shardActor, schemaContext, shardStats);
this.transaction = transaction;
}
import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
private final DOMStoreReadWriteTransaction transaction;
public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext,String shardName) {
- super(shardActor, schemaContext, shardName);
+ SchemaContext schemaContext, ShardStats shardStats) {
+ super(shardActor, schemaContext, shardStats);
this.transaction = transaction;
}
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.japi.Creator;
+
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
+
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
private final ActorRef shardActor;
protected final SchemaContext schemaContext;
- private final String shardName;
-
+ private final ShardStats shardStats;
private final MutableCompositeModification modification = new MutableCompositeModification();
protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
- String shardName) {
+ ShardStats shardStats) {
this.shardActor = shardActor;
this.schemaContext = schemaContext;
- this.shardName = shardName;
+ this.shardStats = shardStats;
}
public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext,DatastoreContext datastoreContext, String shardName) {
+ SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats) {
return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
- datastoreContext, shardName));
+ datastoreContext, shardStats));
}
protected abstract DOMStoreTransaction getDOMStoreTransaction();
sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
}
} catch (Exception e) {
- ShardMBeanFactory.getShardStatsMBean(shardName).incrementFailedReadTransactionsCount();
+ shardStats.incrementFailedReadTransactionsCount();
sender.tell(new akka.actor.Status.Failure(e), self);
}
protected void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) {
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
ActorRef cohortActor = getContext().actorOf(
- ThreePhaseCommitCohort.props(cohort, shardActor, modification, shardName), "cohort");
+ ThreePhaseCommitCohort.props(cohort, shardActor, modification, shardStats), "cohort");
getSender()
.tell(new ReadyTransactionReply(cohortActor.path()).toSerializable(), getSelf());
final ActorRef shardActor;
final SchemaContext schemaContext;
final DatastoreContext datastoreContext;
- final String shardName;
+ final ShardStats shardStats;
ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext, DatastoreContext datastoreContext, String shardName) {
+ SchemaContext schemaContext, DatastoreContext datastoreContext,
+ ShardStats shardStats) {
this.transaction = transaction;
this.shardActor = shardActor;
- this.shardName = shardName;
+ this.shardStats = shardStats;
this.schemaContext = schemaContext;
this.datastoreContext = datastoreContext;
}
ShardTransaction tx;
if(transaction instanceof DOMStoreReadWriteTransaction) {
tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
- shardActor, schemaContext, shardName);
+ shardActor, schemaContext, shardStats);
} else if(transaction instanceof DOMStoreReadTransaction) {
tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
- schemaContext, shardName);
+ schemaContext, shardStats);
} else {
tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
- shardActor, schemaContext, shardName);
+ shardActor, schemaContext, shardStats);
}
tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
import akka.actor.Props;
import akka.japi.Creator;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
private final DOMStoreTransactionChain chain;
private final DatastoreContext datastoreContext;
private final SchemaContext schemaContext;
- private final String shardName;
+ private final ShardStats shardStats;
public ShardTransactionChain(DOMStoreTransactionChain chain, SchemaContext schemaContext,
- DatastoreContext datastoreContext,String shardName) {
+ DatastoreContext datastoreContext, ShardStats shardStats) {
this.chain = chain;
this.datastoreContext = datastoreContext;
this.schemaContext = schemaContext;
- this.shardName = shardName;
+ this.shardStats = shardStats;
}
@Override
TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
return getContext().actorOf(
ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
- schemaContext, datastoreContext,shardName), transactionId);
+ schemaContext, datastoreContext, shardStats), transactionId);
} else if (createTransaction.getTransactionType() ==
TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
return getContext().actorOf(
ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
- schemaContext, datastoreContext,shardName), transactionId);
+ schemaContext, datastoreContext, shardStats), transactionId);
} else if (createTransaction.getTransactionType() ==
TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
return getContext().actorOf(
ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
- schemaContext, datastoreContext,shardName), transactionId);
+ schemaContext, datastoreContext, shardStats), transactionId);
} else {
throw new IllegalArgumentException (
"CreateTransaction message has unidentified transaction type=" +
}
public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext,
- DatastoreContext datastoreContext, String shardName) {
- return Props.create(new ShardTransactionChainCreator(chain, schemaContext, datastoreContext, shardName));
+ DatastoreContext datastoreContext, ShardStats shardStats) {
+ return Props.create(new ShardTransactionChainCreator(chain, schemaContext,
+ datastoreContext, shardStats));
}
private static class ShardTransactionChainCreator implements Creator<ShardTransactionChain> {
final DOMStoreTransactionChain chain;
final DatastoreContext datastoreContext;
final SchemaContext schemaContext;
- final String shardName;
+ final ShardStats shardStats;
ShardTransactionChainCreator(DOMStoreTransactionChain chain, SchemaContext schemaContext,
- DatastoreContext datastoreContext, String shardName) {
+ DatastoreContext datastoreContext, ShardStats shardStats) {
this.chain = chain;
this.datastoreContext = datastoreContext;
this.schemaContext = schemaContext;
- this.shardName = shardName;
+ this.shardStats = shardStats;
}
@Override
public ShardTransactionChain create() throws Exception {
- return new ShardTransactionChain(chain, schemaContext, datastoreContext,shardName);
+ return new ShardTransactionChain(chain, schemaContext, datastoreContext, shardStats);
}
}
}
import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
private final DOMStoreWriteTransaction transaction;
public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext,String shardName) {
- super(shardActor, schemaContext, shardName);
+ SchemaContext schemaContext, ShardStats shardStats) {
+ super(shardActor, schemaContext, shardStats);
this.transaction = transaction;
}
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
private final DOMStoreThreePhaseCommitCohort cohort;
private final ActorRef shardActor;
private final CompositeModification modification;
- private final String shardName;
+ private final ShardStats shardStats;
public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort,
- ActorRef shardActor, CompositeModification modification,String shardName) {
+ ActorRef shardActor, CompositeModification modification, ShardStats shardStats) {
this.cohort = cohort;
this.shardActor = shardActor;
this.modification = modification;
- this.shardName = shardName;
+ this.shardStats = shardStats;
}
private final LoggingAdapter log =
Logging.getLogger(getContext().system(), this);
public static Props props(final DOMStoreThreePhaseCommitCohort cohort,
- final ActorRef shardActor, final CompositeModification modification,
- String shardName) {
+ final ActorRef shardActor, final CompositeModification modification,
+ ShardStats shardStats) {
return Props.create(new ThreePhaseCommitCohortCreator(cohort, shardActor, modification,
- shardName));
+ shardStats));
}
@Override
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
public void onSuccess(Void v) {
- ShardMBeanFactory.getShardStatsMBean(shardName).incrementAbortTransactionsCount();
+ shardStats.incrementAbortTransactionsCount();
sender
.tell(new AbortTransactionReply().toSerializable(),
self);
final DOMStoreThreePhaseCommitCohort cohort;
final ActorRef shardActor;
final CompositeModification modification;
- final String shardName;
+ final ShardStats shardStats;
ThreePhaseCommitCohortCreator(DOMStoreThreePhaseCommitCohort cohort,
- ActorRef shardActor, CompositeModification modification, String shardName) {
+ ActorRef shardActor, CompositeModification modification, ShardStats shardStats) {
this.cohort = cohort;
this.shardActor = shardActor;
this.modification = modification;
- this.shardName = shardName;
+ this.shardStats = shardStats;
}
@Override
public ThreePhaseCommitCohort create() throws Exception {
- return new ThreePhaseCommitCohort(cohort, shardActor, modification, shardName);
+ return new ThreePhaseCommitCohort(cohort, shardActor, modification, shardStats);
}
}
}
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-
-package org.opendaylight.controller.cluster.datastore.jmx.mbeans;
-
-
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanRegistrationException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-
-/**
- * All MBeans should extend this class that help in registering and
- * unregistering the MBeans.
- * @author Basheeruddin <syedbahm@cisco.com>
- */
-
-
-public abstract class AbstractBaseMBean {
-
-
- public static String BASE_JMX_PREFIX = "org.opendaylight.controller:";
- public static String JMX_TYPE_DISTRIBUTED_DATASTORE = "DistributedDatastore";
- public static String JMX_CATEGORY_SHARD = "Shard";
- public static String JMX_CATEGORY_SHARD_MANAGER = "ShardManager";
-
- private static final Logger LOG = LoggerFactory
- .getLogger(AbstractBaseMBean.class);
-
- MBeanServer server = ManagementFactory.getPlatformMBeanServer();
- /**
- * gets the MBean ObjectName
- *
- * @return Object name of the MBean
- * @throws MalformedObjectNameException - The bean name does not have the right format.
- * @throws NullPointerException - The bean name is null
- */
- protected ObjectName getMBeanObjectName()
- throws MalformedObjectNameException, NullPointerException {
- String name = BASE_JMX_PREFIX + "type="+getMBeanType()+",Category="+
- getMBeanCategory() + ",name="+
- getMBeanName();
-
-
- return new ObjectName(name);
- }
-
- public boolean registerMBean() {
- boolean registered = false;
- try {
- // Object to identify MBean
- final ObjectName mbeanName = this.getMBeanObjectName();
-
- Preconditions.checkArgument(mbeanName != null,
- "Object name of the MBean cannot be null");
-
- LOG.debug("Register MBean {}", mbeanName);
-
- // unregistered if already registered
- if (server.isRegistered(mbeanName)) {
-
- LOG.debug("MBean {} found to be already registered", mbeanName);
-
- try {
- unregisterMBean(mbeanName);
- } catch (Exception e) {
-
- LOG.warn("unregister mbean {} resulted in exception {} ", mbeanName,
- e);
- }
- }
- server.registerMBean(this, mbeanName);
-
- LOG.debug("MBean {} registered successfully",
- mbeanName.getCanonicalName());
- registered = true;
- } catch (Exception e) {
-
- LOG.error("registration failed {}", e);
-
- }
- return registered;
- }
-
-
- public boolean unregisterMBean() {
- boolean unregister = false;
- try {
- ObjectName mbeanName = this.getMBeanObjectName();
- unregister = true;
- unregisterMBean(mbeanName);
- } catch (Exception e) {
-
- LOG.error("Failed when unregistering MBean {}", e);
- }
- return unregister;
- }
-
- private void unregisterMBean(ObjectName mbeanName)
- throws MBeanRegistrationException, InstanceNotFoundException {
-
- server.unregisterMBean(mbeanName);
-
- }
-
-
- /**
- * @return name of bean
- */
- protected abstract String getMBeanName();
-
- /**
- * @return type of the MBean
- */
- protected abstract String getMBeanType();
-
-
- /**
- * @return Category name of teh bean
- */
- protected abstract String getMBeanCategory();
-
- //require for test cases
- public MBeanServer getMBeanServer() {
- return server;
- }
-}
*/
package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
/**
* @author Basheeruddin syedbahm@cisco.com
*
*/
public class ShardMBeanFactory {
- private static Map<String, ShardStats> shardMBeans =
- new HashMap<String, ShardStats>();
- public static ShardStats getShardStatsMBean(String shardName) {
- if (shardMBeans.containsKey(shardName)) {
- return shardMBeans.get(shardName);
- } else {
- ShardStats shardStatsMBeanImpl = new ShardStats(shardName);
+ private static final Logger LOG = LoggerFactory.getLogger(ShardMBeanFactory.class);
- if (shardStatsMBeanImpl.registerMBean()) {
- shardMBeans.put(shardName, shardStatsMBeanImpl);
- }
- return shardStatsMBeanImpl;
+ private static Cache<String,ShardStats> shardMBeansCache =
+ CacheBuilder.newBuilder().weakValues().build();
+
+ public static ShardStats getShardStatsMBean(final String shardName, final String mxBeanType) {
+ final String finalMXBeanType = mxBeanType != null ? mxBeanType : "DistDataStore";
+ try {
+ return shardMBeansCache.get(shardName, new Callable<ShardStats>() {
+ @Override
+ public ShardStats call() throws Exception {
+ ShardStats shardStatsMBeanImpl = new ShardStats(shardName, finalMXBeanType);
+ shardStatsMBeanImpl.registerMBean();
+ return shardStatsMBeanImpl;
+ }
+ });
+ } catch(ExecutionException e) {
+ LOG.error(String.format("Could not create MXBean for shard: %s", shardName), e);
+ // Just return an instance that isn't registered.
+ return new ShardStats(shardName, finalMXBeanType);
}
}
-
}
package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl;
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats;
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
+import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
+ * Maintains statistics for a shard.
+ *
* @author Basheeruddin syedbahm@cisco.com
*/
-public class ShardStats extends AbstractBaseMBean implements ShardStatsMBean {
+public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
+ public static String JMX_CATEGORY_SHARD = "Shards";
- private final String shardName;
+ private final AtomicLong committedTransactionsCount = new AtomicLong();
- private long committedTransactionsCount = 0L;
+ private final AtomicLong readOnlyTransactionCount = new AtomicLong();
- private long readOnlyTransactionCount = 0L;
+ private final AtomicLong writeOnlyTransactionCount = new AtomicLong();
- private long writeOnlyTransactionCount = 0L;
-
- private long readWriteTransactionCount = 0L;
+ private final AtomicLong readWriteTransactionCount = new AtomicLong();
private String leader;
private String raftState;
- private long lastLogTerm = -1L;
+ private volatile long lastLogTerm = -1L;
+
+ private volatile long lastLogIndex = -1L;
- private long lastLogIndex = -1L;
+ private volatile long currentTerm = -1L;
- private long currentTerm = -1L;
+ private volatile long commitIndex = -1L;
- private long commitIndex = -1L;
+ private volatile long lastApplied = -1L;
- private long lastApplied = -1L;
+ private volatile long lastCommittedTransactionTime;
- private Date lastCommittedTransactionTime = new Date(0L);
+ private final AtomicLong failedTransactionsCount = new AtomicLong();
- private long failedTransactionsCount = 0L;
+ private final AtomicLong failedReadTransactionsCount = new AtomicLong();
- private long failedReadTransactionsCount = 0L;
+ private final AtomicLong abortTransactionsCount = new AtomicLong();
- private long abortTransactionsCount = 0L;
+ private ThreadExecutorStatsMXBeanImpl notificationExecutorStatsBean;
- private SimpleDateFormat sdf =
+ private ThreadExecutorStatsMXBeanImpl dataStoreExecutorStatsBean;
+
+ private QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
+
+ private final SimpleDateFormat sdf =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- ShardStats(String shardName) {
- this.shardName = shardName;
+ public ShardStats(String shardName, String mxBeanType) {
+ super(shardName, mxBeanType, JMX_CATEGORY_SHARD);
+ }
+
+ public void setDataStoreExecutor(ExecutorService dsExecutor) {
+ this.dataStoreExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(dsExecutor,
+ "notification-executor", getMBeanType(), getMBeanCategory());
}
+ public void setNotificationManager(QueuedNotificationManager<?, ?> manager) {
+ this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
+ "notification-manager", getMBeanType(), getMBeanCategory());
+
+ this.notificationExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(manager.getExecutor(),
+ "data-store-executor", getMBeanType(), getMBeanCategory());
+ }
@Override
public String getShardName() {
- return shardName;
+ return getMBeanName();
}
@Override
public long getCommittedTransactionsCount() {
- return committedTransactionsCount;
+ return committedTransactionsCount.get();
}
- @Override public String getLeader() {
+ @Override
+ public String getLeader() {
return leader;
}
- @Override public String getRaftState() {
+ @Override
+ public String getRaftState() {
return raftState;
}
- @Override public long getReadOnlyTransactionCount() {
- return readOnlyTransactionCount;
+ @Override
+ public long getReadOnlyTransactionCount() {
+ return readOnlyTransactionCount.get();
}
- @Override public long getWriteOnlyTransactionCount() {
- return writeOnlyTransactionCount;
+ @Override
+ public long getWriteOnlyTransactionCount() {
+ return writeOnlyTransactionCount.get();
}
- @Override public long getReadWriteTransactionCount() {
- return readWriteTransactionCount;
+ @Override
+ public long getReadWriteTransactionCount() {
+ return readWriteTransactionCount.get();
}
- @Override public long getLastLogIndex() {
+ @Override
+ public long getLastLogIndex() {
return lastLogIndex;
}
- @Override public long getLastLogTerm() {
+ @Override
+ public long getLastLogTerm() {
return lastLogTerm;
}
- @Override public long getCurrentTerm() {
+ @Override
+ public long getCurrentTerm() {
return currentTerm;
}
- @Override public long getCommitIndex() {
+ @Override
+ public long getCommitIndex() {
return commitIndex;
}
- @Override public long getLastApplied() {
+ @Override
+ public long getLastApplied() {
return lastApplied;
}
@Override
public String getLastCommittedTransactionTime() {
- return sdf.format(lastCommittedTransactionTime);
+ return sdf.format(new Date(lastCommittedTransactionTime));
}
- @Override public long getFailedTransactionsCount() {
- return failedTransactionsCount;
+ @Override
+ public long getFailedTransactionsCount() {
+ return failedTransactionsCount.get();
}
- @Override public long getFailedReadTransactionsCount() {
- return failedReadTransactionsCount;
+ @Override
+ public long getFailedReadTransactionsCount() {
+ return failedReadTransactionsCount.get();
}
- @Override public long getAbortTransactionsCount() {
- return abortTransactionsCount;
+ @Override
+ public long getAbortTransactionsCount() {
+ return abortTransactionsCount.get();
}
public long incrementCommittedTransactionCount() {
- return committedTransactionsCount++;
+ return committedTransactionsCount.incrementAndGet();
}
public long incrementReadOnlyTransactionCount() {
- return readOnlyTransactionCount++;
+ return readOnlyTransactionCount.incrementAndGet();
}
public long incrementWriteOnlyTransactionCount() {
- return writeOnlyTransactionCount++;
+ return writeOnlyTransactionCount.incrementAndGet();
}
public long incrementReadWriteTransactionCount() {
- return readWriteTransactionCount++;
+ return readWriteTransactionCount.incrementAndGet();
}
public long incrementFailedTransactionsCount() {
- return failedTransactionsCount++;
+ return failedTransactionsCount.incrementAndGet();
}
public long incrementFailedReadTransactionsCount() {
- return failedReadTransactionsCount++;
+ return failedReadTransactionsCount.incrementAndGet();
}
- public long incrementAbortTransactionsCount () { return abortTransactionsCount++;}
+ public long incrementAbortTransactionsCount ()
+ {
+ return abortTransactionsCount.incrementAndGet();
+ }
public void setLeader(String leader) {
this.leader = leader;
this.lastApplied = lastApplied;
}
-
- public void setLastCommittedTransactionTime(
- Date lastCommittedTransactionTime) {
+ public void setLastCommittedTransactionTime(long lastCommittedTransactionTime) {
this.lastCommittedTransactionTime = lastCommittedTransactionTime;
}
@Override
- protected String getMBeanName() {
- return shardName;
+ public ThreadExecutorStats getDataStoreExecutorStats() {
+ return dataStoreExecutorStatsBean.toThreadExecutorStats();
+ }
+
+ @Override
+ public ThreadExecutorStats getNotificationMgrExecutorStats() {
+ return notificationExecutorStatsBean.toThreadExecutorStats();
}
@Override
- protected String getMBeanType() {
- return JMX_TYPE_DISTRIBUTED_DATASTORE;
+ public List<ListenerNotificationQueueStats> getCurrentNotificationMgrListenerQueueStats() {
+ return notificationManagerStatsBean.getCurrentListenerQueueStats();
}
@Override
- protected String getMBeanCategory() {
- return JMX_CATEGORY_SHARD;
+ public int getMaxNotificationMgrListenerQueueSize() {
+ return notificationManagerStatsBean.getMaxListenerQueueSize();
}
/**
* resets the counters related to transactions
*/
-
+ @Override
public void resetTransactionCounters(){
- committedTransactionsCount = 0L;
+ committedTransactionsCount.set(0);
- readOnlyTransactionCount = 0L;
+ readOnlyTransactionCount.set(0);
- writeOnlyTransactionCount = 0L;
+ writeOnlyTransactionCount.set(0);
- readWriteTransactionCount = 0L;
+ readWriteTransactionCount.set(0);
- lastCommittedTransactionTime = new Date(0L);
+ lastCommittedTransactionTime = 0;
- failedTransactionsCount = 0L;
+ failedTransactionsCount.set(0);
- failedReadTransactionsCount = 0L;
+ failedReadTransactionsCount.set(0);
- abortTransactionsCount = 0L;
+ abortTransactionsCount.set(0);
}
-
-
}
-package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
-
-/**
- * @author: syedbahm
- */
-public interface ShardStatsMBean {
- String getShardName();
-
- long getCommittedTransactionsCount();
-
- String getLeader();
-
- String getRaftState();
-
- long getReadOnlyTransactionCount();
-
- long getWriteOnlyTransactionCount();
-
- long getReadWriteTransactionCount();
-
- long getLastLogIndex();
-
- long getLastLogTerm();
-
- long getCurrentTerm();
-
- long getCommitIndex();
-
- long getLastApplied();
-
- String getLastCommittedTransactionTime();
-
- long getFailedTransactionsCount();
-
- long getFailedReadTransactionsCount();
-
- long getAbortTransactionsCount();
-
- void resetTransactionCounters();
-
-}
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
+
+import java.util.List;
+
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats;
+import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
+
+/**
+ * @author: syedbahm
+ */
+public interface ShardStatsMXBean {
+
+ String getShardName();
+
+ long getCommittedTransactionsCount();
+
+ long getReadOnlyTransactionCount();
+
+ long getWriteOnlyTransactionCount();
+
+ long getReadWriteTransactionCount();
+
+ long getLastLogIndex();
+
+ long getLastLogTerm();
+
+ long getCurrentTerm();
+
+ long getCommitIndex();
+
+ long getLastApplied();
+
+ String getLastCommittedTransactionTime();
+
+ long getFailedTransactionsCount();
+
+ long getAbortTransactionsCount();
+
+ long getFailedReadTransactionsCount();
+
+ String getLeader();
+
+ String getRaftState();
+
+ ThreadExecutorStats getDataStoreExecutorStats();
+
+ ThreadExecutorStats getNotificationMgrExecutorStats();
+
+ List<ListenerNotificationQueueStats> getCurrentNotificationMgrListenerQueueStats();
+
+ int getMaxNotificationMgrListenerQueueSize();
+
+ void resetTransactionCounters();
+}
package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
-
import java.util.List;
-public class ShardManagerInfo extends AbstractBaseMBean implements
- ShardManagerInfoMBean {
-
- private final String name;
- private final List<String> localShards;
-
- public ShardManagerInfo(String name, List<String> localShards) {
- this.name = name;
- this.localShards = localShards;
- }
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfoMBean {
- @Override protected String getMBeanName() {
- return name;
- }
+ public static String JMX_CATEGORY_SHARD_MANAGER = "ShardManager";
- @Override protected String getMBeanType() {
- return JMX_TYPE_DISTRIBUTED_DATASTORE;
- }
+ private final List<String> localShards;
- @Override protected String getMBeanCategory() {
- return JMX_CATEGORY_SHARD_MANAGER;
+ public ShardManagerInfo(String name, String mxBeanType, List<String> localShards) {
+ super(name, mxBeanType, JMX_CATEGORY_SHARD_MANAGER);
+ this.localShards = localShards;
}
- public static ShardManagerInfo createShardManagerMBean(String name, List<String> localShards){
- ShardManagerInfo shardManagerInfo = new ShardManagerInfo(name,
- localShards);
+ public static ShardManagerInfo createShardManagerMBean(String name, String mxBeanType,
+ List<String> localShards){
+ ShardManagerInfo shardManagerInfo = new ShardManagerInfo(name, mxBeanType, localShards);
shardManagerInfo.registerMBean();
return shardManagerInfo;
}
- @Override public List<String> getLocalShards() {
+ @Override
+ public List<String> getLocalShards() {
return localShards;
}
}
package org.opendaylight.controller.cluster.datastore.messages;
+import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable;
return new ReadDataReply(schemaContext,new NormalizedNodeToNodeCodec(schemaContext).decode(id, o.getNormalizedNode()));
}
+
+ public static ByteString getNormalizedNodeByteString(Object serializable){
+ ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable;
+ return ((ShardTransactionMessages.ReadDataReply) serializable).getNormalizedNode().toByteString();
+ }
}
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.osgi.framework.BundleContext;
+import scala.concurrent.duration.Duration;
+
public class DistributedConfigDataStoreProviderModule extends
org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModule {
private BundleContext bundleContext;
props = new ConfigProperties();
}
- return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
- new DistributedDataStoreProperties(
+ DatastoreContext datastoreContext = new DatastoreContext("DistributedConfigDatastore",
+ InMemoryDOMDataStoreConfigProperties.create(
props.getMaxShardDataChangeExecutorPoolSize().getValue(),
props.getMaxShardDataChangeExecutorQueueSize().getValue(),
props.getMaxShardDataChangeListenerQueueSize().getValue(),
- props.getShardTransactionIdleTimeoutInMinutes().getValue(),
- props.getOperationTimeoutInSeconds().getValue()), bundleContext);
+ props.getMaxShardDataStoreExecutorQueueSize().getValue()),
+ Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+ TimeUnit.MINUTES),
+ props.getOperationTimeoutInSeconds().getValue());
+
+ return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
+ datastoreContext, bundleContext);
}
public void setBundleContext(BundleContext bundleContext) {
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.osgi.framework.BundleContext;
+import scala.concurrent.duration.Duration;
+
public class DistributedOperationalDataStoreProviderModule extends
org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModule {
private BundleContext bundleContext;
props = new OperationalProperties();
}
- return DistributedDataStoreFactory.createInstance("operational",
- getOperationalSchemaServiceDependency(),
- new DistributedDataStoreProperties(
+ DatastoreContext datastoreContext = new DatastoreContext("DistributedOperationalDatastore",
+ InMemoryDOMDataStoreConfigProperties.create(
props.getMaxShardDataChangeExecutorPoolSize().getValue(),
props.getMaxShardDataChangeExecutorQueueSize().getValue(),
props.getMaxShardDataChangeListenerQueueSize().getValue(),
- props.getShardTransactionIdleTimeoutInMinutes().getValue(),
- props.getOperationTimeoutInSeconds().getValue()), bundleContext);
+ props.getMaxShardDataStoreExecutorQueueSize().getValue()),
+ Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+ TimeUnit.MINUTES),
+ props.getOperationTimeoutInSeconds().getValue());
+
+ return DistributedDataStoreFactory.createInstance("operational",
+ getOperationalSchemaServiceDependency(), datastoreContext, bundleContext);
}
public void setBundleContext(BundleContext bundleContext) {
type non-zero-uint16-type;
description "The maximum queue size for each shard's data store data change listeners.";
}
-
+
+ leaf max-shard-data-store-executor-queue-size {
+ default 5000;
+ type non-zero-uint16-type;
+ description "The maximum queue size for each shard's data store executor.";
+ }
+
leaf shard-transaction-idle-timeout-in-minutes {
default 10;
type non-zero-uint16-type;
@BeforeClass
public static void setUpClass() throws IOException {
- File journal = new File("journal");
-
- if(journal.exists()) {
- FileUtils.deleteDirectory(journal);
- }
System.setProperty("shard.persistent", "false");
system = ActorSystem.create("test");
public static void tearDownClass() throws IOException {
JavaTestKit.shutdownActorSystem(system);
system = null;
+ }
+ protected static void deletePersistenceFiles() throws IOException {
File journal = new File("journal");
if(journal.exists()) {
FileUtils.deleteDirectory(journal);
}
+
+ File snapshots = new File("snapshots");
+
+ if(snapshots.exists()){
+ FileUtils.deleteDirectory(snapshots);
+ }
+
}
protected ActorSystem getSystem() {
final SchemaContext schemaContext = TestModel.createTestContext();
DatastoreContext datastoreContext = new DatastoreContext();
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP, datastoreContext);
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext());
final ActorRef shard = getSystem().actorOf(props);
new Within(duration("10 seconds")) {
final DistributedDataStore distributedDataStore =
new DistributedDataStore(getSystem(), "config",
new MockClusterWrapper(), configuration,
- new DistributedDataStoreProperties());
+ new DatastoreContext());
distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
new DistributedDataStore(actorSystem, "config",
mock(ClusterWrapper.class), mock(Configuration.class),
- new DistributedDataStoreProperties());
+ new DatastoreContext());
verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config"));
}
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import scala.concurrent.duration.Duration;
import static junit.framework.Assert.assertEquals;
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
+ subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
new Within(duration("10 seconds")) {
@Override
protected void run() {
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
+ subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
new Within(duration("10 seconds")) {
@Override
protected void run() {
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.event.Logging;
import akka.testkit.JavaTestKit;
-
+import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class ShardTest extends AbstractActorTest {
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("config").build();
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT);
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
final ActorRef subject =
getSystem().actorOf(props, "testCreateTransactionChain");
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("config").build();
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT);
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
final ActorRef subject =
getSystem().actorOf(props, "testRegisterChangeListener");
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("config").build();
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT);
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
final ActorRef subject =
getSystem().actorOf(props, "testCreateTransaction");
.shardName("inventory").type("config").build();
peerAddresses.put(identifier, null);
- final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT);
+ final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
final ActorRef subject =
getSystem().actorOf(props, "testPeerAddressResolved");
}};
}
+ @Test
+ public void testApplySnapshot() throws ExecutionException, InterruptedException {
+ Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
+
+ final ShardIdentifier identifier =
+ ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config").build();
+
+ peerAddresses.put(identifier, null);
+ final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
+
+ TestActorRef<Shard> ref = TestActorRef.create(getSystem(), props);
+
+ ref.underlyingActor().updateSchemaContext(TestModel.createTestContext());
+
+ NormalizedNodeToNodeCodec codec =
+ new NormalizedNodeToNodeCodec(TestModel.createTestContext());
+
+ ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ NormalizedNode expected = ref.underlyingActor().readStore();
+
+ NormalizedNodeMessages.Container encode = codec
+ .encode(YangInstanceIdentifier.builder().build(), expected);
+
+
+ ref.underlyingActor().applySnapshot(encode.getNormalizedNode().toByteString());
+
+ NormalizedNode actual = ref.underlyingActor().readStore();
+
+ assertEquals(expected, actual);
+ }
+
+ private static class ShardTestKit extends JavaTestKit {
+
+ private ShardTestKit(ActorSystem actorSystem) {
+ super(actorSystem);
+ }
+
+ protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
+ // Wait for a specific log message to show up
+ final boolean result =
+ new JavaTestKit.EventFilter<Boolean>(logLevel
+ ) {
+ @Override
+ protected Boolean run() {
+ return true;
+ }
+ }.from(subject.path().toString())
+ .message(logMessage)
+ .occurrences(1).exec();
+
+ Assert.assertEquals(true, result);
+
+ }
+
+ }
+
+ @Test
+ public void testCreateSnapshot() throws IOException, InterruptedException {
+ new ShardTestKit(getSystem()) {{
+ final ShardIdentifier identifier =
+ ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config").build();
+
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
+ final ActorRef subject =
+ getSystem().actorOf(props, "testCreateSnapshot");
+
+ // Wait for a specific log message to show up
+ this.waitForLogMessage(Logging.Info.class, subject, "Switching from state Candidate to Leader");
+
+
+ new Within(duration("3 seconds")) {
+ @Override
+ protected void run() {
+
+ subject.tell(
+ new UpdateSchemaContext(TestModel.createTestContext()),
+ getRef());
+
+ subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
+ getRef());
+
+ waitForLogMessage(Logging.Debug.class, subject, "CaptureSnapshotReply received by actor");
+ }
+ };
+
+ Thread.sleep(2000);
+ deletePersistenceFiles();
+ }};
+ }
+
+ /**
+ * This test simply verifies that the applySnapShot logic will work
+ * @throws ReadFailedException
+ */
+ @Test
+ public void testInMemoryDataStoreRestore() throws ReadFailedException {
+ InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
+ MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
+
+ store.onGlobalContextUpdated(TestModel.createTestContext());
+
+ DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
+ putTransaction.write(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ commitTransaction(putTransaction);
+
+
+ NormalizedNode expected = readStore(store);
+
+ DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
+
+ writeTransaction.delete(YangInstanceIdentifier.builder().build());
+ writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
+
+ commitTransaction(writeTransaction);
+
+ NormalizedNode actual = readStore(store);
+
+ assertEquals(expected, actual);
+
+ }
+
+ private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
+ DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
+ CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
+ transaction.read(YangInstanceIdentifier.builder().build());
+
+ Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
+
+ NormalizedNode<?, ?> normalizedNode = optional.get();
+
+ transaction.close();
+
+ return normalizedNode;
+ }
+
+ private void commitTransaction(DOMStoreWriteTransaction transaction) {
+ DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+ ListenableFuture<Void> future =
+ commitCohort.preCommit();
+ try {
+ future.get();
+ future = commitCohort.commit();
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ }
+ }
+
private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
@Override
import org.junit.BeforeClass;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
private static final String mockShardName = "mockShardName";
+ private final ShardStats shardStats = new ShardStats(mockShardName, "DataStore");
+
@BeforeClass
public static void staticSetup() {
store.onGlobalContextUpdated(testSchemaContext);
public void testOnReceiveCreateTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
final Props props = ShardTransactionChain.props(store.createTransactionChain(),
- testSchemaContext, DATA_STORE_CONTEXT, mockShardName);
+ testSchemaContext, DATA_STORE_CONTEXT, shardStats);
final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction");
new Within(duration("1 seconds")) {
public void testOnReceiveCloseTransactionChain() throws Exception {
new JavaTestKit(getSystem()) {{
final Props props = ShardTransactionChain.props(store.createTransactionChain(),
- testSchemaContext, DATA_STORE_CONTEXT,mockShardName );
+ testSchemaContext, DATA_STORE_CONTEXT, shardStats );
final ActorRef subject = getSystem().actorOf(props, "testCloseTransactionChain");
new Within(duration("1 seconds")) {
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
private final DatastoreContext datastoreContext = new DatastoreContext();
+ private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
+
@BeforeClass
public static void staticSetup() {
store.onGlobalContextUpdated(testSchemaContext);
}
+ private ActorRef createShard(){
+ return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext(), TestModel.createTestContext()));
+ }
+
@Test(expected = ReadFailedException.class)
public void testNegativeReadWithReadOnlyTransactionClosed()
throws Throwable {
- final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
+ final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
public void testNegativeReadWithReadWriteTransactionClosed()
throws Throwable {
- final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
+ final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
public void testNegativeExistsWithReadWriteTransactionClosed()
throws Throwable {
- final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
+ final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
public void testNegativeWriteWithTransactionReady() throws Exception {
- final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
+ final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
public void testNegativeReadWriteWithTransactionReady() throws Exception {
- final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
+ final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
public void testNegativeMergeTransactionReady() throws Exception {
- final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
+ final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props, "testNegativeMergeTransactionReady");
public void testNegativeDeleteDataWhenTransactionReady() throws Exception {
- final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
+ final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
private DatastoreContext datastoreContext = new DatastoreContext();
+ private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
+
@BeforeClass
public static void staticSetup() {
store.onGlobalContextUpdated(testSchemaContext);
}
+ private ActorRef createShard(){
+ return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+ Collections.EMPTY_MAP, new DatastoreContext(), TestModel.createTestContext()));
+ }
+
@Test
public void testOnReceiveReadData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.EMPTY_MAP, new DatastoreContext()));
+ final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject = getSystem().actorOf(props, "testReadData");
new Within(duration("1 seconds")) {
@Test
public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.EMPTY_MAP, new DatastoreContext()));
+ final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
new Within(duration("1 seconds")) {
@Test
public void testOnReceiveDataExistsPositive() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.EMPTY_MAP, new DatastoreContext()));
+ final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
new Within(duration("1 seconds")) {
@Test
public void testOnReceiveDataExistsNegative() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.EMPTY_MAP, new DatastoreContext()));
+ final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
new Within(duration("1 seconds")) {
@Test
public void testOnReceiveWriteData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.EMPTY_MAP, new DatastoreContext()));
+ final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject =
getSystem().actorOf(props, "testWriteData");
@Test
public void testOnReceiveMergeData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.EMPTY_MAP, new DatastoreContext()));
+ final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject =
getSystem().actorOf(props, "testMergeData");
@Test
public void testOnReceiveDeleteData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.EMPTY_MAP, new DatastoreContext()));
+ final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject =
getSystem().actorOf(props, "testDeleteData");
@Test
public void testOnReceiveReadyTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.EMPTY_MAP, new DatastoreContext()));
+ final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject =
getSystem().actorOf(props, "testReadyTransaction");
@Test
public void testOnReceiveCloseTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.EMPTY_MAP, new DatastoreContext()));
+ final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject =
getSystem().actorOf(props, "testCloseTransaction");
@Test(expected=UnknownMessageException.class)
public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.EMPTY_MAP, new DatastoreContext()));
+ final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final TestActorRef subject = TestActorRef.apply(props,getSystem());
subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
@Test
public void testShardTransactionInactivity() {
- datastoreContext = new DatastoreContext(InMemoryDOMDataStoreConfigProperties.getDefault(),
- Duration.create(500, TimeUnit.MILLISECONDS));
+ datastoreContext = new DatastoreContext("Test",
+ InMemoryDOMDataStoreConfigProperties.getDefault(),
+ Duration.create(500, TimeUnit.MILLISECONDS), 5);
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.EMPTY_MAP, new DatastoreContext()));
+ final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+ testSchemaContext, datastoreContext, shardStats);
final ActorRef subject =
getSystem().actorOf(props, "testShardTransactionInactivity");
import org.junit.Test;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
private final DatastoreContext datastoreContext = new DatastoreContext();
+ private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
@BeforeClass
public static void staticSetup() {
private final FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS);
+ private ActorRef createShard(){
+ return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()));
+ }
@Test(expected = TestException.class)
public void testNegativeAbortResultsInException() throws Exception {
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.EMPTY_MAP, datastoreContext));
+ final ActorRef shard = createShard();
final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
.mock(DOMStoreThreePhaseCommitCohort.class);
final CompositeModification mockComposite =
Mockito.mock(CompositeModification.class);
final Props props =
- ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
+ ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
.create(getSystem(), props,
@Test(expected = OptimisticLockFailedException.class)
public void testNegativeCanCommitResultsInException() throws Exception {
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.EMPTY_MAP, datastoreContext));
+ final ActorRef shard = createShard();
final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
.mock(DOMStoreThreePhaseCommitCohort.class);
final CompositeModification mockComposite =
Mockito.mock(CompositeModification.class);
final Props props =
- ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
+ ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
.create(getSystem(), props,
@Test(expected = TestException.class)
public void testNegativePreCommitResultsInException() throws Exception {
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.EMPTY_MAP, datastoreContext));
+ final ActorRef shard = createShard();
final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
.mock(DOMStoreThreePhaseCommitCohort.class);
final CompositeModification mockComposite =
Mockito.mock(CompositeModification.class);
final Props props =
- ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
+ ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
.create(getSystem(), props,
public void testNegativeCommitResultsInException() throws Exception {
final TestActorRef<Shard> subject = TestActorRef.create(getSystem(),
- Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext),
+ Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()),
"testNegativeCommitResultsInException");
final ActorRef shardTransaction =
getSystem().actorOf(ShardTransaction.props(store.newReadWriteTransaction(), subject,
- testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()));
+ testSchemaContext, datastoreContext, shardStats));
ShardTransactionMessages.WriteData writeData =
ShardTransactionMessages.WriteData.newBuilder()
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+
+import java.lang.management.ManagementFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
@Before
public void setUp() throws Exception {
- shardStats = new ShardStats("shard-1");
+ shardStats = new ShardStats("shard-1", "DataStore");
shardStats.registerMBean();
- mbeanServer = shardStats.getMBeanServer();
+ mbeanServer = ManagementFactory.getPlatformMBeanServer();
String objectName =
- AbstractBaseMBean.BASE_JMX_PREFIX + "type=" + shardStats
+ AbstractMXBean.BASE_JMX_PREFIX + "type=" + shardStats
.getMBeanType() + ",Category=" +
shardStats.getMBeanCategory() + ",name=" +
shardStats.getMBeanName();
public void testGetShardName() throws Exception {
Object attribute = mbeanServer.getAttribute(testMBeanName, "ShardName");
- Assert.assertEquals((String) attribute, "shard-1");
+ Assert.assertEquals(attribute, "shard-1");
}
//now let us get from MBeanServer what is the transaction count.
Object attribute = mbeanServer.getAttribute(testMBeanName,
"CommittedTransactionsCount");
- Assert.assertEquals((Long) attribute, (Long) 3L);
+ Assert.assertEquals(attribute, 3L);
}
Assert.assertEquals(shardStats.getLastCommittedTransactionTime(),
sdf.format(new Date(0L)));
long millis = System.currentTimeMillis();
- shardStats.setLastCommittedTransactionTime(new Date(millis));
+ shardStats.setLastCommittedTransactionTime(millis);
//now let us get from MBeanServer what is the transaction count.
Object attribute = mbeanServer.getAttribute(testMBeanName,
"LastCommittedTransactionTime");
- Assert.assertEquals((String) attribute, sdf.format(new Date(millis)));
- Assert.assertNotEquals((String) attribute,
+ Assert.assertEquals(attribute, sdf.format(new Date(millis)));
+ Assert.assertNotEquals(attribute,
sdf.format(new Date(millis - 1)));
}
//now let us get from MBeanServer what is the transaction count.
Object attribute =
mbeanServer.getAttribute(testMBeanName, "FailedTransactionsCount");
- Assert.assertEquals((Long) attribute, (Long) 2L);
+ Assert.assertEquals(attribute, 2L);
}
@Test
//now let us get from MBeanServer what is the transaction count.
Object attribute =
mbeanServer.getAttribute(testMBeanName, "AbortTransactionsCount");
- Assert.assertEquals((Long) attribute, (Long) 2L);
+ Assert.assertEquals(attribute, 2L);
}
@Test
//now let us get from MBeanServer what is the transaction count.
Object attribute =
mbeanServer.getAttribute(testMBeanName, "FailedReadTransactionsCount");
- Assert.assertEquals((Long) attribute, (Long) 2L);
+ Assert.assertEquals(attribute, 2L);
}
@Test
//now let us get from MBeanServer what is the transaction count.
Object attribute = mbeanServer.getAttribute(testMBeanName,
"CommittedTransactionsCount");
- Assert.assertEquals((Long) attribute, (Long) 3L);
+ Assert.assertEquals(attribute, 3L);
//let us increment FailedReadTransactions count and then check
shardStats.incrementFailedReadTransactionsCount();
//now let us get from MBeanServer what is the transaction count.
attribute =
mbeanServer.getAttribute(testMBeanName, "FailedReadTransactionsCount");
- Assert.assertEquals((Long) attribute, (Long) 2L);
+ Assert.assertEquals(attribute, 2L);
//here we will reset the counters and check the above ones are 0 after reset
//now let us get from MBeanServer what is the transaction count.
attribute = mbeanServer.getAttribute(testMBeanName,
"CommittedTransactionsCount");
- Assert.assertEquals((Long) attribute, (Long) 0L);
+ Assert.assertEquals(attribute, 0L);
attribute =
mbeanServer.getAttribute(testMBeanName, "FailedReadTransactionsCount");
- Assert.assertEquals((Long) attribute, (Long) 0L);
+ Assert.assertEquals(attribute, 0L);
}
@Override
public java.lang.AutoCloseable createInstance() {
-
- InMemoryDOMDataStore dataStore = InMemoryDOMDataStoreFactory.create(
- "DOM-CFG", getSchemaServiceDependency(),
+ InMemoryDOMDataStore dataStore = InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency(),
+ getDebugTransactions(),
InMemoryDOMDataStoreConfigProperties.create(getMaxDataChangeExecutorPoolSize(),
getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize(),
getMaxDataStoreExecutorQueueSize()));
@Override
public java.lang.AutoCloseable createInstance() {
InMemoryDOMDataStore dataStore = InMemoryDOMDataStoreFactory.create("DOM-OPER", getSchemaServiceDependency(),
- InMemoryDOMDataStoreConfigProperties.create(getMaxDataChangeExecutorPoolSize(),
+ getDebugTransactions(), InMemoryDOMDataStoreConfigProperties.create(getMaxDataChangeExecutorPoolSize(),
getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize(),
getMaxDataStoreExecutorQueueSize()));
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-
import com.google.common.base.Objects;
import com.google.common.base.Objects.ToStringHelper;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.slf4j.Logger;
+
/**
* Abstract DOM Store Transaction
*
* Convenience super implementation of DOM Store transaction which provides
* common implementation of {@link #toString()} and {@link #getIdentifier()}.
- *
- *
*/
abstract class AbstractDOMStoreTransaction implements DOMStoreTransaction {
+ private final Throwable debugContext;
private final Object identifier;
- protected AbstractDOMStoreTransaction(final Object identifier) {
- this.identifier = Preconditions.checkNotNull(identifier,"Identifier must not be null.");
+ protected AbstractDOMStoreTransaction(final Object identifier, final boolean debug) {
+ this.identifier = Preconditions.checkNotNull(identifier, "Identifier must not be null.");
+ this.debugContext = debug ? new Throwable().fillInStackTrace() : null;
}
@Override
return identifier;
}
+ protected final void warnDebugContext(final Logger logger) {
+ if (debugContext != null) {
+ logger.warn("Transaction {} has been allocated in the following context", identifier, debugContext);
+ }
+ }
+
@Override
public final String toString() {
return addToStringAttributes(Objects.toStringHelper(this)).toString();
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-
import javax.annotation.concurrent.GuardedBy;
-
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
private final ExecutorService dataChangeListenerExecutor;
private final ExecutorService domStoreExecutor;
-
+ private final boolean debugTransactions;
private final String name;
private volatile AutoCloseable closeable;
public InMemoryDOMDataStore(final String name, final ExecutorService domStoreExecutor,
final ExecutorService dataChangeListenerExecutor) {
this(name, domStoreExecutor, dataChangeListenerExecutor,
- InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE);
+ InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE, false);
}
public InMemoryDOMDataStore(final String name, final ExecutorService domStoreExecutor,
- final ExecutorService dataChangeListenerExecutor, final int maxDataChangeListenerQueueSize) {
+ final ExecutorService dataChangeListenerExecutor, final int maxDataChangeListenerQueueSize,
+ final boolean debugTransactions) {
this.name = Preconditions.checkNotNull(name);
this.domStoreExecutor = Preconditions.checkNotNull(domStoreExecutor);
this.listeningExecutor = MoreExecutors.listeningDecorator(this.domStoreExecutor);
this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
+ this.debugTransactions = debugTransactions;
dataChangeListenerNotificationManager =
new QueuedNotificationManager<>(this.dataChangeListenerExecutor,
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
- return new SnapshotBackedReadTransaction(nextIdentifier(), dataTree.takeSnapshot());
+ return new SnapshotBackedReadTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot());
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- return new SnapshotBackedReadWriteTransaction(nextIdentifier(), dataTree.takeSnapshot(), this);
+ return new SnapshotBackedReadWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- return new SnapshotBackedWriteTransaction(nextIdentifier(), dataTree.takeSnapshot(), this);
+ return new SnapshotBackedWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
}
@Override
}
}
+ boolean getDebugTransactions() {
+ return debugTransactions;
+ }
+
@Override
public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
final YangInstanceIdentifier path, final L listener, final DataChangeScope scope) {
} else {
snapshot = dataTree.takeSnapshot();
}
- return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
+ return new SnapshotBackedReadTransaction(nextIdentifier(), getDebugTransactions(), snapshot);
}
@Override
snapshot = dataTree.takeSnapshot();
}
final SnapshotBackedReadWriteTransaction ret = new SnapshotBackedReadWriteTransaction(nextIdentifier(),
- snapshot, this);
+ getDebugTransactions(), snapshot, this);
latestOutstandingTx = ret;
return ret;
}
} else {
snapshot = dataTree.takeSnapshot();
}
- final SnapshotBackedWriteTransaction ret = new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot,
- this);
+ final SnapshotBackedWriteTransaction ret = new SnapshotBackedWriteTransaction(nextIdentifier(),
+ getDebugTransactions(), snapshot, this);
latestOutstandingTx = ret;
return ret;
}
} catch (ConflictingModificationAppliedException e) {
LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
e.getPath());
+ transaction.warnDebugContext(LOG);
throw new OptimisticLockFailedException("Optimistic lock failed.",e);
} catch (DataValidationFailedException e) {
LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
e.getPath(), e);
+ transaction.warnDebugContext(LOG);
throw new TransactionCommitFailedException("Data did not pass validation.",e);
}
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.md.sal.dom.store.impl;
import java.util.concurrent.ExecutorService;
-
import javax.annotation.Nullable;
-
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
public static InMemoryDOMDataStore create(final String name,
@Nullable final SchemaService schemaService,
@Nullable final InMemoryDOMDataStoreConfigProperties properties) {
+ return create(name, schemaService, false, properties);
+ }
+
+ /**
+ * Creates an InMemoryDOMDataStore instance.
+ *
+ * @param name the name of the data store
+ * @param schemaService the SchemaService to which to register the data store.
+ * @param debugTransactions enable transaction debugging
+ * @param properties configuration properties for the InMemoryDOMDataStore instance. If null,
+ * default property values are used.
+ * @return an InMemoryDOMDataStore instance
+ */
+ public static InMemoryDOMDataStore create(final String name,
+ @Nullable final SchemaService schemaService, final boolean debugTransactions,
+ @Nullable final InMemoryDOMDataStoreConfigProperties properties) {
InMemoryDOMDataStoreConfigProperties actualProperties = properties;
if(actualProperties == null) {
InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name,
domStoreExecutor, dataChangeListenerExecutor,
- actualProperties.getMaxDataChangeListenerQueueSize());
+ actualProperties.getMaxDataChangeListenerQueueSize(), debugTransactions);
if(schemaService != null) {
schemaService.registerSchemaContextListener(dataStore);
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
+
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static com.google.common.base.Preconditions.checkNotNull;
-
/**
*
* Implementation of read-only transaction backed by {@link DataTreeSnapshot}
private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadTransaction.class);
private volatile DataTreeSnapshot stableSnapshot;
- public SnapshotBackedReadTransaction(final Object identifier, final DataTreeSnapshot snapshot) {
- super(identifier);
+ public SnapshotBackedReadTransaction(final Object identifier, final boolean debug, final DataTreeSnapshot snapshot) {
+ super(identifier, debug);
this.stableSnapshot = Preconditions.checkNotNull(snapshot);
LOG.debug("ReadOnly Tx: {} allocated with snapshot {}", identifier, snapshot);
}
}
@Override
- public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
+ public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
LOG.debug("Tx: {} Exists: {}", getIdentifier(), path);
checkNotNull(path, "Path must not be null.");
import static com.google.common.base.Preconditions.checkNotNull;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-
/**
* Implementation of Read-Write transaction which is backed by {@link DataTreeSnapshot}
* and executed according to {@link TransactionReadyPrototype}.
* @param snapshot Snapshot which will be modified.
* @param readyImpl Implementation of ready method.
*/
- protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataTreeSnapshot snapshot,
- final TransactionReadyPrototype store) {
- super(identifier, snapshot, store);
+ protected SnapshotBackedReadWriteTransaction(final Object identifier, final boolean debug,
+ final DataTreeSnapshot snapshot, final TransactionReadyPrototype store) {
+ super(identifier, debug, snapshot, store);
}
@Override
}
}
- @Override public CheckedFuture<Boolean, ReadFailedException> exists(
- YangInstanceIdentifier path) {
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
try {
return Futures.immediateCheckedFuture(
read(path).checkedGet().isPresent());
import static com.google.common.base.Preconditions.checkState;
+import com.google.common.base.Objects.ToStringHelper;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Objects.ToStringHelper;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-
/**
* Implementation of Write transaction which is backed by
* {@link DataTreeSnapshot} and executed according to
* @param readyImpl
* Implementation of ready method.
*/
- public SnapshotBackedWriteTransaction(final Object identifier, final DataTreeSnapshot snapshot,
- final TransactionReadyPrototype readyImpl) {
- super(identifier);
+ public SnapshotBackedWriteTransaction(final Object identifier, final boolean debug,
+ final DataTreeSnapshot snapshot, final TransactionReadyPrototype readyImpl) {
+ super(identifier, debug);
mutableTree = snapshot.newModification();
this.readyImpl = Preconditions.checkNotNull(readyImpl, "readyImpl must not be null.");
LOG.debug("Write Tx: {} allocated with snapshot {}", identifier, snapshot);
-
module opendaylight-inmemory-datastore-provider {
yang-version 1;
import config { prefix config; revision-date 2013-04-05; }
import rpc-context { prefix rpcx; revision-date 2013-06-17; }
- import opendaylight-config-dom-datastore {prefix config-dom-store-spi;}
- import opendaylight-operational-dom-datastore {prefix operational-dom-store-spi;}
+ import opendaylight-config-dom-datastore {prefix config-dom-store-spi;}
+ import opendaylight-operational-dom-datastore {prefix operational-dom-store-spi;}
import opendaylight-md-sal-dom {prefix sal;}
description
// This is the definition of the service implementation as a module identity.
- identity inmemory-operational-datastore-provider {
- base config:module-type;
- config:provided-service operational-dom-store-spi:operational-dom-datastore;
- config:java-name-prefix InMemoryOperationalDataStoreProvider;
- }
+ identity inmemory-operational-datastore-provider {
+ base config:module-type;
+ config:provided-service operational-dom-store-spi:operational-dom-datastore;
+ config:java-name-prefix InMemoryOperationalDataStoreProvider;
+ }
grouping datastore-configuration {
leaf max-data-change-executor-queue-size {
type uint16;
description "The maximum queue size for the data change listeners.";
}
-
leaf max-data-store-executor-queue-size {
default 5000;
type uint16;
description "The maximum queue size for the data store executor.";
}
+ leaf debug-transactions {
+ type boolean;
+ default false;
+ description "Enable transaction lifecycle debugging.";
+ }
}
// Augments the 'configuration' choice node under modules/module.
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
+
+import java.util.concurrent.ExecutionException;
+
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import java.util.concurrent.ExecutionException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
public class InMemoryDataStoreTest {
Mockito.doThrow( new RuntimeException( "mock ex" ) ).when( mockSnapshot )
.readNode( Mockito.any( YangInstanceIdentifier.class ) );
- DOMStoreReadTransaction readTx = new SnapshotBackedReadTransaction( "1", mockSnapshot );
+ DOMStoreReadTransaction readTx = new SnapshotBackedReadTransaction("1", true, mockSnapshot);
doReadAndThrowEx( readTx );
}
.readNode( Mockito.any( YangInstanceIdentifier.class ) );
Mockito.doReturn( mockModification ).when( mockSnapshot ).newModification();
TransactionReadyPrototype mockReady = Mockito.mock( TransactionReadyPrototype.class );
- DOMStoreReadTransaction readTx = new SnapshotBackedReadWriteTransaction( "1", mockSnapshot, mockReady );
+ DOMStoreReadTransaction readTx = new SnapshotBackedReadWriteTransaction("1", false, mockSnapshot, mockReady);
doReadAndThrowEx( readTx );
}
- private void doReadAndThrowEx( DOMStoreReadTransaction readTx ) throws Throwable {
+ private void doReadAndThrowEx( final DOMStoreReadTransaction readTx ) throws Throwable {
try {
readTx.read(TestModel.TEST_PATH).get();
import com.google.common.util.concurrent.Futures;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class OperationProcessor implements Runnable {
+final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
private static final int MAX_TRANSACTION_OPERATIONS = 100;
private static final int OPERATION_QUEUE_DEPTH = 500;
private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
private final DataBroker dataBroker;
+ private final BindingTransactionChain transactionChain;
OperationProcessor(final DataBroker dataBroker) {
this.dataBroker = Preconditions.checkNotNull(dataBroker);
+ transactionChain = this.dataBroker.createTransactionChain(this);
}
void enqueueOperation(final TopologyOperation task) {
TopologyOperation op = queue.take();
LOG.debug("New operations available, starting transaction");
- final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
+ final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
+
int ops = 0;
do {
queue.poll();
}
}
+
+ @Override
+ public void onTransactionChainFailed(TransactionChain<?, ?> chain, AsyncTransaction<?, ?> transaction, Throwable cause) {
+ LOG.error("Failed to export Topology manager operations, Transaction {} failed.", transaction.getIdentifier(), cause);
+ }
+
+ @Override
+ public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
+ //NOOP
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (transactionChain != null) {
+ transactionChain.close();
+ }
+
+ }
}