Bug 4564: Implement GetSnapshot message in ShardManager 73/29173/3
authorTom Pantelis <tpanteli@brocade.com>
Tue, 3 Nov 2015 06:54:46 +0000 (01:54 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 9 Nov 2015 03:36:04 +0000 (22:36 -0500)
Added a serializable DatastoreSnapshot class that stores the serialized
snapshot for each shard.

On GetSnapshot, the ShardManager sends a GetSnapshot message to each
shard and creates a ShardManagerGetSnapshotReplyActor to compile the
replies and return a DatastoreSnapshot instance to the caller.

Change-Id: I11f872aa701f1e51de9cbccdc1a372a76bc45cff
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActorTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java

index 4f3d4aa7f931b1af11e1198de001d02831ac6d63..0804a50e9b1f2b6d5587742e83b8efa2218ddd30 100644 (file)
@@ -75,6 +75,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
@@ -211,12 +212,44 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onAddShardReplica((AddShardReplica)message);
         } else if(message instanceof RemoveShardReplica){
             onRemoveShardReplica((RemoveShardReplica)message);
+        } else if(message instanceof GetSnapshot) {
+            onGetSnapshot();
         } else {
             unknownMessage(message);
         }
 
     }
 
+    private void onGetSnapshot() {
+        LOG.debug("{}: onGetSnapshot", persistenceId());
+
+        List<String> notInitialized = null;
+        for(ShardInformation shardInfo: localShards.values()) {
+            if(!shardInfo.isShardInitialized()) {
+                if(notInitialized == null) {
+                    notInitialized = new ArrayList<>();
+                }
+
+                notInitialized.add(shardInfo.getShardName());
+            }
+        }
+
+        if(notInitialized != null) {
+            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
+                    "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
+            return;
+        }
+
+        byte[] shardManagerSnapshot = null;
+        ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(localShards.size(),
+                type, shardManagerSnapshot , getSender(), persistenceId(),
+                datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
+
+        for(ShardInformation shardInfo: localShards.values()) {
+            shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
+        }
+    }
+
     private void onCreateShard(CreateShard createShard) {
         Object reply;
         try {
@@ -448,8 +481,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 shardInformation.addOnShardInitialized(onShardInitialized);
 
-                LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
-
                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
                 if(shardInformation.isShardInitialized()) {
                     // If the shard is already initialized then we'll wait enough time for the shard to
@@ -458,6 +489,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
                 }
 
+                LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
+                        shardInformation.getShardName());
+
                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
                         timeout, getSelf(),
                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActor.java
new file mode 100644 (file)
index 0000000..f77b3cb
--- /dev/null
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.actor.ReceiveTimeout;
+import akka.actor.Status.Failure;
+import akka.actor.UntypedActor;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+/**
+ * Temporary actor used by the ShardManager to compile GetSnapshot replies from the Shard actors and return
+ * a DatastoreSnapshot instance reply.
+ *
+ * @author Thomas Pantelis
+ */
+class ShardManagerGetSnapshotReplyActor extends UntypedActor {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardManagerGetSnapshotReplyActor.class);
+
+    private int repliesReceived;
+    private final Params params;
+    private final List<ShardSnapshot> shardSnapshots = new ArrayList<>();
+
+    private ShardManagerGetSnapshotReplyActor(Params params) {
+        this.params = params;
+
+        LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.totalShardCount);
+
+        getContext().setReceiveTimeout(params.receiveTimeout);
+    }
+
+    @Override
+    public void onReceive(Object message) {
+        if(message instanceof GetSnapshotReply) {
+            onGetSnapshotReply((GetSnapshotReply)message);
+        } else if(message instanceof Failure) {
+            LOG.debug("{}: Received {}", params.id, message);
+
+            params.replyToActor.tell(message, getSelf());
+            getSelf().tell(PoisonPill.getInstance(), getSelf());
+        } else if (message instanceof ReceiveTimeout) {
+            LOG.warn("{}: Got ReceiveTimeout for inactivity - expected {} GetSnapshotReply messages within {} ms, received {}",
+                    params.id, params.totalShardCount, params.receiveTimeout.toMillis(), repliesReceived);
+
+            params.replyToActor.tell(new Failure(new TimeoutException(String.format(
+                    "Timed out after %s ms while waiting for snapshot replies from %d shards. Actual replies received was %s",
+                        params.receiveTimeout.toMillis(), params.totalShardCount, repliesReceived))), getSelf());
+            getSelf().tell(PoisonPill.getInstance(), getSelf());
+        }
+    }
+
+    private void onGetSnapshotReply(GetSnapshotReply getSnapshotReply) {
+        LOG.debug("{}: Received {}", params.id, getSnapshotReply);
+
+        ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(getSnapshotReply.getId()).build();
+        shardSnapshots.add(new ShardSnapshot(shardId.getShardName(), getSnapshotReply.getSnapshot()));
+
+        if(++repliesReceived == params.totalShardCount) {
+            LOG.debug("{}: All shard snapshots received", params.id);
+
+            DatastoreSnapshot datastoreSnapshot = new DatastoreSnapshot(params.datastoreType, params.shardManagerSnapshot,
+                    shardSnapshots);
+            params.replyToActor.tell(datastoreSnapshot, getSelf());
+            getSelf().tell(PoisonPill.getInstance(), getSelf());
+        }
+    }
+
+    public static Props props(int totalShardCount, String datastoreType, byte[] shardManagerSnapshot,
+            ActorRef replyToActor, String id, Duration receiveTimeout) {
+        return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(totalShardCount, datastoreType,
+                shardManagerSnapshot, replyToActor, id, receiveTimeout));
+    }
+
+    private static final class Params {
+        final int totalShardCount;
+        final String datastoreType;
+        final byte[] shardManagerSnapshot;
+        final ActorRef replyToActor;
+        final String id;
+        final Duration receiveTimeout;
+
+        Params(int totalShardCount, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor,
+                String id, Duration receiveTimeout) {
+            this.totalShardCount = totalShardCount;
+            this.datastoreType = datastoreType;
+            this.shardManagerSnapshot = shardManagerSnapshot;
+            this.replyToActor = replyToActor;
+            this.id = id;
+            this.receiveTimeout = receiveTimeout;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshot.java
new file mode 100644 (file)
index 0000000..54c0d74
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * Stores a snapshot of the internal state of a data store.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreSnapshot implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String type;
+    private final byte[] shardManagerSnapshot;
+    private final List<ShardSnapshot> shardSnapshots;
+
+    public DatastoreSnapshot(@Nonnull String type, @Nullable byte[] shardManagerSnapshot,
+            @Nonnull List<ShardSnapshot> shardSnapshots) {
+        this.type = Preconditions.checkNotNull(type);
+        this.shardManagerSnapshot = shardManagerSnapshot;
+        this.shardSnapshots = Preconditions.checkNotNull(shardSnapshots);
+    }
+
+    @Nonnull
+    public String getType() {
+        return type;
+    }
+
+    @Nullable
+    public byte[] getShardManagerSnapshot() {
+        return shardManagerSnapshot;
+    }
+
+    @Nonnull
+    public List<ShardSnapshot> getShardSnapshots() {
+        return shardSnapshots;
+    }
+
+    public static class ShardSnapshot implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final String name;
+        private final byte[] snapshot;
+
+        public ShardSnapshot(@Nonnull String name, @Nonnull byte[] snapshot) {
+            this.name = Preconditions.checkNotNull(name);
+            this.snapshot = Preconditions.checkNotNull(snapshot);
+        }
+
+        @Nonnull
+        public String getName() {
+            return name;
+        }
+
+        @Nonnull
+        public byte[] getSnapshot() {
+            return snapshot;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActorTest.java
new file mode 100644 (file)
index 0000000..3052397
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import akka.actor.ActorRef;
+import akka.actor.Status.Failure;
+import akka.actor.Terminated;
+import akka.testkit.JavaTestKit;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.After;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Unit tests for ShardManagerGetSnapshotReplyActor.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardManagerGetSnapshotReplyActorTest extends AbstractActorTest {
+    private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+
+    @After
+    public void tearDown() {
+        actorFactory.close();
+    }
+
+    @Test
+    public void testSuccess() {
+        JavaTestKit kit = new JavaTestKit(getSystem());
+
+        byte[] shardManagerSnapshot = new byte[]{0,5,9};
+        ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(3, "config",
+                shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.SECONDS)),
+                    actorFactory.generateActorId("actor"));
+
+        kit.watch(replyActor);
+
+        byte[] shard1Snapshot = new byte[]{1,2,3};
+        replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName("member-1").type("config").
+                shardName("shard1").build().toString(), shard1Snapshot), ActorRef.noSender());
+
+        byte[] shard2Snapshot = new byte[]{4,5,6};
+        replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName("member-1").type("config").
+                shardName("shard2").build().toString(), shard2Snapshot), ActorRef.noSender());
+
+        kit.expectNoMsg(FiniteDuration.create(500, TimeUnit.MILLISECONDS));
+
+        byte[] shard3Snapshot = new byte[]{7,8,9};
+        replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName("member-1").type("config").
+                shardName("shard3").build().toString(), shard3Snapshot), ActorRef.noSender());
+
+        DatastoreSnapshot datastoreSnapshot = kit.expectMsgClass(DatastoreSnapshot.class);
+
+        assertEquals("getType", "config", datastoreSnapshot.getType());
+        assertArrayEquals("getShardManagerSnapshot", shardManagerSnapshot, datastoreSnapshot.getShardManagerSnapshot());
+        List<ShardSnapshot> shardSnapshots = datastoreSnapshot.getShardSnapshots();
+        assertEquals("ShardSnapshot size", 3, shardSnapshots.size());
+        assertEquals("ShardSnapshot 1 getName", "shard1", shardSnapshots.get(0).getName());
+        assertArrayEquals("ShardSnapshot 1 getSnapshot", shard1Snapshot, shardSnapshots.get(0).getSnapshot());
+        assertEquals("ShardSnapshot 2 getName", "shard2", shardSnapshots.get(1).getName());
+        assertArrayEquals("ShardSnapshot 2 getSnapshot", shard2Snapshot, shardSnapshots.get(1).getSnapshot());
+        assertEquals("ShardSnapshot 3 getName", "shard3", shardSnapshots.get(2).getName());
+        assertArrayEquals("ShardSnapshot 3 getSnapshot", shard3Snapshot, shardSnapshots.get(2).getSnapshot());
+
+        kit.expectMsgClass(Terminated.class);
+    }
+
+    @Test
+    public void testGetSnapshotFailureReply() {
+        JavaTestKit kit = new JavaTestKit(getSystem());
+
+        byte[] shardManagerSnapshot = new byte[]{0,5,9};
+        ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(2, "config",
+                shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.SECONDS)),
+                    actorFactory.generateActorId("actor"));
+
+        kit.watch(replyActor);
+
+        replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName("member-1").type("config").
+                shardName("shard1").build().toString(), new byte[]{1,2,3}), ActorRef.noSender());
+
+        replyActor.tell(new Failure(new RuntimeException()), ActorRef.noSender());
+
+        kit.expectMsgClass(Failure.class);
+        kit.expectTerminated(replyActor);
+    }
+
+    @Test
+    public void testGetSnapshotTimeout() {
+        JavaTestKit kit = new JavaTestKit(getSystem());
+
+        byte[] shardManagerSnapshot = new byte[]{0,5,9};
+        ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(1, "config",
+                shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.MILLISECONDS)),
+                    actorFactory.generateActorId("actor"));
+
+        kit.watch(replyActor);
+
+        Failure failure = kit.expectMsgClass(Failure.class);
+        assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass());
+        kit.expectTerminated(replyActor);
+    }
+}
index 6f3946d42219ed963b0ed03da2448a675165cf12..f164446897011ca7190f9e8a02fe9cf0d1127dec 100644 (file)
@@ -23,6 +23,7 @@ import akka.actor.AddressFromURIString;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Status;
+import akka.actor.Status.Failure;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent;
 import akka.dispatch.Dispatchers;
@@ -44,9 +45,11 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
@@ -68,6 +71,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
@@ -90,6 +95,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
@@ -1136,6 +1142,46 @@ public class ShardManagerTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testGetSnapshot() throws Throwable {
+        JavaTestKit kit = new JavaTestKit(getSystem());
+
+        MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+                   put("shard1", Arrays.asList("member-1")).
+                   put("shard2", Arrays.asList("member-1")).build());
+
+        ActorRef shardManager = getSystem().actorOf(newShardMgrProps(mockConfig).withDispatcher(
+                Dispatchers.DefaultDispatcherId()));
+
+        shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
+        Failure failure = kit.expectMsgClass(Failure.class);
+        assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
+
+        kit = new JavaTestKit(getSystem());
+
+        shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
+
+        shardManager.tell(new FindLocalShard("shard1", true), kit.getRef());
+        kit.expectMsgClass(LocalShardFound.class);
+        shardManager.tell(new FindLocalShard("shard2", true), kit.getRef());
+        kit.expectMsgClass(LocalShardFound.class);
+
+        shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
+
+        DatastoreSnapshot datastoreSnapshot = kit.expectMsgClass(DatastoreSnapshot.class);
+
+        assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
+        List<ShardSnapshot> shardSnapshots = datastoreSnapshot.getShardSnapshots();
+        Set<String> actualShardNames = new HashSet<>();
+        for(ShardSnapshot s: shardSnapshots) {
+            actualShardNames.add(s.getName());
+        }
+
+        assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), actualShardNames);
+
+        shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    }
+
     @Test
     public void testAddShardReplicaForNonExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{