import akka.actor.ActorRef;
import akka.actor.Props;
+import akka.japi.Creator;
+import akka.pattern.Patterns;
import akka.persistence.RecoveryCompleted;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
-import akka.japi.Creator;
+import akka.util.Timeout;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
import java.net.URI;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shardManager.tell(new FindPrimary("non-existent").toSerializable(), getRef());
+ shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
expectMsgEquals(duration("5 seconds"),
new PrimaryNotFound("non-existent").toSerializable());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
}};
}
@Test
- public void testOnReceiveFindPrimaryForNotInitialzedShard() throws Exception {
+ public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
}};
}
+ @Test
+ public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ // We're passing waitUntilInitialized = true to FindPrimary so the response should be
+ // delayed until we send ActorInitialized.
+ Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
+ new Timeout(5, TimeUnit.SECONDS));
+
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ Object resp = Await.result(future, duration("5 seconds"));
+ assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
+ }};
+ }
+
@Test
public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shardManager.tell(new FindLocalShard("non-existent"), getRef());
+ shardManager.tell(new FindLocalShard("non-existent", false), getRef());
LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- //shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
}};
}
+ @Test
+ public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
+ // delayed until we send ActorInitialized.
+ Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
+ new Timeout(5, TimeUnit.SECONDS));
+
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ Object resp = Await.result(future, duration("5 seconds"));
+ assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
+ }};
+ }
+
@Test
public void testOnReceiveMemberUp() throws Exception {
new JavaTestKit(getSystem()) {{
MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
- shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+ shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
PrimaryFound.SERIALIZABLE_CLASS));
MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
- shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+ shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
- shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+ shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
}};
}};
}
+ @Test
+ public void testRecoveryApplicable(){
+ new JavaTestKit(getSystem()) {
+ {
+ final Props persistentProps = ShardManager.props(shardMrgIDSuffix,
+ new MockClusterWrapper(),
+ new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(true).build());
+ final TestActorRef<ShardManager> persistentShardManager =
+ TestActorRef.create(getSystem(), persistentProps);
+
+ DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
+
+ assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
+
+ final Props nonPersistentProps = ShardManager.props(shardMrgIDSuffix,
+ new MockClusterWrapper(),
+ new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(false).build());
+ final TestActorRef<ShardManager> nonPersistentShardManager =
+ TestActorRef.create(getSystem(), nonPersistentProps);
+
+ DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
+
+ assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
+
+
+ }};
+
+ }
+
+ @Test
+ public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
+ throws Exception {
+ final CountDownLatch persistLatch = new CountDownLatch(1);
+ final Creator<ShardManager> creator = new Creator<ShardManager>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public ShardManager create() throws Exception {
+ return new ShardManager(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build()) {
+ @Override
+ protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
+ DataPersistenceProviderMonitor dataPersistenceProviderMonitor
+ = new DataPersistenceProviderMonitor();
+ dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+ return dataPersistenceProviderMonitor;
+ }
+ };
+ }
+ };
+
+ new JavaTestKit(getSystem()) {{
+
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
+
+ ModuleIdentifier foo = mock(ModuleIdentifier.class);
+ when(foo.getNamespace()).thenReturn(new URI("foo"));
+
+ Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
+ moduleIdentifierSet.add(foo);
+
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
+
+ shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertEquals("Persisted", true,
+ Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
+
+ }};
+ }
+
+
private static class TestShardManager extends ShardManager {
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
}
}
+
+ private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
+ private static final long serialVersionUID = 1L;
+ private Creator<ShardManager> delegate;
+
+ public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public ShardManager create() throws Exception {
+ return delegate.create();
+ }
+ }
}