import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
onAddShardReplica((AddShardReplica)message);
} else if(message instanceof RemoveShardReplica){
onRemoveShardReplica((RemoveShardReplica)message);
+ } else if(message instanceof GetSnapshot) {
+ onGetSnapshot();
} else {
unknownMessage(message);
}
}
+ private void onGetSnapshot() {
+ LOG.debug("{}: onGetSnapshot", persistenceId());
+
+ List<String> notInitialized = null;
+ for(ShardInformation shardInfo: localShards.values()) {
+ if(!shardInfo.isShardInitialized()) {
+ if(notInitialized == null) {
+ notInitialized = new ArrayList<>();
+ }
+
+ notInitialized.add(shardInfo.getShardName());
+ }
+ }
+
+ if(notInitialized != null) {
+ getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
+ "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
+ return;
+ }
+
+ byte[] shardManagerSnapshot = null;
+ ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(localShards.size(),
+ type, shardManagerSnapshot , getSender(), persistenceId(),
+ datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
+
+ for(ShardInformation shardInfo: localShards.values()) {
+ shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
+ }
+ }
+
private void onCreateShard(CreateShard createShard) {
Object reply;
try {
shardInformation.addOnShardInitialized(onShardInitialized);
- LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
-
FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
if(shardInformation.isShardInitialized()) {
// If the shard is already initialized then we'll wait enough time for the shard to
.getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
}
+ LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
+ shardInformation.getShardName());
+
Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
timeout, getSelf(),
new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.actor.ReceiveTimeout;
+import akka.actor.Status.Failure;
+import akka.actor.UntypedActor;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+/**
+ * Temporary actor used by the ShardManager to compile GetSnapshot replies from the Shard actors and return
+ * a DatastoreSnapshot instance reply.
+ *
+ * @author Thomas Pantelis
+ */
+class ShardManagerGetSnapshotReplyActor extends UntypedActor {
+ private static final Logger LOG = LoggerFactory.getLogger(ShardManagerGetSnapshotReplyActor.class);
+
+ private int repliesReceived;
+ private final Params params;
+ private final List<ShardSnapshot> shardSnapshots = new ArrayList<>();
+
+ private ShardManagerGetSnapshotReplyActor(Params params) {
+ this.params = params;
+
+ LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.totalShardCount);
+
+ getContext().setReceiveTimeout(params.receiveTimeout);
+ }
+
+ @Override
+ public void onReceive(Object message) {
+ if(message instanceof GetSnapshotReply) {
+ onGetSnapshotReply((GetSnapshotReply)message);
+ } else if(message instanceof Failure) {
+ LOG.debug("{}: Received {}", params.id, message);
+
+ params.replyToActor.tell(message, getSelf());
+ getSelf().tell(PoisonPill.getInstance(), getSelf());
+ } else if (message instanceof ReceiveTimeout) {
+ LOG.warn("{}: Got ReceiveTimeout for inactivity - expected {} GetSnapshotReply messages within {} ms, received {}",
+ params.id, params.totalShardCount, params.receiveTimeout.toMillis(), repliesReceived);
+
+ params.replyToActor.tell(new Failure(new TimeoutException(String.format(
+ "Timed out after %s ms while waiting for snapshot replies from %d shards. Actual replies received was %s",
+ params.receiveTimeout.toMillis(), params.totalShardCount, repliesReceived))), getSelf());
+ getSelf().tell(PoisonPill.getInstance(), getSelf());
+ }
+ }
+
+ private void onGetSnapshotReply(GetSnapshotReply getSnapshotReply) {
+ LOG.debug("{}: Received {}", params.id, getSnapshotReply);
+
+ ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(getSnapshotReply.getId()).build();
+ shardSnapshots.add(new ShardSnapshot(shardId.getShardName(), getSnapshotReply.getSnapshot()));
+
+ if(++repliesReceived == params.totalShardCount) {
+ LOG.debug("{}: All shard snapshots received", params.id);
+
+ DatastoreSnapshot datastoreSnapshot = new DatastoreSnapshot(params.datastoreType, params.shardManagerSnapshot,
+ shardSnapshots);
+ params.replyToActor.tell(datastoreSnapshot, getSelf());
+ getSelf().tell(PoisonPill.getInstance(), getSelf());
+ }
+ }
+
+ public static Props props(int totalShardCount, String datastoreType, byte[] shardManagerSnapshot,
+ ActorRef replyToActor, String id, Duration receiveTimeout) {
+ return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(totalShardCount, datastoreType,
+ shardManagerSnapshot, replyToActor, id, receiveTimeout));
+ }
+
+ private static final class Params {
+ final int totalShardCount;
+ final String datastoreType;
+ final byte[] shardManagerSnapshot;
+ final ActorRef replyToActor;
+ final String id;
+ final Duration receiveTimeout;
+
+ Params(int totalShardCount, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor,
+ String id, Duration receiveTimeout) {
+ this.totalShardCount = totalShardCount;
+ this.datastoreType = datastoreType;
+ this.shardManagerSnapshot = shardManagerSnapshot;
+ this.replyToActor = replyToActor;
+ this.id = id;
+ this.receiveTimeout = receiveTimeout;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * Stores a snapshot of the internal state of a data store.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreSnapshot implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String type;
+ private final byte[] shardManagerSnapshot;
+ private final List<ShardSnapshot> shardSnapshots;
+
+ public DatastoreSnapshot(@Nonnull String type, @Nullable byte[] shardManagerSnapshot,
+ @Nonnull List<ShardSnapshot> shardSnapshots) {
+ this.type = Preconditions.checkNotNull(type);
+ this.shardManagerSnapshot = shardManagerSnapshot;
+ this.shardSnapshots = Preconditions.checkNotNull(shardSnapshots);
+ }
+
+ @Nonnull
+ public String getType() {
+ return type;
+ }
+
+ @Nullable
+ public byte[] getShardManagerSnapshot() {
+ return shardManagerSnapshot;
+ }
+
+ @Nonnull
+ public List<ShardSnapshot> getShardSnapshots() {
+ return shardSnapshots;
+ }
+
+ public static class ShardSnapshot implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String name;
+ private final byte[] snapshot;
+
+ public ShardSnapshot(@Nonnull String name, @Nonnull byte[] snapshot) {
+ this.name = Preconditions.checkNotNull(name);
+ this.snapshot = Preconditions.checkNotNull(snapshot);
+ }
+
+ @Nonnull
+ public String getName() {
+ return name;
+ }
+
+ @Nonnull
+ public byte[] getSnapshot() {
+ return snapshot;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import akka.actor.ActorRef;
+import akka.actor.Status.Failure;
+import akka.actor.Terminated;
+import akka.testkit.JavaTestKit;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.After;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Unit tests for ShardManagerGetSnapshotReplyActor.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardManagerGetSnapshotReplyActorTest extends AbstractActorTest {
+ private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+
+ @After
+ public void tearDown() {
+ actorFactory.close();
+ }
+
+ @Test
+ public void testSuccess() {
+ JavaTestKit kit = new JavaTestKit(getSystem());
+
+ byte[] shardManagerSnapshot = new byte[]{0,5,9};
+ ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(3, "config",
+ shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.SECONDS)),
+ actorFactory.generateActorId("actor"));
+
+ kit.watch(replyActor);
+
+ byte[] shard1Snapshot = new byte[]{1,2,3};
+ replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName("member-1").type("config").
+ shardName("shard1").build().toString(), shard1Snapshot), ActorRef.noSender());
+
+ byte[] shard2Snapshot = new byte[]{4,5,6};
+ replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName("member-1").type("config").
+ shardName("shard2").build().toString(), shard2Snapshot), ActorRef.noSender());
+
+ kit.expectNoMsg(FiniteDuration.create(500, TimeUnit.MILLISECONDS));
+
+ byte[] shard3Snapshot = new byte[]{7,8,9};
+ replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName("member-1").type("config").
+ shardName("shard3").build().toString(), shard3Snapshot), ActorRef.noSender());
+
+ DatastoreSnapshot datastoreSnapshot = kit.expectMsgClass(DatastoreSnapshot.class);
+
+ assertEquals("getType", "config", datastoreSnapshot.getType());
+ assertArrayEquals("getShardManagerSnapshot", shardManagerSnapshot, datastoreSnapshot.getShardManagerSnapshot());
+ List<ShardSnapshot> shardSnapshots = datastoreSnapshot.getShardSnapshots();
+ assertEquals("ShardSnapshot size", 3, shardSnapshots.size());
+ assertEquals("ShardSnapshot 1 getName", "shard1", shardSnapshots.get(0).getName());
+ assertArrayEquals("ShardSnapshot 1 getSnapshot", shard1Snapshot, shardSnapshots.get(0).getSnapshot());
+ assertEquals("ShardSnapshot 2 getName", "shard2", shardSnapshots.get(1).getName());
+ assertArrayEquals("ShardSnapshot 2 getSnapshot", shard2Snapshot, shardSnapshots.get(1).getSnapshot());
+ assertEquals("ShardSnapshot 3 getName", "shard3", shardSnapshots.get(2).getName());
+ assertArrayEquals("ShardSnapshot 3 getSnapshot", shard3Snapshot, shardSnapshots.get(2).getSnapshot());
+
+ kit.expectMsgClass(Terminated.class);
+ }
+
+ @Test
+ public void testGetSnapshotFailureReply() {
+ JavaTestKit kit = new JavaTestKit(getSystem());
+
+ byte[] shardManagerSnapshot = new byte[]{0,5,9};
+ ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(2, "config",
+ shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.SECONDS)),
+ actorFactory.generateActorId("actor"));
+
+ kit.watch(replyActor);
+
+ replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName("member-1").type("config").
+ shardName("shard1").build().toString(), new byte[]{1,2,3}), ActorRef.noSender());
+
+ replyActor.tell(new Failure(new RuntimeException()), ActorRef.noSender());
+
+ kit.expectMsgClass(Failure.class);
+ kit.expectTerminated(replyActor);
+ }
+
+ @Test
+ public void testGetSnapshotTimeout() {
+ JavaTestKit kit = new JavaTestKit(getSystem());
+
+ byte[] shardManagerSnapshot = new byte[]{0,5,9};
+ ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(1, "config",
+ shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.MILLISECONDS)),
+ actorFactory.generateActorId("actor"));
+
+ kit.watch(replyActor);
+
+ Failure failure = kit.expectMsgClass(Failure.class);
+ assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass());
+ kit.expectTerminated(replyActor);
+ }
+}
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status;
+import akka.actor.Status.Failure;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.dispatch.Dispatchers;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
}};
}
+ @Test
+ public void testGetSnapshot() throws Throwable {
+ JavaTestKit kit = new JavaTestKit(getSystem());
+
+ MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("shard1", Arrays.asList("member-1")).
+ put("shard2", Arrays.asList("member-1")).build());
+
+ ActorRef shardManager = getSystem().actorOf(newShardMgrProps(mockConfig).withDispatcher(
+ Dispatchers.DefaultDispatcherId()));
+
+ shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
+ Failure failure = kit.expectMsgClass(Failure.class);
+ assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
+
+ kit = new JavaTestKit(getSystem());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
+
+ shardManager.tell(new FindLocalShard("shard1", true), kit.getRef());
+ kit.expectMsgClass(LocalShardFound.class);
+ shardManager.tell(new FindLocalShard("shard2", true), kit.getRef());
+ kit.expectMsgClass(LocalShardFound.class);
+
+ shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
+
+ DatastoreSnapshot datastoreSnapshot = kit.expectMsgClass(DatastoreSnapshot.class);
+
+ assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
+ List<ShardSnapshot> shardSnapshots = datastoreSnapshot.getShardSnapshots();
+ Set<String> actualShardNames = new HashSet<>();
+ for(ShardSnapshot s: shardSnapshots) {
+ actualShardNames.add(s.getName());
+ }
+
+ assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), actualShardNames);
+
+ shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
@Test
public void testAddShardReplicaForNonExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{