package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
import akka.actor.Props;
-import akka.dispatch.Futures;
-import akka.japi.Procedure;
-import akka.persistence.PersistentConfirmation;
-import akka.persistence.PersistentId;
-import akka.persistence.PersistentImpl;
-import akka.persistence.PersistentRepr;
-import akka.persistence.journal.japi.AsyncWriteJournal;
+import akka.japi.Creator;
+import akka.pattern.Patterns;
+import akka.persistence.RecoveryCompleted;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import akka.util.Timeout;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
-import org.junit.AfterClass;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
import scala.concurrent.Future;
-import java.net.URI;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
+public class ShardManagerTest extends AbstractActorTest {
+ private static int ID_COUNTER = 1;
-import static junit.framework.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class ShardManagerTest {
- private static ActorSystem system;
- Configuration mockConfig = new MockConfiguration();
- private static ActorRef defaultShardMockActor;
+ private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
+ private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
- @BeforeClass
- public static void setUpClass() {
- Map<String, String> myJournal = new HashMap<>();
- myJournal.put("class", "org.opendaylight.controller.cluster.datastore.ShardManagerTest$MyJournal");
- myJournal.put("plugin-dispatcher", "akka.actor.default-dispatcher");
- Config config = ConfigFactory.load()
- .withValue("akka.persistence.journal.plugin",
- ConfigValueFactory.fromAnyRef("my-journal"))
- .withValue("my-journal", ConfigValueFactory.fromMap(myJournal));
-
- MyJournal.clear();
-
- system = ActorSystem.create("test", config);
-
- String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
- defaultShardMockActor = system.actorOf(Props.create(DoNothingActor.class), name);
+ private static ActorRef mockShardActor;
+ @Before
+ public void setUp() {
+ InMemoryJournal.clear();
+ if(mockShardActor == null) {
+ String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
+ mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
+ }
}
- @AfterClass
- public static void tearDown() {
- JavaTestKit.shutdownActorSystem(system);
- system = null;
+ @After
+ public void tearDown() {
+ InMemoryJournal.clear();
}
- @Before
- public void setUpTest(){
- MyJournal.clear();
+ private Props newShardMgrProps() {
+
+ DatastoreContext.Builder builder = DatastoreContext.newBuilder();
+ builder.dataStoreType(shardMrgIDSuffix);
+ return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), builder.build());
}
@Test
public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
- new JavaTestKit(system) {
- {
- final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), DatastoreContext.newBuilder().build());
-
- final ActorRef subject = getSystem().actorOf(props);
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- subject.tell(new FindPrimary("inventory").toSerializable(), getRef());
+ shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
- expectMsgEquals(duration("2 seconds"),
- new PrimaryNotFound("inventory").toSerializable());
- }};
+ expectMsgEquals(duration("5 seconds"),
+ new PrimaryNotFound("non-existent").toSerializable());
+ }};
}
@Test
public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
- new JavaTestKit(system) {{
- final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), DatastoreContext.newBuilder().build());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
- final ActorRef subject = getSystem().actorOf(props);
+ expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ }};
+ }
- subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- subject.tell(new ActorInitialized(), defaultShardMockActor);
+ @Test
+ public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
- subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
- expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
- }
- };
+ expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
+ }};
}
@Test
- public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
+ public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
- new JavaTestKit(system) {{
- final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), DatastoreContext.newBuilder().build());
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- final ActorRef subject = getSystem().actorOf(props);
+ // We're passing waitUntilInitialized = true to FindPrimary so the response should be
+ // delayed until we send ActorInitialized.
+ Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
+ new Timeout(5, TimeUnit.SECONDS));
- subject.tell(new FindLocalShard("inventory"), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
- final String out = new ExpectMsg<String>(duration("3 seconds"), "find local") {
- @Override
- protected String match(Object in) {
- if (in instanceof LocalShardNotFound) {
- return ((LocalShardNotFound) in).getShardName();
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("inventory", out);
+ Object resp = Await.result(future, duration("5 seconds"));
+ assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
}};
}
@Test
- public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
-
- final MockClusterWrapper mockClusterWrapper = new MockClusterWrapper();
-
- new JavaTestKit(system) {{
- final Props props = ShardManager
- .props("config", mockClusterWrapper,
- new MockConfiguration(), DatastoreContext.newBuilder().build());
-
- final ActorRef subject = getSystem().actorOf(props);
+ public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
- subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- subject.tell(new ActorInitialized(), defaultShardMockActor);
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
+ shardManager.tell(new FindLocalShard("non-existent", false), getRef());
- final ActorRef out = new ExpectMsg<ActorRef>(duration("3 seconds"), "find local") {
- @Override
- protected ActorRef match(Object in) {
- if (in instanceof LocalShardFound) {
- return ((LocalShardFound) in).getPath();
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
+ LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
- assertTrue(out.path().toString(),
- out.path().toString().contains("member-1-shard-default-config"));
+ assertEquals("getShardName", "non-existent", notFound.getShardName());
}};
}
@Test
- public void testOnReceiveMemberUp() throws Exception {
+ public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
- new JavaTestKit(system) {{
- final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), DatastoreContext.newBuilder().build());
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
- final ActorRef subject = getSystem().actorOf(props);
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
- MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
+ LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ assertTrue("Found path contains " + found.getPath().path().toString(),
+ found.getPath().path().toString().contains("member-1-shard-default-config"));
+ }};
+ }
- subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+ @Test
+ public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
- final String out = new ExpectMsg<String>(duration("3 seconds"), "primary found") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
- PrimaryFound f = PrimaryFound.fromSerializable(in);
- return f.getPrimaryPath();
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
- assertTrue(out, out.contains("member-2-shard-astronauts-config"));
+ expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
}};
}
@Test
- public void testOnReceiveMemberDown() throws Exception {
+ public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
- new JavaTestKit(system) {{
- final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), DatastoreContext.newBuilder().build());
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- final ActorRef subject = getSystem().actorOf(props);
+ // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
+ // delayed until we send ActorInitialized.
+ Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
+ new Timeout(5, TimeUnit.SECONDS));
- MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
- subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+ Object resp = Await.result(future, duration("5 seconds"));
+ assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
+ }};
+ }
- expectMsgClass(duration("3 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ @Test
+ public void testOnReceiveMemberUp() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
- MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString());
+ MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
- subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+ shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
- expectMsgClass(duration("1 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
+ PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
+ PrimaryFound.SERIALIZABLE_CLASS));
+ String path = found.getPrimaryPath();
+ assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
}};
}
@Test
- public void testOnRecoveryJournalIsEmptied(){
- MyJournal.addToJournal(1L, new ShardManager.SchemaContextModules(
- ImmutableSet.of("foo")));
+ public void testOnReceiveMemberDown() throws Exception {
+
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
- assertEquals(1, MyJournal.get().size());
+ MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
- new JavaTestKit(system) {{
- final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), DatastoreContext.newBuilder().build());
+ shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
- final ActorRef subject = getSystem().actorOf(props);
+ expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
- // Send message to check that ShardManager is ready
- subject.tell(new FindPrimary("unknown").toSerializable(), getRef());
+ MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
- expectMsgClass(duration("3 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
+ shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
- assertEquals(0, MyJournal.get().size());
+ expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
+ }};
+ }
+
+ @Test
+ public void testOnRecoveryJournalIsCleaned() {
+ InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
+ ImmutableSet.of("foo")));
+ InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
+ ImmutableSet.of("bar")));
+ InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
+
+ new JavaTestKit(getSystem()) {{
+ TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+ Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
+
+ shardManager.underlyingActor().waitForRecoveryComplete();
+ InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
+
+ // Journal entries up to the last one should've been deleted
+ Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
+ synchronized (journal) {
+ assertEquals("Journal size", 1, journal.size());
+ assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
+ }
}};
}
@Test
public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
- new JavaTestKit(system) {{
- final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), DatastoreContext.newBuilder().build());
- final TestActorRef<ShardManager> subject =
- TestActorRef.create(system, props);
+ final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
+ InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
+ persistedModules));
+ new JavaTestKit(getSystem()) {{
+ TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+ Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
- subject.underlyingActor().onReceiveRecover(new ShardManager.SchemaContextModules(ImmutableSet.of("foo")));
+ shardManager.underlyingActor().waitForRecoveryComplete();
- Collection<String> knownModules = subject.underlyingActor().getKnownModules();
+ Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
- assertTrue(knownModules.contains("foo"));
+ assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
}};
}
@Test
public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
throws Exception {
- new JavaTestKit(system) {{
- final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), DatastoreContext.newBuilder().build());
- final TestActorRef<ShardManager> subject =
- TestActorRef.create(system, props);
-
- Collection<String> knownModules = subject.underlyingActor().getKnownModules();
+ new JavaTestKit(getSystem()) {{
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), newShardMgrProps());
- assertEquals(0, knownModules.size());
-
- SchemaContext schemaContext = mock(SchemaContext.class);
- Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
+ assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
ModuleIdentifier foo = mock(ModuleIdentifier.class);
when(foo.getNamespace()).thenReturn(new URI("foo"));
+ Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
moduleIdentifierSet.add(foo);
+ SchemaContext schemaContext = mock(SchemaContext.class);
when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
- subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
-
- assertTrue(knownModules.contains("foo"));
+ shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
- assertEquals(1, knownModules.size());
+ assertEquals("getKnownModules", Sets.newHashSet("foo"),
+ Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
ModuleIdentifier bar = mock(ModuleIdentifier.class);
when(bar.getNamespace()).thenReturn(new URI("bar"));
moduleIdentifierSet.add(bar);
- subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
-
- assertTrue(knownModules.contains("bar"));
-
- assertEquals(2, knownModules.size());
+ shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+ assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
+ Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
}};
-
}
-
@Test
public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
throws Exception {
- new JavaTestKit(system) {{
- final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), DatastoreContext.newBuilder().build());
- final TestActorRef<ShardManager> subject =
- TestActorRef.create(system, props);
-
- Collection<String> knownModules = subject.underlyingActor().getKnownModules();
-
- assertEquals(0, knownModules.size());
+ new JavaTestKit(getSystem()) {{
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), newShardMgrProps());
SchemaContext schemaContext = mock(SchemaContext.class);
Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
- subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+ shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
- assertTrue(knownModules.contains("foo"));
-
- assertEquals(1, knownModules.size());
+ assertEquals("getKnownModules", Sets.newHashSet("foo"),
+ Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
//Create a completely different SchemaContext with only the bar module in it
- schemaContext = mock(SchemaContext.class);
- moduleIdentifierSet = new HashSet<>();
+ //schemaContext = mock(SchemaContext.class);
+ moduleIdentifierSet.clear();
ModuleIdentifier bar = mock(ModuleIdentifier.class);
when(bar.getNamespace()).thenReturn(new URI("bar"));
moduleIdentifierSet.add(bar);
- subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
-
- assertFalse(knownModules.contains("bar"));
+ shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
- assertEquals(1, knownModules.size());
+ assertEquals("getKnownModules", Sets.newHashSet("foo"),
+ Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
}};
-
}
+ @Test
+ public void testRecoveryApplicable(){
+ new JavaTestKit(getSystem()) {
+ {
+ final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(true).dataStoreType(shardMrgIDSuffix).build());
+ final TestActorRef<ShardManager> persistentShardManager =
+ TestActorRef.create(getSystem(), persistentProps);
+
+ DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
+
+ assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
+
+ final Props nonPersistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(false).dataStoreType(shardMrgIDSuffix).build());
+ final TestActorRef<ShardManager> nonPersistentShardManager =
+ TestActorRef.create(getSystem(), nonPersistentProps);
+
+ DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
+
+ assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
+
+
+ }};
- private void sleep(long period){
- Uninterruptibles.sleepUninterruptibly(period, TimeUnit.MILLISECONDS);
}
- public static class MyJournal extends AsyncWriteJournal {
+ @Test
+ public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
+ throws Exception {
+ final CountDownLatch persistLatch = new CountDownLatch(1);
+ final Creator<ShardManager> creator = new Creator<ShardManager>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public ShardManager create() throws Exception {
+ return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()) {
+ @Override
+ protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
+ DataPersistenceProviderMonitor dataPersistenceProviderMonitor
+ = new DataPersistenceProviderMonitor();
+ dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+ return dataPersistenceProviderMonitor;
+ }
+ };
+ }
+ };
- private static Map<Long, Object> journal = Maps.newTreeMap();
+ new JavaTestKit(getSystem()) {{
- public static void addToJournal(Long sequenceNr, Object value){
- journal.put(sequenceNr, value);
- }
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
- public static Map<Long, Object> get(){
- return journal;
- }
+ ModuleIdentifier foo = mock(ModuleIdentifier.class);
+ when(foo.getNamespace()).thenReturn(new URI("foo"));
+
+ Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
+ moduleIdentifierSet.add(foo);
+
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
+
+ shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertEquals("Persisted", true,
+ Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
- public static void clear(){
- journal.clear();
+ }};
+ }
+
+
+
+ private static class TestShardManager extends ShardManager {
+ private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+
+ TestShardManager(String shardMrgIDSuffix) {
+ super(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build());
}
- @Override public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
- final Procedure<PersistentRepr> replayCallback) {
- if(journal.size() == 0){
- return Futures.successful(null);
- }
- return Futures.future(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- for (Map.Entry<Long, Object> entry : journal.entrySet()) {
- PersistentRepr persistentMessage =
- new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
- false, null, null);
- replayCallback.apply(persistentMessage);
- }
- return null;
+ @Override
+ public void handleRecover(Object message) throws Exception {
+ try {
+ super.handleRecover(message);
+ } finally {
+ if(message instanceof RecoveryCompleted) {
+ recoveryComplete.countDown();
}
- }, context().dispatcher());
+ }
}
- @Override public Future<Long> doAsyncReadHighestSequenceNr(String s, long l) {
- return Futures.successful(-1L);
+ void waitForRecoveryComplete() {
+ assertEquals("Recovery complete", true,
+ Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
}
+ }
- @Override public Future<Void> doAsyncWriteMessages(
- final Iterable<PersistentRepr> persistentReprs) {
- return Futures.future(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- for (PersistentRepr repr : persistentReprs){
- if(repr.payload() instanceof ShardManager.SchemaContextModules) {
- journal.put(repr.sequenceNr(), repr.payload());
- }
- }
- return null;
- }
- }, context().dispatcher());
+ @SuppressWarnings("serial")
+ static class TestShardManagerCreator implements Creator<TestShardManager> {
+ String shardMrgIDSuffix;
+
+ TestShardManagerCreator(String shardMrgIDSuffix) {
+ this.shardMrgIDSuffix = shardMrgIDSuffix;
}
- @Override public Future<Void> doAsyncWriteConfirmations(
- Iterable<PersistentConfirmation> persistentConfirmations) {
- return Futures.successful(null);
+ @Override
+ public TestShardManager create() throws Exception {
+ return new TestShardManager(shardMrgIDSuffix);
}
- @Override public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> persistentIds,
- boolean b) {
- clear();
- return Futures.successful(null);
+ }
+
+ private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
+ private static final long serialVersionUID = 1L;
+ private Creator<ShardManager> delegate;
+
+ public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
+ this.delegate = delegate;
}
- @Override public Future<Void> doAsyncDeleteMessagesTo(String s, long l, boolean b) {
- clear();
- return Futures.successful(null);
+ @Override
+ public ShardManager create() throws Exception {
+ return delegate.create();
}
}
}