From f1c3050779d7770ef6a12a67a1870765c3dfd9eb Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 30 Oct 2015 17:42:39 -0400 Subject: [PATCH] Bug 4564: Implement GetSnapshot message in RaftActor Added a new client message, GetSnapshot, to return a serialized Snapshot instance. The implementation just captures the snapshot for return and does not persist it. If data persistence isn't enabled, it does not initiate a capture and returns a serialized Snapshot instance containing just the persistable state, eg election term info. Change-Id: I9ea7fc8e0e60c4d6874f5eb0188543e1d9b51243 Signed-off-by: Tom Pantelis --- .../cluster/raft/GetSnapshotReplyActor.java | 87 ++++++++++++ .../cluster/raft/ImmutableElectionTerm.java | 52 +++++++ .../controller/cluster/raft/RaftActor.java | 2 +- .../raft/RaftActorSnapshotMessageSupport.java | 42 +++++- .../cluster/raft/SnapshotManager.java | 62 ++++---- .../raft/client/messages/GetSnapshot.java | 21 +++ .../client/messages/GetSnapshotReply.java | 41 ++++++ .../cluster/raft/MockRaftActor.java | 9 +- .../RaftActorSnapshotMessageSupportTest.java | 2 +- .../cluster/raft/RaftActorTest.java | 132 ++++++++++++++++-- 10 files changed, 402 insertions(+), 48 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ImmutableElectionTerm.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetSnapshot.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetSnapshotReply.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java new file mode 100644 index 0000000000..ca09823a12 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import akka.actor.ActorRef; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.actor.ReceiveTimeout; +import akka.actor.UntypedActor; +import com.google.common.base.Preconditions; +import java.util.concurrent.TimeoutException; +import org.apache.commons.lang3.SerializationUtils; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.Duration; + +/** + * Temporary actor used to receive a CaptureSnapshotReply message and return a GetSnapshotReply instance. + * + * @author Thomas Pantelis + */ +class GetSnapshotReplyActor extends UntypedActor { + private static final Logger LOG = LoggerFactory.getLogger(GetSnapshotReplyActor.class); + + private final Params params; + + private GetSnapshotReplyActor(Params params) { + this.params = params; + + getContext().setReceiveTimeout(params.receiveTimeout); + } + + @Override + public void onReceive(Object message) { + if(message instanceof CaptureSnapshotReply) { + Snapshot snapshot = Snapshot.create(((CaptureSnapshotReply)message).getSnapshot(), + params.captureSnapshot.getUnAppliedEntries(), + params.captureSnapshot.getLastIndex(), params.captureSnapshot.getLastTerm(), + params.captureSnapshot.getLastAppliedIndex(), params.captureSnapshot.getLastAppliedTerm(), + params.electionTerm.getCurrentTerm(), params.electionTerm.getVotedFor()); + + LOG.debug("{}: Received CaptureSnapshotReply, sending {}", params.id, snapshot); + + params.replyToActor.tell(new GetSnapshotReply(params.id, SerializationUtils.serialize(snapshot)), getSelf()); + getSelf().tell(PoisonPill.getInstance(), getSelf()); + } else if (message instanceof ReceiveTimeout) { + LOG.warn("{}: Got ReceiveTimeout for inactivity - did not receive CaptureSnapshotReply within {} ms", + params.id, params.receiveTimeout.toMillis()); + + params.replyToActor.tell(new akka.actor.Status.Failure(new TimeoutException(String.format( + "Timed out after %d ms while waiting for CaptureSnapshotReply", + params.receiveTimeout.toMillis()))), getSelf()); + getSelf().tell(PoisonPill.getInstance(), getSelf()); + } + } + + public static Props props(CaptureSnapshot captureSnapshot, ElectionTerm electionTerm, ActorRef replyToActor, + Duration receiveTimeout, String id) { + return Props.create(GetSnapshotReplyActor.class, new Params(captureSnapshot, electionTerm, replyToActor, + receiveTimeout, id)); + } + + private static final class Params { + final CaptureSnapshot captureSnapshot; + final ActorRef replyToActor; + final ElectionTerm electionTerm; + final Duration receiveTimeout; + final String id; + + Params(CaptureSnapshot captureSnapshot, ElectionTerm electionTerm, ActorRef replyToActor, + Duration receiveTimeout, String id) { + this.captureSnapshot = Preconditions.checkNotNull(captureSnapshot); + this.electionTerm = Preconditions.checkNotNull(electionTerm); + this.replyToActor = Preconditions.checkNotNull(replyToActor); + this.receiveTimeout = Preconditions.checkNotNull(receiveTimeout); + this.id = Preconditions.checkNotNull(id); + } + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ImmutableElectionTerm.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ImmutableElectionTerm.java new file mode 100644 index 0000000000..2760b48318 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ImmutableElectionTerm.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +/** + * Immutable implementation of ElectionTerm. + * + * @author Thomas Pantelis + */ +public class ImmutableElectionTerm implements ElectionTerm { + private final long currentTerm; + private final String votedFor; + + private ImmutableElectionTerm(long currentTerm, String votedFor) { + this.currentTerm = currentTerm; + this.votedFor = votedFor; + } + + @Override + public long getCurrentTerm() { + return currentTerm; + } + + @Override + public String getVotedFor() { + return votedFor; + } + + @Override + public void update(long currentTerm, String votedFor) { + throw new UnsupportedOperationException(); + } + + @Override + public void updateAndPersist(long currentTerm, String votedFor) { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() { + return "ImmutableElectionTerm [currentTerm=" + currentTerm + ", votedFor=" + votedFor + "]"; + } + + public static ElectionTerm copyOf(ElectionTerm from) { + return new ImmutableElectionTerm(from.getCurrentTerm(), from.getVotedFor()); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 4eb9ad8359..5f6f3ec24e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -250,7 +250,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { captureSnapshot(); } else if(message instanceof SwitchBehavior){ switchBehavior(((SwitchBehavior) message)); - } else if(!snapshotSupport.handleSnapshotMessage(message)) { + } else if(!snapshotSupport.handleSnapshotMessage(message, getSender())) { switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message)); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java index bf0fc10aad..ec46f30878 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java @@ -7,13 +7,22 @@ */ package org.opendaylight.controller.cluster.raft; +import akka.actor.ActorRef; import akka.japi.Procedure; import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; +import com.google.common.annotations.VisibleForTesting; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply; import org.slf4j.Logger; +import scala.concurrent.duration.Duration; /** * Handles snapshot related messages for a RaftActor. @@ -42,6 +51,8 @@ class RaftActorSnapshotMessageSupport { } }; + private Duration snapshotReplyActorTimeout = Duration.create(30, TimeUnit.SECONDS); + RaftActorSnapshotMessageSupport(RaftActorContext context, RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort) { this.context = context; @@ -53,7 +64,7 @@ class RaftActorSnapshotMessageSupport { context.getSnapshotManager().setApplySnapshotProcedure(applySnapshotProcedure); } - boolean handleSnapshotMessage(Object message) { + boolean handleSnapshotMessage(Object message, ActorRef sender) { if(message instanceof ApplySnapshot ) { onApplySnapshot((ApplySnapshot) message); return true; @@ -69,6 +80,9 @@ class RaftActorSnapshotMessageSupport { } else if (message.equals(COMMIT_SNAPSHOT)) { context.getSnapshotManager().commit(-1, currentBehavior); return true; + } else if (message instanceof GetSnapshot) { + onGetSnapshot(sender); + return true; } else { return false; } @@ -101,4 +115,30 @@ class RaftActorSnapshotMessageSupport { context.getSnapshotManager().apply(message); } + + private void onGetSnapshot(ActorRef sender) { + log.debug("{}: onGetSnapshot", context.getId()); + + if(context.getPersistenceProvider().isRecoveryApplicable()) { + CaptureSnapshot captureSnapshot = context.getSnapshotManager().newCaptureSnapshot( + context.getReplicatedLog().last(), -1, false); + + ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot, + ImmutableElectionTerm.copyOf(context.getTermInformation()), sender, + snapshotReplyActorTimeout, context.getId())); + + cohort.createSnapshot(snapshotReplyActor); + } else { + Snapshot snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1, + context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor()); + + sender.tell(new GetSnapshotReply(context.getId(), SerializationUtils.serialize(snapshot)), + context.getActor()); + } + } + + @VisibleForTesting + void setSnapshotReplyActorTimeout(Duration snapshotReplyActorTimeout) { + this.snapshotReplyActorTimeout = snapshotReplyActorTimeout; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 4dbe9ee9a0..9571173175 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -116,6 +116,37 @@ public class SnapshotManager implements SnapshotState { return context.getId(); } + public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, + boolean installSnapshotInitiated) { + TermInformationReader lastAppliedTermInfoReader = + lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(), + lastLogEntry, hasFollowers()); + + long lastAppliedIndex = lastAppliedTermInfoReader.getIndex(); + long lastAppliedTerm = lastAppliedTermInfoReader.getTerm(); + + TermInformationReader replicatedToAllTermInfoReader = + replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex); + + long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex(); + long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm(); + + List unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1); + + long lastLogEntryIndex = lastAppliedIndex; + long lastLogEntryTerm = lastAppliedTerm; + if(lastLogEntry != null) { + lastLogEntryIndex = lastLogEntry.getIndex(); + lastLogEntryTerm = lastLogEntry.getTerm(); + } else { + LOG.warn("Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.", + lastAppliedIndex, lastAppliedTerm); + } + + return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm, + newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, installSnapshotInitiated); + } + private class AbstractSnapshotState implements SnapshotState { @Override @@ -200,36 +231,7 @@ public class SnapshotManager implements SnapshotState { } private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { - TermInformationReader lastAppliedTermInfoReader = - lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(), - lastLogEntry, hasFollowers()); - - long lastAppliedIndex = lastAppliedTermInfoReader.getIndex(); - long lastAppliedTerm = lastAppliedTermInfoReader.getTerm(); - - TermInformationReader replicatedToAllTermInfoReader = - replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex); - - long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex(); - long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm(); - - // send a CaptureSnapshot to self to make the expensive operation async. - - List unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1); - - long lastLogEntryIndex = lastAppliedIndex; - long lastLogEntryTerm = lastAppliedTerm; - if(lastLogEntry != null) { - lastLogEntryIndex = lastLogEntry.getIndex(); - lastLogEntryTerm = lastLogEntry.getTerm(); - } else { - LOG.warn("Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.", - lastAppliedIndex, lastAppliedTerm); - } - - captureSnapshot = new CaptureSnapshot(lastLogEntryIndex, - lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm, - newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null); + captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, targetFollower != null); if(captureSnapshot.isInstallSnapshotInitiated()) { LOG.info("{}: Initiating snapshot capture {} to install on {}", diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetSnapshot.java new file mode 100644 index 0000000000..f1d05bfca0 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetSnapshot.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.client.messages; + +/** + * Internal client message to get a snapshot of the current state based on whether or not persistence is + * enabled. Returns a GetSnapshotReply instance. + * + * @author Thomas Pantelis + */ +public class GetSnapshot { + public static final GetSnapshot INSTANCE = new GetSnapshot(); + + private GetSnapshot() { + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetSnapshotReply.java new file mode 100644 index 0000000000..f3521deb3b --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetSnapshotReply.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.client.messages; + +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; + +/** + * Reply to GetSnapshot that returns a serialized Snapshot instance. + * + * @author Thomas Pantelis + */ +public class GetSnapshotReply { + private final String id; + private final byte[] snapshot; + + public GetSnapshotReply(@Nonnull String id, @Nonnull byte[] snapshot) { + this.id = Preconditions.checkNotNull(id); + this.snapshot = Preconditions.checkNotNull(snapshot); + } + + @Nonnull + public String getId() { + return id; + } + + @Nonnull + public byte[] getSnapshot() { + return snapshot; + } + + @Override + public String toString() { + return "GetSnapshotReply [id=" + id + ", snapshot.length=" + snapshot.length + "]"; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java index bb39ed98ba..f56638bc82 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java @@ -34,7 +34,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, final RaftActor actorDelegate; final RaftActorRecoveryCohort recoveryCohortDelegate; - final RaftActorSnapshotCohort snapshotCohortDelegate; + volatile RaftActorSnapshotCohort snapshotCohortDelegate; private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final List state; private ActorRef roleChangeNotifier; @@ -96,7 +96,12 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, @Override protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() { - return snapshotMessageSupport != null ? snapshotMessageSupport : super.newRaftActorSnapshotMessageSupport(); + return snapshotMessageSupport != null ? snapshotMessageSupport : + (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport()); + } + + public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() { + return snapshotMessageSupport; } public void waitForRecoveryComplete() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java index d79a48357a..94000d0712 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java @@ -83,7 +83,7 @@ public class RaftActorSnapshotMessageSupportTest { } private void sendMessageToSupport(Object message, boolean expHandled) { - boolean handled = support.handleSnapshotMessage(message); + boolean handled = support.handleSnapshotMessage(message, mockRaftActorRef); assertEquals("complete", expHandled, handled); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 4b43095316..941deb5843 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.raft; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; @@ -20,11 +21,14 @@ import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.actor.Status.Failure; import akka.actor.Terminated; import akka.dispatch.Dispatchers; import akka.japi.Procedure; @@ -47,9 +51,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.commons.lang3.SerializationUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.PersistentDataProvider; @@ -68,13 +75,17 @@ import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; public class RaftActorTest extends AbstractActorTest { @@ -340,34 +351,40 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.waitForRecoveryComplete(); ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class)); - doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot)); + doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class)); mockRaftActor.handleCommand(applySnapshot); CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null); - doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot)); + doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class)); mockRaftActor.handleCommand(captureSnapshot); CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]); - doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply)); + doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class)); mockRaftActor.handleCommand(captureSnapshotReply); SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class)); - doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess)); + doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class)); mockRaftActor.handleCommand(saveSnapshotSuccess); SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable()); - doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure)); + doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class)); mockRaftActor.handleCommand(saveSnapshotFailure); - doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT)); + doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT), + any(ActorRef.class)); mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT); - verify(mockSupport).handleSnapshotMessage(same(applySnapshot)); - verify(mockSupport).handleSnapshotMessage(same(captureSnapshot)); - verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply)); - verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess)); - verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure)); - verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT)); + doReturn(true).when(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class)); + mockRaftActor.handleCommand(GetSnapshot.INSTANCE); + + verify(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class)); + verify(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class)); + verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class)); + verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class)); + verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class)); + verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT), + any(ActorRef.class)); + verify(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class)); } @Test @@ -1089,4 +1106,93 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("Behavior State", RaftState.Follower, mockRaftActor.getCurrentBehavior().state()); } + + @Test + public void testGetSnapshot() throws Exception { + TEST_LOG.info("testGetSnapshot starting"); + + JavaTestKit kit = new JavaTestKit(getSystem()); + + String persistenceId = factory.generateActorId("test-actor-"); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + long term = 3; + long seqN = 1; + InMemoryJournal.addEntry(persistenceId, seqN++, new UpdateElectionTerm(term, "member-1")); + InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 0, + new MockRaftActorContext.MockPayload("A"))); + InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 1, + new MockRaftActorContext.MockPayload("B"))); + InMemoryJournal.addEntry(persistenceId, seqN++, new ApplyJournalEntries(1)); + InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 2, + new MockRaftActorContext.MockPayload("C"))); + + TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + ImmutableMap.builder().put("member1", "address").build(), Optional.of(config)). + withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId); + MockRaftActor mockRaftActor = raftActorRef.underlyingActor(); + + mockRaftActor.waitForRecoveryComplete(); + + // Wait for snapshot after recovery + verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class)); + + mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class); + + raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef()); + + ArgumentCaptor replyActor = ArgumentCaptor.forClass(ActorRef.class); + verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture()); + + byte[] stateSnapshot = new byte[]{1,2,3}; + replyActor.getValue().tell(new CaptureSnapshotReply(stateSnapshot), ActorRef.noSender()); + + GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class); + + assertEquals("getId", persistenceId, reply.getId()); + Snapshot replySnapshot = SerializationUtils.deserialize(reply.getSnapshot()); + assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm()); + assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor()); + assertEquals("getLastAppliedIndex", 1L, replySnapshot.getLastAppliedIndex()); + assertEquals("getLastAppliedTerm", term, replySnapshot.getLastAppliedTerm()); + assertEquals("getLastIndex", 2L, replySnapshot.getLastIndex()); + assertEquals("getLastTerm", term, replySnapshot.getLastTerm()); + assertArrayEquals("getState", stateSnapshot, replySnapshot.getState()); + assertEquals("getUnAppliedEntries size", 1, replySnapshot.getUnAppliedEntries().size()); + assertEquals("UnApplied entry index ", 2L, replySnapshot.getUnAppliedEntries().get(0).getIndex()); + + // Test with timeout + + mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(200, TimeUnit.MILLISECONDS)); + reset(mockRaftActor.snapshotCohortDelegate); + + raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef()); + Failure failure = kit.expectMsgClass(akka.actor.Status.Failure.class); + assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass()); + + mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(30, TimeUnit.SECONDS)); + + // Test with persistence disabled. + + mockRaftActor.setPersistence(false); + reset(mockRaftActor.snapshotCohortDelegate); + + raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef()); + reply = kit.expectMsgClass(GetSnapshotReply.class); + verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(any(ActorRef.class)); + + assertEquals("getId", persistenceId, reply.getId()); + replySnapshot = SerializationUtils.deserialize(reply.getSnapshot()); + assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm()); + assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor()); + assertEquals("getLastAppliedIndex", -1L, replySnapshot.getLastAppliedIndex()); + assertEquals("getLastAppliedTerm", -1L, replySnapshot.getLastAppliedTerm()); + assertEquals("getLastIndex", -1L, replySnapshot.getLastIndex()); + assertEquals("getLastTerm", -1L, replySnapshot.getLastTerm()); + assertEquals("getState length", 0, replySnapshot.getState().length); + assertEquals("getUnAppliedEntries size", 0, replySnapshot.getUnAppliedEntries().size()); + + TEST_LOG.info("testGetSnapshot ending"); + } } -- 2.36.6