import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.ConfigFactory;
import java.net.URI;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.SerializationUtils;
import org.junit.Test;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.ShardManager.SchemaContextModules;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
private ActorSystem newActorSystem(String config) {
- ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
- actorSystems.add(system);
- return system;
+ return newActorSystem("cluster-test", config);
}
private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
if (system == getSystem()) {
- return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name);
+ return actorFactory.createActor(Props.create(MessageCollectorActor.class), name);
}
- return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
+ return system.actorOf(Props.create(MessageCollectorActor.class), name);
}
private Props newShardMgrProps() {
}
private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) {
- return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor);
+ return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
+ .distributedDataStore(mock(DistributedDataStore.class));
}
}
private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) {
- return newTestShardMgrBuilderWithMockShardActor(shardActor).props();
+ return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId());
}
}
};
- final TestActorRef<MessageCollectorActor> defaultShardActor = actorFactory.createTestActor(
+ final ActorRef defaultShardActor = actorFactory.createActor(
Props.create(MessageCollectorActor.class), actorFactory.generateActorId("default"));
- final TestActorRef<MessageCollectorActor> topologyShardActor = actorFactory.createTestActor(
+ final ActorRef topologyShardActor = actorFactory.createActor(
Props.create(MessageCollectorActor.class), actorFactory.generateActorId("topology"));
final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
ActorRef shardManager = actorFactory
- .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+ .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
+ .withDispatcher(Dispatchers.DefaultDispatcherId()));
SchemaContext schemaContext = TestModel.createTestContext();
shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
ActorRef shardManager = actorFactory
- .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+ .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
+ .withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
new JavaTestKit(getSystem()) {
{
ActorRef shardManager = actorFactory
- .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+ .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
+ .withDispatcher(Dispatchers.DefaultDispatcherId()));
Shard.Builder shardBuilder = Shard.builder();
.put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
.put("astronauts", Collections.<String>emptyList()).build());
- TestActorRef<TestShardManager> shardManager = actorFactory
- .createTestActor(newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
+ TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)
+ .withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
Failure failure = kit.expectMsgClass(Failure.class);
assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
- byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
- assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
- ShardManagerSnapshot snapshot = SerializationUtils.deserialize(snapshotBytes);
+ ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
+ assertNotNull("Expected ShardManagerSnapshot", snapshot);
assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
Sets.newHashSet(snapshot.getShardList()));
.put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
.put("astronauts", Collections.<String>emptyList()).build());
- ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
- DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix,
- SerializationUtils.serialize(snapshot), Collections.<ShardSnapshot>emptyList());
+ ShardManagerSnapshot snapshot =
+ new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap());
+ DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
+ Collections.<ShardSnapshot>emptyList());
TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
.restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
- byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
- assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
- snapshot = SerializationUtils.deserialize(snapshotBytes);
+ assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
- Sets.newHashSet(snapshot.getShardList()));
+ Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList()));
LOG.info("testRestoreFromSnapshot ending");
}
new JavaTestKit(getSystem()) {
{
ActorRef shardManager = actorFactory
- .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+ .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
+ .withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.tell(new AddShardReplica("model-inventory"), getRef());
Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
// Have a dummy snapshot to be overwritten by the new data
// persisted.
String[] restoredShards = { "default", "people" };
- ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
+ ShardManagerSnapshot snapshot =
+ new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
- newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props(), shardMgrID);
+ newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
shardManager.underlyingActor()
.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
.put("astronauts", Arrays.asList("member-2")).build());
final ActorRef newReplicaShardManager = actorFactory
- .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props(), shardMgrID);
+ .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
new JavaTestKit(getSystem()) {
{
ActorRef shardManager = actorFactory
- .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+ .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
+ .withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), getRef());
Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class);
{
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
- final TestActorRef<MockRespondActor> respondActor = actorFactory
- .createTestActor(Props.create(MockRespondActor.class, RemoveServer.class,
- new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
+ final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class,
+ RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
.put("astronauts", Arrays.asList("member-2"))
.put("people", Arrays.asList("member-1", "member-2")).build());
- TestActorRef<TestShardManager> shardManager = actorFactory
- .createTestActor(newShardMgrProps(mockConfig));
+ TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
+ newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.underlyingActor().waitForRecoveryComplete();
shardManager.tell(new FindLocalShard("people", false), getRef());
.put("people", Arrays.asList("member-1", "member-2")).build());
String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
- TestActorRef<MessageCollectorActor> shard = actorFactory.createTestActor(MessageCollectorActor.props(),
- shardId);
+ ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId);
TestActorRef<TestShardManager> shardManager = actorFactory
- .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props());
+ .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.underlyingActor().waitForRecoveryComplete();
LOG.info("testShardPersistenceWithRestoredData starting");
new JavaTestKit(getSystem()) {
{
- MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
- .put("default", Arrays.asList("member-1", "member-2"))
- .put("astronauts", Arrays.asList("member-2"))
- .put("people", Arrays.asList("member-1", "member-2")).build());
- String[] restoredShards = { "default", "astronauts" };
- ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
+ MockConfiguration mockConfig =
+ new MockConfiguration(ImmutableMap.<String, List<String>>builder()
+ .put("default", Arrays.asList("member-1", "member-2"))
+ .put("astronauts", Arrays.asList("member-2"))
+ .put("people", Arrays.asList("member-1", "member-2")).build());
+ String[] restoredShards = {"default", "astronauts"};
+ ShardManagerSnapshot snapshot =
+ new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
// create shardManager to come up with restored data
- TestActorRef<TestShardManager> newRestoredShardManager = actorFactory
- .createTestActor(newShardMgrProps(mockConfig));
+ TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
+ newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
.put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
- TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
- MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1);
+ ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1);
String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
- TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
- MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2);
+ ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2);
- TestActorRef<TestShardManager> shardManager = actorFactory
- .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("shard1", shard1)
- .addShardActor("shard2", shard2).props()
- .withDispatcher(Dispatchers.DefaultDispatcherId()));
+ ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig)
+ .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), shard1);
{
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
- TestActorRef<MockRespondActor> respondActor = actorFactory
- .createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
+ ActorRef respondActor = actorFactory
+ .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
{
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
- TestActorRef<MockRespondActor> respondActor = actorFactory
- .createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
+ ActorRef respondActor = actorFactory
+ .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));