import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
+
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+
public class ShardTest extends AbstractActorTest {
@Before
public void setUp() {
- System.setProperty("shard.persistent", "false");
-
InMemorySnapshotStore.clear();
InMemoryJournal.clear();
}
return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
dataStoreContext, SCHEMA_CONTEXT) {
@Override
- public void onReceiveCommand(final Object message) {
+ public void onReceiveCommand(final Object message) throws Exception {
if(message instanceof ElectionTimeout && firstElectionTimeout) {
// Got the first ElectionTimeout. We don't forward it to the
// base Shard yet until we've sent the RegisterChangeListener
}
@Test
- public void testPeerAddressResolved(){
+ public void testPeerAddressResolved() throws Exception {
new ShardTestKit(getSystem()) {{
final CountDownLatch recoveryComplete = new CountDownLatch(1);
class TestShard extends Shard {
}
@Test
- public void testApplySnapshot() throws ExecutionException, InterruptedException {
+ public void testApplySnapshot() throws Exception {
TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
"testApplySnapshot");
@SuppressWarnings({ "unchecked" })
@Test
public void testConcurrentThreePhaseCommits() throws Throwable {
- System.setProperty("shard.persistent", "true");
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
@Test
public void testAbortBeforeFinishCommit() throws Throwable {
- System.setProperty("shard.persistent", "true");
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
@Test
public void testCreateSnapshot() throws IOException, InterruptedException {
+ testCreateSnapshot(true, "testCreateSnapshot");
+ }
+
+ @Test
+ public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
+ testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
+ }
+
+ public void testCreateSnapshot(boolean persistent, final String shardActorName) throws IOException, InterruptedException {
+ final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
+ shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
+
new ShardTestKit(getSystem()) {{
final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
Creator<Shard> creator = new Creator<Shard>() {
return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
dataStoreContext, SCHEMA_CONTEXT) {
@Override
- public void saveSnapshot(Object snapshot) {
- super.saveSnapshot(snapshot);
+ protected void commitSnapshot(long sequenceNumber) {
+ super.commitSnapshot(sequenceNumber);
latch.get().countDown();
}
};
};
TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(creator)), "testCreateSnapshot");
+ Props.create(new DelegatingShardCreator(creator)), shardActorName);
waitUntilLeader(shard);
}
+ @Test
+ public void testRecoveryApplicable(){
+
+ final DatastoreContext persistentContext = DatastoreContext.newBuilder().
+ shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
+
+ final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+ persistentContext, SCHEMA_CONTEXT);
+
+ final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
+ shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
+
+ final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+ nonPersistentContext, SCHEMA_CONTEXT);
+
+ new ShardTestKit(getSystem()) {{
+ TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
+ persistentProps, "testPersistence1");
+
+ assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+ shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+ TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
+ nonPersistentProps, "testPersistence2");
+
+ assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+ shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+ }};
+
+ }
+
+
private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =