From be4e53698d37883b68b96f45d5a71683ca7fd8e6 Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Wed, 27 Aug 2014 14:12:36 -0700 Subject: [PATCH] Implement creating and applying of snapshot for a shard This commit implements creating and applying of snapshot as per the RaftActor contract. There was an issue related to recovery which was occurring because Shard was created without a schemaContext so there are changes in this commit which ensure that the Shard is not created before a schemaContext is received Change-Id: I45fd64885f09fac57f1f5ff235144064b94ab129 Signed-off-by: Moiz Raja --- .../controller/cluster/datastore/Shard.java | 160 +++++++++++++--- .../cluster/datastore/ShardManager.java | 23 +-- .../datastore/messages/ReadDataReply.java | 6 + .../cluster/datastore/AbstractActorTest.java | 14 +- .../datastore/BasicIntegrationTest.java | 2 +- .../cluster/datastore/ShardManagerTest.java | 6 + .../cluster/datastore/ShardTest.java | 179 +++++++++++++++++- .../ShardTransactionFailureTest.java | 25 ++- .../datastore/ShardTransactionTest.java | 38 ++-- .../ThreePhaseCommitCohortFailureTest.java | 14 +- 10 files changed, 370 insertions(+), 97 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 6a6a181b6c..b40ccd08e8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -10,19 +10,22 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSelection; +import akka.actor.PoisonPill; import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; import akka.persistence.RecoveryFailure; import akka.serialization.Serialization; - +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; @@ -35,26 +38,32 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionR import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; +import org.opendaylight.controller.cluster.datastore.messages.ReadData; +import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; +import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; - import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; @@ -102,12 +111,15 @@ public class Shard extends RaftActor { private SchemaContext schemaContext; + private ActorRef createSnapshotTransaction; + private Shard(ShardIdentifier name, Map peerAddresses, - DatastoreContext datastoreContext) { + DatastoreContext datastoreContext, SchemaContext schemaContext) { super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams)); this.name = name; this.datastoreContext = datastoreContext; + this.schemaContext = schemaContext; String setting = System.getProperty("shard.persistent"); @@ -118,6 +130,10 @@ public class Shard extends RaftActor { store = InMemoryDOMDataStoreFactory.create(name.toString(), null, datastoreContext.getDataStoreProperties()); + if(schemaContext != null) { + store.onGlobalContextUpdated(schemaContext); + } + shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString()); @@ -137,12 +153,13 @@ public class Shard extends RaftActor { public static Props props(final ShardIdentifier name, final Map peerAddresses, - DatastoreContext datastoreContext) { + DatastoreContext datastoreContext, SchemaContext schemaContext) { Preconditions.checkNotNull(name, "name should not be null"); Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null"); - Preconditions.checkNotNull(datastoreContext, "shardContext should not be null"); + Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null"); + Preconditions.checkNotNull(schemaContext, "schemaContext should not be null"); - return Props.create(new ShardCreator(name, peerAddresses, datastoreContext)); + return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext)); } @Override public void onReceiveRecover(Object message) { @@ -167,6 +184,15 @@ public class Shard extends RaftActor { } else if (getLeader() != null) { getLeader().forward(message, getContext()); } + } else if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { + // This must be for install snapshot. Don't want to open this up and trigger + // deSerialization + self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), self()); + + // Send a PoisonPill instead of sending close transaction because we do not really need + // a response + getSender().tell(PoisonPill.getInstance(), self()); + } else if (message instanceof RegisterChangeListener) { registerChangeListener((RegisterChangeListener) message); } else if (message instanceof UpdateSchemaContext) { @@ -190,9 +216,14 @@ public class Shard extends RaftActor { } private ActorRef createTypedTransactionActor( - CreateTransaction createTransaction, + int transactionType, ShardTransactionIdentifier transactionId) { - if (createTransaction.getTransactionType() + + if(this.schemaContext == null){ + throw new NullPointerException("schemaContext should not be null"); + } + + if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) { shardMBean.incrementReadOnlyTransactionCount(); @@ -201,7 +232,7 @@ public class Shard extends RaftActor { ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(), schemaContext,datastoreContext, name.toString()), transactionId.toString()); - } else if (createTransaction.getTransactionType() + } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) { shardMBean.incrementReadWriteTransactionCount(); @@ -211,7 +242,7 @@ public class Shard extends RaftActor { schemaContext, datastoreContext,name.toString()), transactionId.toString()); - } else if (createTransaction.getTransactionType() + } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) { shardMBean.incrementWriteOnlyTransactionCount(); @@ -222,28 +253,42 @@ public class Shard extends RaftActor { } else { throw new IllegalArgumentException( "Shard="+name + ":CreateTransaction message has unidentified transaction type=" - + createTransaction.getTransactionType()); + + transactionType); } } private void createTransaction(CreateTransaction createTransaction) { + createTransaction(createTransaction.getTransactionType(), + createTransaction.getTransactionId()); + } + + private ActorRef createTransaction(int transactionType, String remoteTransactionId) { ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder() - .remoteTransactionId(createTransaction.getTransactionId()) + .remoteTransactionId(remoteTransactionId) .build(); LOG.debug("Creating transaction : {} ", transactionId); ActorRef transactionActor = - createTypedTransactionActor(createTransaction, transactionId); + createTypedTransactionActor(transactionType, transactionId); getSender() .tell(new CreateTransactionReply( Serialization.serializedActorPath(transactionActor), - createTransaction.getTransactionId()).toSerializable(), - getSelf() - ); + remoteTransactionId).toSerializable(), + getSelf()); + + return transactionActor; } + private void syncCommitTransaction(DOMStoreWriteTransaction transaction) + throws ExecutionException, InterruptedException { + DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); + commitCohort.preCommit().get(); + commitCohort.commit().get(); + } + + private void commit(final ActorRef sender, Object serialized) { Modification modification = MutableCompositeModification .fromSerializable(serialized, schemaContext); @@ -253,16 +298,11 @@ public class Shard extends RaftActor { LOG.debug( "Could not find cohort for modification : {}. Writing modification using a new transaction", modification); - DOMStoreReadWriteTransaction transaction = - store.newReadWriteTransaction(); + DOMStoreWriteTransaction transaction = + store.newWriteOnlyTransaction(); modification.apply(transaction); - DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); - ListenableFuture future = - commitCohort.preCommit(); try { - future.get(); - future = commitCohort.commit(); - future.get(); + syncCommitTransaction(transaction); } catch (InterruptedException | ExecutionException e) { shardMBean.incrementFailedTransactionsCount(); LOG.error("Failed to commit", e); @@ -311,9 +351,14 @@ public class Shard extends RaftActor { private void updateSchemaContext(UpdateSchemaContext message) { this.schemaContext = message.getSchemaContext(); + updateSchemaContext(message.getSchemaContext()); store.onGlobalContextUpdated(message.getSchemaContext()); } + @VisibleForTesting void updateSchemaContext(SchemaContext schemaContext) { + store.onGlobalContextUpdated(schemaContext); + } + private void registerChangeListener( RegisterChangeListener registerChangeListener) { @@ -360,7 +405,7 @@ public class Shard extends RaftActor { ActorRef transactionChain = getContext().actorOf( ShardTransactionChain.props(chain, schemaContext, datastoreContext,name.toString() )); getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(), - getSelf()); + getSelf()); } @Override protected void applyState(ActorRef clientActor, String identifier, @@ -396,11 +441,39 @@ public class Shard extends RaftActor { } @Override protected void createSnapshot() { - throw new UnsupportedOperationException("createSnapshot"); + if (createSnapshotTransaction == null) { + + // Create a transaction. We are really going to treat the transaction as a worker + // so that this actor does not get block building the snapshot + createSnapshotTransaction = createTransaction( + TransactionProxy.TransactionType.READ_ONLY.ordinal(), + "createSnapshot"); + + createSnapshotTransaction.tell( + new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self()); + + } } - @Override protected void applySnapshot(ByteString snapshot) { - throw new UnsupportedOperationException("applySnapshot"); + @VisibleForTesting @Override protected void applySnapshot(ByteString snapshot) { + // Since this will be done only on Recovery or when this actor is a Follower + // we can safely commit everything in here. We not need to worry about event notifications + // as they would have already been disabled on the follower + try { + DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); + NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot); + NormalizedNode node = new NormalizedNodeToNodeCodec(schemaContext) + .decode(YangInstanceIdentifier.builder().build(), serializedNode); + + // delete everything first + transaction.delete(YangInstanceIdentifier.builder().build()); + + // Add everything from the remote node back + transaction.write(YangInstanceIdentifier.builder().build(), node); + syncCommitTransaction(transaction); + } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) { + LOG.error(e, "An exception occurred when applying snapshot"); + } } @Override protected void onStateChanged() { @@ -438,17 +511,42 @@ public class Shard extends RaftActor { final ShardIdentifier name; final Map peerAddresses; final DatastoreContext datastoreContext; + final SchemaContext schemaContext; ShardCreator(ShardIdentifier name, Map peerAddresses, - DatastoreContext datastoreContext) { + DatastoreContext datastoreContext, SchemaContext schemaContext) { this.name = name; this.peerAddresses = peerAddresses; this.datastoreContext = datastoreContext; + this.schemaContext = schemaContext; } @Override public Shard create() throws Exception { - return new Shard(name, peerAddresses, datastoreContext); + return new Shard(name, peerAddresses, datastoreContext, schemaContext); } } + + @VisibleForTesting NormalizedNode readStore() throws ExecutionException, InterruptedException { + DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); + + CheckedFuture>, ReadFailedException> future = + transaction.read(YangInstanceIdentifier.builder().build()); + + NormalizedNode node = future.get().get(); + + transaction.close(); + + return node; + } + + @VisibleForTesting void writeToStore(YangInstanceIdentifier id, NormalizedNode node) + throws ExecutionException, InterruptedException { + DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); + + transaction.write(id, node); + + syncCommitTransaction(transaction); + } + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index e51d49bff2..adfda1ccbe 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -17,9 +17,7 @@ import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; import akka.japi.Creator; import akka.japi.Function; - import com.google.common.base.Preconditions; - import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; @@ -32,8 +30,8 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; - import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; import java.util.ArrayList; @@ -89,9 +87,7 @@ public class ShardManager extends AbstractUntypedActor { // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); - // Create all the local Shards and make them a child of the ShardManager - // TODO: This may need to be initiated when we first get the schema context - createLocalShards(); + //createLocalShards(null); } public static Props props(final String type, @@ -162,8 +158,14 @@ public class ShardManager extends AbstractUntypedActor { * @param message */ private void updateSchemaContext(Object message) { - for(ShardInformation info : localShards.values()){ - info.getActor().tell(message,getSelf()); + SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); + + if(localShards.size() == 0){ + createLocalShards(schemaContext); + } else { + for (ShardInformation info : localShards.values()) { + info.getActor().tell(message, getSelf()); + } } } @@ -235,7 +237,7 @@ public class ShardManager extends AbstractUntypedActor { * runs * */ - private void createLocalShards() { + private void createLocalShards(SchemaContext schemaContext) { String memberName = this.cluster.getCurrentMemberName(); List memberShardNames = this.configuration.getMemberShardNames(memberName); @@ -245,9 +247,8 @@ public class ShardManager extends AbstractUntypedActor { ShardIdentifier shardId = getShardIdentifier(memberName, shardName); Map peerAddresses = getPeerAddresses(shardName); ActorRef actor = getContext() - .actorOf(Shard.props(shardId, peerAddresses, datastoreContext). + .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext). withMailbox(ActorContext.MAILBOX), shardId.toString()); - localShardActorNames.add(shardId.toString()); localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses)); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java index c5498ca228..fc6bcff64a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore.messages; +import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -46,4 +47,9 @@ public class ReadDataReply implements SerializableMessage{ ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable; return new ReadDataReply(schemaContext,new NormalizedNodeToNodeCodec(schemaContext).decode(id, o.getNormalizedNode())); } + + public static ByteString getNormalizedNodeByteString(Object serializable){ + ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable; + return ((ShardTransactionMessages.ReadDataReply) serializable).getNormalizedNode().toByteString(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java index e23a76b0b2..4c550a768c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java @@ -22,11 +22,6 @@ public abstract class AbstractActorTest { @BeforeClass public static void setUpClass() throws IOException { - File journal = new File("journal"); - - if(journal.exists()) { - FileUtils.deleteDirectory(journal); - } System.setProperty("shard.persistent", "false"); system = ActorSystem.create("test"); @@ -36,12 +31,21 @@ public abstract class AbstractActorTest { public static void tearDownClass() throws IOException { JavaTestKit.shutdownActorSystem(system); system = null; + } + protected static void deletePersistenceFiles() throws IOException { File journal = new File("journal"); if(journal.exists()) { FileUtils.deleteDirectory(journal); } + + File snapshots = new File("snapshots"); + + if(snapshots.exists()){ + FileUtils.deleteDirectory(snapshots); + } + } protected ActorSystem getSystem() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java index 6f131f301f..50367e66ce 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java @@ -64,7 +64,7 @@ public class BasicIntegrationTest extends AbstractActorTest { final SchemaContext schemaContext = TestModel.createTestContext(); DatastoreContext datastoreContext = new DatastoreContext(); - final Props props = Shard.props(identifier, Collections.EMPTY_MAP, datastoreContext); + final Props props = Shard.props(identifier, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()); final ActorRef shard = getSystem().actorOf(props); new Within(duration("10 seconds")) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 1feefd1c1f..02201f7cd1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -15,8 +15,10 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; +import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import scala.concurrent.duration.Duration; import static junit.framework.Assert.assertEquals; @@ -71,6 +73,8 @@ public class ShardManagerTest { final TestActorRef subject = TestActorRef.create(system, props); + subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + new Within(duration("10 seconds")) { @Override protected void run() { @@ -132,6 +136,8 @@ public class ShardManagerTest { final TestActorRef subject = TestActorRef.create(system, props); + subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + new Within(duration("10 seconds")) { @Override protected void run() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 4466e50f96..766dcb7268 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -1,10 +1,15 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; +import akka.actor.ActorSystem; import akka.actor.Props; import akka.event.Logging; import akka.testkit.JavaTestKit; - +import akka.testkit.TestActorRef; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; @@ -16,21 +21,32 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class ShardTest extends AbstractActorTest { @@ -44,7 +60,7 @@ public class ShardTest extends AbstractActorTest { ShardIdentifier.builder().memberName("member-1") .shardName("inventory").type("config").build(); - final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT); + final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext()); final ActorRef subject = getSystem().actorOf(props, "testCreateTransactionChain"); @@ -103,7 +119,7 @@ public class ShardTest extends AbstractActorTest { ShardIdentifier.builder().memberName("member-1") .shardName("inventory").type("config").build(); - final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT); + final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext()); final ActorRef subject = getSystem().actorOf(props, "testRegisterChangeListener"); @@ -165,7 +181,7 @@ public class ShardTest extends AbstractActorTest { ShardIdentifier.builder().memberName("member-1") .shardName("inventory").type("config").build(); - final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT); + final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext()); final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction"); @@ -227,7 +243,7 @@ public class ShardTest extends AbstractActorTest { .shardName("inventory").type("config").build(); peerAddresses.put(identifier, null); - final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT); + final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext()); final ActorRef subject = getSystem().actorOf(props, "testPeerAddressResolved"); @@ -245,6 +261,157 @@ public class ShardTest extends AbstractActorTest { }}; } + @Test + public void testApplySnapshot() throws ExecutionException, InterruptedException { + Map peerAddresses = new HashMap<>(); + + final ShardIdentifier identifier = + ShardIdentifier.builder().memberName("member-1") + .shardName("inventory").type("config").build(); + + peerAddresses.put(identifier, null); + final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext()); + + TestActorRef ref = TestActorRef.create(getSystem(), props); + + ref.underlyingActor().updateSchemaContext(TestModel.createTestContext()); + + NormalizedNodeToNodeCodec codec = + new NormalizedNodeToNodeCodec(TestModel.createTestContext()); + + ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + NormalizedNode expected = ref.underlyingActor().readStore(); + + NormalizedNodeMessages.Container encode = codec + .encode(YangInstanceIdentifier.builder().build(), expected); + + + ref.underlyingActor().applySnapshot(encode.getNormalizedNode().toByteString()); + + NormalizedNode actual = ref.underlyingActor().readStore(); + + assertEquals(expected, actual); + } + + private static class ShardTestKit extends JavaTestKit { + + private ShardTestKit(ActorSystem actorSystem) { + super(actorSystem); + } + + protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){ + // Wait for a specific log message to show up + final boolean result = + new JavaTestKit.EventFilter(logLevel + ) { + @Override + protected Boolean run() { + return true; + } + }.from(subject.path().toString()) + .message(logMessage) + .occurrences(1).exec(); + + Assert.assertEquals(true, result); + + } + + } + + @Test + public void testCreateSnapshot() throws IOException, InterruptedException { + new ShardTestKit(getSystem()) {{ + final ShardIdentifier identifier = + ShardIdentifier.builder().memberName("member-1") + .shardName("inventory").type("config").build(); + + final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext()); + final ActorRef subject = + getSystem().actorOf(props, "testCreateSnapshot"); + + // Wait for a specific log message to show up + this.waitForLogMessage(Logging.Info.class, subject, "Switching from state Candidate to Leader"); + + + new Within(duration("3 seconds")) { + @Override + protected void run() { + + subject.tell( + new UpdateSchemaContext(TestModel.createTestContext()), + getRef()); + + subject.tell(new CaptureSnapshot(-1,-1,-1,-1), + getRef()); + + waitForLogMessage(Logging.Debug.class, subject, "CaptureSnapshotReply received by actor"); + } + }; + + Thread.sleep(2000); + deletePersistenceFiles(); + }}; + } + + /** + * This test simply verifies that the applySnapShot logic will work + * @throws ReadFailedException + */ + @Test + public void testInMemoryDataStoreRestore() throws ReadFailedException { + InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator( + MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor()); + + store.onGlobalContextUpdated(TestModel.createTestContext()); + + DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction(); + putTransaction.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + commitTransaction(putTransaction); + + + NormalizedNode expected = readStore(store); + + DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction(); + + writeTransaction.delete(YangInstanceIdentifier.builder().build()); + writeTransaction.write(YangInstanceIdentifier.builder().build(), expected); + + commitTransaction(writeTransaction); + + NormalizedNode actual = readStore(store); + + assertEquals(expected, actual); + + } + + private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException { + DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); + CheckedFuture>, ReadFailedException> read = + transaction.read(YangInstanceIdentifier.builder().build()); + + Optional> optional = read.checkedGet(); + + NormalizedNode normalizedNode = optional.get(); + + transaction.close(); + + return normalizedNode; + } + + private void commitTransaction(DOMStoreWriteTransaction transaction) { + DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); + ListenableFuture future = + commitCohort.preCommit(); + try { + future.get(); + future = commitCohort.commit(); + future.get(); + } catch (InterruptedException | ExecutionException e) { + } + } + private AsyncDataChangeListener> noOpDataChangeListener() { return new AsyncDataChangeListener>() { @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java index 4fe60f6467..4e73c70b9d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -64,12 +64,15 @@ public class ShardTransactionFailureTest extends AbstractActorTest { store.onGlobalContextUpdated(testSchemaContext); } + private ActorRef createShard(){ + return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext(), TestModel.createTestContext())); + } + @Test(expected = ReadFailedException.class) public void testNegativeReadWithReadOnlyTransactionClosed() throws Throwable { - final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext())); + final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()); @@ -98,8 +101,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { public void testNegativeReadWithReadWriteTransactionClosed() throws Throwable { - final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext())); + final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()); @@ -128,8 +130,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { public void testNegativeExistsWithReadWriteTransactionClosed() throws Throwable { - final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext())); + final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()); @@ -158,8 +159,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { public void testNegativeWriteWithTransactionReady() throws Exception { - final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext())); + final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()); @@ -191,8 +191,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { public void testNegativeReadWriteWithTransactionReady() throws Exception { - final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext())); + final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()); @@ -229,8 +228,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { public void testNegativeMergeTransactionReady() throws Exception { - final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext())); + final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()); @@ -262,8 +260,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { public void testNegativeDeleteDataWhenTransactionReady() throws Exception { - final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext())); + final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index ff2ee08f94..c9e08f9cbb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -67,11 +67,15 @@ public class ShardTransactionTest extends AbstractActorTest { store.onGlobalContextUpdated(testSchemaContext); } + private ActorRef createShard(){ + return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, + Collections.EMPTY_MAP, new DatastoreContext(), TestModel.createTestContext())); + } + @Test public void testOnReceiveReadData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, - Collections.EMPTY_MAP, new DatastoreContext())); + final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()); final ActorRef subject = getSystem().actorOf(props, "testReadData"); @@ -113,8 +117,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveReadDataWhenDataNotFound() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, - Collections.EMPTY_MAP, new DatastoreContext())); + final ActorRef shard = createShard(); final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()); final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound"); @@ -157,8 +160,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveDataExistsPositive() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, - Collections.EMPTY_MAP, new DatastoreContext())); + final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()); final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive"); @@ -200,8 +202,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveDataExistsNegative() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, - Collections.EMPTY_MAP, new DatastoreContext())); + final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()); final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative"); @@ -278,8 +279,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveWriteData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, - Collections.EMPTY_MAP, new DatastoreContext())); + final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()); final ActorRef subject = @@ -319,8 +319,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveMergeData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, - Collections.EMPTY_MAP, new DatastoreContext())); + final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()); final ActorRef subject = @@ -361,8 +360,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveDeleteData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, - Collections.EMPTY_MAP, new DatastoreContext())); + final ActorRef shard = createShard(); final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard, testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()); final ActorRef subject = @@ -401,8 +399,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveReadyTransaction() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, - Collections.EMPTY_MAP, new DatastoreContext())); + final ActorRef shard = createShard(); final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()); final ActorRef subject = @@ -440,8 +437,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveCloseTransaction() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, - Collections.EMPTY_MAP, new DatastoreContext())); + final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()); final ActorRef subject = @@ -491,8 +487,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test(expected=UnknownMessageException.class) public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception { - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, - Collections.EMPTY_MAP, new DatastoreContext())); + final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()); final TestActorRef subject = TestActorRef.apply(props,getSystem()); @@ -507,8 +502,7 @@ public class ShardTransactionTest extends AbstractActorTest { Duration.create(500, TimeUnit.MILLISECONDS)); new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, - Collections.EMPTY_MAP, new DatastoreContext())); + final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()); final ActorRef subject = diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java index e39b9abd65..a0e6b412dd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java @@ -74,12 +74,14 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest { private final FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS); + private ActorRef createShard(){ + return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext())); + } @Test(expected = TestException.class) public void testNegativeAbortResultsInException() throws Exception { - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, - Collections.EMPTY_MAP, datastoreContext)); + final ActorRef shard = createShard(); final DOMStoreThreePhaseCommitCohort mockCohort = Mockito .mock(DOMStoreThreePhaseCommitCohort.class); final CompositeModification mockComposite = @@ -107,8 +109,7 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest { @Test(expected = OptimisticLockFailedException.class) public void testNegativeCanCommitResultsInException() throws Exception { - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, - Collections.EMPTY_MAP, datastoreContext)); + final ActorRef shard = createShard(); final DOMStoreThreePhaseCommitCohort mockCohort = Mockito .mock(DOMStoreThreePhaseCommitCohort.class); final CompositeModification mockComposite = @@ -139,8 +140,7 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest { @Test(expected = TestException.class) public void testNegativePreCommitResultsInException() throws Exception { - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, - Collections.EMPTY_MAP, datastoreContext)); + final ActorRef shard = createShard(); final DOMStoreThreePhaseCommitCohort mockCohort = Mockito .mock(DOMStoreThreePhaseCommitCohort.class); final CompositeModification mockComposite = @@ -170,7 +170,7 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest { public void testNegativeCommitResultsInException() throws Exception { final TestActorRef subject = TestActorRef.create(getSystem(), - Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext), + Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()), "testNegativeCommitResultsInException"); final ActorRef shardTransaction = -- 2.36.6