import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
+
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
+
public class ShardTest extends AbstractActorTest {
@Before
public void setUp() {
- System.setProperty("shard.persistent", "false");
-
InMemorySnapshotStore.clear();
InMemoryJournal.clear();
}
return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
dataStoreContext, SCHEMA_CONTEXT) {
@Override
- public void onReceiveCommand(final Object message) {
+ public void onReceiveCommand(final Object message) throws Exception {
if(message instanceof ElectionTimeout && firstElectionTimeout) {
// Got the first ElectionTimeout. We don't forward it to the
// base Shard yet until we've sent the RegisterChangeListener
}
@Test
- public void testPeerAddressResolved(){
+ public void testPeerAddressResolved() throws Exception {
new ShardTestKit(getSystem()) {{
final CountDownLatch recoveryComplete = new CountDownLatch(1);
class TestShard extends Shard {
}
@Test
- public void testApplySnapshot() throws ExecutionException, InterruptedException {
+ public void testApplySnapshot() throws Exception {
TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
"testApplySnapshot");
@SuppressWarnings({ "unchecked" })
@Test
public void testConcurrentThreePhaseCommits() throws Throwable {
- System.setProperty("shard.persistent", "true");
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
// Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true), getRef());
ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
// Send the ForwardedReadyTransaction for the next 2 Tx's.
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+ cohort3, modification3, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message for the first Tx.
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message.
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message.
@Test
public void testAbortBeforeFinishCommit() throws Throwable {
- System.setProperty("shard.persistent", "true");
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
modification, preCommit);
- shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
// Ready the Tx's
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// canCommit 1st Tx. We don't send the commit so it should timeout.
// Ready the Tx's
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+ cohort3, modification3, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// canCommit 1st Tx.
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message for the first Tx.
@Test
public void testCreateSnapshot() throws IOException, InterruptedException {
+ testCreateSnapshot(true, "testCreateSnapshot");
+ }
+
+ @Test
+ public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
+ testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
+ }
+
+ public void testCreateSnapshot(boolean persistent, final String shardActorName) throws IOException, InterruptedException {
+ final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
+ shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
+
new ShardTestKit(getSystem()) {{
final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
Creator<Shard> creator = new Creator<Shard>() {
return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
dataStoreContext, SCHEMA_CONTEXT) {
@Override
- public void saveSnapshot(Object snapshot) {
- super.saveSnapshot(snapshot);
+ protected void commitSnapshot(long sequenceNumber) {
+ super.commitSnapshot(sequenceNumber);
latch.get().countDown();
}
};
};
TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(creator)), "testCreateSnapshot");
+ Props.create(new DelegatingShardCreator(creator)), shardActorName);
waitUntilLeader(shard);
}
+ @Test
+ public void testRecoveryApplicable(){
+
+ final DatastoreContext persistentContext = DatastoreContext.newBuilder().
+ shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
+
+ final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+ persistentContext, SCHEMA_CONTEXT);
+
+ final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
+ shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
+
+ final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+ nonPersistentContext, SCHEMA_CONTEXT);
+
+ new ShardTestKit(getSystem()) {{
+ TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
+ persistentProps, "testPersistence1");
+
+ assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+ shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+ TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
+ nonPersistentProps, "testPersistence2");
+
+ assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+ shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+ }};
+
+ }
+
+
private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
};
}
- private NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
+ static NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
throws ExecutionException, InterruptedException {
DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();