<configfile finalname="configuration/initial/akka.conf">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/akkaconf</configfile>
<configfile finalname="configuration/initial/module-shards.conf">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/moduleshardconf</configfile>
<configfile finalname="configuration/initial/modules.conf">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/moduleconf</configfile>
+ <configfile finalname="etc/org.opendaylight.controller.cluster.datastore.cfg">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/cfg/datastore</configfile>
</feature>
<feature name='odl-clustering-test-app' version='${project.version}'>
-->
</feature>
+ <feature name="odl-nsf-service" description="OpenDaylight :: NSF :: Network Service Functions in Controller" version="${project.version}">
+ <feature version="${sal.version}">odl-adsal-all</feature>
+ <feature version="${project.version}">odl-nsf-controller-managers</feature>
+ <feature version="${project.version}">odl-adsal-controller-northbound</feature>
+ </feature>
+
<feature name="odl-nsf-managers" description="OpenDaylight :: AD-SAL :: Network Service Functions" version="${project.version}">
<feature version="${commons.opendaylight.version}">odl-base-all</feature>
<feature version="${sal.version}">odl-adsal-all</feature>
<bundle>mvn:org.opendaylight.controller/routing.dijkstra_implementation/${routing.dijkstra_implementation.version}</bundle>
</feature>
+ <feature name="odl-nsf-controller-managers" description="OpenDaylight :: AD-SAL :: Network Service Functions in Controller" version="${project.version}">
+ <feature version="${commons.opendaylight.version}">odl-base-all</feature>
+ <feature version="${sal.version}">odl-adsal-all</feature>
+ <bundle>mvn:org.opendaylight.controller/usermanager/${usermanager.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/usermanager.implementation/${usermanager.version}</bundle>
+
+ <bundle>mvn:org.opendaylight.controller/appauth/${appauth.version}</bundle>
+
+ <bundle>mvn:org.opendaylight.controller/connectionmanager/${connectionmanager.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/connectionmanager.implementation/${connectionmanager.version}</bundle>
+
+ <bundle>mvn:org.opendaylight.controller/containermanager/${containermanager.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/containermanager.implementation/${containermanager.version}</bundle>
+
+ <bundle>mvn:org.opendaylight.controller/statisticsmanager/${statisticsmanager.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/statisticsmanager.implementation/${statisticsmanager.implementation.version}</bundle>
+
+ <bundle>mvn:org.opendaylight.controller/switchmanager/${switchmanager.api.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/switchmanager.implementation/${switchmanager.implementation.version}</bundle>
+
+ <bundle>mvn:org.opendaylight.controller/forwardingrulesmanager/${forwardingrulesmanager.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/forwardingrulesmanager.implementation/${forwardingrulesmanager.implementation.version}</bundle>
+
+ <bundle>mvn:org.opendaylight.controller/topologymanager/${topologymanager.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/topologymanager.shell/${topologymanager.shell.version}</bundle>
+
+ <bundle>mvn:org.opendaylight.controller/hosttracker/${hosttracker.api.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/hosttracker.implementation/${hosttracker.implementation.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/hosttracker.shell/${hosttracker.shell.version}</bundle>
+
+ <bundle>mvn:org.opendaylight.controller/forwarding.staticrouting/${forwarding.staticrouting}</bundle>
+
+ <bundle>mvn:org.opendaylight.controller.thirdparty/net.sf.jung2/2.0.1</bundle>
+ <bundle>mvn:org.opendaylight.controller/routing.dijkstra_implementation/${routing.dijkstra_implementation.version}</bundle>
+ </feature>
+
<feature name="odl-adsal-northbound" description="OpenDaylight :: AD-SAL :: Northbound APIs" version="${project.version}">
<feature version="${commons.opendaylight.version}">odl-base-all</feature>
<feature version="${project.version}">odl-nsf-managers</feature>
<bundle>mvn:org.opendaylight.controller/topology.northbound/${topology.northbound.version}</bundle>
<bundle>mvn:org.opendaylight.controller/usermanager.northbound/${usermanager.northbound.version}</bundle>
</feature>
+
+ <feature name="odl-adsal-controller-northbound" description="OpenDaylight :: AD-SAL :: Northbound APIs in Controller" version="${project.version}">
+ <feature version="${commons.opendaylight.version}">odl-base-all</feature>
+ <feature version="${project.version}">odl-nsf-managers</feature>
+ <bundle>mvn:org.ow2.asm/asm-all/${asm.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/bundlescanner/${bundlescanner.api.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/bundlescanner.implementation/${bundlescanner.implementation.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/commons.northbound/${northbound.commons.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/connectionmanager.northbound/${connectionmanager.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/flowprogrammer.northbound/${flowprogrammer.northbound.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/hosttracker.northbound/${hosttracker.northbound.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/networkconfig.bridgedomain.northbound/${networkconfig.bridgedomain.northbound.version}</bundle>
+ <bundle>mvn:org.eclipse.persistence/org.eclipse.persistence.antlr/${eclipse.persistence.version}</bundle>
+ <bundle>mvn:org.eclipse.persistence/org.eclipse.persistence.core/${eclipse.persistence.version}</bundle>
+ <bundle>mvn:org.eclipse.persistence/org.eclipse.persistence.moxy/${eclipse.persistence.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/forwarding.staticrouting.northbound/${forwarding.staticrouting.northbound.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/statistics.northbound/${statistics.northbound.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/subnets.northbound/${subnets.northbound.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/switchmanager.northbound/${switchmanager.northbound.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/topology.northbound/${topology.northbound.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/usermanager.northbound/${usermanager.northbound.version}</bundle>
+ </feature>
</features>
<type>xml</type>
<classifier>moduleconf</classifier>
</artifact>
+ <artifact>
+ <file>${project.build.directory}/classes/initial/datastore.cfg</file>
+ <type>cfg</type>
+ <classifier>datastore</classifier>
+ </artifact>
</artifacts>
</configuration>
</execution>
--- /dev/null
+# This file specifies property settings for the clustered data store to control its behavior. A
+# property may be applied to every data store type ("config" and "operational") or can be customized
+# differently for each data store type by prefixing the data store type + '.'. For example, specifying
+# the "shard-election-timeout-factor" property would be applied to both data stores whereas specifying
+# "operational.shard-election-timeout-factor" would only apply to the "operational" data store. Similarly,
+# specifying "config.shard-election-timeout-factor" would only apply to the "config" data store.
+
+# The multiplication factor to be used to determine shard election timeout. The shard election timeout
+# is determined by multiplying shardHeartbeatIntervalInMillis with the shardElectionTimeoutFactor.
+#shard-election-timeout-factor=2
+
+# The interval at which a shard will send a heart beat message to its remote shard.
+#shard-heartbeat-interval-in-millis=500
+
+# The maximum amount of time to wait for a shard to elect a leader before failing an operation (eg transaction create).
+#shard-leader-election-timeout-in-seconds=30
+
+# Enable or disable data persistence.
+#persistent=true
+
+# Disable persistence for the operational data store by default.
+operational.persistent=false
+
+# The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.
+#shard-transaction-idle-timeout-in-minutes=10
+
+# The maximum amount of time a shard transaction three-phase commit can be idle without receiving the
+# next messages before it aborts the transaction.
+#shard-transaction-commit-timeout-in-seconds=30
+
+# The maximum allowed capacity for each shard's transaction commit queue.
+#shard-transaction-commit-queue-capacity=20000
+
+# The maximum amount of time to wait for a shard to initialize from persistence on startup before
+# failing an operation (eg transaction create and change listener registration).
+#shard-initialization-timeout-in-seconds=300
+
+# The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.
+#shard-journal-recovery-log-batch-size=5000
+
+# The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.
+#shard-snapshot-batch-count=20000
+
+# The percentage of Runtime.totalMemory() used by the in-memory journal log before a snapshot is to be taken.
+#shard-snapshot-data-threshold-percentage=12
+
+# The interval at which the leader of the shard will check if its majority followers are active and
+# term itself as isolated.
+#shard-isolated-leader-check-interval-in-millis=5000
+
+# The number of transaction modification operations (put, merge, delete) to batch before sending to the
+# shard transaction actor. Batching improves performance as less modifications messages are sent to the
+# actor and thus lessens the chance that the transaction actor's mailbox queue could get full.
+#shard-batched-modification-count=100
+
+# The maximum amount of time for akka operations (remote or local) to complete before failing.
+#operation-timeout-in-seconds=5
+
+# The initial number of transactions per second that are allowed before the data store should begin
+# applying back pressure. This number is only used as an initial guidance, subsequently the datastore
+# measures the latency for a commit and auto-adjusts the rate limit.
+#transaction-creation-initial-rate-limit=100
+
+# The maximum thread pool size for each shard's data store data change notification executor.
+#max-shard-data-change-executor-pool-size=20
+
+# The maximum queue size for each shard's data store data change notification executor.
+#max-shard-data-change-executor-queue-size=1000
+
+# The maximum queue size for each shard's data store data change listener.
+#max-shard-data-change-listener-queue-size=1000
+
+# The maximum queue size for each shard's data store executor.
+#max-shard-data-store-executor-queue-size=5000
+
public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+ public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT= 100;
private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
private boolean persistent = DEFAULT_PERSISTENT;
private ConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
- private DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
+ private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
+ private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
- private DatastoreContext(){
+ private DatastoreContext() {
setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
+ setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
+ }
+
+ private DatastoreContext(DatastoreContext other) {
+ this.dataStoreProperties = other.dataStoreProperties;
+ this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
+ this.operationTimeoutInSeconds = other.operationTimeoutInSeconds;
+ this.dataStoreMXBeanType = other.dataStoreMXBeanType;
+ this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
+ this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
+ this.shardInitializationTimeout = other.shardInitializationTimeout;
+ this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
+ this.persistent = other.persistent;
+ this.configurationReader = other.configurationReader;
+ this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
+ this.dataStoreType = other.dataStoreType;
+ this.shardBatchedModificationCount = other.shardBatchedModificationCount;
+
+ setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
+ setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
+ setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
+ setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
+ setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
+ setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
}
public static Builder newBuilder() {
- return new Builder();
+ return new Builder(new DatastoreContext());
+ }
+
+ public static Builder newBuilderFrom(DatastoreContext context) {
+ return new Builder(new DatastoreContext(context));
}
public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
}
- private void setSnapshotBatchCount(int shardSnapshotBatchCount) {
+ private void setSnapshotBatchCount(long shardSnapshotBatchCount) {
raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
}
+ public int getShardBatchedModificationCount() {
+ return shardBatchedModificationCount;
+ }
+
public static class Builder {
- private DatastoreContext datastoreContext = new DatastoreContext();
+ private final DatastoreContext datastoreContext;
+ private int maxShardDataChangeExecutorPoolSize =
+ InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
+ private int maxShardDataChangeExecutorQueueSize =
+ InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE;
+ private int maxShardDataChangeListenerQueueSize =
+ InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE;
+ private int maxShardDataStoreExecutorQueueSize =
+ InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
+
+ private Builder(DatastoreContext datastoreContext) {
+ this.datastoreContext = datastoreContext;
+
+ if(datastoreContext.getDataStoreProperties() != null) {
+ maxShardDataChangeExecutorPoolSize =
+ datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
+ maxShardDataChangeExecutorQueueSize =
+ datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorQueueSize();
+ maxShardDataChangeListenerQueueSize =
+ datastoreContext.getDataStoreProperties().getMaxDataChangeListenerQueueSize();
+ maxShardDataStoreExecutorQueueSize =
+ datastoreContext.getDataStoreProperties().getMaxDataStoreExecutorQueueSize();
+ }
+ }
+
+ public Builder boundedMailboxCapacity(int boundedMailboxCapacity) {
+ // TODO - this is defined in the yang DataStoreProperties but not currently used.
+ return this;
+ }
- public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
- datastoreContext.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
+ public Builder enableMetricCapture(boolean enableMetricCapture) {
+ // TODO - this is defined in the yang DataStoreProperties but not currently used.
return this;
}
+
+ public Builder shardTransactionIdleTimeout(long timeout, TimeUnit unit) {
+ datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit);
+ return this;
+ }
+
+ public Builder shardTransactionIdleTimeoutInMinutes(long timeout) {
+ return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
+ }
+
public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) {
datastoreContext.operationTimeoutInSeconds = operationTimeoutInSeconds;
return this;
return this;
}
- public Builder dataStoreProperties(InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
- datastoreContext.dataStoreProperties = dataStoreProperties;
- return this;
- }
-
public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) {
datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
return this;
return this;
}
+ public Builder shardInitializationTimeoutInSeconds(long timeout) {
+ return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
+ }
+
public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) {
datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
return this;
}
+ public Builder shardLeaderElectionTimeoutInSeconds(long timeout) {
+ return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
+ }
+
public Builder configurationReader(ConfigurationReader configurationReader){
datastoreContext.configurationReader = configurationReader;
return this;
return this;
}
+ public Builder shardBatchedModificationCount(int shardBatchedModificationCount) {
+ datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
+ return this;
+ }
+
+ public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
+ this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
+ return this;
+ }
+
+ public Builder maxShardDataChangeExecutorQueueSize(int maxShardDataChangeExecutorQueueSize) {
+ this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
+ return this;
+ }
+
+ public Builder maxShardDataChangeListenerQueueSize(int maxShardDataChangeListenerQueueSize) {
+ this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
+ return this;
+ }
+
+ public Builder maxShardDataStoreExecutorQueueSize(int maxShardDataStoreExecutorQueueSize) {
+ this.maxShardDataStoreExecutorQueueSize = maxShardDataStoreExecutorQueueSize;
+ return this;
+ }
+
public DatastoreContext build() {
+ datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
+ maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
+ maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
return datastoreContext;
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that overlays DatastoreContext settings with settings obtained from an OSGi Config Admin
+ * service.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreContextConfigAdminOverlay implements AutoCloseable {
+ public static final String CONFIG_ID = "org.opendaylight.controller.cluster.datastore";
+
+ private static final Logger LOG = LoggerFactory.getLogger(DatastoreContextConfigAdminOverlay.class);
+
+ private final DatastoreContextIntrospector introspector;
+ private final BundleContext bundleContext;
+
+ public DatastoreContextConfigAdminOverlay(DatastoreContextIntrospector introspector, BundleContext bundleContext) {
+ this.introspector = introspector;
+ this.bundleContext = bundleContext;
+
+ ServiceReference<ConfigurationAdmin> configAdminServiceReference =
+ bundleContext.getServiceReference(ConfigurationAdmin.class);
+ if(configAdminServiceReference == null) {
+ LOG.warn("No ConfigurationAdmin service found");
+ } else {
+ overlaySettings(configAdminServiceReference);
+ }
+ }
+
+ private void overlaySettings(ServiceReference<ConfigurationAdmin> configAdminServiceReference) {
+ try {
+ ConfigurationAdmin configAdmin = bundleContext.getService(configAdminServiceReference);
+
+ Configuration config = configAdmin.getConfiguration(CONFIG_ID);
+ if(config != null) {
+ Dictionary<String, Object> properties = config.getProperties();
+
+ LOG.debug("Overlaying settings: {}", properties);
+
+ introspector.update(properties);
+ } else {
+ LOG.debug("No Configuration found for {}", CONFIG_ID);
+ }
+ } catch (IOException e) {
+ LOG.error("Error obtaining Configuration for pid {}", CONFIG_ID, e);
+ } catch(IllegalStateException e) {
+ // Ignore - indicates the bundleContext has been closed.
+ } finally {
+ try {
+ bundleContext.ungetService(configAdminServiceReference);
+ } catch (Exception e) {
+ LOG.debug("Error from ungetService", e);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.primitives.Primitives;
+import java.beans.BeanInfo;
+import java.beans.ConstructorProperties;
+import java.beans.IntrospectionException;
+import java.beans.Introspector;
+import java.beans.MethodDescriptor;
+import java.beans.PropertyDescriptor;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.text.WordUtils;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Introspects on a DatastoreContext instance to set its properties via reflection.
+ * i
+ * @author Thomas Pantelis
+ */
+public class DatastoreContextIntrospector {
+ private static final Logger LOG = LoggerFactory.getLogger(DatastoreContextIntrospector.class);
+
+ private static final Map<String, Class<?>> dataStorePropTypes = new HashMap<>();
+
+ private static final Map<Class<?>, Constructor<?>> constructors = new HashMap<>();
+
+ private static final Map<Class<?>, Method> yangTypeGetters = new HashMap<>();
+
+ private static final Map<String, Method> builderSetters = new HashMap<>();
+
+ static {
+ try {
+ introspectDatastoreContextBuilder();
+ introspectDataStoreProperties();
+ introspectPrimitiveTypes();
+ } catch (IntrospectionException e) {
+ LOG.error("Error initializing DatastoreContextIntrospector", e);
+ }
+ }
+
+ /**
+ * Introspects each primitive wrapper (ie Integer, Long etc) and String type to find the
+ * constructor that takes a single String argument. For primitive wrappers, this constructor
+ * converts from a String representation.
+ */
+ private static void introspectPrimitiveTypes() {
+
+ Set<Class<?>> primitives = ImmutableSet.<Class<?>>builder().addAll(
+ Primitives.allWrapperTypes()).add(String.class).build();
+ for(Class<?> primitive: primitives) {
+ try {
+ processPropertyType(primitive);
+ } catch (Exception e) {
+ // Ignore primitives that can't be constructed from a String, eg Character and Void.
+ }
+ }
+ }
+
+ /**
+ * Introspects the DatastoreContext.Builder class to find all its setter methods that we will
+ * invoke via reflection. We can't use the bean Introspector here as the Builder setters don't
+ * follow the bean property naming convention, ie setter prefixed with "set", so look for all
+ * the methods that return Builder.
+ */
+ private static void introspectDatastoreContextBuilder() {
+ for(Method method: Builder.class.getMethods()) {
+ if(Builder.class.equals(method.getReturnType())) {
+ builderSetters.put(method.getName(), method);
+ }
+ }
+ }
+
+ /**
+ * Introspects the DataStoreProperties interface that is generated from the DataStoreProperties
+ * yang grouping. We use the bean Introspector to find the types of all the properties defined
+ * in the interface (this is the type returned from the getter method). For each type, we find
+ * the appropriate constructor that we will use.
+ */
+ private static void introspectDataStoreProperties() throws IntrospectionException {
+ BeanInfo beanInfo = Introspector.getBeanInfo(DataStoreProperties.class);
+ for(PropertyDescriptor desc: beanInfo.getPropertyDescriptors()) {
+ processDataStoreProperty(desc.getName(), desc.getPropertyType());
+ }
+
+ // Getter methods that return Boolean and start with "is" instead of "get" aren't recognized as
+ // properties and thus aren't returned from getPropertyDescriptors. A getter starting with
+ // "is" is only supported if it returns primitive boolean. So we'll check for these via
+ // getMethodDescriptors.
+ for(MethodDescriptor desc: beanInfo.getMethodDescriptors()) {
+ String methodName = desc.getName();
+ if(Boolean.class.equals(desc.getMethod().getReturnType()) && methodName.startsWith("is")) {
+ String propertyName = WordUtils.uncapitalize(methodName.substring(2));
+ processDataStoreProperty(propertyName, Boolean.class);
+ }
+ }
+ }
+
+ /**
+ * Processes a property defined on the DataStoreProperties interface.
+ */
+ private static void processDataStoreProperty(String name, Class<?> propertyType) {
+ Preconditions.checkArgument(builderSetters.containsKey(name), String.format(
+ "DataStoreProperties property \"%s\" does not have corresponding setter in DatastoreContext.Builder", name));
+ try {
+ processPropertyType(propertyType);
+ dataStorePropTypes.put(name, propertyType);
+ } catch (Exception e) {
+ LOG.error("Error finding constructor for type {}", propertyType, e);
+ }
+ }
+
+ /**
+ * Finds the appropriate constructor for the specified type that we will use to construct
+ * instances.
+ */
+ private static void processPropertyType(Class<?> propertyType) throws Exception {
+ Class<?> wrappedType = Primitives.wrap(propertyType);
+ if(constructors.containsKey(wrappedType)) {
+ return;
+ }
+
+ // If the type is a primitive (or String type), we look for the constructor that takes a
+ // single String argument, which, for primitives, validates and converts from a String
+ // representation which is the form we get on ingress.
+ if(propertyType.isPrimitive() || Primitives.isWrapperType(propertyType) ||
+ propertyType.equals(String.class))
+ {
+ constructors.put(wrappedType, propertyType.getConstructor(String.class));
+ } else {
+ // This must be a yang-defined type. We need to find the constructor that takes a
+ // primitive as the only argument. This will be used to construct instances to perform
+ // validation (eg range checking). The yang-generated types have a couple single-argument
+ // constructors but the one we want has the bean ConstructorProperties annotation.
+ for(Constructor<?> ctor: propertyType.getConstructors()) {
+ ConstructorProperties ctorPropsAnnotation = ctor.getAnnotation(ConstructorProperties.class);
+ if(ctor.getParameterTypes().length == 1 && ctorPropsAnnotation != null) {
+ findYangTypeGetter(propertyType, ctorPropsAnnotation.value()[0]);
+ constructors.put(propertyType, ctor);
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Finds the getter method on a yang-generated type for the specified property name.
+ */
+ private static void findYangTypeGetter(Class<?> type, String propertyName)
+ throws Exception {
+ for(PropertyDescriptor desc: Introspector.getBeanInfo(type).getPropertyDescriptors()) {
+ if(desc.getName().equals(propertyName)) {
+ yangTypeGetters.put(type, desc.getReadMethod());
+ return;
+ }
+ }
+
+ throw new IllegalArgumentException(String.format(
+ "Getter method for constructor property %s not found for YANG type %s",
+ propertyName, type));
+ }
+
+ private DatastoreContext context;
+
+ public DatastoreContextIntrospector(DatastoreContext context) {
+ this.context = context;
+ }
+
+ public DatastoreContext getContext() {
+ return context;
+ }
+
+ /**
+ * Applies the given properties to the cached DatastoreContext and yields a new DatastoreContext
+ * instance which can be obtained via {@link getContext}.
+ *
+ * @param properties the properties to apply
+ * @return true if the cached DatastoreContext was updated, false otherwise.
+ */
+ public boolean update(Dictionary<String, Object> properties) {
+ if(properties == null || properties.isEmpty()) {
+ return false;
+ }
+
+ Builder builder = DatastoreContext.newBuilderFrom(context);
+
+ final String dataStoreTypePrefix = context.getDataStoreType() + '.';
+
+ // Sort the property keys by putting the names prefixed with the data store type last. This
+ // is done so data store specific settings are applied after global settings.
+ ArrayList<String> keys = Collections.list(properties.keys());
+ Collections.sort(keys, new Comparator<String>() {
+ @Override
+ public int compare(String key1, String key2) {
+ return key1.startsWith(dataStoreTypePrefix) ? 1 :
+ key2.startsWith(dataStoreTypePrefix) ? -1 : key1.compareTo(key2);
+ }
+ });
+
+ boolean updated = false;
+ for(String key: keys) {
+ Object value = properties.get(key);
+ try {
+ // If the key is prefixed with the data store type, strip it off.
+ if(key.startsWith(dataStoreTypePrefix)) {
+ key = key.replaceFirst(dataStoreTypePrefix, "");
+ }
+
+ key = convertToCamelCase(key);
+
+ // Convert the value to the right type.
+ value = convertValue(key, value);
+ if(value == null) {
+ continue;
+ }
+
+ LOG.debug("Converted value for property {}: {} ({})",
+ key, value, value.getClass().getSimpleName());
+
+ // Call the setter method on the Builder instance.
+ Method setter = builderSetters.get(key);
+ setter.invoke(builder, constructorValueRecursively(
+ Primitives.wrap(setter.getParameterTypes()[0]), value.toString()));
+
+ updated = true;
+
+ } catch (Exception e) {
+ LOG.error("Error converting value ({}) for property {}", value, key, e);
+ }
+ }
+
+ if(updated) {
+ context = builder.build();
+ }
+
+ return updated;
+ }
+
+ private String convertToCamelCase(String inString) {
+ String str = inString.trim();
+ if(StringUtils.contains(str, '-') || StringUtils.contains(str, ' ')) {
+ str = inString.replace('-', ' ');
+ str = WordUtils.capitalizeFully(str);
+ str = StringUtils.deleteWhitespace(str);
+ }
+
+ return StringUtils.uncapitalize(str);
+ }
+
+ private Object convertValue(String name, Object from) throws Exception {
+ Class<?> propertyType = dataStorePropTypes.get(name);
+ if(propertyType == null) {
+ LOG.debug("Property not found for {}", name);
+ return null;
+ }
+
+ LOG.debug("Type for property {}: {}, converting value {} ({})",
+ name, propertyType.getSimpleName(), from, from.getClass().getSimpleName());
+
+ // Recurse the chain of constructors depth-first to get the resulting value. Eg, if the
+ // property type is the yang-generated NonZeroUint32Type, it's constructor takes a Long so
+ // we have to first construct a Long instance from the input value.
+ Object converted = constructorValueRecursively(propertyType, from.toString());
+
+ // If the converted type is a yang-generated type, call the getter to obtain the actual value.
+ Method getter = yangTypeGetters.get(converted.getClass());
+ if(getter != null) {
+ converted = getter.invoke(converted);
+ }
+
+ return converted;
+ }
+
+ private Object constructorValueRecursively(Class<?> toType, Object fromValue) throws Exception {
+ LOG.debug("convertValueRecursively - toType: {}, fromValue {} ({})",
+ toType.getSimpleName(), fromValue, fromValue.getClass().getSimpleName());
+
+ Constructor<?> ctor = constructors.get(toType);
+
+ LOG.debug("Found {}", ctor);
+
+ if(ctor == null) {
+ throw new IllegalArgumentException(String.format("Constructor not found for type %s", toType));
+ }
+
+ Object value = fromValue;
+
+ // Since the original input type is a String, once we find a constructor that takes a String
+ // argument, we're done recursing.
+ if(!ctor.getParameterTypes()[0].equals(String.class)) {
+ value = constructorValueRecursively(ctor.getParameterTypes()[0], fromValue);
+ }
+
+ return ctor.newInstance(value);
+ }
+}
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
private final ActorContext actorContext;
+ private AutoCloseable closeable;
+
+ private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
+
public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
Configuration configuration, DatastoreContext datastoreContext) {
Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
ShardManager.props(cluster, configuration, datastoreContext)
.withDispatcher(shardDispatcher).withMailbox(ActorContext.MAILBOX), shardManagerId ),
cluster, configuration, datastoreContext);
+
+ datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(datastoreContext.getDataStoreMXBeanType());
+ datastoreConfigMXBean.setContext(datastoreContext);
+ datastoreConfigMXBean.registerMBean();
}
public DistributedDataStore(ActorContext actorContext) {
this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
}
+ public void setCloseable(AutoCloseable closeable) {
+ this.closeable = closeable;
+ }
+
@SuppressWarnings("unchecked")
@Override
public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
}
@Override
- public void close() throws Exception {
+ public void close() {
+ datastoreConfigMXBean.unregisterMBean();
+
+ if(closeable != null) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ LOG.debug("Error closing insance", e);
+ }
+ }
+
actorContext.shutdown();
}
private static volatile ActorSystem persistentActorSystem = null;
public static DistributedDataStore createInstance(SchemaService schemaService,
- DatastoreContext datastoreContext, BundleContext bundleContext) {
+ DatastoreContext datastoreContext, BundleContext bundleContext) {
+
+ DatastoreContextIntrospector introspector = new DatastoreContextIntrospector(datastoreContext);
+ DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+ introspector, bundleContext);
ActorSystem actorSystem = getOrCreateInstance(bundleContext, datastoreContext.getConfigurationReader());
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
- final DistributedDataStore dataStore =
- new DistributedDataStore(actorSystem, new ClusterWrapperImpl(actorSystem),
- config, datastoreContext);
+ final DistributedDataStore dataStore = new DistributedDataStore(actorSystem,
+ new ClusterWrapperImpl(actorSystem), config, introspector.getContext());
ShardStrategyFactory.setConfiguration(config);
schemaService.registerSchemaContextListener(dataStore);
+
+ dataStore.setCloseable(overlay);
return dataStore;
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+/**
+ * Implementation of TransactionContextImpl used when talking to a pre-Lithium controller that doesn't
+ * support the BatchedModifications message.
+ *
+ * @author Thomas Pantelis
+ */
+class LegacyTransactionContextImpl extends TransactionContextImpl {
+
+ LegacyTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
+ ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+ short remoteTransactionVersion, OperationCompleter operationCompleter) {
+ super(transactionPath, actor, identifier, actorContext, schemaContext, isTxActorLocal,
+ remoteTransactionVersion, operationCompleter);
+ }
+
+ @Override
+ public void deleteData(YangInstanceIdentifier path) {
+ recordedOperationFutures.add(executeOperationAsync(
+ new DeleteData(path, getRemoteTransactionVersion())));
+ }
+
+ @Override
+ public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ recordedOperationFutures.add(executeOperationAsync(
+ new MergeData(path, data, getRemoteTransactionVersion())));
+ }
+
+ @Override
+ public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ recordedOperationFutures.add(executeOperationAsync(
+ new WriteData(path, data, getRemoteTransactionVersion())));
+ }
+}
import akka.dispatch.OnComplete;
import com.google.common.base.Preconditions;
import java.util.concurrent.Semaphore;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
final class OperationCompleter extends OnComplete<Object> {
private final Semaphore operationLimiter;
}
@Override
- public void onComplete(Throwable throwable, Object o){
- this.operationLimiter.release();
+ public void onComplete(Throwable throwable, Object message) {
+ if(message instanceof BatchedModificationsReply) {
+ this.operationLimiter.release(((BatchedModificationsReply)message).getNumBatched());
+ } else {
+ this.operationLimiter.release();
+ }
}
}
\ No newline at end of file
try {
final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future = transaction.read(path);
Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
- ReadDataReply readDataReply = new ReadDataReply(optional.orNull());
+ ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
- sender().tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion): readDataReply), self());
+ sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
} catch (Exception e) {
LOG.debug(String.format("Unexpected error reading path %s", path), e);
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
*/
public class ShardWriteTransaction extends ShardTransaction {
- private final MutableCompositeModification modification = new MutableCompositeModification();
+ private final MutableCompositeModification compositeModification = new MutableCompositeModification();
private final DOMStoreWriteTransaction transaction;
public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
@Override
public void handleReceive(Object message) throws Exception {
- if (message instanceof WriteData) {
- writeData(transaction, (WriteData) message, !SERIALIZED_REPLY);
-
- } else if (message instanceof MergeData) {
- mergeData(transaction, (MergeData) message, !SERIALIZED_REPLY);
-
- } else if (message instanceof DeleteData) {
- deleteData(transaction, (DeleteData) message, !SERIALIZED_REPLY);
-
+ if (message instanceof BatchedModifications) {
+ batchedModifications((BatchedModifications)message);
} else if (message instanceof ReadyTransaction) {
readyTransaction(transaction, !SERIALIZED_REPLY);
-
+ } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
+ readyTransaction(transaction, SERIALIZED_REPLY);
} else if(WriteData.isSerializedType(message)) {
writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY);
} else if(DeleteData.isSerializedType(message)) {
deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY);
- } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
- readyTransaction(transaction, SERIALIZED_REPLY);
-
} else if (message instanceof GetCompositedModification) {
// This is here for testing only
- getSender().tell(new GetCompositeModificationReply(modification), getSelf());
+ getSender().tell(new GetCompositeModificationReply(compositeModification), getSelf());
} else {
super.handleReceive(message);
}
}
+ private void batchedModifications(BatchedModifications batched) {
+ try {
+ for(Modification modification: batched.getModifications()) {
+ compositeModification.addModification(modification);
+ modification.apply(transaction);
+ }
+
+ getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
+ } catch (Exception e) {
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
+ }
+
private void writeData(DOMStoreWriteTransaction transaction, WriteData message,
boolean returnSerialized) {
LOG.debug("writeData at path : {}", message.getPath());
- modification.addModification(
+ compositeModification.addModification(
new WriteModification(message.getPath(), message.getData()));
try {
transaction.write(message.getPath(), message.getData());
boolean returnSerialized) {
LOG.debug("mergeData at path : {}", message.getPath());
- modification.addModification(
+ compositeModification.addModification(
new MergeModification(message.getPath(), message.getData()));
try {
boolean returnSerialized) {
LOG.debug("deleteData at path : {}", message.getPath());
- modification.addModification(new DeleteModification(message.getPath()));
+ compositeModification.addModification(new DeleteModification(message.getPath()));
try {
transaction.delete(message.getPath());
DeleteDataReply deleteDataReply = DeleteDataReply.INSTANCE;
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
- cohort, modification, returnSerialized), getContext());
+ cohort, compositeModification, returnSerialized), getContext());
// The shard will handle the commit from here so we're no longer needed - self-destruct.
getSelf().tell(PoisonPill.getInstance(), getSelf());
import com.google.common.util.concurrent.SettableFuture;
import java.util.List;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
-import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
-import org.opendaylight.controller.cluster.datastore.messages.VersionedSerializableMessage;
-import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
-final class TransactionContextImpl extends AbstractTransactionContext {
+class TransactionContextImpl extends AbstractTransactionContext {
private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
private final ActorContext actorContext;
private final ActorSelection actor;
private final boolean isTxActorLocal;
private final short remoteTransactionVersion;
- private final OperationCompleter operationCompleter;
+ private final OperationCompleter operationCompleter;
+ private BatchedModifications batchedModifications;
TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
ActorContext actorContext, SchemaContext schemaContext,
return actor;
}
- private Future<Object> executeOperationAsync(SerializableMessage msg) {
- return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
+ protected short getRemoteTransactionVersion() {
+ return remoteTransactionVersion;
}
- private Future<Object> executeOperationAsync(VersionedSerializableMessage msg) {
- return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
- msg.toSerializable(remoteTransactionVersion)));
+ protected Future<Object> executeOperationAsync(SerializableMessage msg) {
+ return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
}
@Override
LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
identifier, recordedOperationFutures.size());
+ // Send the remaining batched modifications if any.
+
+ sendBatchedModifications();
+
// Send the ReadyTransaction message to the Tx actor.
final Future<Object> replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
}, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
}
+ private void batchModification(Modification modification) {
+ if(batchedModifications == null) {
+ batchedModifications = new BatchedModifications(remoteTransactionVersion);
+ }
+
+ batchedModifications.addModification(modification);
+
+ if(batchedModifications.getModifications().size() >=
+ actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
+ sendBatchedModifications();
+ }
+ }
+
+ private void sendBatchedModifications() {
+ if(batchedModifications != null) {
+ LOG.debug("Tx {} sending {} batched modifications", identifier,
+ batchedModifications.getModifications().size());
+
+ recordedOperationFutures.add(executeOperationAsync(batchedModifications));
+ batchedModifications = null;
+ }
+ }
+
@Override
public void deleteData(YangInstanceIdentifier path) {
LOG.debug("Tx {} deleteData called path = {}", identifier, path);
- recordedOperationFutures.add(executeOperationAsync(new DeleteData(path)));
+ batchModification(new DeleteModification(path));
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", identifier, path);
- recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data)));
+ batchModification(new MergeModification(path, data));
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", identifier, path);
- recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data)));
+ batchModification(new WriteModification(path, data));
}
@Override
LOG.debug("Tx {} readData called path = {}", identifier, path);
+ // Send the remaining batched modifications if any.
+
+ sendBatchedModifications();
+
// If there were any previous recorded put/merge/delete operation reply Futures then we
// must wait for them to successfully complete. This is necessary to honor the read
// uncommitted semantics of the public API contract. If any one fails then fail the read.
LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+ // Send the remaining batched modifications if any.
+
+ sendBatchedModifications();
+
// If there were any previous recorded put/merge/delete operation reply Futures then we
// must wait for them to successfully complete. This is necessary to honor the read
// uncommitted semantics of the public API contract. If any one fails then fail this
private void throttleOperation(int acquirePermits) {
try {
- if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
+ if(!operationLimiter.tryAcquire(acquirePermits,
+ actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
}
} catch (InterruptedException e) {
private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
String transactionPath = reply.getTransactionPath();
- LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath);
+ LOG.debug("Tx {} Received {}", identifier, reply);
ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
// Check if TxActor is created in the same node
boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
- return new TransactionContextImpl(transactionPath, transactionActor, identifier,
- actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+ if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
+ return new TransactionContextImpl(transactionPath, transactionActor, identifier,
+ actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+ } else {
+ return new LegacyTransactionContextImpl(transactionPath, transactionActor, identifier,
+ actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+ }
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans;
+
+
+/**
+ * MXBean interface for data store configuration.
+ *
+ * @author Thomas Pantelis
+ */
+public interface DatastoreConfigurationMXBean {
+ long getShardTransactionIdleTimeoutInSeconds();
+
+ long getOperationTimeoutInSeconds();
+
+ long getShardHeartbeatIntervalInMillis();
+
+ int getShardJournalRecoveryLogBatchSize();
+
+ long getShardIsolatedLeaderCheckIntervalInMillis();
+
+ long getShardElectionTimeoutFactor();
+
+ int getShardSnapshotDataThresholdPercentage();
+
+ long getShardSnapshotBatchCount();
+
+ long getShardTransactionCommitTimeoutInSeconds();
+
+ int getShardTransactionCommitQueueCapacity();
+
+ long getShardInitializationTimeoutInSeconds();
+
+ long getShardLeaderElectionTimeoutInSeconds();
+
+ boolean isPersistent();
+
+ long getTransactionCreationInitialRateLimit();
+
+ int getMaxShardDataChangeExecutorPoolSize();
+
+ int getMaxShardDataChangeExecutorQueueSize();
+
+ int getMaxShardDataChangeListenerQueueSize();
+
+ int getMaxShardDataStoreExecutorQueueSize();
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans;
+
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+
+/**
+ * Implementation of DatastoreConfigurationMXBean.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreConfigurationMXBeanImpl extends AbstractMXBean implements DatastoreConfigurationMXBean {
+ public static final String JMX_CATEGORY_CONFIGURATION = "Configuration";
+
+ private DatastoreContext context;
+
+ public DatastoreConfigurationMXBeanImpl(String mxBeanType) {
+ super("Datastore", mxBeanType, JMX_CATEGORY_CONFIGURATION);
+ }
+
+ public void setContext(DatastoreContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public long getShardTransactionIdleTimeoutInSeconds() {
+ return context.getShardTransactionIdleTimeout().toSeconds();
+ }
+
+ @Override
+ public long getOperationTimeoutInSeconds() {
+ return context.getOperationTimeoutInSeconds();
+ }
+
+ @Override
+ public long getShardHeartbeatIntervalInMillis() {
+ return context.getShardRaftConfig().getHeartBeatInterval().toMillis();
+ }
+
+ @Override
+ public int getShardJournalRecoveryLogBatchSize() {
+ return context.getShardRaftConfig().getJournalRecoveryLogBatchSize();
+ }
+
+ @Override
+ public long getShardIsolatedLeaderCheckIntervalInMillis() {
+ return context.getShardRaftConfig().getIsolatedCheckIntervalInMillis();
+ }
+
+ @Override
+ public long getShardElectionTimeoutFactor() {
+ return context.getShardRaftConfig().getElectionTimeoutFactor();
+ }
+
+ @Override
+ public int getShardSnapshotDataThresholdPercentage() {
+ return context.getShardRaftConfig().getSnapshotDataThresholdPercentage();
+ }
+
+ @Override
+ public long getShardSnapshotBatchCount() {
+ return context.getShardRaftConfig().getSnapshotBatchCount();
+ }
+
+ @Override
+ public long getShardTransactionCommitTimeoutInSeconds() {
+ return context.getShardTransactionCommitTimeoutInSeconds();
+ }
+
+ @Override
+ public int getShardTransactionCommitQueueCapacity() {
+ return context.getShardTransactionCommitQueueCapacity();
+ }
+
+ @Override
+ public long getShardInitializationTimeoutInSeconds() {
+ return context.getShardInitializationTimeout().duration().toSeconds();
+ }
+
+ @Override
+ public long getShardLeaderElectionTimeoutInSeconds() {
+ return context.getShardLeaderElectionTimeout().duration().toSeconds();
+ }
+
+ @Override
+ public boolean isPersistent() {
+ return context.isPersistent();
+ }
+
+ @Override
+ public long getTransactionCreationInitialRateLimit() {
+ return context.getTransactionCreationInitialRateLimit();
+ }
+
+ @Override
+ public int getMaxShardDataChangeExecutorPoolSize() {
+ return context.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
+ }
+
+ @Override
+ public int getMaxShardDataChangeExecutorQueueSize() {
+ return context.getDataStoreProperties().getMaxDataChangeExecutorQueueSize();
+ }
+
+ @Override
+ public int getMaxShardDataChangeListenerQueueSize() {
+ return context.getDataStoreProperties().getMaxDataChangeListenerQueueSize();
+ }
+
+ @Override
+ public int getMaxShardDataStoreExecutorQueueSize() {
+ return context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+
+/**
+ * Message used to batch write, merge, delete modification operations to the ShardTransaction actor.
+ *
+ * @author Thomas Pantelis
+ */
+public class BatchedModifications extends MutableCompositeModification implements SerializableMessage {
+ private static final long serialVersionUID = 1L;
+
+ public BatchedModifications() {
+ }
+
+ public BatchedModifications(short version) {
+ super(version);
+ }
+
+ @Override
+ public Object toSerializable() {
+ return this;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * The reply for the BatchedModifications message.
+ *
+ * @author Thomas Pantelis
+ */
+public class BatchedModificationsReply extends VersionedExternalizableMessage {
+ private static final long serialVersionUID = 1L;
+
+ private int numBatched;
+
+ public BatchedModificationsReply() {
+ }
+
+ public BatchedModificationsReply(int numBatched) {
+ this.numBatched = numBatched;
+ }
+
+
+ public int getNumBatched() {
+ return numBatched;
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+ numBatched = in.readInt();
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+ out.writeInt(numBatched);
+ }
+
+ @Override
+ public Object toSerializable() {
+ return this;
+ }
+}
(short)o.getMessageVersion());
}
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("CreateTransactionReply [transactionPath=").append(transactionPath).append(", transactionId=")
+ .append(transactionId).append(", version=").append(version).append("]");
+ return builder.toString();
+ }
}
package org.opendaylight.controller.cluster.datastore.messages;
-import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-public class DeleteData implements VersionedSerializableMessage, Externalizable {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public class DeleteData extends VersionedExternalizableMessage {
private static final long serialVersionUID = 1L;
public static final Class<DeleteData> SERIALIZABLE_CLASS = DeleteData.class;
private YangInstanceIdentifier path;
- private short version;
public DeleteData() {
}
- public DeleteData(final YangInstanceIdentifier path) {
+ public DeleteData(final YangInstanceIdentifier path, short version) {
+ super(version);
this.path = path;
}
return path;
}
- public short getVersion() {
- return version;
- }
-
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- version = in.readShort(); // Read the version - don't need to do anything with it now
+ super.readExternal(in);
path = SerializationUtils.deserializePath(in);
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
- out.writeShort(version);
+ super.writeExternal(out);
SerializationUtils.serializePath(path, out);
}
@Override
- public Object toSerializable(short toVersion) {
- if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
- version = toVersion;
+ public Object toSerializable() {
+ if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
return this;
} else {
// To base or R1 Helium version
} else {
// From base or R1 Helium version
ShardTransactionMessages.DeleteData o = (ShardTransactionMessages.DeleteData) serializable;
- return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
+ return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()),
+ DataStoreVersions.HELIUM_2_VERSION);
}
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+/**
+ * @deprecated Replaced by BatchedModificationsReply.
+ */
+@Deprecated
public class DeleteDataReply extends EmptyReply {
+ private static final long serialVersionUID = 1L;
private static final Object LEGACY_SERIALIZED_INSTANCE =
ShardTransactionMessages.DeleteDataReply.newBuilder().build();
*
* @author Thomas Pantelis
*/
-public abstract class EmptyReply extends EmptyExternalizable implements VersionedSerializableMessage {
+public abstract class EmptyReply extends EmptyExternalizable {
private final Object legacySerializedInstance;
this.legacySerializedInstance = legacySerializedInstance;
}
- @Override
public Object toSerializable(short toVersion) {
return toVersion >= DataStoreVersions.LITHIUM_VERSION ? this : legacySerializedInstance;
}
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-public class MergeData extends ModifyData implements VersionedSerializableMessage {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public class MergeData extends ModifyData {
private static final long serialVersionUID = 1L;
public static final Class<MergeData> SERIALIZABLE_CLASS = MergeData.class;
public MergeData() {
}
- public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- super(path, data);
+ public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, short version) {
+ super(path, data, version);
}
@Override
- public Object toSerializable(short toVersion) {
- if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
- setVersion(toVersion);
+ public Object toSerializable() {
+ if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
return this;
} else {
// To base or R1 Helium version
ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable;
Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
- return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode());
+ return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode(),
+ DataStoreVersions.HELIUM_2_VERSION);
}
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+/**
+ * @deprecated Replaced by BatchedModificationsReply.
+ */
+@Deprecated
public class MergeDataReply extends EmptyReply {
private static final long serialVersionUID = 1L;
package org.opendaylight.controller.cluster.datastore.messages;
-import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-public abstract class ModifyData implements Externalizable {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public abstract class ModifyData extends VersionedExternalizableMessage {
private static final long serialVersionUID = 1L;
private YangInstanceIdentifier path;
private NormalizedNode<?, ?> data;
- private short version;
protected ModifyData() {
}
- protected ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ protected ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, short version) {
+ super(version);
this.path = path;
this.data = data;
}
return data;
}
- public short getVersion() {
- return version;
- }
-
- protected void setVersion(short version) {
- this.version = version;
- }
-
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- version = in.readShort();
+ super.readExternal(in);
SerializationUtils.deserializePathAndNode(in, this, APPLIER);
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
- out.writeShort(version);
+ super.writeExternal(out);
SerializationUtils.serializePathAndNode(path, data, out);
}
package org.opendaylight.controller.cluster.datastore.messages;
import com.google.protobuf.ByteString;
-import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-public class ReadDataReply implements VersionedSerializableMessage, Externalizable {
+public class ReadDataReply extends VersionedExternalizableMessage {
private static final long serialVersionUID = 1L;
public static final Class<ReadDataReply> SERIALIZABLE_CLASS = ReadDataReply.class;
private NormalizedNode<?, ?> normalizedNode;
- private short version;
public ReadDataReply() {
}
- public ReadDataReply(NormalizedNode<?, ?> normalizedNode) {
+ public ReadDataReply(NormalizedNode<?, ?> normalizedNode, short version) {
+ super(version);
this.normalizedNode = normalizedNode;
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- version = in.readShort();
+ super.readExternal(in);
normalizedNode = SerializationUtils.deserializeNormalizedNode(in);
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
- out.writeShort(version);
+ super.writeExternal(out);
SerializationUtils.serializeNormalizedNode(normalizedNode, out);
}
@Override
- public Object toSerializable(short toVersion) {
- if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
- version = toVersion;
+ public Object toSerializable() {
+ if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
return this;
} else {
return toSerializableReadDataReply(normalizedNode);
} else {
ShardTransactionMessages.ReadDataReply o =
(ShardTransactionMessages.ReadDataReply) serializable;
- return new ReadDataReply(new NormalizedNodeToNodeCodec(null).decode(o.getNormalizedNode()));
+ return new ReadDataReply(new NormalizedNodeToNodeCodec(null).decode(o.getNormalizedNode()),
+ DataStoreVersions.HELIUM_2_VERSION);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Abstract base class for a versioned Externalizable message.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class VersionedExternalizableMessage implements Externalizable, SerializableMessage {
+ private static final long serialVersionUID = 1L;
+
+ private short version;
+
+ public VersionedExternalizableMessage() {
+ }
+
+ public VersionedExternalizableMessage(short version) {
+ this.version = version;
+ }
+
+ public short getVersion() {
+ return version;
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ version = in.readShort();
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeShort(version);
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.messages;
-
-/**
- * Interface for a Serializable message with versioning.
- *
- * @author Thomas Pantelis
- */
-public interface VersionedSerializableMessage {
- Object toSerializable(short toVersion);
-}
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-public class WriteData extends ModifyData implements VersionedSerializableMessage {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public class WriteData extends ModifyData {
private static final long serialVersionUID = 1L;
public static final Class<WriteData> SERIALIZABLE_CLASS = WriteData.class;
public WriteData() {
}
- public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- super(path, data);
+ public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, short version) {
+ super(path, data, version);
}
@Override
- public Object toSerializable(short toVersion) {
- if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
- setVersion(toVersion);
+ public Object toSerializable() {
+ if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
return this;
} else {
// To base or R1 Helium version
ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable;
Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
- return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode());
+ return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode(),
+ DataStoreVersions.HELIUM_2_VERSION);
}
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+/**
+ * @deprecated Replaced by BatchedModificationsReply.
+ */
+@Deprecated
public class WriteDataReply extends EmptyReply {
private static final long serialVersionUID = 1L;
public abstract class AbstractModification implements Modification {
private YangInstanceIdentifier path;
+ private short version;
- protected AbstractModification() {
+ protected AbstractModification(short version) {
+ this.version = version;
}
protected AbstractModification(YangInstanceIdentifier path) {
public YangInstanceIdentifier getPath() {
return path;
}
+
+ public short getVersion() {
+ return version;
+ }
}
private static final long serialVersionUID = 1L;
public DeleteModification() {
+ this(DataStoreVersions.CURRENT_VERSION);
+ }
+
+ public DeleteModification(short version) {
+ super(version);
}
public DeleteModification(YangInstanceIdentifier path) {
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- in.readShort();
setPath(SerializationUtils.deserializePath(in));
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
- out.writeShort(DataStoreVersions.CURRENT_VERSION);
SerializationUtils.serializePath(getPath(), out);
}
return new DeleteModification(InstanceIdentifierUtils.fromSerializable(o.getPath()));
}
- public static DeleteModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
- DeleteModification mod = new DeleteModification();
+ public static DeleteModification fromStream(ObjectInput in, short version)
+ throws ClassNotFoundException, IOException {
+ DeleteModification mod = new DeleteModification(version);
mod.readExternal(in);
return mod;
}
import java.io.IOException;
import java.io.ObjectInput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
private static final long serialVersionUID = 1L;
public MergeModification() {
+ this(DataStoreVersions.CURRENT_VERSION);
+ }
+
+ public MergeModification(short version) {
+ super(version);
}
public MergeModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
return new MergeModification(decoded.getDecodedPath(), decoded.getDecodedNode());
}
- public static MergeModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
- MergeModification mod = new MergeModification();
+ public static MergeModification fromStream(ObjectInput in, short version)
+ throws ClassNotFoundException, IOException {
+ MergeModification mod = new MergeModification(version);
mod.readExternal(in);
return mod;
}
public class MutableCompositeModification implements CompositeModification {
private static final long serialVersionUID = 1L;
- private final List<Modification> modifications;
+ private final List<Modification> modifications = new ArrayList<>();
+ private short version;
public MutableCompositeModification() {
- modifications = new ArrayList<>();
+ this(DataStoreVersions.CURRENT_VERSION);
+ }
+
+ public MutableCompositeModification(short version) {
+ this.version = version;
}
@Override
return COMPOSITE;
}
+ public short getVersion() {
+ return version;
+ }
+
+ public void setVersion(short version) {
+ this.version = version;
+ }
+
/**
* Add a new Modification to the list of Modifications represented by this
* composite
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- in.readShort();
+ version = in.readShort();
int size = in.readInt();
byte type = in.readByte();
switch(type) {
case Modification.WRITE:
- modifications.add(WriteModification.fromStream(in));
+ modifications.add(WriteModification.fromStream(in, version));
break;
case Modification.MERGE:
- modifications.add(MergeModification.fromStream(in));
+ modifications.add(MergeModification.fromStream(in, version));
break;
case Modification.DELETE:
- modifications.add(DeleteModification.fromStream(in));
+ modifications.add(DeleteModification.fromStream(in, version));
break;
}
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
- out.writeShort(DataStoreVersions.CURRENT_VERSION);
+ out.writeShort(version);
out.writeInt(modifications.size());
builder.setTimeStamp(System.nanoTime());
for (Modification m : modifications) {
- builder.addModification(
- (PersistentMessages.Modification) m.toSerializable());
+ builder.addModification((PersistentMessages.Modification) m.toSerializable());
}
return builder.build();
private NormalizedNode<?, ?> data;
public WriteModification() {
+ this(DataStoreVersions.CURRENT_VERSION);
+ }
+
+ public WriteModification(short version) {
+ super(version);
}
public WriteModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- in.readShort(); // version
-
SerializationUtils.deserializePathAndNode(in, this, APPLIER);
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
- out.writeShort(DataStoreVersions.CURRENT_VERSION);
SerializationUtils.serializePathAndNode(getPath(), data, out);
}
return new WriteModification(decoded.getDecodedPath(), decoded.getDecodedNode());
}
- public static WriteModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
- WriteModification mod = new WriteModification();
+ public static WriteModification fromStream(ObjectInput in, short version)
+ throws ClassNotFoundException, IOException {
+ WriteModification mod = new WriteModification(version);
mod.readExternal(in);
return mod;
}
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
-import java.util.concurrent.TimeUnit;
-
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.osgi.framework.BundleContext;
-import scala.concurrent.duration.Duration;
-
public class DistributedConfigDataStoreProviderModule extends
org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModule {
private BundleContext bundleContext;
DatastoreContext datastoreContext = DatastoreContext.newBuilder()
.dataStoreType("config")
- .dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create(
- props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
- props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
- props.getMaxShardDataChangeListenerQueueSize().getValue().intValue(),
- props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue()))
- .shardTransactionIdleTimeout(Duration.create(
- props.getShardTransactionIdleTimeoutInMinutes().getValue(), TimeUnit.MINUTES))
+ .maxShardDataChangeExecutorPoolSize(props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue())
+ .maxShardDataChangeExecutorQueueSize(props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue())
+ .maxShardDataChangeListenerQueueSize(props.getMaxShardDataChangeListenerQueueSize().getValue().intValue())
+ .maxShardDataStoreExecutorQueueSize(props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue())
+ .shardTransactionIdleTimeoutInMinutes(props.getShardTransactionIdleTimeoutInMinutes().getValue())
.operationTimeoutInSeconds(props.getOperationTimeoutInSeconds().getValue())
.shardJournalRecoveryLogBatchSize(props.getShardJournalRecoveryLogBatchSize().
getValue().intValue())
.shardSnapshotBatchCount(props.getShardSnapshotBatchCount().getValue().intValue())
.shardSnapshotDataThresholdPercentage(props.getShardSnapshotDataThresholdPercentage().getValue().intValue())
- .shardHeartbeatIntervalInMillis(props.getShardHearbeatIntervalInMillis().getValue())
- .shardInitializationTimeout(props.getShardInitializationTimeoutInSeconds().getValue(),
- TimeUnit.SECONDS)
- .shardLeaderElectionTimeout(props.getShardLeaderElectionTimeoutInSeconds().getValue(),
- TimeUnit.SECONDS)
+ .shardHeartbeatIntervalInMillis(props.getShardHeartbeatIntervalInMillis().getValue())
+ .shardInitializationTimeoutInSeconds(props.getShardInitializationTimeoutInSeconds().getValue())
+ .shardLeaderElectionTimeoutInSeconds(props.getShardLeaderElectionTimeoutInSeconds().getValue())
.shardTransactionCommitTimeoutInSeconds(
props.getShardTransactionCommitTimeoutInSeconds().getValue().intValue())
.shardTransactionCommitQueueCapacity(
.shardIsolatedLeaderCheckIntervalInMillis(
props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
.shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
- .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
+ .transactionCreationInitialRateLimit(props.getTransactionCreationInitialRateLimit().getValue())
+ .shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue())
.build();
return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
-import java.util.concurrent.TimeUnit;
-
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.osgi.framework.BundleContext;
-import scala.concurrent.duration.Duration;
-
public class DistributedOperationalDataStoreProviderModule extends
org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModule {
private BundleContext bundleContext;
DatastoreContext datastoreContext = DatastoreContext.newBuilder()
.dataStoreType("operational")
- .dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create(
- props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
- props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
- props.getMaxShardDataChangeListenerQueueSize().getValue().intValue(),
- props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue()))
- .shardTransactionIdleTimeout(Duration.create(
- props.getShardTransactionIdleTimeoutInMinutes().getValue(), TimeUnit.MINUTES))
+ .maxShardDataChangeExecutorPoolSize(props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue())
+ .maxShardDataChangeExecutorQueueSize(props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue())
+ .maxShardDataChangeListenerQueueSize(props.getMaxShardDataChangeListenerQueueSize().getValue().intValue())
+ .maxShardDataStoreExecutorQueueSize(props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue())
+ .shardTransactionIdleTimeoutInMinutes(props.getShardTransactionIdleTimeoutInMinutes().getValue())
.operationTimeoutInSeconds(props.getOperationTimeoutInSeconds().getValue())
.shardJournalRecoveryLogBatchSize(props.getShardJournalRecoveryLogBatchSize().
getValue().intValue())
.shardSnapshotBatchCount(props.getShardSnapshotBatchCount().getValue().intValue())
.shardSnapshotDataThresholdPercentage(props.getShardSnapshotDataThresholdPercentage().getValue().intValue())
- .shardHeartbeatIntervalInMillis(props.getShardHearbeatIntervalInMillis().getValue())
- .shardInitializationTimeout(props.getShardInitializationTimeoutInSeconds().getValue(),
- TimeUnit.SECONDS)
- .shardLeaderElectionTimeout(props.getShardLeaderElectionTimeoutInSeconds().getValue(),
- TimeUnit.SECONDS)
+ .shardHeartbeatIntervalInMillis(props.getShardHeartbeatIntervalInMillis().getValue())
+ .shardInitializationTimeoutInSeconds(props.getShardInitializationTimeoutInSeconds().getValue())
+ .shardLeaderElectionTimeoutInSeconds(props.getShardLeaderElectionTimeoutInSeconds().getValue())
.shardTransactionCommitTimeoutInSeconds(
props.getShardTransactionCommitTimeoutInSeconds().getValue().intValue())
.shardTransactionCommitQueueCapacity(
.shardIsolatedLeaderCheckIntervalInMillis(
props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
.shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
- .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
+ .transactionCreationInitialRateLimit(props.getTransactionCreationInitialRateLimit().getValue())
+ .shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue())
.build();
return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
}
- leaf shard-hearbeat-interval-in-millis {
+ leaf shard-heartbeat-interval-in-millis {
default 500;
type heartbeat-interval-type;
description "The interval at which a shard will send a heart beat message to its remote shard.";
an operation (eg transaction create).";
}
+ leaf shard-batched-modification-count {
+ default 100;
+ type non-zero-uint32-type;
+ description "The number of transaction modification operations (put, merge, delete) to
+ batch before sending to the shard transaction actor. Batching improves
+ performance as less modifications messages are sent to the actor and thus
+ lessens the chance that the transaction actor's mailbox queue could get full.";
+ }
+
leaf enable-metric-capture {
default false;
type boolean;
followers are active and term itself as isolated";
}
- leaf tx-creation-initial-rate-limit {
+ leaf transaction-creation-initial-rate-limit {
default 100;
type non-zero-uint32-type;
description "The initial number of transactions per second that are allowed before the data store
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import org.junit.Test;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
+
+/**
+ * Unit tests for DatastoreContextConfigAdminOverlay.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreContextConfigAdminOverlayTest {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void test() throws IOException {
+ BundleContext mockBundleContext = mock(BundleContext.class);
+ ServiceReference<ConfigurationAdmin> mockServiceRef = mock(ServiceReference.class);
+ ConfigurationAdmin mockConfigAdmin = mock(ConfigurationAdmin.class);
+ Configuration mockConfig = mock(Configuration.class);
+ DatastoreContextIntrospector mockIntrospector = mock(DatastoreContextIntrospector.class);
+
+ doReturn(mockServiceRef).when(mockBundleContext).getServiceReference(ConfigurationAdmin.class);
+ doReturn(mockConfigAdmin).when(mockBundleContext).getService(mockServiceRef);
+
+ doReturn(mockConfig).when(mockConfigAdmin).getConfiguration(DatastoreContextConfigAdminOverlay.CONFIG_ID);
+
+ doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(mockConfig).getPid();
+
+ Dictionary<String, Object> properties = new Hashtable<>();
+ properties.put("property", "value");
+ doReturn(properties).when(mockConfig).getProperties();
+
+ try(DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+ mockIntrospector, mockBundleContext)) {
+ }
+
+ verify(mockIntrospector).update(properties);
+
+ verify(mockBundleContext).ungetService(mockServiceRef);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_SECONDS;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+
+/**
+ * Unit tests for DatastoreContextIntrospector.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreContextIntrospectorTest {
+
+ @Test
+ public void testUpdate() {
+ DatastoreContext context = DatastoreContext.newBuilder().dataStoreType("operational").build();
+ DatastoreContextIntrospector introspector = new DatastoreContextIntrospector(context );
+
+ Dictionary<String, Object> properties = new Hashtable<>();
+ properties.put("shard-transaction-idle-timeout-in-minutes", "31");
+ properties.put("operation-timeout-in-seconds", "26");
+ properties.put("shard-transaction-commit-timeout-in-seconds", "100");
+ properties.put("shard-journal-recovery-log-batch-size", "199");
+ properties.put("shard-snapshot-batch-count", "212");
+ properties.put("shard-heartbeat-interval-in-millis", "101");
+ properties.put("shard-transaction-commit-queue-capacity", "567");
+ properties.put("shard-initialization-timeout-in-seconds", "82");
+ properties.put("shard-leader-election-timeout-in-seconds", "66");
+ properties.put("shard-isolated-leader-check-interval-in-millis", "123");
+ properties.put("shard-snapshot-data-threshold-percentage", "100");
+ properties.put("shard-election-timeout-factor", "21");
+ properties.put("shard-batched-modification-count", "901");
+ properties.put("transactionCreationInitialRateLimit", "200");
+ properties.put("MaxShardDataChangeExecutorPoolSize", "41");
+ properties.put("Max-Shard-Data-Change Executor-Queue Size", "1111");
+ properties.put(" max shard data change listener queue size", "2222");
+ properties.put("mAx-shaRd-data-STORE-executor-quEUe-size", "3333");
+ properties.put("persistent", "false");
+
+ boolean updated = introspector.update(properties);
+ assertEquals("updated", true, updated);
+ context = introspector.getContext();
+
+ assertEquals(31, context.getShardTransactionIdleTimeout().toMinutes());
+ assertEquals(26, context.getOperationTimeoutInSeconds());
+ assertEquals(100, context.getShardTransactionCommitTimeoutInSeconds());
+ assertEquals(199, context.getShardRaftConfig().getJournalRecoveryLogBatchSize());
+ assertEquals(212, context.getShardRaftConfig().getSnapshotBatchCount());
+ assertEquals(101, context.getShardRaftConfig().getHeartBeatInterval().length());
+ assertEquals(567, context.getShardTransactionCommitQueueCapacity());
+ assertEquals(82, context.getShardInitializationTimeout().duration().toSeconds());
+ assertEquals(66, context.getShardLeaderElectionTimeout().duration().toSeconds());
+ assertEquals(123, context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
+ assertEquals(100, context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+ assertEquals(21, context.getShardRaftConfig().getElectionTimeoutFactor());
+ assertEquals(901, context.getShardBatchedModificationCount());
+ assertEquals(200, context.getTransactionCreationInitialRateLimit());
+ assertEquals(41, context.getDataStoreProperties().getMaxDataChangeExecutorPoolSize());
+ assertEquals(1111, context.getDataStoreProperties().getMaxDataChangeExecutorQueueSize());
+ assertEquals(2222, context.getDataStoreProperties().getMaxDataChangeListenerQueueSize());
+ assertEquals(3333, context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize());
+ assertEquals(false, context.isPersistent());
+
+ properties.put("shard-transaction-idle-timeout-in-minutes", "32");
+ properties.put("operation-timeout-in-seconds", "27");
+ properties.put("shard-heartbeat-interval-in-millis", "102");
+ properties.put("shard-election-timeout-factor", "22");
+ properties.put("max-shard-data-change-executor-pool-size", "42");
+ properties.put("max-shard-data-store-executor-queue-size", "4444");
+ properties.put("persistent", "true");
+
+ updated = introspector.update(properties);
+ assertEquals("updated", true, updated);
+ context = introspector.getContext();
+
+ assertEquals(32, context.getShardTransactionIdleTimeout().toMinutes());
+ assertEquals(27, context.getOperationTimeoutInSeconds());
+ assertEquals(100, context.getShardTransactionCommitTimeoutInSeconds());
+ assertEquals(199, context.getShardRaftConfig().getJournalRecoveryLogBatchSize());
+ assertEquals(212, context.getShardRaftConfig().getSnapshotBatchCount());
+ assertEquals(102, context.getShardRaftConfig().getHeartBeatInterval().length());
+ assertEquals(567, context.getShardTransactionCommitQueueCapacity());
+ assertEquals(82, context.getShardInitializationTimeout().duration().toSeconds());
+ assertEquals(66, context.getShardLeaderElectionTimeout().duration().toSeconds());
+ assertEquals(123, context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
+ assertEquals(100, context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+ assertEquals(22, context.getShardRaftConfig().getElectionTimeoutFactor());
+ assertEquals(200, context.getTransactionCreationInitialRateLimit());
+ assertEquals(42, context.getDataStoreProperties().getMaxDataChangeExecutorPoolSize());
+ assertEquals(1111, context.getDataStoreProperties().getMaxDataChangeExecutorQueueSize());
+ assertEquals(2222, context.getDataStoreProperties().getMaxDataChangeListenerQueueSize());
+ assertEquals(4444, context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize());
+ assertEquals(true, context.isPersistent());
+
+ updated = introspector.update(null);
+ assertEquals("updated", false, updated);
+
+ updated = introspector.update(new Hashtable<String, Object>());
+ assertEquals("updated", false, updated);
+ }
+
+
+ @Test
+ public void testUpdateWithInvalidValues() {
+ DatastoreContext context = DatastoreContext.newBuilder().dataStoreType("operational").build();
+ DatastoreContextIntrospector introspector = new DatastoreContextIntrospector(context );
+
+ Dictionary<String, Object> properties = new Hashtable<>();
+ properties.put("shard-transaction-idle-timeout-in-minutes", "0"); // bad - must be > 0
+ properties.put("shard-journal-recovery-log-batch-size", "199");
+ properties.put("shard-transaction-commit-timeout-in-seconds", "bogus"); // bad - NaN
+ properties.put("shard-snapshot-batch-count", "212"); // good
+ properties.put("operation-timeout-in-seconds", "4"); // bad - must be >= 5
+ properties.put("shard-heartbeat-interval-in-millis", "99"); // bad - must be >= 100
+ properties.put("shard-transaction-commit-queue-capacity", "567"); // good
+ properties.put("shard-snapshot-data-threshold-percentage", "101"); // bad - must be 0-100
+ properties.put("shard-initialization-timeout-in-seconds", "-1"); // bad - must be > 0
+ properties.put("max-shard-data-change-executor-pool-size", "bogus"); // bad - NaN
+ properties.put("unknownProperty", "1"); // bad - invalid property name
+
+ boolean updated = introspector.update(properties);
+ assertEquals("updated", true, updated);
+ context = introspector.getContext();
+
+ assertEquals(DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT, context.getShardTransactionIdleTimeout());
+ assertEquals(199, context.getShardRaftConfig().getJournalRecoveryLogBatchSize());
+ assertEquals(DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS, context.getShardTransactionCommitTimeoutInSeconds());
+ assertEquals(212, context.getShardRaftConfig().getSnapshotBatchCount());
+ assertEquals(DEFAULT_OPERATION_TIMEOUT_IN_SECONDS, context.getOperationTimeoutInSeconds());
+ assertEquals(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS, context.getShardRaftConfig().getHeartBeatInterval().length());
+ assertEquals(567, context.getShardTransactionCommitQueueCapacity());
+ assertEquals(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE,
+ context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+ assertEquals(DEFAULT_SHARD_INITIALIZATION_TIMEOUT, context.getShardInitializationTimeout());
+ assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE,
+ context.getDataStoreProperties().getMaxDataChangeExecutorPoolSize());
+ }
+
+ @Test
+ public void testUpdateWithDatastoreTypeSpecificProperties() {
+ Dictionary<String, Object> properties = new Hashtable<>();
+ properties.put("shard-transaction-idle-timeout-in-minutes", "22"); // global setting
+ properties.put("operational.shard-transaction-idle-timeout-in-minutes", "33"); // operational override
+ properties.put("config.shard-transaction-idle-timeout-in-minutes", "44"); // config override
+
+ properties.put("max-shard-data-change-executor-pool-size", "222"); // global setting
+ properties.put("operational.max-shard-data-change-executor-pool-size", "333"); // operational override
+ properties.put("config.max-shard-data-change-executor-pool-size", "444"); // config override
+
+ properties.put("persistent", "false"); // global setting
+ properties.put("operational.Persistent", "true"); // operational override
+
+ DatastoreContext operContext = DatastoreContext.newBuilder().dataStoreType("operational").build();
+ DatastoreContextIntrospector operIntrospector = new DatastoreContextIntrospector(operContext);
+ boolean updated = operIntrospector.update(properties);
+ assertEquals("updated", true, updated);
+ operContext = operIntrospector.getContext();
+
+ assertEquals(33, operContext.getShardTransactionIdleTimeout().toMinutes());
+ assertEquals(true, operContext.isPersistent());
+ assertEquals(333, operContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize());
+
+ DatastoreContext configContext = DatastoreContext.newBuilder().dataStoreType("config").build();
+ DatastoreContextIntrospector configIntrospector = new DatastoreContextIntrospector(configContext);
+ updated = configIntrospector.update(properties);
+ assertEquals("updated", true, updated);
+ configContext = configIntrospector.getContext();
+
+ assertEquals(44, configContext.getShardTransactionIdleTimeout().toMinutes());
+ assertEquals(false, configContext.isPersistent());
+ assertEquals(444, configContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize());
+ }
+}
package org.opendaylight.controller.cluster.datastore;
import static org.junit.Assert.assertEquals;
-import org.junit.Before;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_CONFIGURATION_READER;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_SECONDS;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_PERSISTENT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SNAPSHOT_BATCH_COUNT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
+import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
import org.junit.Test;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
public class DatastoreContextTest {
- private DatastoreContext.Builder builder;
+ @Test
+ public void testNewBuilderWithDefaultSettings() {
+ DatastoreContext context = DatastoreContext.newBuilder().build();
- @Before
- public void setUp(){
- builder = new DatastoreContext.Builder();
+ assertEquals(DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT, context.getShardTransactionIdleTimeout());
+ assertEquals(DEFAULT_OPERATION_TIMEOUT_IN_SECONDS, context.getOperationTimeoutInSeconds());
+ assertEquals(DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS, context.getShardTransactionCommitTimeoutInSeconds());
+ assertEquals(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE, context.getShardRaftConfig().getJournalRecoveryLogBatchSize());
+ assertEquals(DEFAULT_SNAPSHOT_BATCH_COUNT, context.getShardRaftConfig().getSnapshotBatchCount());
+ assertEquals(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS, context.getShardRaftConfig().getHeartBeatInterval().length());
+ assertEquals(DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY, context.getShardTransactionCommitQueueCapacity());
+ assertEquals(DEFAULT_SHARD_INITIALIZATION_TIMEOUT.duration().toMillis(),
+ context.getShardInitializationTimeout().duration().toMillis());
+ assertEquals(DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT.duration().toMillis(),
+ context.getShardLeaderElectionTimeout().duration().toMillis());
+ assertEquals(DEFAULT_PERSISTENT, context.isPersistent());
+ assertEquals(DEFAULT_CONFIGURATION_READER, context.getConfigurationReader());
+ assertEquals(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS, context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
+ assertEquals(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE, context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+ assertEquals(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR, context.getShardRaftConfig().getElectionTimeoutFactor());
+ assertEquals(DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT, context.getTransactionCreationInitialRateLimit());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT, context.getShardBatchedModificationCount());
+ assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE,
+ context.getDataStoreProperties().getMaxDataChangeExecutorPoolSize());
+ assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE,
+ context.getDataStoreProperties().getMaxDataChangeExecutorQueueSize());
+ assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE,
+ context.getDataStoreProperties().getMaxDataChangeListenerQueueSize());
+ assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE,
+ context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize());
}
@Test
- public void testDefaults(){
- DatastoreContext build = builder.build();
-
- assertEquals(DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT , build.getShardTransactionIdleTimeout());
- assertEquals(DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_SECONDS, build.getOperationTimeoutInSeconds());
- assertEquals(DatastoreContext.DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS, build.getShardTransactionCommitTimeoutInSeconds());
- assertEquals(DatastoreContext.DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE, build.getShardRaftConfig().getJournalRecoveryLogBatchSize());
- assertEquals(DatastoreContext.DEFAULT_SNAPSHOT_BATCH_COUNT, build.getShardRaftConfig().getSnapshotBatchCount());
- assertEquals(DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getHeartBeatInterval().length());
- assertEquals(DatastoreContext.DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY, build.getShardTransactionCommitQueueCapacity());
- assertEquals(DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT, build.getShardInitializationTimeout());
- assertEquals(DatastoreContext.DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT, build.getShardLeaderElectionTimeout());
- assertEquals(DatastoreContext.DEFAULT_PERSISTENT, build.isPersistent());
- assertEquals(DatastoreContext.DEFAULT_CONFIGURATION_READER, build.getConfigurationReader());
- assertEquals(DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
- assertEquals(DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE, build.getShardRaftConfig().getSnapshotDataThresholdPercentage());
- assertEquals(DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR, build.getShardRaftConfig().getElectionTimeoutFactor());
- assertEquals(DatastoreContext.DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT, build.getTransactionCreationInitialRateLimit());
+ public void testNewBuilderWithCustomSettings() {
+ DatastoreContext.Builder builder = DatastoreContext.newBuilder();
+
+ builder.shardTransactionIdleTimeout(DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT.toMillis() + 1,
+ TimeUnit.MILLISECONDS);
+ builder.operationTimeoutInSeconds(DEFAULT_OPERATION_TIMEOUT_IN_SECONDS + 1);
+ builder.shardTransactionCommitTimeoutInSeconds(DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS + 1);
+ builder.shardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE + 1);
+ builder.shardSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT + 1);
+ builder.shardHeartbeatIntervalInMillis(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS + 1);
+ builder.shardTransactionCommitQueueCapacity(DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY + 1);
+ builder.shardInitializationTimeout(DEFAULT_SHARD_INITIALIZATION_TIMEOUT.
+ duration().toMillis() + 1, TimeUnit.MILLISECONDS);
+ builder.shardInitializationTimeout(DEFAULT_SHARD_INITIALIZATION_TIMEOUT.duration().toMillis() + 1,
+ TimeUnit.MILLISECONDS);
+ builder.shardLeaderElectionTimeout(DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT.duration().toMillis() + 1,
+ TimeUnit.MILLISECONDS);
+ builder.persistent(!DEFAULT_PERSISTENT);
+ builder.shardIsolatedLeaderCheckIntervalInMillis(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS + 1);
+ builder.shardSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE + 1);
+ builder.shardElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR + 1);
+ builder.transactionCreationInitialRateLimit(DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT + 1);
+ builder.shardBatchedModificationCount(DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT + 1);
+ builder.maxShardDataChangeExecutorPoolSize(
+ InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE + 1);
+ builder.maxShardDataChangeExecutorQueueSize(
+ InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE + 1);
+ builder.maxShardDataChangeListenerQueueSize(
+ InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE + 1);
+ builder.maxShardDataStoreExecutorQueueSize(
+ InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE + 1);
+
+ DatastoreContext context = builder.build();
+
+ verifyCustomSettings(context);
+
+ builder = DatastoreContext.newBuilderFrom(context);
+
+ DatastoreContext newContext = builder.build();
+
+ verifyCustomSettings(newContext);
+
+ Assert.assertNotSame(context, newContext);
}
-}
\ No newline at end of file
+ private void verifyCustomSettings(DatastoreContext context) {
+ assertEquals(DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT.toMillis() + 1,
+ context.getShardTransactionIdleTimeout().toMillis());
+ assertEquals(DEFAULT_OPERATION_TIMEOUT_IN_SECONDS + 1, context.getOperationTimeoutInSeconds());
+ assertEquals(DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS + 1,
+ context.getShardTransactionCommitTimeoutInSeconds());
+ assertEquals(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE + 1,
+ context.getShardRaftConfig().getJournalRecoveryLogBatchSize());
+ assertEquals(DEFAULT_SNAPSHOT_BATCH_COUNT + 1, context.getShardRaftConfig().getSnapshotBatchCount());
+ assertEquals(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS + 1,
+ context.getShardRaftConfig().getHeartBeatInterval().length());
+ assertEquals(DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY + 1, context.getShardTransactionCommitQueueCapacity());
+ assertEquals(DEFAULT_SHARD_INITIALIZATION_TIMEOUT.duration().toMillis() + 1,
+ context.getShardInitializationTimeout().duration().toMillis());
+ assertEquals(DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT.duration().toMillis() + 1,
+ context.getShardLeaderElectionTimeout().duration().toMillis());
+ assertEquals(!DEFAULT_PERSISTENT, context.isPersistent());
+ assertEquals(DEFAULT_CONFIGURATION_READER, context.getConfigurationReader());
+ assertEquals(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS + 1,
+ context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
+ assertEquals(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE + 1,
+ context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+ assertEquals(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR + 1, context.getShardRaftConfig().getElectionTimeoutFactor());
+ assertEquals(DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT + 1, context.getTransactionCreationInitialRateLimit());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT + 1,
+ context.getShardBatchedModificationCount());
+ assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE + 1,
+ context.getDataStoreProperties().getMaxDataChangeExecutorPoolSize());
+ assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE + 1,
+ context.getDataStoreProperties().getMaxDataChangeExecutorQueueSize());
+ assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE + 1,
+ context.getDataStoreProperties().getMaxDataChangeListenerQueueSize());
+ assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE + 1,
+ context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize());
+ }
+}
schemaContext = TestModel.createTestContext();
doReturn(schemaContext).when(actorContext).getSchemaContext();
+ doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
}
@Test
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import java.util.concurrent.Semaphore;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
+
+/**
+ * Unit tests for OperationCompleter.
+ *
+ * @author Thomas Pantelis
+ */
+public class OperationCompleterTest {
+
+ @Test
+ public void testOnComplete() throws Exception {
+ int permits = 10;
+ Semaphore operationLimiter = new Semaphore(permits);
+ operationLimiter.acquire(permits);
+ int availablePermits = 0;
+
+ OperationCompleter completer = new OperationCompleter(operationLimiter );
+
+ completer.onComplete(null, new DataExistsReply(true));
+ assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
+
+ completer.onComplete(null, new DataExistsReply(true));
+ assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
+
+ completer.onComplete(null, new IllegalArgumentException());
+ assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
+
+ completer.onComplete(null, new BatchedModificationsReply(4));
+ availablePermits += 4;
+ assertEquals("availablePermits", availablePermits, operationLimiter.availablePermits());
+ }
+}
// Write data to the Tx
txActor.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
- DataStoreVersions.BASE_HELIUM_VERSION), getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.BASE_HELIUM_VERSION).
+ toSerializable(), getRef());
expectMsgClass(duration, ShardTransactionMessages.WriteDataReply.class);
// Write data to the Tx
txActor.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+ DataStoreVersions.BASE_HELIUM_VERSION).toSerializable(), getRef());
- expectMsgClass(duration, WriteDataReply.class);
+ expectMsgClass(duration, WriteDataReply.INSTANCE.toSerializable(
+ DataStoreVersions.BASE_HELIUM_VERSION).getClass());
// Ready the Tx
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.duration.Duration;
public class ShardTransactionTest extends AbstractActorTest {
"testOnReceiveWriteData");
transaction.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
- DataStoreVersions.HELIUM_2_VERSION), getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
+ toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
// unserialized write
transaction.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
getRef());
expectMsgClass(duration("5 seconds"), WriteDataReply.class);
"testMergeData");
transaction.tell(new MergeData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
- DataStoreVersions.HELIUM_2_VERSION), getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
+ toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
//unserialized merge
transaction.tell(new MergeData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
getRef());
expectMsgClass(duration("5 seconds"), MergeDataReply.class);
final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
"testDeleteData");
- transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(
- DataStoreVersions.HELIUM_2_VERSION), getRef());
+ transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
+ toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
assertModification(transaction, DeleteModification.class);
//unserialized
- transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
+ transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
}};
}
+ @Test
+ public void testOnReceiveBatchedModifications() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
+ final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications");
+
+ YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+ YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
+ NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
+
+ YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
+
+ BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
+ batched.addModification(new WriteModification(writePath, writeData));
+ batched.addModification(new MergeModification(mergePath, mergeData));
+ batched.addModification(new DeleteModification(deletePath));
+
+ transaction.tell(batched, getRef());
+
+ BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
+ assertEquals("getNumBatched", 3, reply.getNumBatched());
+
+ JavaTestKit verification = new JavaTestKit(getSystem());
+ transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
+
+ CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
+ GetCompositeModificationReply.class).getModification();
+
+ assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
+
+ WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
+ assertEquals("getPath", writePath, write.getPath());
+ assertEquals("getData", writeData, write.getData());
+
+ MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
+ assertEquals("getPath", mergePath, merge.getPath());
+ assertEquals("getData", mergeData, merge.getData());
+
+ DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
+ assertEquals("getPath", deletePath, delete.getPath());
+
+ InOrder inOrder = Mockito.inOrder(mockWriteTx);
+ inOrder.verify(mockWriteTx).write(writePath, writeData);
+ inOrder.verify(mockWriteTx).merge(mergePath, mergeData);
+ inOrder.verify(mockWriteTx).delete(deletePath);
+ }};
+ }
@Test
public void testOnReceiveReadyTransaction() throws Exception {
DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
- transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(
- DataStoreVersions.CURRENT_VERSION), ActorRef.noSender());
+ transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
+ toSerializable(), ActorRef.noSender());
}
@Test
public void testShardTransactionInactivity() {
datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
- Duration.create(500, TimeUnit.MILLISECONDS)).build();
+ 500, TimeUnit.MILLISECONDS).build();
new JavaTestKit(getSystem()) {{
final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
actorContext.setSchemaContext(schemaContext);
doReturn(schemaContext).when(mockActorContext).getSchemaContext();
+ doReturn(DatastoreContext.newBuilder().build()).when(mockActorContext).getDatastoreContext();
}
@SuppressWarnings("resource")
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
+import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@Mock
private ClusterWrapper mockClusterWrapper;
- String memberName = "mock-member";
+ private final String memberName = "mock-member";
+
+ private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).
+ shardBatchedModificationCount(1);
@BeforeClass
public static void setUpClass() throws IOException {
schemaContext = TestModel.createTestContext();
- DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
-
doReturn(getSystem()).when(mockActorContext).getActorSystem();
doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
doReturn(memberName).when(mockActorContext).getCurrentMemberName();
doReturn(schemaContext).when(mockActorContext).getSchemaContext();
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
- doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
+ doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
ShardStrategyFactory.setConfiguration(configuration);
}
private ReadData eqSerializedReadData() {
+ return eqSerializedReadData(TestModel.TEST_PATH);
+ }
+
+ private ReadData eqSerializedReadData(final YangInstanceIdentifier path) {
ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
@Override
public boolean matches(Object argument) {
return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
- ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
+ ReadData.fromSerializable(argument).getPath().equals(path);
}
};
return argThat(matcher);
}
- private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
- return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
- }
-
- private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite,
- final int transactionVersion) {
+ private WriteData eqLegacyWriteData(final NormalizedNode<?, ?> nodeToWrite) {
ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
@Override
public boolean matches(Object argument) {
- if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
- WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
- (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
- ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) {
-
+ if(ShardTransactionMessages.WriteData.class.equals(argument.getClass())) {
WriteData obj = WriteData.fromSerializable(argument);
- return obj.getPath().equals(TestModel.TEST_PATH) &&
- obj.getData().equals(nodeToWrite);
+ return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
}
return false;
return argThat(matcher);
}
- private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
- ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
- @Override
- public boolean matches(Object argument) {
- if(argument instanceof WriteData) {
- WriteData obj = (WriteData) argument;
- return obj.getPath().equals(TestModel.TEST_PATH) &&
- obj.getData().equals(nodeToWrite);
- }
- return false;
- }
- };
-
- return argThat(matcher);
- }
-
- private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
- return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
- }
-
- private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite,
- final int transactionVersion) {
+ private MergeData eqLegacyMergeData(final NormalizedNode<?, ?> nodeToWrite) {
ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
@Override
public boolean matches(Object argument) {
- if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
- MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
- (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
- ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) {
-
+ if(ShardTransactionMessages.MergeData.class.equals(argument.getClass())) {
MergeData obj = MergeData.fromSerializable(argument);
- return obj.getPath().equals(TestModel.TEST_PATH) &&
- obj.getData().equals(nodeToWrite);
+ return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
}
return false;
return argThat(matcher);
}
- private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
- ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
- @Override
- public boolean matches(Object argument) {
- if(argument instanceof MergeData) {
- MergeData obj = ((MergeData) argument);
- return obj.getPath().equals(TestModel.TEST_PATH) &&
- obj.getData().equals(nodeToWrite);
- }
-
- return false;
- }
- };
-
- return argThat(matcher);
- }
-
- private DeleteData eqSerializedDeleteData() {
- ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
- @Override
- public boolean matches(Object argument) {
- return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
- DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
- }
- };
-
- return argThat(matcher);
- }
-
- private DeleteData eqDeleteData() {
+ private DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) {
ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
@Override
public boolean matches(Object argument) {
- return argument instanceof DeleteData &&
- ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH);
+ return ShardTransactionMessages.DeleteData.class.equals(argument.getClass()) &&
+ DeleteData.fromSerializable(argument).getPath().equals(expPath);
}
};
private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
short transactionVersion) {
- return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion));
+ return Futures.successful(new ReadDataReply(data, transactionVersion).toSerializable());
}
private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
}
private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
- return Futures.successful(new ReadDataReply(data));
+ return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION));
}
private Future<Object> dataExistsSerializedReply(boolean exists) {
return Futures.successful(new DataExistsReply(exists));
}
- private Future<Object> writeSerializedDataReply(short version) {
- return Futures.successful(new WriteDataReply().toSerializable(version));
- }
-
- private Future<Object> writeSerializedDataReply() {
- return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
- }
-
- private Future<WriteDataReply> writeDataReply() {
- return Futures.successful(new WriteDataReply());
- }
-
- private Future<Object> mergeSerializedDataReply(short version) {
- return Futures.successful(new MergeDataReply().toSerializable(version));
- }
-
- private Future<Object> mergeSerializedDataReply() {
- return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
+ private Future<BatchedModificationsReply> batchedModificationsReply(int count) {
+ return Futures.successful(new BatchedModificationsReply(count));
}
private Future<Object> incompleteFuture(){
return mock(Future.class);
}
- private Future<MergeDataReply> mergeDataReply() {
- return Futures.successful(new MergeDataReply());
+ private ActorSelection actorSelection(ActorRef actorRef) {
+ return getSystem().actorSelection(actorRef.path());
+ }
+
+ private void expectBatchedModifications(ActorRef actorRef, int count) {
+ doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(BatchedModifications.class));
}
- private Future<Object> deleteSerializedDataReply(short version) {
- return Futures.successful(new DeleteDataReply().toSerializable(version));
+ private void expectBatchedModifications(int count) {
+ doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), isA(BatchedModifications.class));
}
- private Future<Object> deleteSerializedDataReply() {
- return deleteSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
+ private void expectIncompleteBatchedModifications() {
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), isA(BatchedModifications.class));
}
- private Future<DeleteDataReply> deleteDataReply() {
- return Futures.successful(new DeleteDataReply());
+ private void expectReadyTransaction(ActorRef actorRef) {
+ doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
}
- private ActorSelection actorSelection(ActorRef actorRef) {
- return getSystem().actorSelection(actorRef.path());
+ private void expectFailedBatchedModifications(ActorRef actorRef) {
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(BatchedModifications.class));
}
private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
public void testRead() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
doReturn(Futures.successful(new Object())).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
}
doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
}
@Test(expected = TestException.class)
public void testReadWithPriorRecordingOperationFailure() throws Throwable {
+ doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
+ when(mockActorContext).getDatastoreContext();
+
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
-
- doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
+ expectFailedBatchedModifications(actorRef);
doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(expectedNode));
+ expectBatchedModifications(actorRef, 1);
doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
transactionProxy.write(TestModel.TEST_PATH, expectedNode);
TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
-
assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
+
+ InOrder inOrder = Mockito.inOrder(mockActorContext);
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData());
}
@Test(expected=IllegalStateException.class)
public void testReadPreConditionCheck() {
-
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
-
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.read(TestModel.TEST_PATH);
}
public void testExists() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedDataExists());
doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
}
@Test(expected = TestException.class)
public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
+ doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
+ when(mockActorContext).getDatastoreContext();
+
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
-
- doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
+ expectFailedBatchedModifications(actorRef);
doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedDataExists());
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+ expectBatchedModifications(actorRef, 1);
doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedDataExists());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
assertEquals("Exists response", true, exists);
+
+ InOrder inOrder = Mockito.inOrder(mockActorContext);
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedDataExists());
}
@Test(expected=IllegalStateException.class)
public void testExistsPreConditionCheck() {
-
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
-
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.exists(TestModel.TEST_PATH);
}
// Expected
}
} else {
- assertEquals("Recording operation Future result type", expResultType,
+ assertEquals(String.format("Recording operation %d Future result type", i +1 ), expResultType,
Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
}
}
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+ expectBatchedModifications(actorRef, 1);
+ expectReadyTransaction(actorRef);
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+ // This sends the batched modification.
+ transactionProxy.ready();
+
+ verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- WriteDataReply.class);
+ BatchedModificationsReply.class);
}
@Test
doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
- final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ expectBatchedModifications(actorRef, 1);
+ expectReadyTransaction(actorRef);
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+ final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
throw caughtEx.get();
}
- verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+ // This sends the batched modification.
+ transactionProxy.ready();
+
+ verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- WriteDataReply.class);
+ BatchedModificationsReply.class);
}
@Test(expected=IllegalStateException.class)
public void testWritePreConditionCheck() {
-
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY);
-
- transactionProxy.write(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+ transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
}
@Test(expected=IllegalStateException.class)
public void testWriteAfterReadyPreConditionCheck() {
-
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.ready();
- transactionProxy.write(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
}
@Test
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+ expectBatchedModifications(actorRef, 1);
+ expectReadyTransaction(actorRef);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
- verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+ // This sends the batched modification.
+ transactionProxy.ready();
+
+ verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- MergeDataReply.class);
+ BatchedModificationsReply.class);
}
@Test
public void testDelete() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
- doReturn(deleteSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedDeleteData());
+ expectBatchedModifications(actorRef, 1);
+ expectReadyTransaction(actorRef);
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.delete(TestModel.TEST_PATH);
- verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedDeleteData());
+ // This sends the batched modification.
+ transactionProxy.ready();
+
+ verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- DeleteDataReply.class);
+ BatchedModificationsReply.class);
}
private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+ expectBatchedModifications(actorRef, 1);
+ expectReadyTransaction(actorRef);
- doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
transactionProxy.read(TestModel.TEST_PATH);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- WriteDataReply.class);
+ BatchedModificationsReply.class);
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+ verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
+ isA(BatchedModifications.class));
}
private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
- doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version));
+ doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
+ executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
- doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version));
+ doReturn(Futures.successful(new MergeDataReply().toSerializable(version))).when(mockActorContext).
+ executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyMergeData(testNode));
- doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext).
+ executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH));
+
+ expectReadyTransaction(actorRef);
doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
eq(actorRef.path().toString()));
transactionProxy.merge(TestModel.TEST_PATH, testNode);
+ transactionProxy.delete(TestModel.TEST_PATH);
+
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class);
+ ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class,
+ ShardTransactionMessages.DeleteDataReply.class);
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+ expectFailedBatchedModifications(actorRef);
- doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
-
- doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ expectReadyTransaction(actorRef);
doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
-
- transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
verifyCohortFutures(proxy, TestException.class);
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- MergeDataReply.class, TestException.class);
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), TestException.class);
}
@Test
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+ expectBatchedModifications(actorRef, 1);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)),
isA(ReadyTransaction.SERIALIZABLE_CLASS));
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- MergeDataReply.class);
+ BatchedModificationsReply.class);
verifyCohortFutures(proxy, TestException.class);
}
doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
mockActorContext).findPrimaryShardAsync(anyString());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+ expectBatchedModifications(actorRef, 1);
doReturn(Futures.successful(new Object())).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)),
isA(ReadyTransaction.SERIALIZABLE_CLASS));
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
transactionProxy.read(TestModel.TEST_PATH);
String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
- .setTransactionId("txn-1")
- .setTransactionActorPath(actorPath)
- .build();
+ .setTransactionId("txn-1").setTransactionActorPath(actorPath).build();
doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
}
@Test
- public void testLocalTxActorWrite() throws Exception {
+ public void testLocalTxActorReady() throws Exception {
ActorSystem actorSystem = getSystem();
ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
- CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
- .setTransactionId("txn-1")
- .setTransactionActorPath(actorPath)
- .build();
+ CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
+ setTransactionId("txn-1").setTransactionActorPath(actorPath).
+ setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
- executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+ executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
eqCreateTransaction(memberName, WRITE_ONLY));
doReturn(true).when(mockActorContext).isPathLocal(actorPath);
- NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqWriteData(nodeToWrite));
+ doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), isA(BatchedModifications.class));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- verify(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqWriteData(nodeToWrite));
-
- //testing local merge
- doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqMergeData(nodeToWrite));
-
- transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
-
- verify(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqMergeData(nodeToWrite));
-
-
- //testing local delete
- doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqDeleteData());
- transactionProxy.delete(TestModel.TEST_PATH);
-
- verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData());
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class);
+ BatchedModificationsReply.class);
// testing ready
doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
}
String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
- CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
- .setTransactionId("txn-1")
- .setTransactionActorPath(actorPath)
- .build();
+ CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
+ setTransactionId("txn-1").setTransactionActorPath(actorPath).
+ setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
long end = System.nanoTime();
- Assert.assertTrue(String.format("took less time than expected %s was %s",
- TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()),
- (end-start)), (end - start) > TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+ long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
+ Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
+ expected, (end-start)), (end - start) > expected);
}
}
String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
- CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
- .setTransactionId("txn-1")
- .setTransactionActorPath(actorPath)
- .build();
+ CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
+ setTransactionId("txn-1").setTransactionActorPath(actorPath).
+ setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
long end = System.nanoTime();
- Assert.assertTrue(String.format("took more time than expected %s was %s",
- TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()),
- (end-start)), (end - start) <= TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+ long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
+ Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
+ expected, (end-start)), (end - start) <= expected);
}
public void testWriteThrottling(boolean shardFound){
public void run(TransactionProxy transactionProxy) {
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqWriteData(nodeToWrite));
+ expectBatchedModifications(2);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
public void run(TransactionProxy transactionProxy) {
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqWriteData(nodeToWrite));
+ expectIncompleteBatchedModifications();
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
}
});
-
}
@Test
public void run(TransactionProxy transactionProxy) {
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqWriteData(nodeToWrite));
+ expectBatchedModifications(2);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
public void run(TransactionProxy transactionProxy) {
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqSerializedWriteData(nodeToWrite));
+ expectBatchedModifications(2);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
}
});
-
}
@Test
public void run(TransactionProxy transactionProxy) {
NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqMergeData(nodeToMerge));
+ expectIncompleteBatchedModifications();
transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
public void run(TransactionProxy transactionProxy) {
NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqMergeData(nodeToMerge));
+ expectBatchedModifications(2);
transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
public void run(TransactionProxy transactionProxy) {
NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqMergeData(nodeToMerge));
+ expectBatchedModifications(2);
transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
throttleOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqDeleteData());
+ expectIncompleteBatchedModifications();
transactionProxy.delete(TestModel.TEST_PATH);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqDeleteData());
+ expectBatchedModifications(2);
transactionProxy.delete(TestModel.TEST_PATH);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
- doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqDeleteData());
+ expectBatchedModifications(2);
transactionProxy.delete(TestModel.TEST_PATH);
public void run(TransactionProxy transactionProxy) {
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqWriteData(nodeToWrite));
+ expectBatchedModifications(1);
doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
any(ActorSelection.class), any(ReadyTransaction.class));
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqWriteData(nodeToWrite));
-
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqWriteData(carsNode));
+ expectBatchedModifications(2);
doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
any(ActorSelection.class), any(ReadyTransaction.class));
}
}, 2, true);
}
+
+ @Test
+ public void testModificationOperationBatching() throws Throwable {
+ int shardBatchedModificationCount = 3;
+ doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
+ when(mockActorContext).getDatastoreContext();
+
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+ expectBatchedModifications(actorRef, shardBatchedModificationCount);
+
+ expectReadyTransaction(actorRef);
+
+ YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
+ NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+ YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
+ NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
+
+ YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
+ NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+ YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
+ NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
+
+ YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
+ YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+ transactionProxy.write(writePath1, writeNode1);
+ transactionProxy.write(writePath2, writeNode2);
+ transactionProxy.delete(deletePath1);
+ transactionProxy.merge(mergePath1, mergeNode1);
+ transactionProxy.merge(mergePath2, mergeNode2);
+ transactionProxy.write(writePath3, writeNode3);
+ transactionProxy.merge(mergePath3, mergeNode3);
+ transactionProxy.delete(deletePath2);
+
+ // This sends the last batch.
+ transactionProxy.ready();
+
+ List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+ assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
+
+ verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
+ new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
+
+ verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
+ new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
+
+ verifyBatchedModifications(batchedModifications.get(2), new MergeModification(mergePath3, mergeNode3),
+ new DeleteModification(deletePath2));
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+ }
+
+ @Test
+ public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
+ int shardBatchedModificationCount = 10;
+ doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
+ when(mockActorContext).getDatastoreContext();
+
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+ expectBatchedModifications(actorRef, shardBatchedModificationCount);
+
+ YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
+ NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+ YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
+ NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
+
+ YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
+
+ doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
+
+ doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
+
+ doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedDataExists());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+ transactionProxy.write(writePath1, writeNode1);
+ transactionProxy.write(writePath2, writeNode2);
+
+ Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
+ get(5, TimeUnit.SECONDS);
+
+ assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+ assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
+
+ transactionProxy.merge(mergePath1, mergeNode1);
+ transactionProxy.merge(mergePath2, mergeNode2);
+
+ readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
+
+ transactionProxy.delete(deletePath);
+
+ Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
+ assertEquals("Exists response", true, exists);
+
+ assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+ assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
+
+ List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+ assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
+
+ verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
+ new WriteModification(writePath2, writeNode2));
+
+ verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
+ new MergeModification(mergePath2, mergeNode2));
+
+ verifyBatchedModifications(batchedModifications.get(2), new DeleteModification(deletePath));
+
+ InOrder inOrder = Mockito.inOrder(mockActorContext);
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
+
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
+
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedDataExists());
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+ }
+
+ private List<BatchedModifications> captureBatchedModifications(ActorRef actorRef) {
+ ArgumentCaptor<BatchedModifications> batchedModificationsCaptor =
+ ArgumentCaptor.forClass(BatchedModifications.class);
+ verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync(
+ eq(actorSelection(actorRef)), batchedModificationsCaptor.capture());
+
+ List<BatchedModifications> batchedModifications = filterCaptured(
+ batchedModificationsCaptor, BatchedModifications.class);
+ return batchedModifications;
+ }
+
+ private <T> List<T> filterCaptured(ArgumentCaptor<T> captor, Class<T> type) {
+ List<T> captured = new ArrayList<>();
+ for(T c: captor.getAllValues()) {
+ if(type.isInstance(c)) {
+ captured.add(c);
+ }
+ }
+
+ return captured;
+ }
+
+ private void verifyOneBatchedModification(ActorRef actorRef, Modification expected) {
+ List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+ assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+ verifyBatchedModifications(batchedModifications.get(0), expected);
+ }
+
+ private void verifyBatchedModifications(Object message, Modification... expected) {
+ assertEquals("Message type", BatchedModifications.class, message.getClass());
+ BatchedModifications batchedModifications = (BatchedModifications)message;
+ assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
+ for(int i = 0; i < batchedModifications.getModifications().size(); i++) {
+ Modification actual = batchedModifications.getModifications().get(i);
+ assertEquals("Modification type", expected[i].getClass(), actual.getClass());
+ assertEquals("getPath", ((AbstractModification)expected[i]).getPath(),
+ ((AbstractModification)actual).getPath());
+ if(actual instanceof WriteModification) {
+ assertEquals("getData", ((WriteModification)expected[i]).getData(),
+ ((WriteModification)actual).getData());
+ }
+ }
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for BatchedModifications.
+ *
+ * @author Thomas Pantelis
+ */
+public class BatchedModificationsTest {
+
+ @Test
+ public void testSerialization() {
+ YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+ YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
+ NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
+
+ YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
+
+ BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
+ batched.addModification(new WriteModification(writePath, writeData));
+ batched.addModification(new MergeModification(mergePath, mergeData));
+ batched.addModification(new DeleteModification(deletePath));
+
+ BatchedModifications clone = (BatchedModifications) SerializationUtils.clone(
+ (Serializable) batched.toSerializable());
+
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
+
+ assertEquals("getModifications size", 3, clone.getModifications().size());
+
+ WriteModification write = (WriteModification)clone.getModifications().get(0);
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, write.getVersion());
+ assertEquals("getPath", writePath, write.getPath());
+ assertEquals("getData", writeData, write.getData());
+
+ MergeModification merge = (MergeModification)clone.getModifications().get(1);
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, merge.getVersion());
+ assertEquals("getPath", mergePath, merge.getPath());
+ assertEquals("getData", mergeData, merge.getData());
+
+ DeleteModification delete = (DeleteModification)clone.getModifications().get(2);
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion());
+ assertEquals("getPath", deletePath, delete.getPath());
+ }
+
+ @Test
+ public void testBatchedModificationsReplySerialization() {
+ BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone(
+ (Serializable) new BatchedModificationsReply(100).toSerializable());
+ assertEquals("getNumBatched", 100, clone.getNumBatched());
+ }
+}
*
* @author Thomas Pantelis
*/
+@Deprecated
public class DeleteDataTest {
@Test
public void testSerialization() {
YangInstanceIdentifier path = TestModel.TEST_PATH;
- DeleteData expected = new DeleteData(path);
+ DeleteData expected = new DeleteData(path, DataStoreVersions.CURRENT_VERSION);
- Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+ Object serialized = expected.toSerializable();
assertEquals("Serialized type", DeleteData.class, serialized.getClass());
assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((DeleteData)serialized).getVersion());
Object clone = SerializationUtils.clone((Serializable) serialized);
- assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((DeleteData)clone).getVersion());
DeleteData actual = DeleteData.fromSerializable(clone);
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
assertEquals("getPath", expected.getPath(), actual.getPath());
}
public void testSerializationWithHeliumR1Version() throws Exception {
YangInstanceIdentifier path = TestModel.TEST_PATH;
- DeleteData expected = new DeleteData(path);
+ DeleteData expected = new DeleteData(path, DataStoreVersions.HELIUM_1_VERSION);
- Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+ Object serialized = expected.toSerializable();
assertEquals("Serialized type", ShardTransactionMessages.DeleteData.class, serialized.getClass());
DeleteData actual = DeleteData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+@Deprecated
public class MergeDataTest {
@Test
new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- MergeData expected = new MergeData(path, data);
+ MergeData expected = new MergeData(path, data, DataStoreVersions.CURRENT_VERSION);
- Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+ Object serialized = expected.toSerializable();
assertEquals("Serialized type", MergeData.class, serialized.getClass());
assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)serialized).getVersion());
Object clone = SerializationUtils.clone((Serializable) serialized);
- assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)clone).getVersion());
MergeData actual = MergeData.fromSerializable(clone);
+ assertEquals("Version", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
assertEquals("getPath", expected.getPath(), actual.getPath());
assertEquals("getData", expected.getData(), actual.getData());
}
new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- MergeData expected = new MergeData(path, data);
+ MergeData expected = new MergeData(path, data, DataStoreVersions.HELIUM_1_VERSION);
- Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+ Object serialized = expected.toSerializable();
assertEquals("Serialized type", ShardTransactionMessages.MergeData.class, serialized.getClass());
MergeData actual = MergeData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- ReadDataReply expected = new ReadDataReply(data);
+ ReadDataReply expected = new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION);
- Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+ Object serialized = expected.toSerializable();
assertEquals("Serialized type", ReadDataReply.class, serialized.getClass());
ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
(Serializable) serialized));
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
assertEquals("getNormalizedNode", expected.getNormalizedNode(), actual.getNormalizedNode());
}
new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- ReadDataReply expected = new ReadDataReply(data);
+ ReadDataReply expected = new ReadDataReply(data, DataStoreVersions.HELIUM_1_VERSION);
- Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+ Object serialized = expected.toSerializable();
assertEquals("Serialized type", ShardTransactionMessages.ReadDataReply.class, serialized.getClass());
ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
*
* @author Thomas Pantelis
*/
+@Deprecated
public class WriteDataTest {
@Test
new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- WriteData expected = new WriteData(path, data);
+ WriteData expected = new WriteData(path, data, DataStoreVersions.CURRENT_VERSION);
- Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+ Object serialized = expected.toSerializable();
assertEquals("Serialized type", WriteData.class, serialized.getClass());
assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)serialized).getVersion());
Object clone = SerializationUtils.clone((Serializable) serialized);
- assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)clone).getVersion());
WriteData actual = WriteData.fromSerializable(clone);
+ assertEquals("Version", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
assertEquals("getPath", expected.getPath(), actual.getPath());
assertEquals("getData", expected.getData(), actual.getData());
}
new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- WriteData expected = new WriteData(path, data);
+ WriteData expected = new WriteData(path, data, DataStoreVersions.HELIUM_1_VERSION);
- Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+ Object serialized = expected.toSerializable();
assertEquals("Serialized type", ShardTransactionMessages.WriteData.class, serialized.getClass());
WriteData actual = WriteData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
import org.apache.commons.lang.SerializationUtils;
import org.junit.Ignore;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
MutableCompositeModification clone = (MutableCompositeModification) SerializationUtils.clone(compositeModification);
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
+
assertEquals("getModifications size", 3, clone.getModifications().size());
WriteModification write = (WriteModification)clone.getModifications().get(0);
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, write.getVersion());
assertEquals("getPath", writePath, write.getPath());
assertEquals("getData", writeData, write.getData());
MergeModification merge = (MergeModification)clone.getModifications().get(1);
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, merge.getVersion());
assertEquals("getPath", mergePath, merge.getPath());
assertEquals("getData", mergeData, merge.getData());
DeleteModification delete = (DeleteModification)clone.getModifications().get(2);
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion());
assertEquals("getPath", deletePath, delete.getPath());
}
private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
- public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).node(OUTER_LIST_QNAME).build();
+ public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
+ node(OUTER_LIST_QNAME).build();
+ public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
+ node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).build();
public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two");
public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three");
final MediaType mediaType, final MultivaluedMap<String, Object> httpHeaders, final OutputStream entityStream)
throws IOException, WebApplicationException {
NormalizedNode<?, ?> data = t.getData();
- InstanceIdentifierContext context = t.getInstanceIdentifierContext();
- DataSchemaNode schema = context.getSchemaNode();
+ final InstanceIdentifierContext context = t.getInstanceIdentifierContext();
+ final DataSchemaNode schema = context.getSchemaNode();
SchemaPath path = context.getSchemaNode().getPath();
- OutputStreamWriter outputWriter = new OutputStreamWriter(entityStream, Charsets.UTF_8);
+ final OutputStreamWriter outputWriter = new OutputStreamWriter(entityStream, Charsets.UTF_8);
if (data == null) {
throw new RestconfDocumentedException(Response.Status.NOT_FOUND);
}
boolean isDataRoot = false;
URI initialNs = null;
- outputWriter.write('{');
if (SchemaPath.ROOT.equals(path)) {
isDataRoot = true;
} else {
if(!schema.isAugmenting() && !(schema instanceof SchemaContext)) {
initialNs = schema.getQName().getNamespace();
}
- NormalizedNodeStreamWriter jsonWriter = JSONNormalizedNodeStreamWriter.create(context.getSchemaContext(),path,initialNs,outputWriter);
- NormalizedNodeWriter nnWriter = NormalizedNodeWriter.forStreamWriter(jsonWriter);
+ final NormalizedNodeStreamWriter jsonWriter = JSONNormalizedNodeStreamWriter.create(context.getSchemaContext(),path,initialNs,outputWriter);
+ final NormalizedNodeWriter nnWriter = NormalizedNodeWriter.forStreamWriter(jsonWriter);
if(isDataRoot) {
writeDataRoot(outputWriter,nnWriter,(ContainerNode) data);
} else {
nnWriter.write(data);
}
nnWriter.flush();
- outputWriter.write('}');
outputWriter.flush();
}
- private void writeDataRoot(OutputStreamWriter outputWriter, NormalizedNodeWriter nnWriter, ContainerNode data) throws IOException {
- Iterator<DataContainerChild<? extends PathArgument, ?>> iterator = data.getValue().iterator();
+ private void writeDataRoot(final OutputStreamWriter outputWriter, final NormalizedNodeWriter nnWriter, final ContainerNode data) throws IOException {
+ final Iterator<DataContainerChild<? extends PathArgument, ?>> iterator = data.getValue().iterator();
while(iterator.hasNext()) {
- DataContainerChild<? extends PathArgument, ?> child = iterator.next();
+ final DataContainerChild<? extends PathArgument, ?> child = iterator.next();
nnWriter.write(child);
nnWriter.flush();
}
import java.util.List;
+/**
+ * This interface defines the methods for Neutron Requests
+ *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.northbound.api.INeutronRequest}
+ */
+@Deprecated
public interface INeutronRequest<T extends INeutronObject> {
public T getSingleton();
public boolean isSingleton();
/**
* This interface defines the methods a service that wishes to be aware of Firewall Rules needs to implement
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronFirewallAware}
*/
+@Deprecated
public interface INeutronFirewallAware {
/**
/**
* This interface defines the methods for CRUD of NB OpenStack Firewall objects
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronFirewallCRUD}
*/
+@Deprecated
public interface INeutronFirewallCRUD {
/**
* Applications call this interface method to determine if a particular
/**
* This interface defines the methods a service that wishes to be aware of Firewall Policys needs to implement
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronFirewallPolicyAware}
*/
+@Deprecated
public interface INeutronFirewallPolicyAware {
/**
/**
* This interface defines the methods for CRUD of NB OpenStack Firewall Policy objects
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronFirewallPolicyCRUD}
*/
+@Deprecated
public interface INeutronFirewallPolicyCRUD {
/**
* Applications call this interface method to determine if a particular
/**
* This interface defines the methods a service that wishes to be aware of Firewall Rules needs to implement
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronFirewallRuleAware}
*/
+@Deprecated
public interface INeutronFirewallRuleAware {
/**
/**
* This interface defines the methods for CRUD of NB OpenStack Firewall Rule objects
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronFirewallRuleCRUD}
*/
+@Deprecated
public interface INeutronFirewallRuleCRUD {
/**
* Applications call this interface method to determine if a particular
/**
* This interface defines the methods a service that wishes to be aware of Neutron FloatingIPs needs to implement
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronFloatingIPAware}
*/
+@Deprecated
public interface INeutronFloatingIPAware {
/**
/**
* This interface defines the methods for CRUD of NB FloatingIP objects
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronFloatingIPCRUD}
*/
+@Deprecated
public interface INeutronFloatingIPCRUD {
/**
* Applications call this interface method to determine if a particular
/**
* This interface defines the methods a service that wishes to be aware of LoadBalancer Rules needs to implement
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerAware}
*/
+@Deprecated
public interface INeutronLoadBalancerAware {
/**
/**
* This interface defines the methods for CRUD of NB OpenStack LoadBalancer objects
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerCRUD}
*/
+@Deprecated
public interface INeutronLoadBalancerCRUD {
/**
* Applications call this interface method to determine if a particular
/**
* This interface defines the methods a service that wishes to be aware of LoadBalancerHealthMonitor Rules needs to implement
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerHealthMonitorAware}
*/
+@Deprecated
public interface INeutronLoadBalancerHealthMonitorAware {
/**
/**
* This interface defines the methods for CRUD of NB OpenStack LoadBalancerHealthMonitor objects
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerHealthMonitorCRUD}
*/
+@Deprecated
public interface INeutronLoadBalancerHealthMonitorCRUD {
/**
* Applications call this interface method to determine if a particular
/**
* This interface defines the methods a service that wishes to be aware of LoadBalancerListener Rules needs to implement
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerListenerAware}
*/
+@Deprecated
public interface INeutronLoadBalancerListenerAware {
/**
/**
* This interface defines the methods for CRUD of NB OpenStack LoadBalancerListener objects
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerListenerCRUD}
*/
+@Deprecated
public interface INeutronLoadBalancerListenerCRUD {
/**
* Applications call this interface method to determine if a particular
/**
* This interface defines the methods a service that wishes to be aware of LoadBalancerPool Rules needs to implement
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerPoolAware}
*/
+@Deprecated
public interface INeutronLoadBalancerPoolAware {
/**
/**
* This interface defines the methods for CRUD of NB OpenStack LoadBalancerPool objects
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerPoolCRUD}
*/
+@Deprecated
public interface INeutronLoadBalancerPoolCRUD {
/**
* Applications call this interface method to determine if a particular
*/
package org.opendaylight.controller.networkconfig.neutron;
+/**
+ * This interface defines the methods for CRUD of NB OpenStack INeutronLoadBalancerPoolMemberAware objects
+ *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerPoolMemberAware}
+ */
+
+@Deprecated
public interface INeutronLoadBalancerPoolMemberAware {
import java.util.List;
+/**
+ * This interface defines the methods for CRUD of NB OpenStack INeutronLoadBalancerPoolMemberCRUD objects
+ *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerPoolMemberCRUD}
+ */
+
+@Deprecated
public interface INeutronLoadBalancerPoolMemberCRUD {
/**
/**
* This interface defines the methods a service that wishes to be aware of Neutron Networks needs to implement
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronNetworkAware}
*/
+@Deprecated
public interface INeutronNetworkAware {
/**
/**
* This interface defines the methods for CRUD of NB network objects
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronNetworkCRUD}
*/
+@Deprecated
public interface INeutronNetworkCRUD {
/**
* Applications call this interface method to determine if a particular
/**
* This class contains behaviour common to Neutron configuration objects
+ *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronObject}
*/
+@Deprecated
public interface INeutronObject {
public String getID();
public void setID(String id);
/**
* This interface defines the methods a service that wishes to be aware of Neutron Ports needs to implement
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronPortAware}
*/
+@Deprecated
public interface INeutronPortAware {
/**
/**
* This interface defines the methods for CRUD of NB Port objects
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronPortCRUD}
*/
+@Deprecated
public interface INeutronPortCRUD {
/**
* Applications call this interface method to determine if a particular
/**
* This interface defines the methods a service that wishes to be aware of Neutron Routers needs to implement
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronRouterAware}
*/
+@Deprecated
public interface INeutronRouterAware {
/**
/**
* This interface defines the methods for CRUD of NB Router objects
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronRouterCRUD}
*/
+@Deprecated
public interface INeutronRouterCRUD {
/**
* Applications call this interface method to determine if a particular
/**
* This interface defines the methods a service that wishes to be aware of Neutron Security Groups needs to implement
+ *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronSecurityGroupAware}
*/
+@Deprecated
public interface INeutronSecurityGroupAware {
/**
/**
* This interface defines the methods for CRUD of NB OpenStack Security Group objects
+ *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronSecurityGroupCRUD}
*/
+@Deprecated
public interface INeutronSecurityGroupCRUD {
/**
* Applications call this interface method to determine if a particular
/**
* This interface defines the methods required to be aware of Neutron Security Rules
+ *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronSecurityRuleAware}
*/
+@Deprecated
public interface INeutronSecurityRuleAware {
/**
/**
* This interface defines the methods for CRUD of NB OpenStack Security Rule objects
+ *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronSecurityRuleCRUD}
*/
+@Deprecated
public interface INeutronSecurityRuleCRUD {
/**
* Applications call this interface method to determine if a particular
/**
* This interface defines the methods a service that wishes to be aware of Neutron Subnets needs to implement
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronSubnetAware}
*/
+@Deprecated
public interface INeutronSubnetAware {
/**
/**
* This interface defines the methods for CRUD of NB Subnet objects
*
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronSubnetCRUD}
*/
+@Deprecated
public interface INeutronSubnetCRUD {
/**
* Applications call this interface method to determine if a particular