From 86a6e60ff59f09684da41067d37457dea1b8f162 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Thu, 19 Nov 2015 16:18:31 -0500 Subject: [PATCH] Bug 4564: Implement backup/restore of ShardManagerSnapshot Change-Id: I939318ac0414dd43c182399bb6dd6f72180f50b9 Signed-off-by: Tom Pantelis --- .../cluster/datastore/ShardManager.java | 61 +++-- .../cluster/datastore/ShardManagerTest.java | 222 ++++++++++++++++-- 2 files changed, 247 insertions(+), 36 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index e33d7cdce6..ce473bbc49 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -33,6 +33,8 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.Sets; +import java.io.ByteArrayInputStream; +import java.io.ObjectInputStream; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -48,6 +50,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; @@ -139,7 +142,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private DatastoreSnapshot restoreFromSnapshot; - private ShardManagerSnapshot recoveredSnapshot; + private ShardManagerSnapshot currentSnapshot; private final Set shardReplicaOperationsInProgress = new HashSet<>(); @@ -285,6 +288,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } byte[] shardManagerSnapshot = null; + if(currentSnapshot != null) { + shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot); + } + ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props( new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(), datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration())); @@ -333,8 +340,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); - boolean shardWasInRecoveredSnapshot = recoveredSnapshot != null && - recoveredSnapshot.getShardList().contains(shardName); + boolean shardWasInRecoveredSnapshot = currentSnapshot != null && + currentSnapshot.getShardList().contains(shardName); Map peerAddresses; boolean isActiveMember; @@ -510,15 +517,34 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override protected void handleRecover(Object message) throws Exception { if (message instanceof RecoveryCompleted) { - LOG.info("Recovery complete : {}", persistenceId()); - - // We no longer persist SchemaContext modules so delete all the prior messages from the akka - // journal on upgrade from Helium. - deleteMessages(lastSequenceNr()); - createLocalShards(); + onRecoveryCompleted(); } else if (message instanceof SnapshotOffer) { - onSnapshotOffer((SnapshotOffer) message); + applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot()); + } + } + + private void onRecoveryCompleted() { + LOG.info("Recovery complete : {}", persistenceId()); + + // We no longer persist SchemaContext modules so delete all the prior messages from the akka + // journal on upgrade from Helium. + deleteMessages(lastSequenceNr()); + + if(currentSnapshot == null && restoreFromSnapshot != null && + restoreFromSnapshot.getShardManagerSnapshot() != null) { + try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream( + restoreFromSnapshot.getShardManagerSnapshot()))) { + ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject(); + + LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot); + + applyShardManagerSnapshot(snapshot); + } catch(Exception e) { + LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e); + } } + + createLocalShards(); } private void findLocalShard(FindLocalShard message) { @@ -1067,18 +1093,23 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList); - saveSnapshot(new ShardManagerSnapshot(shardList)); + saveSnapshot(updateShardManagerSnapshot(shardList)); + } + + private ShardManagerSnapshot updateShardManagerSnapshot(List shardList) { + currentSnapshot = new ShardManagerSnapshot(shardList); + return currentSnapshot; } - private void onSnapshotOffer(SnapshotOffer offer) { - recoveredSnapshot = (ShardManagerSnapshot)offer.snapshot(); + private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) { + currentSnapshot = snapshot; - LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), recoveredSnapshot); + LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot); String currentMember = cluster.getCurrentMemberName(); Set configuredShardList = new HashSet<>(configuration.getMemberShardNames(currentMember)); - for (String shard : recoveredSnapshot.getShardList()) { + for (String shard : currentSnapshot.getShardList()) { if (!configuredShardList.contains(shard)) { // add the current member as a replica for the shard LOG.debug ("{}: adding shard {}", persistenceId(), shard); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 74746ebb0a..2525ba78b9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -37,6 +37,7 @@ import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -57,6 +58,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.commons.lang3.SerializationUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -113,11 +115,15 @@ import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; public class ShardManagerTest extends AbstractActorTest { + private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class); + private static int ID_COUNTER = 1; private final String shardMrgIDSuffix = "config" + ID_COUNTER++; @@ -222,8 +228,36 @@ public class ShardManagerTest extends AbstractActorTest { return shardManager; } + private void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) { + AssertionError last = null; + Stopwatch sw = Stopwatch.createStarted(); + while(sw.elapsed(TimeUnit.SECONDS) <= 5) { + try { + shardManager.tell(new FindLocalShard(shardName, true), kit.getRef()); + kit.expectMsgClass(LocalShardFound.class); + return; + } catch(AssertionError e) { + last = e; + } + + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + throw last; + } + + private T expectMsgClassOrFailure(Class msgClass, JavaTestKit kit, String msg) { + Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class); + if(reply instanceof Failure) { + throw new AssertionError(msg + " failed", ((Failure)reply).cause()); + } + + return (T)reply; + } + @Test public void testPerShardDatastoreContext() throws Exception { + LOG.info("testPerShardDatastoreContext starting"); final DatastoreContextFactory mockFactory = newDatastoreContextFactory( datastoreContextBuilder.shardElectionTimeoutFactor(5).build()); @@ -314,6 +348,8 @@ public class ShardManagerTest extends AbstractActorTest { newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class); assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor()); + + LOG.info("testPerShardDatastoreContext ending"); } @Test @@ -331,6 +367,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception { + LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting"); new JavaTestKit(getSystem()) {{ String memberId = "member-1-shard-default-" + shardMrgIDSuffix; @@ -354,10 +391,13 @@ public class ShardManagerTest extends AbstractActorTest { primaryFound.getPrimaryPath().contains("member-1-shard-default")); assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree()); }}; + + LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending"); } @Test public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception { + LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting"); new JavaTestKit(getSystem()) {{ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); @@ -374,10 +414,13 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); }}; + + LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending"); } @Test public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception { + LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting"); new JavaTestKit(getSystem()) {{ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); @@ -401,6 +444,8 @@ public class ShardManagerTest extends AbstractActorTest { primaryFound.getPrimaryPath().contains("member-2-shard-default")); assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion()); }}; + + LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending"); } @Test @@ -430,6 +475,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception { + LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting"); new JavaTestKit(getSystem()) {{ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); @@ -455,10 +501,13 @@ public class ShardManagerTest extends AbstractActorTest { primaryFound.getPrimaryPath().contains("member-1-shard-default")); assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree()); }}; + + LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting"); } @Test public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception { + LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting"); new JavaTestKit(getSystem()) {{ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); @@ -491,10 +540,13 @@ public class ShardManagerTest extends AbstractActorTest { expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); }}; + + LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending"); } @Test public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception { + LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting"); new JavaTestKit(getSystem()) {{ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); @@ -508,10 +560,13 @@ public class ShardManagerTest extends AbstractActorTest { expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); }}; + + LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending"); } @Test public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception { + LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting"); new JavaTestKit(getSystem()) {{ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); @@ -524,10 +579,13 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; + + LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending"); } @Test public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception { + LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting"); new JavaTestKit(getSystem()) {{ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); @@ -540,10 +598,13 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; + + LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending"); } @Test public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception { + LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting"); new JavaTestKit(getSystem()) {{ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); @@ -554,10 +615,13 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; + + LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending"); } @Test public void testOnReceiveFindPrimaryForRemoteShard() throws Exception { + LOG.info("testOnReceiveFindPrimaryForRemoteShard starting"); String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); // Create an ActorSystem ShardManager actor for member-1. @@ -618,10 +682,13 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class); }}; + + LOG.info("testOnReceiveFindPrimaryForRemoteShard ending"); } @Test public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception { + LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting"); String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); // Create an ActorSystem ShardManager actor for member-1. @@ -715,10 +782,13 @@ public class ShardManagerTest extends AbstractActorTest { MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class); }}; + + LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending"); } @Test public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception { + LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting"); String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); // Create an ActorSystem ShardManager actor for member-1. @@ -800,6 +870,8 @@ public class ShardManagerTest extends AbstractActorTest { assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config")); }}; + + LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending"); } @@ -848,6 +920,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception { + LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting"); new JavaTestKit(getSystem()) {{ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); @@ -863,6 +936,8 @@ public class ShardManagerTest extends AbstractActorTest { Object resp = Await.result(future, duration("5 seconds")); assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound); }}; + + LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting"); } @Test @@ -1011,6 +1086,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{ + LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting"); TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() { @Override public List getMemberShardNames(String memberName) { @@ -1050,6 +1126,7 @@ public class ShardManagerTest extends AbstractActorTest { // Sync status is now true assertEquals(true, shardManager.getMBean().getSyncStatus()); + LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending"); } @Test @@ -1071,6 +1148,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnCreateShard() { + LOG.info("testOnCreateShard starting"); new JavaTestKit(getSystem()) {{ datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); @@ -1111,10 +1189,13 @@ public class ShardManagerTest extends AbstractActorTest { Success success = expectMsgClass(duration("5 seconds"), Success.class); assertNotNull("Success status is null", success.status()); }}; + + LOG.info("testOnCreateShard ending"); } @Test public void testOnCreateShardWithLocalMemberNotInShardConfig() { + LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting"); new JavaTestKit(getSystem()) {{ datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); @@ -1137,10 +1218,13 @@ public class ShardManagerTest extends AbstractActorTest { assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder.getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass()); }}; + + LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending"); } @Test public void testOnCreateShardWithNoInitialSchemaContext() { + LOG.info("testOnCreateShardWithNoInitialSchemaContext starting"); new JavaTestKit(getSystem()) {{ ActorRef shardManager = actorFactory.createActor(newShardMgrProps( new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); @@ -1163,35 +1247,35 @@ public class ShardManagerTest extends AbstractActorTest { assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext()); assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext()); }}; + + LOG.info("testOnCreateShardWithNoInitialSchemaContext ending"); } @Test public void testGetSnapshot() throws Throwable { + LOG.info("testGetSnapshot starting"); JavaTestKit kit = new JavaTestKit(getSystem()); MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). put("shard1", Arrays.asList("member-1")). - put("shard2", Arrays.asList("member-1")).build()); + put("shard2", Arrays.asList("member-1")). + put("astronauts", Collections.emptyList()).build()); - ActorRef shardManager = actorFactory.createActor(newShardMgrProps(mockConfig).withDispatcher( - Dispatchers.DefaultDispatcherId())); + TestActorRef shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig). + withDispatcher(Dispatchers.DefaultDispatcherId())); shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); Failure failure = kit.expectMsgClass(Failure.class); assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass()); - kit = new JavaTestKit(getSystem()); - shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender()); - shardManager.tell(new FindLocalShard("shard1", true), kit.getRef()); - kit.expectMsgClass(LocalShardFound.class); - shardManager.tell(new FindLocalShard("shard2", true), kit.getRef()); - kit.expectMsgClass(LocalShardFound.class); + waitForShardInitialized(shardManager, "shard1", kit); + waitForShardInitialized(shardManager, "shard2", kit); shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); - DatastoreSnapshot datastoreSnapshot = kit.expectMsgClass(DatastoreSnapshot.class); + DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot"); assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType()); assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot()); @@ -1205,6 +1289,76 @@ public class ShardManagerTest extends AbstractActorTest { assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet( Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer))); + + // Add a new replica + + JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem()); + + TestShardManager shardManagerInstance = shardManager.underlyingActor(); + shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); + + shardManager.tell(new AddShardReplica("astronauts"), kit.getRef()); + mockShardLeaderKit.expectMsgClass(AddServer.class); + mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, "")); + kit.expectMsgClass(Status.Success.class); + waitForShardInitialized(shardManager, "astronauts", kit); + + // Send another GetSnapshot and verify + + shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); + datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot"); + + assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet( + Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer))); + + byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot(); + assertNotNull("Expected ShardManagerSnapshot", snapshotBytes); + ShardManagerSnapshot snapshot = SerializationUtils.deserialize(snapshotBytes); + assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), + Sets.newHashSet(snapshot.getShardList())); + + LOG.info("testGetSnapshot ending"); + } + + @Test + public void testRestoreFromSnapshot() throws Throwable { + LOG.info("testRestoreFromSnapshot starting"); + + JavaTestKit kit = new JavaTestKit(getSystem()); + + MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). + put("shard1", Collections.emptyList()). + put("shard2", Collections.emptyList()). + put("astronauts", Collections.emptyList()).build()); + + + ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts")); + DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, + SerializationUtils.serialize(snapshot), Collections.emptyList()); + TestActorRef shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig). + restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId())); + + shardManager.underlyingActor().waitForRecoveryComplete(); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender()); + + waitForShardInitialized(shardManager, "shard1", kit); + waitForShardInitialized(shardManager, "shard2", kit); + waitForShardInitialized(shardManager, "astronauts", kit); + + shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); + + DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot"); + + assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType()); + + byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot(); + assertNotNull("Expected ShardManagerSnapshot", snapshotBytes); + snapshot = SerializationUtils.deserialize(snapshotBytes); + assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), + Sets.newHashSet(snapshot.getShardList())); + + LOG.info("testRestoreFromSnapshot ending"); } @Test @@ -1223,6 +1377,8 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testAddShardReplica() throws Exception { + LOG.info("testAddShardReplica starting"); + MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). put("default", Arrays.asList("member-1", "member-2")). @@ -1278,10 +1434,13 @@ public class ShardManagerTest extends AbstractActorTest { .verifySnapshotPersisted(Sets.newHashSet("default", "astronauts")); expectMsgClass(duration("5 seconds"), Status.Success.class); }}; + + LOG.info("testAddShardReplica ending"); } @Test public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception { + LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting"); new JavaTestKit(getSystem()) {{ TestActorRef shardManager = actorFactory.createTestActor( newPropsShardMgrWithMockShardActor(), shardMgrID); @@ -1329,10 +1488,13 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); expectMsgClass(duration("5 seconds"), LocalShardFound.class); }}; + + LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending"); } @Test public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception { + LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting"); new JavaTestKit(getSystem()) {{ String memberId = "member-1-shard-default-" + shardMrgIDSuffix; ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); @@ -1351,10 +1513,13 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); expectMsgClass(duration("5 seconds"), LocalShardFound.class); }}; + + LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending"); } @Test public void testAddShardReplicaWithAddServerReplyFailure() throws Exception { + LOG.info("testAddShardReplicaWithAddServerReplyFailure starting"); new JavaTestKit(getSystem()) {{ JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem()); @@ -1393,10 +1558,13 @@ public class ShardManagerTest extends AbstractActorTest { failure = expectMsgClass(duration("5 seconds"), Failure.class); assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass()); }}; + + LOG.info("testAddShardReplicaWithAddServerReplyFailure ending"); } @Test public void testAddShardReplicaWithAlreadyInProgress() throws Exception { + LOG.info("testAddShardReplicaWithAlreadyInProgress starting"); new JavaTestKit(getSystem()) {{ JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem()); JavaTestKit secondRequestKit = new JavaTestKit(getSystem()); @@ -1419,10 +1587,13 @@ public class ShardManagerTest extends AbstractActorTest { secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class); }}; + + LOG.info("testAddShardReplicaWithAlreadyInProgress ending"); } @Test public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception { + LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting"); new JavaTestKit(getSystem()) {{ MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). put("astronauts", Arrays.asList("member-2")).build()); @@ -1438,6 +1609,8 @@ public class ShardManagerTest extends AbstractActorTest { assertEquals("Failure obtained", true, (resp.cause() instanceof RuntimeException)); }}; + + LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending"); } @Test @@ -1456,6 +1629,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testServerRemovedShardActorNotRunning() throws Exception { + LOG.info("testServerRemovedShardActorNotRunning starting"); new JavaTestKit(getSystem()) {{ MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). @@ -1466,7 +1640,6 @@ public class ShardManagerTest extends AbstractActorTest { TestActorRef shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)); shardManager.underlyingActor().waitForRecoveryComplete(); - shardManager.tell(new FindLocalShard("people", false), getRef()); expectMsgClass(duration("5 seconds"), NotInitializedException.class); @@ -1475,15 +1648,18 @@ public class ShardManagerTest extends AbstractActorTest { // Removed the default shard replica from member-1 ShardIdentifier.Builder builder = new ShardIdentifier.Builder(); - final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build(); + ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type(shardMrgIDSuffix).build(); shardManager.tell(new ServerRemoved(shardId.toString()), getRef()); shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people")); }}; + + LOG.info("testServerRemovedShardActorNotRunning ending"); } @Test public void testServerRemovedShardActorRunning() throws Exception { + LOG.info("testServerRemovedShardActorRunning starting"); new JavaTestKit(getSystem()) {{ MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). @@ -1491,7 +1667,10 @@ public class ShardManagerTest extends AbstractActorTest { put("astronauts", Arrays.asList("member-2")). put("people", Arrays.asList("member-1", "member-2")).build()); - TestActorRef shard = actorFactory.createTestActor(MessageCollectorActor.props()); + String shardId = ShardIdentifier.builder().shardName("default").memberName("member-1"). + type(shardMrgIDSuffix).build().toString(); + TestActorRef shard = actorFactory.createTestActor( + MessageCollectorActor.props(), shardId); TestActorRef shardManager = actorFactory.createTestActor( newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()); @@ -1501,27 +1680,26 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.underlyingActor().waitForRecoveryComplete(); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), shard); - shardManager.tell(new FindLocalShard("people", false), getRef()); - expectMsgClass(duration("5 seconds"), NotInitializedException.class); - - shardManager.tell(new FindLocalShard("default", false), getRef()); - expectMsgClass(duration("5 seconds"), NotInitializedException.class); + waitForShardInitialized(shardManager, "people", this); + waitForShardInitialized(shardManager, "default", this); // Removed the default shard replica from member-1 - ShardIdentifier.Builder builder = new ShardIdentifier.Builder(); - final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build(); - shardManager.tell(new ServerRemoved(shardId.toString()), getRef()); + shardManager.tell(new ServerRemoved(shardId), getRef()); shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people")); expectMsgClass(duration("5 seconds"), Terminated.class); }}; + + LOG.info("testServerRemovedShardActorRunning ending"); } @Test public void testShardPersistenceWithRestoredData() throws Exception { + LOG.info("testShardPersistenceWithRestoredData starting"); new JavaTestKit(getSystem()) {{ MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). @@ -1551,6 +1729,8 @@ public class ShardManagerTest extends AbstractActorTest { newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef()); expectMsgClass(duration("5 seconds"), NotInitializedException.class); }}; + + LOG.info("testShardPersistenceWithRestoredData ending"); } -- 2.36.6