import akka.actor.Props;
import akka.japi.Creator;
+
import com.google.common.base.Preconditions;
+
import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
public class DataChangeListener extends AbstractUntypedActor {
private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
- private final SchemaContext schemaContext;
- private final YangInstanceIdentifier pathId;
- private boolean notificationsEnabled = false;
+ private volatile boolean notificationsEnabled = false;
- public DataChangeListener(SchemaContext schemaContext,
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, YangInstanceIdentifier pathId) {
-
- this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+ public DataChangeListener(AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>> listener) {
this.listener = Preconditions.checkNotNull(listener, "listener should not be null");
- this.pathId = Preconditions.checkNotNull(pathId, "pathId should not be null");
}
@Override public void handleReceive(Object message) throws Exception {
}
}
- public static Props props(final SchemaContext schemaContext, final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, final YangInstanceIdentifier pathId) {
- return Props.create(new Creator<DataChangeListener>() {
- @Override
- public DataChangeListener create() throws Exception {
- return new DataChangeListener(schemaContext,listener,pathId );
- }
+ public static Props props(final AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>> listener) {
+ return Props.create(new DataChangeListenerCreator(listener));
+ }
+
+ private static class DataChangeListenerCreator implements Creator<DataChangeListener> {
+ private static final long serialVersionUID = 1L;
+
+ final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
- });
+ DataChangeListenerCreator(
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
+ this.listener = listener;
+ }
+ @Override
+ public DataChangeListener create() throws Exception {
+ return new DataChangeListener(listener);
+ }
}
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSelection;
+
import com.google.common.base.Preconditions;
+
import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
private final ActorSelection dataChangeListenerActor;
private final SchemaContext schemaContext;
- public DataChangeListenerProxy(SchemaContext schemaContext,ActorSelection dataChangeListenerActor) {
+ public DataChangeListenerProxy(SchemaContext schemaContext, ActorSelection dataChangeListenerActor) {
this.dataChangeListenerActor = Preconditions.checkNotNull(dataChangeListenerActor, "dataChangeListenerActor should not be null");
this.schemaContext = schemaContext;
}
@Override public void onDataChanged(
AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- dataChangeListenerActor.tell(new DataChanged(schemaContext,change), null);
+ dataChangeListenerActor.tell(new DataChanged(schemaContext, change), null);
}
}
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.japi.Creator;
+
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
public class DataChangeListenerRegistration extends AbstractUntypedActor {
- private final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ private final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
registration;
public DataChangeListenerRegistration(
- org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+ ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
this.registration = registration;
}
}
public static Props props(
- final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
- return Props.create(new Creator<DataChangeListenerRegistration>() {
-
- @Override
- public DataChangeListenerRegistration create() throws Exception {
- return new DataChangeListenerRegistration(registration);
- }
- });
+ final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+ return Props.create(new DataChangeListenerRegistrationCreator(registration));
}
private void closeListenerRegistration(
.tell(new CloseDataChangeListenerRegistrationReply().toSerializable(), getSelf());
getSelf().tell(PoisonPill.getInstance(), getSelf());
}
+
+ private static class DataChangeListenerRegistrationCreator
+ implements Creator<DataChangeListenerRegistration> {
+ final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>> registration;
+
+ DataChangeListenerRegistrationCreator(
+ ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>> registration) {
+ this.registration = registration;
+ }
+
+ @Override
+ public DataChangeListenerRegistration create() throws Exception {
+ return new DataChangeListenerRegistration(registration);
+ }
+ }
}
package org.opendaylight.controller.cluster.datastore;
+import java.util.concurrent.TimeUnit;
+
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.google.common.base.Preconditions;
+
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
-import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
/**
*
*/
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
private final ActorContext actorContext;
-
- private SchemaContext schemaContext;
+ private final ShardContext shardContext;
public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
- Configuration configuration, InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+ Configuration configuration, DistributedDataStoreProperties dataStoreProperties) {
Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
Preconditions.checkNotNull(type, "type should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
LOG.info("Creating ShardManager : {}", shardManagerId);
- this.actorContext = new ActorContext(actorSystem, actorSystem
- .actorOf(ShardManager.props(type, cluster, configuration, dataStoreProperties),
+ shardContext = new ShardContext(InMemoryDOMDataStoreConfigProperties.create(
+ dataStoreProperties.getMaxShardDataChangeExecutorPoolSize(),
+ dataStoreProperties.getMaxShardDataChangeExecutorQueueSize(),
+ dataStoreProperties.getMaxShardDataChangeListenerQueueSize()),
+ Duration.create(dataStoreProperties.getShardTransactionIdleTimeoutInMinutes(),
+ TimeUnit.MINUTES));
+
+ actorContext = new ActorContext(actorSystem, actorSystem
+ .actorOf(ShardManager.props(type, cluster, configuration, shardContext),
shardManagerId ), cluster, configuration);
}
public DistributedDataStore(ActorContext actorContext) {
this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+ this.shardContext = new ShardContext();
}
LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
- DataChangeListener.props(schemaContext,listener,path ));
+ DataChangeListener.props(listener ));
String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
@Override
public DOMStoreTransactionChain createTransactionChain() {
- return new TransactionChainProxy(actorContext, schemaContext);
+ return new TransactionChainProxy(actorContext);
}
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
- return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
- schemaContext);
+ return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY,
- schemaContext);
+ return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE,
- schemaContext);
+ return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
}
- @Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
- this.schemaContext = schemaContext;
- actorContext.getShardManager().tell(
- new UpdateSchemaContext(schemaContext), null);
+ @Override
+ public void onGlobalContextUpdated(SchemaContext schemaContext) {
+ actorContext.setSchemaContext(schemaContext);
}
- @Override public void close() throws Exception {
+ @Override
+ public void close() throws Exception {
actorContext.shutdown();
-
}
}
import akka.actor.ActorSystem;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
public class DistributedDataStoreFactory {
public static DistributedDataStore createInstance(String name, SchemaService schemaService,
- InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+ DistributedDataStoreProperties dataStoreProperties) {
ActorSystem actorSystem = ActorSystemFactory.getInstance();
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Wrapper class for DistributedDataStore configuration properties.
+ *
+ * @author Thomas Pantelis
+ */
+public class DistributedDataStoreProperties {
+ private final int maxShardDataChangeListenerQueueSize;
+ private final int maxShardDataChangeExecutorQueueSize;
+ private final int maxShardDataChangeExecutorPoolSize;
+ private final int shardTransactionIdleTimeoutInMinutes;
+
+ public DistributedDataStoreProperties() {
+ maxShardDataChangeListenerQueueSize = 1000;
+ maxShardDataChangeExecutorQueueSize = 1000;
+ maxShardDataChangeExecutorPoolSize = 20;
+ shardTransactionIdleTimeoutInMinutes = 10;
+ }
+
+ public DistributedDataStoreProperties(int maxShardDataChangeListenerQueueSize,
+ int maxShardDataChangeExecutorQueueSize, int maxShardDataChangeExecutorPoolSize,
+ int shardTransactionIdleTimeoutInMinutes) {
+ this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
+ this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
+ this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
+ this.shardTransactionIdleTimeoutInMinutes = shardTransactionIdleTimeoutInMinutes;
+ }
+
+ public int getMaxShardDataChangeListenerQueueSize() {
+ return maxShardDataChangeListenerQueueSize;
+ }
+
+ public int getMaxShardDataChangeExecutorQueueSize() {
+ return maxShardDataChangeExecutorQueueSize;
+ }
+
+ public int getMaxShardDataChangeExecutorPoolSize() {
+ return maxShardDataChangeExecutorPoolSize;
+ }
+
+ public int getShardTransactionIdleTimeoutInMinutes() {
+ return shardTransactionIdleTimeoutInMinutes;
+ }
+}
import akka.event.LoggingAdapter;
import akka.japi.Creator;
import akka.serialization.Serialization;
+
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
/// The name of this shard
private final ShardIdentifier name;
- private volatile SchemaContext schemaContext;
-
private final ShardStats shardMBean;
private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
+ private final ShardContext shardContext;
+
+ private SchemaContext schemaContext;
+
private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
- InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+ ShardContext shardContext) {
super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
this.name = name;
+ this.shardContext = shardContext;
String setting = System.getProperty("shard.persistent");
LOG.info("Shard created : {} persistent : {}", name, persistent);
- store = InMemoryDOMDataStoreFactory.create(name.toString(), null, dataStoreProperties);
+ store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
+ shardContext.getDataStoreProperties());
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
return map;
}
-
-
-
public static Props props(final ShardIdentifier name,
final Map<ShardIdentifier, String> peerAddresses,
- final InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+ ShardContext shardContext) {
Preconditions.checkNotNull(name, "name should not be null");
- Preconditions
- .checkNotNull(peerAddresses, "peerAddresses should not be null");
+ Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
+ Preconditions.checkNotNull(shardContext, "shardContext should not be null");
- return Props.create(new Creator<Shard>() {
-
- @Override
- public Shard create() throws Exception {
- return new Shard(name, peerAddresses, dataStoreProperties);
- }
-
- });
+ return Props.create(new ShardCreator(name, peerAddresses, shardContext));
}
-
@Override public void onReceiveCommand(Object message) {
LOG.debug("Received message {} from {}", message.getClass().toString(),
getSender());
shardMBean.incrementReadOnlyTransactionCount();
return getContext().actorOf(
- ShardTransaction
- .props(store.newReadOnlyTransaction(), getSelf(),
- schemaContext), transactionId.toString());
+ ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
+ schemaContext, shardContext), transactionId.toString());
} else if (createTransaction.getTransactionType()
== TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
shardMBean.incrementReadWriteTransactionCount();
return getContext().actorOf(
- ShardTransaction
- .props(store.newReadWriteTransaction(), getSelf(),
- schemaContext), transactionId.toString());
+ ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
+ schemaContext, shardContext), transactionId.toString());
} else if (createTransaction.getTransactionType()
shardMBean.incrementWriteOnlyTransactionCount();
return getContext().actorOf(
- ShardTransaction
- .props(store.newWriteOnlyTransaction(), getSelf(),
- schemaContext), transactionId.toString());
+ ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(),
+ schemaContext, shardContext), transactionId.toString());
} else {
throw new IllegalArgumentException(
"Shard="+name + ":CreateTransaction message has unidentified transaction type="
final ActorRef self = getSelf();
Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
public void onSuccess(Void v) {
sender.tell(new CommitTransactionReply().toSerializable(),self);
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(new Date());
}
+ @Override
public void onFailure(Throwable t) {
LOG.error(t, "An exception happened during commit");
shardMBean.incrementFailedTransactionsCount();
dataChangeListeners.add(dataChangeListenerPath);
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
- listener =
- new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
+ listener = new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
- org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- registration =
- store.registerChangeListener(registerChangeListener.getPath(),
+ ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ registration = store.registerChangeListener(registerChangeListener.getPath(),
listener, registerChangeListener.getScope());
ActorRef listenerRegistration =
getContext().actorOf(
private void createTransactionChain() {
DOMStoreTransactionChain chain = store.createTransactionChain();
- ActorRef transactionChain =
- getContext().actorOf(
- ShardTransactionChain.props(chain, schemaContext));
- getSender()
- .tell(new CreateTransactionChainReply(transactionChain.path())
- .toSerializable(),
+ ActorRef transactionChain = getContext().actorOf(
+ ShardTransactionChain.props(chain, schemaContext, shardContext));
+ getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
getSelf());
}
return HEART_BEAT_INTERVAL;
}
}
+
+ private static class ShardCreator implements Creator<Shard> {
+
+ private static final long serialVersionUID = 1L;
+
+ final ShardIdentifier name;
+ final Map<ShardIdentifier, String> peerAddresses;
+ final ShardContext shardContext;
+
+ ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
+ ShardContext shardContext) {
+ this.name = name;
+ this.peerAddresses = peerAddresses;
+ this.shardContext = shardContext;
+ }
+
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(name, peerAddresses, shardContext);
+ }
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+import com.google.common.base.Preconditions;
+
+import scala.concurrent.duration.Duration;
+
+/**
+ * Contains contextual data for shards.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardContext {
+
+ private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
+ private final Duration shardTransactionIdleTimeout;
+
+ public ShardContext() {
+ this.dataStoreProperties = null;
+ this.shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
+ }
+
+ public ShardContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
+ Duration shardTransactionIdleTimeout) {
+ this.dataStoreProperties = Preconditions.checkNotNull(dataStoreProperties);
+ this.shardTransactionIdleTimeout = Preconditions.checkNotNull(shardTransactionIdleTimeout);
+ }
+
+ public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
+ return dataStoreProperties;
+ }
+
+ public Duration getShardTransactionIdleTimeout() {
+ return shardTransactionIdleTimeout;
+ }
+}
import akka.cluster.ClusterEvent;
import akka.japi.Creator;
import akka.japi.Function;
+
import com.google.common.base.Preconditions;
+
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import scala.concurrent.duration.Duration;
private ShardManagerInfoMBean mBean;
- private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
+ private final ShardContext shardContext;
/**
* @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
* configuration or operational
*/
private ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
- InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+ ShardContext shardContext) {
this.type = Preconditions.checkNotNull(type, "type should not be null");
this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
- this.dataStoreProperties = dataStoreProperties;
+ this.shardContext = shardContext;
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
public static Props props(final String type,
final ClusterWrapper cluster,
final Configuration configuration,
- final InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+ final ShardContext shardContext) {
Preconditions.checkNotNull(type, "type should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
- return Props.create(new Creator<ShardManager>() {
-
- @Override
- public ShardManager create() throws Exception {
- return new ShardManager(type, cluster, configuration, dataStoreProperties);
- }
- });
+ return Props.create(new ShardManagerCreator(type, cluster, configuration, shardContext));
}
-
@Override
public void handleReceive(Object message) throws Exception {
if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
ActorRef actor = getContext()
- .actorOf(Shard.props(shardId, peerAddresses, dataStoreProperties),
+ .actorOf(Shard.props(shardId, peerAddresses, shardContext),
shardId.toString());
localShardActorNames.add(shardId.toString());
localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
}
}
}
+
+ private static class ShardManagerCreator implements Creator<ShardManager> {
+ private static final long serialVersionUID = 1L;
+
+ final String type;
+ final ClusterWrapper cluster;
+ final Configuration configuration;
+ final ShardContext shardContext;
+
+ ShardManagerCreator(String type, ClusterWrapper cluster,
+ Configuration configuration, ShardContext shardContext) {
+ this.type = type;
+ this.cluster = cluster;
+ this.configuration = configuration;
+ this.shardContext = shardContext;
+ }
+
+ @Override
+ public ShardManager create() throws Exception {
+ return new ShardManager(type, cluster, configuration, shardContext);
+ }
+ }
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
-import akka.actor.PoisonPill;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
* Date: 8/6/14
*/
public class ShardReadTransaction extends ShardTransaction {
- private final DOMStoreReadTransaction transaction;
- private final LoggingAdapter log =
- Logging.getLogger(getContext().system(), this);
-
- public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor, SchemaContext schemaContext) {
- super(shardActor, schemaContext);
- this.transaction = transaction;
-
- }
+ private final DOMStoreReadTransaction transaction;
- public ShardReadTransaction(DOMStoreTransactionChain transactionChain, DOMStoreReadTransaction transaction, ActorRef shardActor, SchemaContext schemaContext) {
- super(transactionChain, shardActor, schemaContext);
- this.transaction = transaction;
- }
-
- @Override
- public void handleReceive(Object message) throws Exception {
- if (ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- readData(transaction, ReadData.fromSerializable(message));
- } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
- dataExists(transaction, DataExists.fromSerializable(message));
- } else {
- super.handleReceive(message);
+ public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
+ SchemaContext schemaContext) {
+ super(shardActor, schemaContext);
+ this.transaction = transaction;
}
- }
- protected void closeTransaction(CloseTransaction message) {
- transaction.close();
- getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
- getSelf().tell(PoisonPill.getInstance(), getSelf());
- }
- //default scope test method to check if we get correct exception
- void forUnitTestOnlyExplicitTransactionClose(){
- transaction.close();
- }
+ @Override
+ public void handleReceive(Object message) throws Exception {
+ if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+ readData(transaction, ReadData.fromSerializable(message));
+ } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
+ dataExists(transaction, DataExists.fromSerializable(message));
+ } else {
+ super.handleReceive(message);
+ }
+ }
+ @Override
+ protected DOMStoreTransaction getDOMStoreTransaction() {
+ return transaction;
+ }
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
-import akka.actor.PoisonPill;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
* Date: 8/6/14
*/
public class ShardReadWriteTransaction extends ShardTransaction {
- private final DOMStoreReadWriteTransaction transaction;
- private final LoggingAdapter log =
- Logging.getLogger(getContext().system(), this);
- public ShardReadWriteTransaction(DOMStoreTransactionChain transactionChain, DOMStoreReadWriteTransaction transaction, ActorRef shardActor, SchemaContext schemaContext) {
- super(transactionChain, shardActor, schemaContext);
- this.transaction = transaction;
- }
-
- public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor, SchemaContext schemaContext) {
- super( shardActor, schemaContext);
- this.transaction = transaction;
- }
+ private final DOMStoreReadWriteTransaction transaction;
- @Override
- public void handleReceive(Object message) throws Exception {
- if (ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- readData(transaction,ReadData.fromSerializable(message));
- }else if (WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- writeData(transaction, WriteData.fromSerializable(message, schemaContext));
- } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
- } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- deleteData(transaction,DeleteData.fromSerializable(message));
- } else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
- readyTransaction(transaction,new ReadyTransaction());
- } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
- dataExists(transaction, DataExists.fromSerializable(message));
- }else {
- super.handleReceive(message);
+ public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
+ SchemaContext schemaContext) {
+ super(shardActor, schemaContext);
+ this.transaction = transaction;
}
- }
- protected void closeTransaction(CloseTransaction message) {
- transaction.close();
- getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
- getSelf().tell(PoisonPill.getInstance(), getSelf());
- }
+ @Override
+ public void handleReceive(Object message) throws Exception {
+ if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+ readData(transaction, ReadData.fromSerializable(message));
+ } else if(WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+ writeData(transaction, WriteData.fromSerializable(message, schemaContext));
+ } else if(MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+ mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
+ } else if(DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+ deleteData(transaction, DeleteData.fromSerializable(message));
+ } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
+ readyTransaction(transaction, new ReadyTransaction());
+ } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
+ dataExists(transaction, DataExists.fromSerializable(message));
+ } else {
+ super.handleReceive(message);
+ }
+ }
- /**
- * The following method is used in unit testing only
- * hence the default scope.
- * This is done to test out failure cases.
- */
- public void forUnitTestOnlyExplicitTransactionClose() {
- transaction.close();
+ @Override
+ protected DOMStoreTransaction getDOMStoreTransaction() {
+ return transaction;
}
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
+import akka.actor.ReceiveTimeout;
import akka.japi.Creator;
+
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
+
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
*/
public abstract class ShardTransaction extends AbstractUntypedActor {
- private final ActorRef shardActor;
- protected final SchemaContext schemaContext;
-
- // FIXME : see below
- // If transactionChain is not null then this transaction is part of a
- // transactionChain. Not really clear as to what that buys us
- private final DOMStoreTransactionChain transactionChain;
-
-
- private final MutableCompositeModification modification =
- new MutableCompositeModification();
-
- private final LoggingAdapter log =
- Logging.getLogger(getContext().system(), this);
-
- protected ShardTransaction(
- ActorRef shardActor, SchemaContext schemaContext) {
- this(null, shardActor, schemaContext);
- }
-
- protected ShardTransaction(DOMStoreTransactionChain transactionChain,
- ActorRef shardActor, SchemaContext schemaContext) {
- this.transactionChain = transactionChain;
- this.shardActor = shardActor;
- this.schemaContext = schemaContext;
- }
-
-
-
- public static Props props(final DOMStoreReadTransaction transaction,
- final ActorRef shardActor, final SchemaContext schemaContext) {
- return Props.create(new Creator<ShardTransaction>() {
-
- @Override
- public ShardTransaction create() throws Exception {
- return new ShardReadTransaction(transaction, shardActor, schemaContext);
- }
- });
- }
-
- public static Props props(final DOMStoreTransactionChain transactionChain, final DOMStoreReadTransaction transaction,
- final ActorRef shardActor, final SchemaContext schemaContext) {
- return Props.create(new Creator<ShardTransaction>() {
-
- @Override
- public ShardTransaction create() throws Exception {
- return new ShardReadTransaction(transactionChain, transaction, shardActor, schemaContext);
- }
- });
- }
-
- public static Props props(final DOMStoreReadWriteTransaction transaction,
- final ActorRef shardActor, final SchemaContext schemaContext) {
- return Props.create(new Creator<ShardTransaction>() {
-
- @Override
- public ShardTransaction create() throws Exception {
- return new ShardReadWriteTransaction(transaction, shardActor, schemaContext);
- }
- });
- }
-
- public static Props props(final DOMStoreTransactionChain transactionChain, final DOMStoreReadWriteTransaction transaction,
- final ActorRef shardActor, final SchemaContext schemaContext) {
- return Props.create(new Creator<ShardTransaction>() {
-
- @Override
- public ShardTransaction create() throws Exception {
- return new ShardReadWriteTransaction(transactionChain, transaction, shardActor, schemaContext);
- }
- });
- }
-
-
- public static Props props(final DOMStoreWriteTransaction transaction,
- final ActorRef shardActor, final SchemaContext schemaContext) {
- return Props.create(new Creator<ShardTransaction>() {
-
- @Override
- public ShardTransaction create() throws Exception {
- return new ShardWriteTransaction(transaction, shardActor, schemaContext);
- }
- });
- }
-
- public static Props props(final DOMStoreTransactionChain transactionChain, final DOMStoreWriteTransaction transaction,
- final ActorRef shardActor, final SchemaContext schemaContext) {
- return Props.create(new Creator<ShardTransaction>() {
-
- @Override
- public ShardTransaction create() throws Exception {
- return new ShardWriteTransaction(transactionChain, transaction, shardActor, schemaContext);
- }
- });
- }
-
-
- @Override
- public void handleReceive(Object message) throws Exception {
- if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
- closeTransaction(new CloseTransaction());
- } else if (message instanceof GetCompositedModification) {
- // This is here for testing only
- getSender().tell(new GetCompositeModificationReply(
- new ImmutableCompositeModification(modification)), getSelf());
- }else{
- throw new UnknownMessageException(message);
+ private final ActorRef shardActor;
+ protected final SchemaContext schemaContext;
+
+ private final MutableCompositeModification modification = new MutableCompositeModification();
+
+ protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext) {
+ this.shardActor = shardActor;
+ this.schemaContext = schemaContext;
}
- }
- abstract protected void closeTransaction(CloseTransaction message);
+ public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
+ SchemaContext schemaContext, ShardContext shardContext) {
+ return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
+ shardContext));
+ }
- protected void readData(DOMStoreReadTransaction transaction,ReadData message) {
- final ActorRef sender = getSender();
- final ActorRef self = getSelf();
- final YangInstanceIdentifier path = message.getPath();
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
- transaction.read(path);
+ protected abstract DOMStoreTransaction getDOMStoreTransaction();
+
+ @Override
+ public void handleReceive(Object message) throws Exception {
+ if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
+ closeTransaction(true);
+ } else if (message instanceof GetCompositedModification) {
+ // This is here for testing only
+ getSender().tell(new GetCompositeModificationReply(
+ new ImmutableCompositeModification(modification)), getSelf());
+ } else if (message instanceof ReceiveTimeout) {
+ LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
+ closeTransaction(false);
+ } else {
+ throw new UnknownMessageException(message);
+ }
+ }
- future.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
- if (optional.isPresent()) {
- sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self);
- } else {
- sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
- }
- } catch (Exception e) {
- sender.tell(new akka.actor.Status.Failure(e),self);
+ private void closeTransaction(boolean sendReply) {
+ getDOMStoreTransaction().close();
+
+ if(sendReply) {
+ getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
}
- }
- }, getContext().dispatcher());
- }
+ getSelf().tell(PoisonPill.getInstance(), getSelf());
+ }
+
+ protected void readData(DOMStoreReadTransaction transaction,ReadData message) {
+ final ActorRef sender = getSender();
+ final ActorRef self = getSelf();
+ final YangInstanceIdentifier path = message.getPath();
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
+ transaction.read(path);
+
+ future.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
+ if (optional.isPresent()) {
+ sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self);
+ } else {
+ sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
+ }
+ } catch (Exception e) {
+ sender.tell(new akka.actor.Status.Failure(e),self);
+ }
+
+ }
+ }, getContext().dispatcher());
+ }
protected void dataExists(DOMStoreReadTransaction transaction, DataExists message) {
final YangInstanceIdentifier path = message.getPath();
}
- protected void writeData(DOMStoreWriteTransaction transaction, WriteData message) {
- modification.addModification(
- new WriteModification(message.getPath(), message.getData(),schemaContext));
- LOG.debug("writeData at path : " + message.getPath().toString());
+ protected void writeData(DOMStoreWriteTransaction transaction, WriteData message) {
+ modification.addModification(
+ new WriteModification(message.getPath(), message.getData(),schemaContext));
+ LOG.debug("writeData at path : " + message.getPath().toString());
- try {
- transaction.write(message.getPath(), message.getData());
- getSender().tell(new WriteDataReply().toSerializable(), getSelf());
- }catch(Exception e){
- getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ try {
+ transaction.write(message.getPath(), message.getData());
+ getSender().tell(new WriteDataReply().toSerializable(), getSelf());
+ }catch(Exception e){
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
}
- }
-
- protected void mergeData(DOMStoreWriteTransaction transaction, MergeData message) {
- modification.addModification(
- new MergeModification(message.getPath(), message.getData(), schemaContext));
- LOG.debug("mergeData at path : " + message.getPath().toString());
- try {
- transaction.merge(message.getPath(), message.getData());
- getSender().tell(new MergeDataReply().toSerializable(), getSelf());
- }catch(Exception e){
- getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+
+ protected void mergeData(DOMStoreWriteTransaction transaction, MergeData message) {
+ modification.addModification(
+ new MergeModification(message.getPath(), message.getData(), schemaContext));
+ LOG.debug("mergeData at path : " + message.getPath().toString());
+ try {
+ transaction.merge(message.getPath(), message.getData());
+ getSender().tell(new MergeDataReply().toSerializable(), getSelf());
+ }catch(Exception e){
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
}
- }
-
- protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) {
- LOG.debug("deleteData at path : " + message.getPath().toString());
- modification.addModification(new DeleteModification(message.getPath()));
- try {
- transaction.delete(message.getPath());
- getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
- }catch(Exception e){
- getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+
+ protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) {
+ LOG.debug("deleteData at path : " + message.getPath().toString());
+ modification.addModification(new DeleteModification(message.getPath()));
+ try {
+ transaction.delete(message.getPath());
+ getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
+ }catch(Exception e){
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
}
- }
- protected void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) {
- DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
- ActorRef cohortActor = getContext().actorOf(
- ThreePhaseCommitCohort.props(cohort, shardActor, modification), "cohort");
- getSender()
+ protected void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) {
+ DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
+ ActorRef cohortActor = getContext().actorOf(
+ ThreePhaseCommitCohort.props(cohort, shardActor, modification), "cohort");
+ getSender()
.tell(new ReadyTransactionReply(cohortActor.path()).toSerializable(), getSelf());
- }
+ }
+ private static class ShardTransactionCreator implements Creator<ShardTransaction> {
- // These classes are in here for test purposes only
+ private static final long serialVersionUID = 1L;
+ final DOMStoreTransaction transaction;
+ final ActorRef shardActor;
+ final SchemaContext schemaContext;
+ final ShardContext shardContext;
- static class GetCompositedModification {
+ ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
+ SchemaContext schemaContext, ShardContext actorContext) {
+ this.transaction = transaction;
+ this.shardActor = shardActor;
+ this.shardContext = actorContext;
+ this.schemaContext = schemaContext;
+ }
- }
+ @Override
+ public ShardTransaction create() throws Exception {
+ ShardTransaction tx;
+ if(transaction instanceof DOMStoreReadWriteTransaction) {
+ tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
+ shardActor, schemaContext);
+ } else if(transaction instanceof DOMStoreReadTransaction) {
+ tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
+ schemaContext);
+ } else {
+ tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
+ shardActor, schemaContext);
+ }
+
+ tx.getContext().setReceiveTimeout(shardContext.getShardTransactionIdleTimeout());
+ return tx;
+ }
+ }
+ // These classes are in here for test purposes only
- static class GetCompositeModificationReply {
- private final CompositeModification modification;
+ static class GetCompositedModification {
+ }
- GetCompositeModificationReply(CompositeModification modification) {
- this.modification = modification;
- }
+ static class GetCompositeModificationReply {
+ private final CompositeModification modification;
- public CompositeModification getModification() {
- return modification;
+ GetCompositeModificationReply(CompositeModification modification) {
+ this.modification = modification;
+ }
+
+
+ public CompositeModification getModification() {
+ return modification;
+ }
}
- }
}
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.Creator;
+
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
public class ShardTransactionChain extends AbstractUntypedActor {
private final DOMStoreTransactionChain chain;
+ private final ShardContext shardContext;
private final SchemaContext schemaContext;
- public ShardTransactionChain(DOMStoreTransactionChain chain, SchemaContext schemaContext) {
+ public ShardTransactionChain(DOMStoreTransactionChain chain, SchemaContext schemaContext,
+ ShardContext shardContext) {
this.chain = chain;
+ this.shardContext = shardContext;
this.schemaContext = schemaContext;
}
return getContext().parent();
}
- private ActorRef createTypedTransactionActor(CreateTransaction createTransaction,String transactionId){
- if(createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_ONLY.ordinal()){
- return getContext().actorOf(
- ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(), schemaContext), transactionId);
-
- }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_WRITE.ordinal()){
- return getContext().actorOf(
- ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(), schemaContext), transactionId);
-
-
- }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.WRITE_ONLY.ordinal()){
- return getContext().actorOf(
- ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(), schemaContext), transactionId);
- }else{
- throw new IllegalArgumentException ("CreateTransaction message has unidentified transaction type="+createTransaction.getTransactionType()) ;
+ private ActorRef createTypedTransactionActor(CreateTransaction createTransaction,
+ String transactionId) {
+ if(createTransaction.getTransactionType() ==
+ TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
+ return getContext().actorOf(
+ ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
+ schemaContext, shardContext), transactionId);
+ } else if (createTransaction.getTransactionType() ==
+ TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
+ return getContext().actorOf(
+ ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
+ schemaContext, shardContext), transactionId);
+ } else if (createTransaction.getTransactionType() ==
+ TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
+ return getContext().actorOf(
+ ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
+ schemaContext, shardContext), transactionId);
+ } else {
+ throw new IllegalArgumentException (
+ "CreateTransaction message has unidentified transaction type=" +
+ createTransaction.getTransactionType());
+ }
}
- }
private void createTransaction(CreateTransaction createTransaction) {
getSelf());
}
- public static Props props(final DOMStoreTransactionChain chain, final SchemaContext schemaContext) {
- return Props.create(new Creator<ShardTransactionChain>() {
+ public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext,
+ ShardContext shardContext) {
+ return Props.create(new ShardTransactionChainCreator(chain, schemaContext, shardContext));
+ }
- @Override
- public ShardTransactionChain create() throws Exception {
- return new ShardTransactionChain(chain, schemaContext);
- }
- });
+ private static class ShardTransactionChainCreator implements Creator<ShardTransactionChain> {
+ private static final long serialVersionUID = 1L;
+
+ final DOMStoreTransactionChain chain;
+ final ShardContext shardContext;
+ final SchemaContext schemaContext;
+
+ ShardTransactionChainCreator(DOMStoreTransactionChain chain, SchemaContext schemaContext,
+ ShardContext shardContext) {
+ this.chain = chain;
+ this.shardContext = shardContext;
+ this.schemaContext = schemaContext;
+ }
+
+ @Override
+ public ShardTransactionChain create() throws Exception {
+ return new ShardTransactionChain(chain, schemaContext, shardContext);
+ }
}
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
-import akka.actor.PoisonPill;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
* Date: 8/6/14
*/
public class ShardWriteTransaction extends ShardTransaction {
- private final DOMStoreWriteTransaction transaction;
- private final LoggingAdapter log =
- Logging.getLogger(getContext().system(), this);
- public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor, SchemaContext schemaContext) {
- super( shardActor, schemaContext);
- this.transaction = transaction;
-
- }
+ private final DOMStoreWriteTransaction transaction;
- public ShardWriteTransaction(DOMStoreTransactionChain transactionChain, DOMStoreWriteTransaction transaction, ActorRef shardActor, SchemaContext schemaContext) {
- super(transactionChain, shardActor, schemaContext);
- this.transaction = transaction;
- }
-
- @Override
- public void handleReceive(Object message) throws Exception {
- if (WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- writeData(transaction, WriteData.fromSerializable(message, schemaContext));
- } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
- } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- deleteData(transaction,DeleteData.fromSerializable(message));
- } else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
- readyTransaction(transaction,new ReadyTransaction());
- }else {
- super.handleReceive(message);
+ public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
+ SchemaContext schemaContext) {
+ super(shardActor, schemaContext);
+ this.transaction = transaction;
}
- }
- protected void closeTransaction(CloseTransaction message) {
- transaction.close();
- getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
- getSelf().tell(PoisonPill.getInstance(), getSelf());
- }
+ @Override
+ public void handleReceive(Object message) throws Exception {
+ if(WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+ writeData(transaction, WriteData.fromSerializable(message, schemaContext));
+ } else if(MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+ mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
+ } else if(DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+ deleteData(transaction, DeleteData.fromSerializable(message));
+ } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
+ readyTransaction(transaction, new ReadyTransaction());
+ } else {
+ super.handleReceive(message);
+ }
+ }
- /**
- * The following method is used in unit testing only
- * hence the default scope.
- * This is done to test out failure cases.
- */
- public void forUnitTestOnlyExplicitTransactionClose() {
- transaction.close();
+ @Override
+ protected DOMStoreTransaction getDOMStoreTransaction() {
+ return transaction;
}
}
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Creator;
+
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
public static Props props(final DOMStoreThreePhaseCommitCohort cohort,
final ActorRef shardActor, final CompositeModification modification) {
- return Props.create(new Creator<ThreePhaseCommitCohort>() {
- @Override
- public ThreePhaseCommitCohort create() throws Exception {
- return new ThreePhaseCommitCohort(cohort, shardActor,
- modification);
- }
- });
+ return Props.create(new ThreePhaseCommitCohortCreator(cohort, shardActor, modification));
}
-
@Override
public void handleReceive(Object message) throws Exception {
if (message.getClass()
final ActorRef self = getSelf();
Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
public void onSuccess(Void v) {
sender
.tell(new AbortTransactionReply().toSerializable(),
self);
}
+ @Override
public void onFailure(Throwable t) {
LOG.error(t, "An exception happened during abort");
sender
final ActorRef sender = getSender();
final ActorRef self = getSelf();
Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
public void onSuccess(Void v) {
sender
.tell(new PreCommitTransactionReply().toSerializable(),
self);
}
+ @Override
public void onFailure(Throwable t) {
LOG.error(t, "An exception happened during pre-commit");
sender
final ActorRef sender = getSender();
final ActorRef self = getSelf();
Futures.addCallback(future, new FutureCallback<Boolean>() {
+ @Override
public void onSuccess(Boolean canCommit) {
sender.tell(new CanCommitTransactionReply(canCommit)
.toSerializable(), self);
}
+ @Override
public void onFailure(Throwable t) {
LOG.error(t, "An exception happened during canCommit");
sender
.tell(new akka.actor.Status.Failure(t), self);
}
});
+ }
+
+ private static class ThreePhaseCommitCohortCreator implements Creator<ThreePhaseCommitCohort> {
+ final DOMStoreThreePhaseCommitCohort cohort;
+ final ActorRef shardActor;
+ final CompositeModification modification;
+ ThreePhaseCommitCohortCreator(DOMStoreThreePhaseCommitCohort cohort,
+ ActorRef shardActor, CompositeModification modification) {
+ this.cohort = cohort;
+ this.shardActor = shardActor;
+ this.modification = modification;
+ }
+ @Override
+ public ThreePhaseCommitCohort create() throws Exception {
+ return new ThreePhaseCommitCohort(cohort, shardActor, modification);
+ }
}
}
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
* TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
*/
public class TransactionChainProxy implements DOMStoreTransactionChain{
private final ActorContext actorContext;
- private final SchemaContext schemaContext;
- public TransactionChainProxy(ActorContext actorContext, SchemaContext schemaContext) {
+ public TransactionChainProxy(ActorContext actorContext) {
this.actorContext = actorContext;
- this.schemaContext = schemaContext;
}
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, schemaContext);
+ TransactionProxy.TransactionType.READ_ONLY);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_WRITE, schemaContext);
+ TransactionProxy.TransactionType.READ_WRITE);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.WRITE_ONLY, schemaContext);
+ TransactionProxy.TransactionType.WRITE_ONLY);
}
@Override
import akka.dispatch.OnComplete;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.FinalizablePhantomReference;
+import com.google.common.base.FinalizableReferenceQueue;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
LOG = LoggerFactory.getLogger(TransactionProxy.class);
+ /**
+ * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
+ * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
+ * trickery to clean up its internal thread when the bundle is unloaded.
+ */
+ private static final FinalizableReferenceQueue phantomReferenceQueue =
+ new FinalizableReferenceQueue();
+
+ /**
+ * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
+ * necessary because PhantomReferences need a hard reference so they're not garbage collected.
+ * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
+ * and thus becomes eligible for garbage collection.
+ */
+ private static final Map<TransactionProxyCleanupPhantomReference,
+ TransactionProxyCleanupPhantomReference> phantomReferenceCache =
+ new ConcurrentHashMap<>();
+
+ /**
+ * A PhantomReference that closes remote transactions for a TransactionProxy when it's
+ * garbage collected. This is used for read-only transactions as they're not explicitly closed
+ * by clients. So the only way to detect that a transaction is no longer in use and it's safe
+ * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
+ * but TransactionProxy instances should generally be short-lived enough to avoid being moved
+ * to the old generation space and thus should be cleaned up in a timely manner as the GC
+ * runs on the young generation (eden, swap1...) space much more frequently.
+ */
+ private static class TransactionProxyCleanupPhantomReference
+ extends FinalizablePhantomReference<TransactionProxy> {
+
+ private final List<ActorSelection> remoteTransactionActors;
+ private final AtomicBoolean remoteTransactionActorsMB;
+ private final ActorContext actorContext;
+ private final TransactionIdentifier identifier;
+
+ protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
+ super(referent, phantomReferenceQueue);
+
+ // Note we need to cache the relevant fields from the TransactionProxy as we can't
+ // have a hard reference to the TransactionProxy instance itself.
+
+ remoteTransactionActors = referent.remoteTransactionActors;
+ remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
+ actorContext = referent.actorContext;
+ identifier = referent.identifier;
+ }
+
+ @Override
+ public void finalizeReferent() {
+ LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
+ remoteTransactionActors.size(), identifier);
+
+ phantomReferenceCache.remove(this);
+
+ // Access the memory barrier volatile to ensure all previous updates to the
+ // remoteTransactionActors list are visible to this thread.
+
+ if(remoteTransactionActorsMB.get()) {
+ for(ActorSelection actor : remoteTransactionActors) {
+ LOG.trace("Sending CloseTransaction to {}", actor);
+ actorContext.sendRemoteOperationAsync(actor,
+ new CloseTransaction().toSerializable());
+ }
+ }
+ }
+ }
+
+ /**
+ * Stores the remote Tx actors for each requested data store path to be used by the
+ * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
+ * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
+ * remoteTransactionActors list so they will be visible to the thread accessing the
+ * PhantomReference.
+ */
+ private List<ActorSelection> remoteTransactionActors;
+ private AtomicBoolean remoteTransactionActorsMB;
+
+ private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
+
private final TransactionType transactionType;
private final ActorContext actorContext;
- private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
private final TransactionIdentifier identifier;
private final SchemaContext schemaContext;
private boolean inReadyState;
- public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
- SchemaContext schemaContext) {
- this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
- this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
- this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+ public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
+ this.actorContext = Preconditions.checkNotNull(actorContext,
+ "actorContext should not be null");
+ this.transactionType = Preconditions.checkNotNull(transactionType,
+ "transactionType should not be null");
+ this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
+ "schemaContext should not be null");
String memberName = actorContext.getCurrentMemberName();
if(memberName == null){
this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
counter.getAndIncrement()).build();
- LOG.debug("Created txn {} of type {}", identifier, transactionType);
+ if(transactionType == TransactionType.READ_ONLY) {
+ // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
+ // to close the remote Tx's when this instance is no longer in use and is garbage
+ // collected.
+ remoteTransactionActors = Lists.newArrayList();
+ remoteTransactionActorsMB = new AtomicBoolean();
+
+ TransactionProxyCleanupPhantomReference cleanup =
+ new TransactionProxyCleanupPhantomReference(this);
+ phantomReferenceCache.put(cleanup, cleanup);
+ }
+
+ LOG.debug("Created txn {} of type {}", identifier, transactionType);
}
@VisibleForTesting
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
transactionContext.closeTransaction();
}
+
+ remoteTransactionPaths.clear();
+
+ if(transactionType == TransactionType.READ_ONLY) {
+ remoteTransactionActors.clear();
+ remoteTransactionActorsMB.set(true);
+ }
}
private TransactionContext transactionContext(YangInstanceIdentifier path){
LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
- ActorSelection transactionActor =
- actorContext.actorSelection(transactionPath);
- transactionContext =
- new TransactionContextImpl(shardName, transactionPath,
- transactionActor);
+ ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
+
+ if(transactionType == TransactionType.READ_ONLY) {
+ // Add the actor to the remoteTransactionActors list for access by the
+ // cleanup PhantonReference.
+ remoteTransactionActors.add(transactionActor);
+
+ // Write to the memory barrier volatile to publish the above update to the
+ // remoteTransactionActors list for thread visibility.
+ remoteTransactionActorsMB.set(true);
+ }
+
+ transactionContext = new TransactionContextImpl(shardName, transactionPath,
+ transactionActor, identifier, actorContext, schemaContext);
remoteTransactionPaths.put(shardName, transactionContext);
} else {
}
} catch(Exception e){
LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
- remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e));
+ remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e, identifier));
}
}
List<Future<Object>> getRecordedOperationFutures();
}
- private abstract class AbstractTransactionContext implements TransactionContext {
+ private static abstract class AbstractTransactionContext implements TransactionContext {
+ protected final TransactionIdentifier identifier;
protected final String shardName;
protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
- AbstractTransactionContext(String shardName) {
+ AbstractTransactionContext(String shardName, TransactionIdentifier identifier) {
this.shardName = shardName;
+ this.identifier = identifier;
}
@Override
}
}
- private class TransactionContextImpl extends AbstractTransactionContext {
+ private static class TransactionContextImpl extends AbstractTransactionContext {
private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
+ private final ActorContext actorContext;
+ private final SchemaContext schemaContext;
private final String actorPath;
private final ActorSelection actor;
private TransactionContextImpl(String shardName, String actorPath,
- ActorSelection actor) {
- super(shardName);
+ ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext,
+ SchemaContext schemaContext) {
+ super(shardName, identifier);
this.actorPath = actorPath;
this.actor = actor;
+ this.actorContext = actorContext;
+ this.schemaContext = schemaContext;
}
private ActorSelection getActor() {
}
}
- private class NoOpTransactionContext extends AbstractTransactionContext {
+ private static class NoOpTransactionContext extends AbstractTransactionContext {
private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
private final Exception failure;
- public NoOpTransactionContext(String shardName, Exception failure){
- super(shardName);
+ public NoOpTransactionContext(String shardName, Exception failure,
+ TransactionIdentifier identifier){
+ super(shardName, identifier);
this.failure = failure;
}
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final ActorRef shardManager;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
+ private volatile SchemaContext schemaContext;
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper,
return actorSystem.actorSelection(actorPath);
}
+ public void setSchemaContext(SchemaContext schemaContext) {
+ this.schemaContext = schemaContext;
+
+ if(shardManager != null) {
+ shardManager.tell(new UpdateSchemaContext(schemaContext), null);
+ }
+ }
+
+ public SchemaContext getSchemaContext() {
+ return schemaContext;
+ }
/**
* Finds the primary for a given shard
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties;
public class DistributedConfigDataStoreProviderModule extends
org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModule {
@Override
public java.lang.AutoCloseable createInstance() {
+
+ ConfigProperties props = getConfigProperties();
+ if(props == null) {
+ props = new ConfigProperties();
+ }
+
return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
- InMemoryDOMDataStoreConfigProperties.create(getConfigMaxShardDataChangeExecutorPoolSize(),
- getConfigMaxShardDataChangeExecutorQueueSize(),
- getConfigMaxShardDataChangeListenerQueueSize()));
+ new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(),
+ props.getMaxShardDataChangeExecutorQueueSize(),
+ props.getMaxShardDataChangeListenerQueueSize(),
+ props.getShardTransactionIdleTimeoutInMinutes()));
}
-
}
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties;
public class DistributedOperationalDataStoreProviderModule extends
org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModule {
@Override
public java.lang.AutoCloseable createInstance() {
+
+ OperationalProperties props = getOperationalProperties();
+ if(props == null) {
+ props = new OperationalProperties();
+ }
+
return DistributedDataStoreFactory.createInstance("operational",
getOperationalSchemaServiceDependency(),
- InMemoryDOMDataStoreConfigProperties.create(getOperationalMaxShardDataChangeExecutorPoolSize(),
- getOperationalMaxShardDataChangeExecutorQueueSize(),
- getOperationalMaxShardDataChangeListenerQueueSize()));
+ new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(),
+ props.getMaxShardDataChangeExecutorQueueSize(),
+ props.getMaxShardDataChangeListenerQueueSize(),
+ props.getShardTransactionIdleTimeoutInMinutes()));
}
}
config:java-name-prefix DistributedOperationalDataStoreProvider;
}
+ grouping data-store-properties {
+ leaf max-shard-data-change-executor-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for each shard's data store data change notification executor.";
+ }
+
+ leaf max-shard-data-change-executor-pool-size {
+ default 20;
+ type uint16;
+ description "The maximum thread pool size for each shard's data store data change notification executor.";
+ }
+
+ leaf max-shard-data-change-listener-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for each shard's data store data change listeners.";
+ }
+
+ leaf shard-transaction-idle-timeout-in-minutes {
+ default 10;
+ type uint16;
+ description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
+ }
+ }
+
// Augments the 'configuration' choice node under modules/module.
augment "/config:modules/config:module/config:configuration" {
case distributed-config-datastore-provider {
when "/config:modules/config:module/config:type = 'distributed-config-datastore-provider'";
- container config-schema-service {
- uses config:service-ref {
- refine type {
- mandatory false;
- config:required-identity sal:schema-service;
+ container config-schema-service {
+ uses config:service-ref {
+ refine type {
+ mandatory false;
+ config:required-identity sal:schema-service;
+ }
}
}
- }
- leaf config-max-shard-data-change-executor-queue-size {
- default 1000;
- type uint16;
- description "The maximum queue size for each shard's data store data change notification executor.";
- }
-
- leaf config-max-shard-data-change-executor-pool-size {
- default 20;
- type uint16;
- description "The maximum thread pool size for each shard's data store data change notification executor.";
- }
-
- leaf config-max-shard-data-change-listener-queue-size {
- default 1000;
- type uint16;
- description "The maximum queue size for each shard's data store data change listeners.";
- }
+ container config-properties {
+ uses data-store-properties;
+ }
}
}
}
}
- leaf operational-max-shard-data-change-executor-queue-size {
- default 1000;
- type uint16;
- description "The maximum queue size for each shard's data store data change notification executor.";
- }
-
- leaf operational-max-shard-data-change-executor-pool-size {
- default 20;
- type uint16;
- description "The maximum thread pool size for each shard's data store data change notification executor.";
- }
-
- leaf operational-max-shard-data-change-listener-queue-size {
- default 1000;
- type uint16;
- description "The maximum queue size for each shard's data store data change listeners.";
- }
- }
+ container operational-properties {
+ uses data-store-properties;
+ }
}
+ }
}
import akka.actor.Props;
import akka.event.Logging;
import akka.testkit.JavaTestKit;
+
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("config").build();
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
+ final SchemaContext schemaContext = TestModel.createTestContext();
+ ShardContext shardContext = new ShardContext();
+
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP, shardContext);
final ActorRef shard = getSystem().actorOf(props);
- new Within(duration("5 seconds")) {
+ new Within(duration("10 seconds")) {
+ @Override
protected void run() {
-
-
- shard.tell(
- new UpdateSchemaContext(TestModel.createTestContext()),
- getRef());
+ shard.tell(new UpdateSchemaContext(schemaContext), getRef());
// Wait for a specific log message to show up
final boolean result =
new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
) {
+ @Override
protected Boolean run() {
return true;
}
shard.tell(new CreateTransactionChain().toSerializable(), getRef());
final ActorSelection transactionChain =
- new ExpectMsg<ActorSelection>(duration("1 seconds"), "CreateTransactionChainReply") {
+ new ExpectMsg<ActorSelection>(duration("3 seconds"), "CreateTransactionChainReply") {
+ @Override
protected ActorSelection match(Object in) {
if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)) {
ActorPath transactionChainPath =
transactionChain.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.WRITE_ONLY.ordinal() ).toSerializable(), getRef());
final ActorSelection transaction =
- new ExpectMsg<ActorSelection>(duration("1 seconds"), "CreateTransactionReply") {
+ new ExpectMsg<ActorSelection>(duration("3 seconds"), "CreateTransactionReply") {
+ @Override
protected ActorSelection match(Object in) {
if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(in.getClass())) {
CreateTransactionReply reply = CreateTransactionReply.fromSerializable(in);
// 3. Write some data
transaction.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext).toSerializable(),
getRef());
- Boolean writeDone = new ExpectMsg<Boolean>(duration("1 seconds"), "WriteDataReply") {
+ Boolean writeDone = new ExpectMsg<Boolean>(duration("3 seconds"), "WriteDataReply") {
+ @Override
protected Boolean match(Object in) {
if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
return true;
transaction.tell(new ReadyTransaction().toSerializable(), getRef());
final ActorSelection cohort =
- new ExpectMsg<ActorSelection>(duration("1 seconds"), "ReadyTransactionReply") {
+ new ExpectMsg<ActorSelection>(duration("3 seconds"), "ReadyTransactionReply") {
+ @Override
protected ActorSelection match(Object in) {
if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
ActorPath cohortPath =
cohort.tell(new PreCommitTransaction().toSerializable(), getRef());
Boolean preCommitDone =
- new ExpectMsg<Boolean>(duration("1 seconds"), "PreCommitTransactionReply") {
+ new ExpectMsg<Boolean>(duration("3 seconds"), "PreCommitTransactionReply") {
+ @Override
protected Boolean match(Object in) {
if (in.getClass().equals(PreCommitTransactionReply.SERIALIZABLE_CLASS)) {
return true;
import akka.actor.ActorRef;
import akka.actor.Props;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@Override
public Set<YangInstanceIdentifier> getRemovedPaths() {
- Set<YangInstanceIdentifier>ids = new HashSet();
+ Set<YangInstanceIdentifier>ids = new HashSet<>();
ids.add( CompositeModel.TEST_PATH);
return ids;
}
final Props props = Props.create(MessageCollectorActor.class);
final ActorRef actorRef = getSystem().actorOf(props);
- DataChangeListenerProxy dataChangeListenerProxy =
- new DataChangeListenerProxy(TestModel.createTestContext(),
- getSystem().actorSelection(actorRef.path()));
+ DataChangeListenerProxy dataChangeListenerProxy = new DataChangeListenerProxy(
+ TestModel.createTestContext(), getSystem().actorSelection(actorRef.path()));
dataChangeListenerProxy.onDataChanged(new MockDataChangedEvent());
public class DataChangeListenerTest extends AbstractActorTest {
private static class MockDataChangedEvent implements AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
- Map<YangInstanceIdentifier,NormalizedNode<?,?>> createdData = new HashMap();
- Map<YangInstanceIdentifier,NormalizedNode<?,?>> updatedData = new HashMap();
- Map<YangInstanceIdentifier,NormalizedNode<?,?>> originalData = new HashMap();
+ Map<YangInstanceIdentifier,NormalizedNode<?,?>> createdData = new HashMap<>();
+ Map<YangInstanceIdentifier,NormalizedNode<?,?>> updatedData = new HashMap<>();
+ Map<YangInstanceIdentifier,NormalizedNode<?,?>> originalData = new HashMap<>();
public void testDataChangedWhenNotificationsAreEnabled(){
new JavaTestKit(getSystem()) {{
final MockDataChangeListener listener = new MockDataChangeListener();
- final Props props = DataChangeListener.props(CompositeModel.createTestContext(),listener,CompositeModel.FAMILY_PATH );
+ final Props props = DataChangeListener.props(listener);
final ActorRef subject =
getSystem().actorOf(props, "testDataChangedNotificationsEnabled");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
// Let the DataChangeListener know that notifications should
final Boolean out = new ExpectMsg<Boolean>(duration("800 millis"), "dataChanged") {
// do not put code outside this method, will run afterwards
+ @Override
protected Boolean match(Object in) {
if (in != null && in.getClass().equals(DataChangedReply.class)) {
expectNoMsg();
}
-
-
};
}};
}
public void testDataChangedWhenNotificationsAreDisabled(){
new JavaTestKit(getSystem()) {{
final MockDataChangeListener listener = new MockDataChangeListener();
- final Props props = DataChangeListener.props(CompositeModel.createTestContext(),listener,CompositeModel.FAMILY_PATH );
+ final Props props = DataChangeListener.props(listener);
final ActorRef subject =
getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(
expectNoMsg();
}
-
-
};
}};
}
import akka.actor.ActorSystem;
import akka.event.Logging;
import akka.testkit.JavaTestKit;
+
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
+
import junit.framework.Assert;
+
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Before;
{
new Within(duration("10 seconds")) {
+ @Override
protected void run() {
try {
final DistributedDataStore distributedDataStore =
- new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration, null);
+ new DistributedDataStore(getSystem(), "config",
+ new MockClusterWrapper(), configuration,
+ new DistributedDataStoreProperties());
distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
final boolean result =
new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
) {
+ @Override
protected Boolean run() {
return true;
}
{
new Within(duration("10 seconds")) {
+ @Override
protected void run() {
try {
final DistributedDataStore distributedDataStore =
new JavaTestKit.EventFilter<Boolean>(
Logging.Info.class
) {
+ @Override
protected Boolean run() {
return true;
}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
}
+ @SuppressWarnings("resource")
@Test
public void testConstructor(){
ActorSystem actorSystem = mock(ActorSystem.class);
new DistributedDataStore(actorSystem, "config",
- mock(ClusterWrapper.class), mock(Configuration.class), null);
+ mock(ClusterWrapper.class), mock(Configuration.class),
+ new DistributedDataStoreProperties());
verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config"));
}
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
- new MockConfiguration(), null);
+ new MockConfiguration(), new ShardContext());
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(new FindPrimary("inventory").toSerializable(), getRef());
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
- new MockConfiguration(), null);
+ new MockConfiguration(), new ShardContext());
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
- new MockConfiguration(), null);
+ new MockConfiguration(), new ShardContext());
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(new FindLocalShard("inventory"), getRef());
final String out = new ExpectMsg<String>(duration("1 seconds"), "find local") {
+ @Override
protected String match(Object in) {
if (in instanceof LocalShardNotFound) {
return ((LocalShardNotFound) in).getShardName();
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", mockClusterWrapper,
- new MockConfiguration(), null);
+ new MockConfiguration(), new ShardContext());
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
final ActorRef out = new ExpectMsg<ActorRef>(duration("1 seconds"), "find local") {
+ @Override
protected ActorRef match(Object in) {
if (in instanceof LocalShardFound) {
return ((LocalShardFound) in).getPath();
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
- new MockConfiguration(), null);
+ new MockConfiguration(), new ShardContext());
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
// the run() method needs to finish within 3 seconds
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
final String out = new ExpectMsg<String>(duration("1 seconds"), "primary found") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
PrimaryFound f = PrimaryFound.fromSerializable(in);
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
- new MockConfiguration(), null);
+ new MockConfiguration(), new ShardContext());
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
// the run() method needs to finish within 3 seconds
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
import akka.actor.Props;
import akka.event.Logging;
import akka.testkit.JavaTestKit;
-import junit.framework.Assert;
+
+import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import java.util.HashMap;
import java.util.Map;
-import static junit.framework.Assert.assertFalse;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ShardTest extends AbstractActorTest {
+
+ private static final ShardContext shardContext = new ShardContext();
+
@Test
public void testOnReceiveCreateTransactionChain() throws Exception {
new JavaTestKit(getSystem()) {{
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("config").build();
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP, shardContext);
final ActorRef subject =
getSystem().actorOf(props, "testCreateTransactionChain");
final boolean result =
new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
) {
+ @Override
protected Boolean run() {
return true;
}
Assert.assertEquals(true, result);
- new Within(duration("1 seconds")) {
+ new Within(duration("3 seconds")) {
+ @Override
protected void run() {
subject.tell(new CreateTransactionChain().toSerializable(), getRef());
- final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+ final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)){
CreateTransactionChainReply reply =
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("config").build();
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP, shardContext);
final ActorRef subject =
getSystem().actorOf(props, "testRegisterChangeListener");
- new Within(duration("1 seconds")) {
+ new Within(duration("3 seconds")) {
+ @Override
protected void run() {
subject.tell(
getRef().path(), AsyncDataBroker.DataChangeScope.BASE),
getRef());
- final Boolean notificationEnabled = new ExpectMsg<Boolean>("enable notification") {
+ final Boolean notificationEnabled = new ExpectMsg<Boolean>(
+ duration("3 seconds"), "enable notification") {
// do not put code outside this method, will run afterwards
+ @Override
protected Boolean match(Object in) {
if(in instanceof EnableNotification){
return ((EnableNotification) in).isEnabled();
assertFalse(notificationEnabled);
- final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+ final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(RegisterChangeListenerReply.class)) {
RegisterChangeListenerReply reply =
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("config").build();
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP, shardContext);
final ActorRef subject =
getSystem().actorOf(props, "testCreateTransaction");
-
// Wait for a specific log message to show up
final boolean result =
new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
) {
+ @Override
protected Boolean run() {
return true;
}
Assert.assertEquals(true, result);
- new Within(duration("1 seconds")) {
+ new Within(duration("3 seconds")) {
+ @Override
protected void run() {
subject.tell(
subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(),
getRef());
- final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+ final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in instanceof CreateTransactionReply) {
CreateTransactionReply reply =
out.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
expectNoMsg();
}
-
-
};
}};
}
.shardName("inventory").type("config").build();
peerAddresses.put(identifier, null);
- final Props props = Shard.props(identifier, peerAddresses, null);
+ final Props props = Shard.props(identifier, peerAddresses, shardContext);
final ActorRef subject =
getSystem().actorOf(props, "testPeerAddressResolved");
- new Within(duration("1 seconds")) {
+ new Within(duration("3 seconds")) {
+ @Override
protected void run() {
subject.tell(
expectNoMsg();
}
-
-
};
}};
}
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
+
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+
+import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import static org.junit.Assert.assertEquals;
public class ShardTransactionChainTest extends AbstractActorTest {
- private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
-
- private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor,
- MoreExecutors.sameThreadExecutor());
-
- static {
- store.onGlobalContextUpdated(TestModel.createTestContext());
- }
- @Test
- public void testOnReceiveCreateTransaction() throws Exception {
- new JavaTestKit(getSystem()) {{
- final Props props = ShardTransactionChain.props(store.createTransactionChain(), TestModel.createTestContext());
- final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction");
-
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
- subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
-
- final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
- return CreateTransactionReply.fromSerializable(in).getTransactionPath();
- }else{
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("Unexpected transaction path " + out,
- "akka://test/user/testCreateTransaction/shard-txn-1",
- out);
-
- // Will wait for the rest of the 3 seconds
- expectNoMsg();
- }
-
-
- };
- }};
- }
-
- @Test
- public void testOnReceiveCloseTransactionChain() throws Exception {
- new JavaTestKit(getSystem()) {{
- final Props props = ShardTransactionChain.props(store.createTransactionChain(), TestModel.createTestContext());
- final ActorRef subject = getSystem().actorOf(props, "testCloseTransactionChain");
-
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
- subject.tell(new CloseTransactionChain().toSerializable(), getRef());
-
- final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in.getClass().equals(CloseTransactionChainReply.SERIALIZABLE_CLASS)) {
- return "match";
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
- // Will wait for the rest of the 3 seconds
- expectNoMsg();
- }
-
-
- };
- }};
- }
+ private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
+
+ private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor,
+ MoreExecutors.sameThreadExecutor());
+
+ private static final SchemaContext testSchemaContext = TestModel.createTestContext();
+
+ private static final ShardContext shardContext = new ShardContext();
+
+ @BeforeClass
+ public static void staticSetup() {
+ store.onGlobalContextUpdated(testSchemaContext);
+ }
+
+ @Test
+ public void testOnReceiveCreateTransaction() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final Props props = ShardTransactionChain.props(store.createTransactionChain(),
+ testSchemaContext, shardContext);
+ final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction");
+
+ new Within(duration("1 seconds")) {
+ @Override
+ protected void run() {
+
+ subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
+
+ final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+ // do not put code outside this method, will run afterwards
+ @Override
+ protected String match(Object in) {
+ if (in.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
+ return CreateTransactionReply.fromSerializable(in).getTransactionPath();
+ }else{
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("Unexpected transaction path " + out,
+ "akka://test/user/testCreateTransaction/shard-txn-1",
+ out);
+
+ // Will wait for the rest of the 3 seconds
+ expectNoMsg();
+ }
+
+
+ };
+ }};
+ }
+
+ @Test
+ public void testOnReceiveCloseTransactionChain() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final Props props = ShardTransactionChain.props(store.createTransactionChain(),
+ testSchemaContext, shardContext);
+ final ActorRef subject = getSystem().actorOf(props, "testCloseTransactionChain");
+
+ new Within(duration("1 seconds")) {
+ @Override
+ protected void run() {
+
+ subject.tell(new CloseTransactionChain().toSerializable(), getRef());
+
+ final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+ // do not put code outside this method, will run afterwards
+ @Override
+ protected String match(Object in) {
+ if (in.getClass().equals(CloseTransactionChainReply.SERIALIZABLE_CLASS)) {
+ return "match";
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+ // Will wait for the rest of the 3 seconds
+ expectNoMsg();
+ }
+
+
+ };
+ }};
+ }
}
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
+
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+
+import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import java.util.Collections;
-
-import static org.junit.Assert.assertTrue;
+import java.util.concurrent.TimeUnit;
/**
* Covers negative test cases
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("operational").build();
- static {
+ private final ShardContext shardContext = new ShardContext();
+
+ @BeforeClass
+ public static void staticSetup() {
store.onGlobalContextUpdated(testSchemaContext);
}
-
@Test(expected = ReadFailedException.class)
public void testNegativeReadWithReadOnlyTransactionClosed()
throws Throwable {
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
- final Props props =
- ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- TestModel.createTestContext());
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+ final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+ testSchemaContext, shardContext);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
).build();
Future<Object> future =
akka.pattern.Patterns.ask(subject, readData, 3000);
- assertTrue(future.isCompleted());
- Await.result(future, Duration.Zero());
+ Await.result(future, Duration.create(3, TimeUnit.SECONDS));
- ((ShardReadTransaction) subject.underlyingActor())
- .forUnitTestOnlyExplicitTransactionClose();
+ subject.underlyingActor().getDOMStoreTransaction().close();
future = akka.pattern.Patterns.ask(subject, readData, 3000);
- Await.result(future, Duration.Zero());
-
-
+ Await.result(future, Duration.create(3, TimeUnit.SECONDS));
}
@Test(expected = ReadFailedException.class)
- public void testNegativeReadWithReadWriteOnlyTransactionClosed()
+ public void testNegativeReadWithReadWriteTransactionClosed()
throws Throwable {
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
- final Props props =
- ShardTransaction.props(store.newReadWriteTransaction(), shard,
- TestModel.createTestContext());
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ testSchemaContext, shardContext);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
- "testNegativeReadWithReadWriteOnlyTransactionClosed");
+ "testNegativeReadWithReadWriteTransactionClosed");
ShardTransactionMessages.ReadData readData =
ShardTransactionMessages.ReadData.newBuilder()
NormalizedNodeMessages.InstanceIdentifier.newBuilder()
.build()
).build();
+
Future<Object> future =
akka.pattern.Patterns.ask(subject, readData, 3000);
- assertTrue(future.isCompleted());
- Await.result(future, Duration.Zero());
+ Await.result(future, Duration.create(3, TimeUnit.SECONDS));
- ((ShardReadWriteTransaction) subject.underlyingActor())
- .forUnitTestOnlyExplicitTransactionClose();
+ subject.underlyingActor().getDOMStoreTransaction().close();
future = akka.pattern.Patterns.ask(subject, readData, 3000);
- Await.result(future, Duration.Zero());
-
-
+ Await.result(future, Duration.create(3, TimeUnit.SECONDS));
}
@Test(expected = ReadFailedException.class)
- public void testNegativeExistsWithReadWriteOnlyTransactionClosed()
+ public void testNegativeExistsWithReadWriteTransactionClosed()
throws Throwable {
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
- final Props props =
- ShardTransaction.props(store.newReadWriteTransaction(), shard,
- TestModel.createTestContext());
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ testSchemaContext, shardContext);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
- "testNegativeExistsWithReadWriteOnlyTransactionClosed");
+ "testNegativeExistsWithReadWriteTransactionClosed");
ShardTransactionMessages.DataExists dataExists =
ShardTransactionMessages.DataExists.newBuilder()
Future<Object> future =
akka.pattern.Patterns.ask(subject, dataExists, 3000);
- assertTrue(future.isCompleted());
- Await.result(future, Duration.Zero());
+ Await.result(future, Duration.create(3, TimeUnit.SECONDS));
- ((ShardReadWriteTransaction) subject.underlyingActor())
- .forUnitTestOnlyExplicitTransactionClose();
+ subject.underlyingActor().getDOMStoreTransaction().close();
future = akka.pattern.Patterns.ask(subject, dataExists, 3000);
- Await.result(future, Duration.Zero());
-
-
+ Await.result(future, Duration.create(3, TimeUnit.SECONDS));
}
@Test(expected = IllegalStateException.class)
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
- final Props props =
- ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
- TestModel.createTestContext());
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+ final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+ testSchemaContext, shardContext);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
Future<Object> future =
akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
- assertTrue(future.isCompleted());
- Await.result(future, Duration.Zero());
+ Await.result(future, Duration.create(3, TimeUnit.SECONDS));
ShardTransactionMessages.WriteData writeData =
ShardTransactionMessages.WriteData.newBuilder()
).build();
future = akka.pattern.Patterns.ask(subject, writeData, 3000);
- assertTrue(future.isCompleted());
- Await.result(future, Duration.Zero());
-
-
+ Await.result(future, Duration.create(3, TimeUnit.SECONDS));
}
-
@Test(expected = IllegalStateException.class)
public void testNegativeReadWriteWithTransactionReady() throws Exception {
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
- final Props props =
- ShardTransaction.props(store.newReadWriteTransaction(), shard,
- TestModel.createTestContext());
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ testSchemaContext, shardContext);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
Future<Object> future =
akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
- assertTrue(future.isCompleted());
- Await.result(future, Duration.Zero());
+ Await.result(future, Duration.create(3, TimeUnit.SECONDS));
ShardTransactionMessages.WriteData writeData =
ShardTransactionMessages.WriteData.newBuilder()
).build();
future = akka.pattern.Patterns.ask(subject, writeData, 3000);
- assertTrue(future.isCompleted());
- Await.result(future, Duration.Zero());
-
-
+ Await.result(future, Duration.create(3, TimeUnit.SECONDS));
}
@Test(expected = IllegalStateException.class)
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
- final Props props =
- ShardTransaction.props(store.newReadWriteTransaction(), shard,
- TestModel.createTestContext());
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ testSchemaContext, shardContext);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props, "testNegativeMergeTransactionReady");
Future<Object> future =
akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
- assertTrue(future.isCompleted());
- Await.result(future, Duration.Zero());
+ Await.result(future, Duration.create(3, TimeUnit.SECONDS));
ShardTransactionMessages.MergeData mergeData =
ShardTransactionMessages.MergeData.newBuilder()
).build();
future = akka.pattern.Patterns.ask(subject, mergeData, 3000);
- assertTrue(future.isCompleted());
- Await.result(future, Duration.Zero());
-
-
+ Await.result(future, Duration.create(3, TimeUnit.SECONDS));
}
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
- final Props props =
- ShardTransaction.props(store.newReadWriteTransaction(), shard,
- TestModel.createTestContext());
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ testSchemaContext, shardContext);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
Future<Object> future =
akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
- assertTrue(future.isCompleted());
- Await.result(future, Duration.Zero());
+ Await.result(future, Duration.create(3, TimeUnit.SECONDS));
ShardTransactionMessages.DeleteData deleteData =
ShardTransactionMessages.DeleteData.newBuilder()
.build()).build();
future = akka.pattern.Patterns.ask(subject, deleteData, 3000);
- assertTrue(future.isCompleted());
- Await.result(future, Duration.Zero());
-
-
+ Await.result(future, Duration.create(3, TimeUnit.SECONDS));
}
}
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.Assert;
+
+import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.Duration;
+
import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("config").build();
+ private ShardContext shardContext = new ShardContext();
- static {
+ @BeforeClass
+ public static void staticSetup() {
store.onGlobalContextUpdated(testSchemaContext);
}
@Test
public void testOnReceiveReadData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
- final Props props =
- ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext);
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+ Collections.EMPTY_MAP, new ShardContext()));
+ final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+ testSchemaContext, shardContext);
final ActorRef subject = getSystem().actorOf(props, "testReadData");
new Within(duration("1 seconds")) {
@Test
public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
- final Props props =
- ShardTransaction.props( store.newReadOnlyTransaction(), shard, testSchemaContext);
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+ Collections.EMPTY_MAP, new ShardContext()));
+ final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
+ testSchemaContext, shardContext);
final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
new Within(duration("1 seconds")) {
@Test
public void testOnReceiveDataExistsPositive() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
- final Props props =
- ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext);
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+ Collections.EMPTY_MAP, new ShardContext()));
+ final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+ testSchemaContext, shardContext);
final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
new Within(duration("1 seconds")) {
public void testOnReceiveDataExistsNegative() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.EMPTY_MAP, null));
- final Props props =
- ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext);
+ Collections.EMPTY_MAP, new ShardContext()));
+ final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+ testSchemaContext, shardContext);
final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
new Within(duration("1 seconds")) {
@Test
public void testOnReceiveWriteData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
- final Props props =
- ShardTransaction.props(store.newWriteOnlyTransaction(), shard, TestModel.createTestContext());
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+ Collections.EMPTY_MAP, new ShardContext()));
+ final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+ testSchemaContext, shardContext);
final ActorRef subject =
getSystem().actorOf(props, "testWriteData");
@Test
public void testOnReceiveMergeData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
- final Props props =
- ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext);
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+ Collections.EMPTY_MAP, new ShardContext()));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ testSchemaContext, shardContext);
final ActorRef subject =
getSystem().actorOf(props, "testMergeData");
@Test
public void testOnReceiveDeleteData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
- final Props props =
- ShardTransaction.props( store.newWriteOnlyTransaction(), shard, TestModel.createTestContext());
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+ Collections.EMPTY_MAP, new ShardContext()));
+ final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
+ testSchemaContext, shardContext);
final ActorRef subject =
getSystem().actorOf(props, "testDeleteData");
@Test
public void testOnReceiveReadyTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
- final Props props =
- ShardTransaction.props( store.newReadWriteTransaction(), shard, TestModel.createTestContext());
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+ Collections.EMPTY_MAP, new ShardContext()));
+ final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
+ testSchemaContext, shardContext);
final ActorRef subject =
getSystem().actorOf(props, "testReadyTransaction");
@Test
public void testOnReceiveCloseTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
- final Props props =
- ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext());
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+ Collections.EMPTY_MAP, new ShardContext()));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ testSchemaContext, shardContext);
final ActorRef subject =
getSystem().actorOf(props, "testCloseTransaction");
watch(subject);
- new Within(duration("2 seconds")) {
+ new Within(duration("6 seconds")) {
@Override
protected void run() {
subject.tell(new CloseTransaction().toSerializable(), getRef());
- final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+ final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
@Override
protected String match(Object in) {
+ System.out.println("!!!IN match 1: "+(in!=null?in.getClass():"NULL"));
if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) {
return "match";
} else {
assertEquals("match", out);
- final String termination = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+ final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
@Override
protected String match(Object in) {
+ System.out.println("!!!IN match 2: "+(in!=null?in.getClass():"NULL"));
if (in instanceof Terminated) {
return "match";
} else {
}
}.get(); // this extracts the received message
-
- expectNoMsg();
+ assertEquals("match", termination);
}
-
-
};
}};
+ }
+ @Test(expected=UnknownMessageException.class)
+ public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+ Collections.EMPTY_MAP, new ShardContext()));
+ final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+ testSchemaContext, shardContext);
+ final TestActorRef subject = TestActorRef.apply(props,getSystem());
+
+ subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
}
+ @Test
+ public void testShardTransactionInactivity() {
+
+ shardContext = new ShardContext(InMemoryDOMDataStoreConfigProperties.getDefault(),
+ Duration.create(500, TimeUnit.MILLISECONDS));
- @Test
- public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
- try {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+ Collections.EMPTY_MAP, new ShardContext()));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ testSchemaContext, shardContext);
+ final ActorRef subject =
+ getSystem().actorOf(props, "testShardTransactionInactivity");
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
- final Props props =
- ShardTransaction.props(store.newReadOnlyTransaction(), shard, TestModel.createTestContext());
- final TestActorRef subject = TestActorRef.apply(props,getSystem());
+ watch(subject);
- subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
- Assert.assertFalse(true);
+ // The shard Tx actor should receive a ReceiveTimeout message and self-destruct.
+ final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
+ // do not put code outside this method, will run afterwards
+ @Override
+ protected String match(Object in) {
+ if (in instanceof Terminated) {
+ return "match";
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
- } catch (Exception cs) {
- assertEquals(UnknownMessageException.class.getSimpleName(), cs.getClass().getSimpleName());
- assertTrue(cs.getMessage(), cs.getMessage().startsWith("Unknown message received "));
+ assertEquals("match", termination);
+ }};
}
- }
}
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
+
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("config").build();
- static {
+ private final ShardContext shardContext = new ShardContext();
+
+ @BeforeClass
+ public static void staticSetup() {
store.onGlobalContextUpdated(testSchemaContext);
}
- private FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS);
+ private final FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS);
@Test(expected = TestException.class)
public void testNegativeAbortResultsInException() throws Exception {
- final ActorRef shard =
- getSystem()
- .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+ Collections.EMPTY_MAP, shardContext));
final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
.mock(DOMStoreThreePhaseCommitCohort.class);
final CompositeModification mockComposite =
assertTrue(future.isCompleted());
Await.result(future, ASK_RESULT_DURATION);
-
-
-
}
@Test(expected = OptimisticLockFailedException.class)
public void testNegativeCanCommitResultsInException() throws Exception {
- final ActorRef shard =
- getSystem()
- .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+ Collections.EMPTY_MAP, shardContext));
final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
.mock(DOMStoreThreePhaseCommitCohort.class);
final CompositeModification mockComposite =
@Test(expected = TestException.class)
public void testNegativePreCommitResultsInException() throws Exception {
- final ActorRef shard =
- getSystem()
- .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+ Collections.EMPTY_MAP, shardContext));
final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
.mock(DOMStoreThreePhaseCommitCohort.class);
final CompositeModification mockComposite =
@Test(expected = TestException.class)
public void testNegativeCommitResultsInException() throws Exception {
- final TestActorRef<Shard> subject = TestActorRef
- .create(getSystem(),
- Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null),
+ final TestActorRef<Shard> subject = TestActorRef.create(getSystem(),
+ Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, shardContext),
"testNegativeCommitResultsInException");
final ActorRef shardTransaction =
- getSystem().actorOf(
- ShardTransaction.props(store.newReadWriteTransaction(), subject,
- TestModel.createTestContext()));
+ getSystem().actorOf(ShardTransaction.props(store.newReadWriteTransaction(), subject,
+ testSchemaContext, shardContext));
ShardTransactionMessages.WriteData writeData =
ShardTransactionMessages.WriteData.newBuilder()
mockForwardCommitTransaction
, 3000);
Await.result(future, ASK_RESULT_DURATION);
-
-
}
private class TestException extends Exception {
}
-
-
}
package org.opendaylight.controller.cluster.datastore;
+import static org.mockito.Mockito.doReturn;
+
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
public class TransactionChainProxyTest {
ActorContext actorContext = Mockito.mock(ActorContext.class);
SchemaContext schemaContext = Mockito.mock(SchemaContext.class);
+
+ @Before
+ public void setUp() {
+ doReturn(schemaContext).when(actorContext).getSchemaContext();
+ }
+
+ @SuppressWarnings("resource")
@Test
public void testNewReadOnlyTransaction() throws Exception {
- DOMStoreTransaction dst = new TransactionChainProxy(actorContext, schemaContext).newReadOnlyTransaction();
+ DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newReadOnlyTransaction();
Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
}
+ @SuppressWarnings("resource")
@Test
public void testNewReadWriteTransaction() throws Exception {
- DOMStoreTransaction dst = new TransactionChainProxy(actorContext, schemaContext).newReadWriteTransaction();
+ DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newReadWriteTransaction();
Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
}
+ @SuppressWarnings("resource")
@Test
public void testNewWriteOnlyTransaction() throws Exception {
- DOMStoreTransaction dst = new TransactionChainProxy(actorContext, schemaContext).newWriteOnlyTransaction();
+ DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newWriteOnlyTransaction();
Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
}
@Test(expected=UnsupportedOperationException.class)
public void testClose() throws Exception {
- new TransactionChainProxy(actorContext, schemaContext).close();
+ new TransactionChainProxy(actorContext).close();
}
}
doReturn(getSystem()).when(mockActorContext).getActorSystem();
doReturn(memberName).when(mockActorContext).getCurrentMemberName();
+ doReturn(schemaContext).when(mockActorContext).getSchemaContext();
ShardStrategyFactory.setConfiguration(configuration);
}
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY, schemaContext);
+ READ_ONLY);
doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
eq(actorSelection(actorRef)), eqReadData(), anyDuration());
executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY, schemaContext);
+ READ_ONLY);
transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
}
executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY, schemaContext);
+ READ_ONLY);
propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
}
anyString(), any(), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY, schemaContext);
+ READ_ONLY);
propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
}
eq(actorSelection(actorRef)), eqReadData(), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE, schemaContext);
+ READ_WRITE);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
eq(actorSelection(actorRef)), eqReadData(), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE, schemaContext);
+ READ_WRITE);
transactionProxy.write(TestModel.TEST_PATH, expectedNode);
public void testReadPreConditionCheck() {
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY, schemaContext);
+ WRITE_ONLY);
transactionProxy.read(TestModel.TEST_PATH);
}
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY, schemaContext);
+ READ_ONLY);
doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY, schemaContext);
+ READ_ONLY);
transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
}
executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY, schemaContext);
+ READ_ONLY);
propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
}
eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE, schemaContext);
+ READ_WRITE);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE, schemaContext);
+ READ_WRITE);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
public void testxistsPreConditionCheck() {
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY, schemaContext);
+ WRITE_ONLY);
transactionProxy.exists(TestModel.TEST_PATH);
}
eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY, schemaContext);
+ WRITE_ONLY);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
public void testWritePreConditionCheck() {
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY, schemaContext);
+ READ_ONLY);
transactionProxy.write(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME));
public void testWriteAfterReadyPreConditionCheck() {
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY, schemaContext);
+ WRITE_ONLY);
transactionProxy.ready();
eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY, schemaContext);
+ WRITE_ONLY);
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY, schemaContext);
+ WRITE_ONLY);
transactionProxy.delete(TestModel.TEST_PATH);
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE, schemaContext);
+ READ_WRITE);
transactionProxy.read(TestModel.TEST_PATH);
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY, schemaContext);
+ WRITE_ONLY);
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY, schemaContext);
+ WRITE_ONLY);
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
anyString(), any(), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY, schemaContext);
+ WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY, schemaContext);
+ WRITE_ONLY);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
public void testGetIdentifier() {
setupActorContextWithInitialCreateTransaction(READ_ONLY);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- TransactionProxy.TransactionType.READ_ONLY, schemaContext);
+ TransactionProxy.TransactionType.READ_ONLY);
Object id = transactionProxy.getIdentifier();
assertNotNull("getIdentifier returned null", id);
eq(actorSelection(actorRef)), eqReadData(), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE, schemaContext);
+ READ_WRITE);
transactionProxy.read(TestModel.TEST_PATH);