From 357eb2684de759f296d1315e587308db5f1c54d4 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 3 Nov 2015 01:54:46 -0500 Subject: [PATCH] Bug 4564: Implement GetSnapshot message in ShardManager Added a serializable DatastoreSnapshot class that stores the serialized snapshot for each shard. On GetSnapshot, the ShardManager sends a GetSnapshot message to each shard and creates a ShardManagerGetSnapshotReplyActor to compile the replies and return a DatastoreSnapshot instance to the caller. Change-Id: I11f872aa701f1e51de9cbccdc1a372a76bc45cff Signed-off-by: Tom Pantelis --- .../cluster/datastore/ShardManager.java | 38 +++++- .../ShardManagerGetSnapshotReplyActor.java | 108 ++++++++++++++++ .../datastore/messages/DatastoreSnapshot.java | 71 +++++++++++ ...ShardManagerGetSnapshotReplyActorTest.java | 118 ++++++++++++++++++ .../cluster/datastore/ShardManagerTest.java | 46 +++++++ 5 files changed, 379 insertions(+), 2 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActor.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshot.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActorTest.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 4f3d4aa7f9..0804a50e9b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -75,6 +75,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; +import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; @@ -211,12 +212,44 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onAddShardReplica((AddShardReplica)message); } else if(message instanceof RemoveShardReplica){ onRemoveShardReplica((RemoveShardReplica)message); + } else if(message instanceof GetSnapshot) { + onGetSnapshot(); } else { unknownMessage(message); } } + private void onGetSnapshot() { + LOG.debug("{}: onGetSnapshot", persistenceId()); + + List notInitialized = null; + for(ShardInformation shardInfo: localShards.values()) { + if(!shardInfo.isShardInitialized()) { + if(notInitialized == null) { + notInitialized = new ArrayList<>(); + } + + notInitialized.add(shardInfo.getShardName()); + } + } + + if(notInitialized != null) { + getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format( + "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf()); + return; + } + + byte[] shardManagerSnapshot = null; + ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(localShards.size(), + type, shardManagerSnapshot , getSender(), persistenceId(), + datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration())); + + for(ShardInformation shardInfo: localShards.values()) { + shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor); + } + } + private void onCreateShard(CreateShard createShard) { Object reply; try { @@ -448,8 +481,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardInformation.addOnShardInitialized(onShardInitialized); - LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName()); - FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration(); if(shardInformation.isShardInitialized()) { // If the shard is already initialized then we'll wait enough time for the shard to @@ -458,6 +489,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS); } + LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(), + shardInformation.getShardName()); + Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce( timeout, getSelf(), new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActor.java new file mode 100644 index 0000000000..f77b3cbd7b --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActor.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.actor.ReceiveTimeout; +import akka.actor.Status.Failure; +import akka.actor.UntypedActor; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot; +import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.Duration; + +/** + * Temporary actor used by the ShardManager to compile GetSnapshot replies from the Shard actors and return + * a DatastoreSnapshot instance reply. + * + * @author Thomas Pantelis + */ +class ShardManagerGetSnapshotReplyActor extends UntypedActor { + private static final Logger LOG = LoggerFactory.getLogger(ShardManagerGetSnapshotReplyActor.class); + + private int repliesReceived; + private final Params params; + private final List shardSnapshots = new ArrayList<>(); + + private ShardManagerGetSnapshotReplyActor(Params params) { + this.params = params; + + LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.totalShardCount); + + getContext().setReceiveTimeout(params.receiveTimeout); + } + + @Override + public void onReceive(Object message) { + if(message instanceof GetSnapshotReply) { + onGetSnapshotReply((GetSnapshotReply)message); + } else if(message instanceof Failure) { + LOG.debug("{}: Received {}", params.id, message); + + params.replyToActor.tell(message, getSelf()); + getSelf().tell(PoisonPill.getInstance(), getSelf()); + } else if (message instanceof ReceiveTimeout) { + LOG.warn("{}: Got ReceiveTimeout for inactivity - expected {} GetSnapshotReply messages within {} ms, received {}", + params.id, params.totalShardCount, params.receiveTimeout.toMillis(), repliesReceived); + + params.replyToActor.tell(new Failure(new TimeoutException(String.format( + "Timed out after %s ms while waiting for snapshot replies from %d shards. Actual replies received was %s", + params.receiveTimeout.toMillis(), params.totalShardCount, repliesReceived))), getSelf()); + getSelf().tell(PoisonPill.getInstance(), getSelf()); + } + } + + private void onGetSnapshotReply(GetSnapshotReply getSnapshotReply) { + LOG.debug("{}: Received {}", params.id, getSnapshotReply); + + ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(getSnapshotReply.getId()).build(); + shardSnapshots.add(new ShardSnapshot(shardId.getShardName(), getSnapshotReply.getSnapshot())); + + if(++repliesReceived == params.totalShardCount) { + LOG.debug("{}: All shard snapshots received", params.id); + + DatastoreSnapshot datastoreSnapshot = new DatastoreSnapshot(params.datastoreType, params.shardManagerSnapshot, + shardSnapshots); + params.replyToActor.tell(datastoreSnapshot, getSelf()); + getSelf().tell(PoisonPill.getInstance(), getSelf()); + } + } + + public static Props props(int totalShardCount, String datastoreType, byte[] shardManagerSnapshot, + ActorRef replyToActor, String id, Duration receiveTimeout) { + return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(totalShardCount, datastoreType, + shardManagerSnapshot, replyToActor, id, receiveTimeout)); + } + + private static final class Params { + final int totalShardCount; + final String datastoreType; + final byte[] shardManagerSnapshot; + final ActorRef replyToActor; + final String id; + final Duration receiveTimeout; + + Params(int totalShardCount, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor, + String id, Duration receiveTimeout) { + this.totalShardCount = totalShardCount; + this.datastoreType = datastoreType; + this.shardManagerSnapshot = shardManagerSnapshot; + this.replyToActor = replyToActor; + this.id = id; + this.receiveTimeout = receiveTimeout; + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshot.java new file mode 100644 index 0000000000..54c0d7465e --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshot.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import com.google.common.base.Preconditions; +import java.io.Serializable; +import java.util.List; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Stores a snapshot of the internal state of a data store. + * + * @author Thomas Pantelis + */ +public class DatastoreSnapshot implements Serializable { + private static final long serialVersionUID = 1L; + + private final String type; + private final byte[] shardManagerSnapshot; + private final List shardSnapshots; + + public DatastoreSnapshot(@Nonnull String type, @Nullable byte[] shardManagerSnapshot, + @Nonnull List shardSnapshots) { + this.type = Preconditions.checkNotNull(type); + this.shardManagerSnapshot = shardManagerSnapshot; + this.shardSnapshots = Preconditions.checkNotNull(shardSnapshots); + } + + @Nonnull + public String getType() { + return type; + } + + @Nullable + public byte[] getShardManagerSnapshot() { + return shardManagerSnapshot; + } + + @Nonnull + public List getShardSnapshots() { + return shardSnapshots; + } + + public static class ShardSnapshot implements Serializable { + private static final long serialVersionUID = 1L; + + private final String name; + private final byte[] snapshot; + + public ShardSnapshot(@Nonnull String name, @Nonnull byte[] snapshot) { + this.name = Preconditions.checkNotNull(name); + this.snapshot = Preconditions.checkNotNull(snapshot); + } + + @Nonnull + public String getName() { + return name; + } + + @Nonnull + public byte[] getSnapshot() { + return snapshot; + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActorTest.java new file mode 100644 index 0000000000..3052397c4f --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActorTest.java @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import akka.actor.ActorRef; +import akka.actor.Status.Failure; +import akka.actor.Terminated; +import akka.testkit.JavaTestKit; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.After; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot; +import org.opendaylight.controller.cluster.raft.TestActorFactory; +import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +/** + * Unit tests for ShardManagerGetSnapshotReplyActor. + * + * @author Thomas Pantelis + */ +public class ShardManagerGetSnapshotReplyActorTest extends AbstractActorTest { + private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); + + @After + public void tearDown() { + actorFactory.close(); + } + + @Test + public void testSuccess() { + JavaTestKit kit = new JavaTestKit(getSystem()); + + byte[] shardManagerSnapshot = new byte[]{0,5,9}; + ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(3, "config", + shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.SECONDS)), + actorFactory.generateActorId("actor")); + + kit.watch(replyActor); + + byte[] shard1Snapshot = new byte[]{1,2,3}; + replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName("member-1").type("config"). + shardName("shard1").build().toString(), shard1Snapshot), ActorRef.noSender()); + + byte[] shard2Snapshot = new byte[]{4,5,6}; + replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName("member-1").type("config"). + shardName("shard2").build().toString(), shard2Snapshot), ActorRef.noSender()); + + kit.expectNoMsg(FiniteDuration.create(500, TimeUnit.MILLISECONDS)); + + byte[] shard3Snapshot = new byte[]{7,8,9}; + replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName("member-1").type("config"). + shardName("shard3").build().toString(), shard3Snapshot), ActorRef.noSender()); + + DatastoreSnapshot datastoreSnapshot = kit.expectMsgClass(DatastoreSnapshot.class); + + assertEquals("getType", "config", datastoreSnapshot.getType()); + assertArrayEquals("getShardManagerSnapshot", shardManagerSnapshot, datastoreSnapshot.getShardManagerSnapshot()); + List shardSnapshots = datastoreSnapshot.getShardSnapshots(); + assertEquals("ShardSnapshot size", 3, shardSnapshots.size()); + assertEquals("ShardSnapshot 1 getName", "shard1", shardSnapshots.get(0).getName()); + assertArrayEquals("ShardSnapshot 1 getSnapshot", shard1Snapshot, shardSnapshots.get(0).getSnapshot()); + assertEquals("ShardSnapshot 2 getName", "shard2", shardSnapshots.get(1).getName()); + assertArrayEquals("ShardSnapshot 2 getSnapshot", shard2Snapshot, shardSnapshots.get(1).getSnapshot()); + assertEquals("ShardSnapshot 3 getName", "shard3", shardSnapshots.get(2).getName()); + assertArrayEquals("ShardSnapshot 3 getSnapshot", shard3Snapshot, shardSnapshots.get(2).getSnapshot()); + + kit.expectMsgClass(Terminated.class); + } + + @Test + public void testGetSnapshotFailureReply() { + JavaTestKit kit = new JavaTestKit(getSystem()); + + byte[] shardManagerSnapshot = new byte[]{0,5,9}; + ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(2, "config", + shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.SECONDS)), + actorFactory.generateActorId("actor")); + + kit.watch(replyActor); + + replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName("member-1").type("config"). + shardName("shard1").build().toString(), new byte[]{1,2,3}), ActorRef.noSender()); + + replyActor.tell(new Failure(new RuntimeException()), ActorRef.noSender()); + + kit.expectMsgClass(Failure.class); + kit.expectTerminated(replyActor); + } + + @Test + public void testGetSnapshotTimeout() { + JavaTestKit kit = new JavaTestKit(getSystem()); + + byte[] shardManagerSnapshot = new byte[]{0,5,9}; + ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(1, "config", + shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.MILLISECONDS)), + actorFactory.generateActorId("actor")); + + kit.watch(replyActor); + + Failure failure = kit.expectMsgClass(Failure.class); + assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass()); + kit.expectTerminated(replyActor); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 6f3946d422..f164446897 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -23,6 +23,7 @@ import akka.actor.AddressFromURIString; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; +import akka.actor.Status.Failure; import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.dispatch.Dispatchers; @@ -44,9 +45,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.After; @@ -68,6 +71,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; @@ -90,6 +95,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; +import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; @@ -1136,6 +1142,46 @@ public class ShardManagerTest extends AbstractActorTest { }}; } + @Test + public void testGetSnapshot() throws Throwable { + JavaTestKit kit = new JavaTestKit(getSystem()); + + MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). + put("shard1", Arrays.asList("member-1")). + put("shard2", Arrays.asList("member-1")).build()); + + ActorRef shardManager = getSystem().actorOf(newShardMgrProps(mockConfig).withDispatcher( + Dispatchers.DefaultDispatcherId())); + + shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); + Failure failure = kit.expectMsgClass(Failure.class); + assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass()); + + kit = new JavaTestKit(getSystem()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender()); + + shardManager.tell(new FindLocalShard("shard1", true), kit.getRef()); + kit.expectMsgClass(LocalShardFound.class); + shardManager.tell(new FindLocalShard("shard2", true), kit.getRef()); + kit.expectMsgClass(LocalShardFound.class); + + shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); + + DatastoreSnapshot datastoreSnapshot = kit.expectMsgClass(DatastoreSnapshot.class); + + assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType()); + List shardSnapshots = datastoreSnapshot.getShardSnapshots(); + Set actualShardNames = new HashSet<>(); + for(ShardSnapshot s: shardSnapshots) { + actualShardNames.add(s.getName()); + } + + assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), actualShardNames); + + shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + @Test public void testAddShardReplicaForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ -- 2.36.6