From 11dadddb4d9ba26ae0b1795921c7a218a6d893c2 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 4 Nov 2015 02:09:45 -0500 Subject: [PATCH] Bug 4564: Implement restore from snapshot in RaftActor The restore snapshot is supplied by the derived actor's RaftActorRecoveryCohort. If one exists the the RaftActorRecoverySupport desrializes and applies the snapshot. I also add a Builder to MockRaftActor to make it easier to pass additional params. Change-Id: Ib52b24331038ed48221cc27086fa3cceafe39fcf Signed-off-by: Tom Pantelis --- .../cluster/example/ExampleActor.java | 5 + .../cluster/raft/RaftActorRecoveryCohort.java | 9 + .../raft/RaftActorRecoverySupport.java | 36 +++- .../cluster/raft/SnapshotManager.java | 6 +- .../AbstractRaftActorIntegrationTest.java | 3 +- .../cluster/raft/MockRaftActor.java | 159 +++++++++------- .../raft/RaftActorRecoverySupportTest.java | 5 +- ...ftActorServerConfigurationSupportTest.java | 7 +- .../cluster/raft/RaftActorTest.java | 174 ++++++++++++++---- .../cluster/raft/RaftActorTestKit.java | 5 +- .../raft/utils/InMemorySnapshotStore.java | 21 +++ .../datastore/ShardRecoveryCoordinator.java | 6 + 12 files changed, 326 insertions(+), 110 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 3ce8364fce..9b7c2e8d4b 100644 --- a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -226,4 +226,9 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() { return this; } + + @Override + public byte[] getRestoreFromSnapshot() { + return null; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java index a9f00aa80b..30e27e17fe 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.raft; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; /** @@ -42,4 +43,12 @@ public interface RaftActorRecoveryCohort { * log entries. This method is called after {@link #appendRecoveredLogEntry}. */ void applyCurrentLogRecoveryBatch(); + + /** + * Returns the state snapshot to restore from on recovery. + * + * @return the snapshot bytes or null if there's no snapshot to restore + */ + @Nullable + byte[] getRestoreFromSnapshot(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java index 05405dc6df..0a37ef7a46 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java @@ -11,10 +11,13 @@ import akka.persistence.RecoveryCompleted; import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; import com.google.common.base.Stopwatch; +import java.io.ByteArrayInputStream; +import java.io.ObjectInputStream; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; +import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries; import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; @@ -32,6 +35,7 @@ class RaftActorRecoverySupport { private int currentRecoveryBatchCount; private boolean dataRecoveredWithPersistenceDisabled; + private boolean anyDataRecovered; private Stopwatch recoveryTimer; private final Logger log; @@ -45,7 +49,9 @@ class RaftActorRecoverySupport { } boolean handleRecoveryMessage(Object message, PersistentDataProvider persistentProvider) { - log.trace("handleRecoveryMessage: {}", message); + log.trace("{}: handleRecoveryMessage: {}", context.getId(), message); + + anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted); boolean recoveryComplete = false; DataPersistenceProvider persistence = context.getPersistenceProvider(); @@ -74,6 +80,7 @@ class RaftActorRecoverySupport { replicatedLog().removeFrom(((org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) message).getFromIndex()); } else if (message instanceof RecoveryCompleted) { onRecoveryCompletedMessage(); + possiblyRestoreFromSnapshot(); recoveryComplete = true; } } else if (message instanceof RecoveryCompleted) { @@ -94,6 +101,8 @@ class RaftActorRecoverySupport { context.getTermInformation().updateAndPersist(context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor()); } + + possiblyRestoreFromSnapshot(); } else { boolean isServerConfigPayload = false; if(message instanceof ReplicatedLogEntry){ @@ -112,6 +121,29 @@ class RaftActorRecoverySupport { return recoveryComplete; } + private void possiblyRestoreFromSnapshot() { + byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot(); + if(restoreFromSnapshot == null) { + return; + } + + if(anyDataRecovered) { + log.warn("{}: The provided restore snapshot was not applied because the persistence store is not empty", + context.getId()); + return; + } + + try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) { + Snapshot snapshot = (Snapshot) ois.readObject(); + + log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot); + + context.getSnapshotManager().apply(new ApplySnapshot(snapshot)); + } catch(Exception e) { + log.error("{}: Error deserializing snapshot restore", context.getId(), e); + } + } + private ReplicatedLog replicatedLog() { return context.getReplicatedLog(); } @@ -181,7 +213,7 @@ class RaftActorRecoverySupport { batchRecoveredLogEntry(logEntry); } else { // Shouldn't happen but cover it anyway. - log.error("Log entry not found for index {}", i); + log.error("{}: Log entry not found for index {}", context.getId(), i); break; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 9571173175..0d0c910298 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -387,12 +387,16 @@ public class SnapshotManager implements SnapshotState { if(applySnapshot != null) { try { Snapshot snapshot = applySnapshot.getSnapshot(); - applySnapshotProcedure.apply(snapshot.getState()); //clears the followers log, sets the snapshot index to ensure adjusted-index works context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior)); context.setLastApplied(snapshot.getLastAppliedIndex()); context.setCommitIndex(snapshot.getLastAppliedIndex()); + context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor()); + + if(snapshot.getState().length > 0 ) { + applySnapshotProcedure.apply(snapshot.getState()); + } applySnapshot.getCallback().onSuccess(); } catch (Exception e) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index 7cd8936912..30ead98cb4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -17,7 +17,6 @@ import akka.actor.Terminated; import akka.dispatch.Dispatchers; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; -import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; @@ -75,7 +74,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest private TestRaftActor(String id, Map peerAddresses, ConfigParams config, TestActorRef collectorActor) { - super(id, peerAddresses, Optional.of(config), null); + super(builder().id(id).peerAddresses(peerAddresses).config(config)); this.collectorActor = collectorActor; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java index f56638bc82..38650e834f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java @@ -12,13 +12,13 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.Props; -import akka.japi.Creator; import com.google.common.base.Optional; import com.google.common.util.concurrent.Uninterruptibles; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -37,52 +37,29 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, volatile RaftActorSnapshotCohort snapshotCohortDelegate; private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final List state; - private ActorRef roleChangeNotifier; + private final ActorRef roleChangeNotifier; protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1); private RaftActorRecoverySupport raftActorRecoverySupport; private RaftActorSnapshotMessageSupport snapshotMessageSupport; + private final byte[] restoreFromSnapshot; + final CountDownLatch snapshotCommitted = new CountDownLatch(1); - public static final class MockRaftActorCreator implements Creator { - private static final long serialVersionUID = 1L; - private final Map peerAddresses; - private final String id; - private final Optional config; - private final DataPersistenceProvider dataPersistenceProvider; - private final ActorRef roleChangeNotifier; - private RaftActorSnapshotMessageSupport snapshotMessageSupport; - - private MockRaftActorCreator(Map peerAddresses, String id, - Optional config, DataPersistenceProvider dataPersistenceProvider, - ActorRef roleChangeNotifier) { - this.peerAddresses = peerAddresses; - this.id = id; - this.config = config; - this.dataPersistenceProvider = dataPersistenceProvider; - this.roleChangeNotifier = roleChangeNotifier; - } - - @Override - public MockRaftActor create() throws Exception { - MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config, - dataPersistenceProvider); - mockRaftActor.roleChangeNotifier = this.roleChangeNotifier; - mockRaftActor.snapshotMessageSupport = snapshotMessageSupport; - return mockRaftActor; - } - } - - public MockRaftActor(String id, Map peerAddresses, Optional config, - DataPersistenceProvider dataPersistenceProvider) { - super(id, peerAddresses, config, PAYLOAD_VERSION); + protected MockRaftActor(Builder builder) { + super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION); state = new ArrayList<>(); this.actorDelegate = mock(RaftActor.class); this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class); this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class); - if(dataPersistenceProvider == null){ - setPersistence(true); + + if(builder.dataPersistenceProvider == null){ + setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true); } else { - setPersistence(dataPersistenceProvider); + setPersistence(builder.dataPersistenceProvider); } + + roleChangeNotifier = builder.roleChangeNotifier; + snapshotMessageSupport = builder.snapshotMessageSupport; + restoreFromSnapshot = builder.restoreFromSnapshot; } public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) { @@ -134,33 +111,6 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, return state; } - public static Props props(final String id, final Map peerAddresses, - Optional config){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null)); - } - - public static Props props(final String id, final Map peerAddresses, - Optional config, RaftActorSnapshotMessageSupport snapshotMessageSupport){ - MockRaftActorCreator creator = new MockRaftActorCreator(peerAddresses, id, config, null, null); - creator.snapshotMessageSupport = snapshotMessageSupport; - return Props.create(creator); - } - - public static Props props(final String id, final Map peerAddresses, - Optional config, DataPersistenceProvider dataPersistenceProvider){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null)); - } - - public static Props props(final String id, final Map peerAddresses, - Optional config, ActorRef roleChangeNotifier){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier)); - } - - public static Props props(final String id, final Map peerAddresses, - Optional config, ActorRef roleChangeNotifier, - DataPersistenceProvider dataPersistenceProvider){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier)); - } @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { actorDelegate.applyState(clientActor, identifier, data); @@ -231,8 +181,8 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, @Override public void applySnapshot(byte [] snapshot) { LOG.info("{}: applySnapshot called", persistenceId()); - snapshotCohortDelegate.applySnapshot(snapshot); applySnapshotBytes(snapshot); + snapshotCohortDelegate.applySnapshot(snapshot); } @Override @@ -259,6 +209,10 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, super.changeCurrentBehavior((RaftActorBehavior)message); } else { super.handleCommand(message); + + if(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) { + snapshotCommitted.countDown(); + } } } @@ -284,4 +238,79 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, public ReplicatedLog getReplicatedLog(){ return this.getRaftActorContext().getReplicatedLog(); } + + @Override + public byte[] getRestoreFromSnapshot() { + return restoreFromSnapshot; + } + + public static Props props(final String id, final Map peerAddresses, + ConfigParams config){ + return builder().id(id).peerAddresses(peerAddresses).config(config).props(); + } + + public static Props props(final String id, final Map peerAddresses, + ConfigParams config, DataPersistenceProvider dataPersistenceProvider){ + return builder().id(id).peerAddresses(peerAddresses).config(config). + dataPersistenceProvider(dataPersistenceProvider).props(); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Map peerAddresses = Collections.emptyMap(); + private String id; + private ConfigParams config; + private DataPersistenceProvider dataPersistenceProvider; + private ActorRef roleChangeNotifier; + private RaftActorSnapshotMessageSupport snapshotMessageSupport; + private byte[] restoreFromSnapshot; + private Optional persistent = Optional.absent(); + + public Builder id(String id) { + this.id = id; + return this; + } + + public Builder peerAddresses(Map peerAddresses) { + this.peerAddresses = peerAddresses; + return this; + } + + public Builder config(ConfigParams config) { + this.config = config; + return this; + } + + public Builder dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) { + this.dataPersistenceProvider = dataPersistenceProvider; + return this; + } + + public Builder roleChangeNotifier(ActorRef roleChangeNotifier) { + this.roleChangeNotifier = roleChangeNotifier; + return this; + } + + public Builder snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) { + this.snapshotMessageSupport = snapshotMessageSupport; + return this; + } + + public Builder restoreFromSnapshot(byte[] restoreFromSnapshot) { + this.restoreFromSnapshot = restoreFromSnapshot; + return this; + } + + public Builder persistent(Optional persistent) { + this.persistent = persistent; + return this; + } + + public Props props() { + return Props.create(MockRaftActor.class, this); + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java index da02e81fa8..ddc8bed42a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java @@ -240,7 +240,7 @@ public class RaftActorRecoverySupportTest { } inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch(); - + inOrder.verify(mockCohort).getRestoreFromSnapshot(); inOrder.verifyNoMoreInteractions(); } @@ -248,6 +248,7 @@ public class RaftActorRecoverySupportTest { public void testOnRecoveryCompletedWithNoRemainingBatch() { sendMessageToSupport(RecoveryCompleted.getInstance(), true); + verify(mockCohort).getRestoreFromSnapshot(); verifyNoMoreInteractions(mockCohort); } @@ -337,6 +338,7 @@ public class RaftActorRecoverySupportTest { sendMessageToSupport(RecoveryCompleted.getInstance(), true); + verify(mockCohort).getRestoreFromSnapshot(); verifyNoMoreInteractions(mockCohort); verify(mockPersistentProvider).deleteMessages(10L); @@ -370,6 +372,7 @@ public class RaftActorRecoverySupportTest { sendMessageToSupport(RecoveryCompleted.getInstance(), true); + verify(mockCohort).getRestoreFromSnapshot(); verifyNoMoreInteractions(mockCohort, mockPersistentProvider); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java index 16acb410fb..df526d8c52 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -565,7 +565,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { TestActorRef noLeaderActor = actorFactory.createTestActor( MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()), - Optional.of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()), + configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID)); noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete(); @@ -626,7 +626,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { TestActorRef followerRaftActor = actorFactory.createTestActor( MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()), - Optional.of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()), + configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(FOLLOWER_ID)); followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete(); @@ -691,7 +691,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { AbstractMockRaftActor(String id, Map peerAddresses, Optional config, DataPersistenceProvider dataPersistenceProvider, TestActorRef collectorActor) { - super(id, peerAddresses, config, dataPersistenceProvider); + super(builder().id(id).peerAddresses(peerAddresses).config(config.get()). + dataPersistenceProvider(dataPersistenceProvider)); this.collectorActor = collectorActor; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 941deb5843..c2ee4a26d1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -62,6 +62,7 @@ import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; +import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; @@ -134,7 +135,7 @@ public class RaftActorTest extends AbstractActorTest { ImmutableMap peerAddresses = ImmutableMap.builder().put("member1", "address").build(); ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId, - peerAddresses, Optional.of(config)), persistenceId); + peerAddresses, config), persistenceId); watch(followerActor); @@ -187,7 +188,7 @@ public class RaftActorTest extends AbstractActorTest { //reinstate the actor TestActorRef ref = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, Optional.of(config))); + MockRaftActor.props(persistenceId, peerAddresses, config)); MockRaftActor mockRaftActor = ref.underlyingActor(); @@ -221,7 +222,7 @@ public class RaftActorTest extends AbstractActorTest { TestActorRef ref = factory.createTestActor(MockRaftActor.props(persistenceId, ImmutableMap.builder().put("member1", "address").build(), - Optional.of(config), new NonPersistentDataProvider()), persistenceId); + config, new NonPersistentDataProvider()), persistenceId); MockRaftActor mockRaftActor = ref.underlyingActor(); @@ -245,7 +246,7 @@ public class RaftActorTest extends AbstractActorTest { TestActorRef ref = factory.createTestActor(MockRaftActor.props(persistenceId, ImmutableMap.builder().put("member1", "address").build(), - Optional.of(config), new NonPersistentDataProvider()). + config, new NonPersistentDataProvider()). withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId); InMemoryJournal.waitForWriteMessagesComplete(persistenceId); @@ -257,8 +258,8 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); ref = factory.createTestActor(MockRaftActor.props(persistenceId, - ImmutableMap.builder().put("member1", "address").build(), - Optional.of(config), new NonPersistentDataProvider()). + ImmutableMap.builder().put("member1", "address").build(), config, + new NonPersistentDataProvider()). withDispatcher(Dispatchers.DefaultDispatcherId()), factory.generateActorId("follower-")); @@ -284,7 +285,7 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config)), persistenceId); + Collections.emptyMap(), config), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -342,8 +343,8 @@ public class RaftActorTest extends AbstractActorTest { RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class); - TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), mockSupport), persistenceId); + TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId). + config(config).snapshotMessageSupport(mockSupport).props()); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -400,7 +401,7 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + Collections.emptyMap(), config, dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -431,7 +432,7 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + Collections.emptyMap(), config, dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -462,9 +463,10 @@ public class RaftActorTest extends AbstractActorTest { String persistenceId = factory.generateActorId("notifier-"); - TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), notifierActor, - new NonPersistentDataProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId); + TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId). + config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider( + new NonPersistentDataProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()), + persistenceId); List matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3); @@ -549,8 +551,9 @@ public class RaftActorTest extends AbstractActorTest { String persistenceId = factory.generateActorId("notifier-"); - factory.createActor(MockRaftActor.props(persistenceId, - ImmutableMap.of("leader", "fake/path"), Optional.of(config), notifierActor), persistenceId); + factory.createActor(MockRaftActor.builder().id(persistenceId). + peerAddresses(ImmutableMap.of("leader", "fake/path")). + config(config).roleChangeNotifier(notifierActor).props()); List matches = null; for(int i = 0; i < 5000 / heartBeatInterval; i++) { @@ -600,8 +603,7 @@ public class RaftActorTest extends AbstractActorTest { peerAddresses.put(follower1Id, followerActor1.path().toString()); TestActorRef mockActorRef = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, - Optional.of(config), dataPersistenceProvider), persistenceId); + MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId); MockRaftActor leaderActor = mockActorRef.underlyingActor(); @@ -698,8 +700,7 @@ public class RaftActorTest extends AbstractActorTest { peerAddresses.put(leaderId, leaderActor1.path().toString()); TestActorRef mockActorRef = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, - Optional.of(config), dataPersistenceProvider), persistenceId); + MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId); MockRaftActor followerActor = mockActorRef.underlyingActor(); followerActor.getRaftActorContext().setCommitIndex(4); @@ -807,8 +808,7 @@ public class RaftActorTest extends AbstractActorTest { peerAddresses.put(follower2Id, followerActor2.path().toString()); TestActorRef mockActorRef = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, - Optional.of(config), dataPersistenceProvider), persistenceId); + MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId); MockRaftActor leaderActor = mockActorRef.underlyingActor(); leaderActor.getRaftActorContext().setCommitIndex(9); @@ -885,8 +885,7 @@ public class RaftActorTest extends AbstractActorTest { Map peerAddresses = ImmutableMap.builder().put("member1", "address").build(); TestActorRef mockActorRef = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, - Optional.of(config), dataPersistenceProvider), persistenceId); + MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId); MockRaftActor leaderActor = mockActorRef.underlyingActor(); leaderActor.getRaftActorContext().setCommitIndex(3); @@ -933,8 +932,7 @@ public class RaftActorTest extends AbstractActorTest { Map peerAddresses = ImmutableMap.builder().put("member1", "address").build(); TestActorRef mockActorRef = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, - Optional.of(config), dataPersistenceProvider), persistenceId); + MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId); MockRaftActor leaderActor = mockActorRef.underlyingActor(); leaderActor.getRaftActorContext().setCommitIndex(3); @@ -984,7 +982,7 @@ public class RaftActorTest extends AbstractActorTest { InMemoryJournal.addEntry(persistenceId, 1, replLogEntry); TestActorRef ref = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, Optional.of(config))); + MockRaftActor.props(persistenceId, peerAddresses, config)); MockRaftActor mockRaftActor = ref.underlyingActor(); @@ -1010,8 +1008,7 @@ public class RaftActorTest extends AbstractActorTest { Map peerAddresses = ImmutableMap.builder().build(); TestActorRef mockActorRef = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, - Optional.of(config), dataPersistenceProvider), persistenceId); + MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId); MockRaftActor leaderActor = mockActorRef.underlyingActor(); @@ -1067,8 +1064,7 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef actorRef = factory.createTestActor( - MockRaftActor.props(persistenceId, peerAddresses, - Optional.of(emptyConfig), dataPersistenceProvider), persistenceId); + MockRaftActor.props(persistenceId, peerAddresses, emptyConfig, dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = actorRef.underlyingActor(); mockRaftActor.waitForInitializeBehaviorComplete(); @@ -1129,7 +1125,7 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("C"))); TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - ImmutableMap.builder().put("member1", "address").build(), Optional.of(config)). + ImmutableMap.builder().put("member1", "address").build(), config). withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId); MockRaftActor mockRaftActor = raftActorRef.underlyingActor(); @@ -1195,4 +1191,118 @@ public class RaftActorTest extends AbstractActorTest { TEST_LOG.info("testGetSnapshot ending"); } + + @Test + public void testRestoreFromSnapshot() throws Exception { + TEST_LOG.info("testRestoreFromSnapshot starting"); + + String persistenceId = factory.generateActorId("test-actor-"); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + List snapshotUnappliedEntries = new ArrayList<>(); + snapshotUnappliedEntries.add(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, + new MockRaftActorContext.MockPayload("E"))); + + int snapshotLastApplied = 3; + int snapshotLastIndex = 4; + + List state = Arrays.asList( + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D")); + ByteString stateBytes = fromObject(state); + + Snapshot snapshot = Snapshot.create(stateBytes.toByteArray(), snapshotUnappliedEntries, + snapshotLastIndex, 1, snapshotLastApplied, 1, 1, "member-1"); + + InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId); + + TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId). + config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId); + MockRaftActor mockRaftActor = raftActorRef.underlyingActor(); + + mockRaftActor.waitForRecoveryComplete(); + + Snapshot savedSnapshot = InMemorySnapshotStore.waitForSavedSnapshot(persistenceId, Snapshot.class); + assertEquals("getElectionTerm", snapshot.getElectionTerm(), savedSnapshot.getElectionTerm()); + assertEquals("getElectionVotedFor", snapshot.getElectionVotedFor(), savedSnapshot.getElectionVotedFor()); + assertEquals("getLastAppliedIndex", snapshot.getLastAppliedIndex(), savedSnapshot.getLastAppliedIndex()); + assertEquals("getLastAppliedTerm", snapshot.getLastAppliedTerm(), savedSnapshot.getLastAppliedTerm()); + assertEquals("getLastIndex", snapshot.getLastIndex(), savedSnapshot.getLastIndex()); + assertEquals("getLastTerm", snapshot.getLastTerm(), savedSnapshot.getLastTerm()); + assertArrayEquals("getState", snapshot.getState(), savedSnapshot.getState()); + assertEquals("getUnAppliedEntries", snapshot.getUnAppliedEntries(), savedSnapshot.getUnAppliedEntries()); + + verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).applySnapshot(any(byte[].class)); + + RaftActorContext context = mockRaftActor.getRaftActorContext(); + assertEquals("Journal log size", 1, context.getReplicatedLog().size()); + assertEquals("Last index", snapshotLastIndex, context.getReplicatedLog().lastIndex()); + assertEquals("Last applied", snapshotLastApplied, context.getLastApplied()); + assertEquals("Commit index", snapshotLastApplied, context.getCommitIndex()); + assertEquals("Recovered state", state, mockRaftActor.getState()); + assertEquals("Current term", 1L, context.getTermInformation().getCurrentTerm()); + assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor()); + + // Test with data persistence disabled + + snapshot = Snapshot.create(new byte[0], Collections.emptyList(), + -1, -1, -1, -1, 5, "member-1"); + + persistenceId = factory.generateActorId("test-actor-"); + + raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId). + config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)). + persistent(Optional.of(Boolean.FALSE)).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId); + mockRaftActor = raftActorRef.underlyingActor(); + + mockRaftActor.waitForRecoveryComplete(); + assertEquals("snapshot committed", true, + Uninterruptibles.awaitUninterruptibly(mockRaftActor.snapshotCommitted, 5, TimeUnit.SECONDS)); + + context = mockRaftActor.getRaftActorContext(); + assertEquals("Current term", 5L, context.getTermInformation().getCurrentTerm()); + assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor()); + + TEST_LOG.info("testRestoreFromSnapshot ending"); + } + + @Test + public void testRestoreFromSnapshotWithRecoveredData() throws Exception { + TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData starting"); + + String persistenceId = factory.generateActorId("test-actor-"); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + List state = Arrays.asList(new MockRaftActorContext.MockPayload("A")); + Snapshot snapshot = Snapshot.create(fromObject(state).toByteArray(), Arrays.asList(), + 5, 2, 5, 2, 2, "member-1"); + + InMemoryJournal.addEntry(persistenceId, 1, new MockRaftActorContext.MockReplicatedLogEntry(1, 0, + new MockRaftActorContext.MockPayload("B"))); + + TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId). + config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId); + MockRaftActor mockRaftActor = raftActorRef.underlyingActor(); + + mockRaftActor.waitForRecoveryComplete(); + + verify(mockRaftActor.snapshotCohortDelegate, timeout(500).never()).applySnapshot(any(byte[].class)); + + RaftActorContext context = mockRaftActor.getRaftActorContext(); + assertEquals("Journal log size", 1, context.getReplicatedLog().size()); + assertEquals("Last index", 0, context.getReplicatedLog().lastIndex()); + assertEquals("Last applied", -1, context.getLastApplied()); + assertEquals("Commit index", -1, context.getCommitIndex()); + assertEquals("Current term", 0, context.getTermInformation().getCurrentTerm()); + assertEquals("Voted for", null, context.getTermInformation().getVotedFor()); + + TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData ending"); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTestKit.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTestKit.java index 3e747e387e..8150474234 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTestKit.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTestKit.java @@ -12,9 +12,7 @@ import akka.actor.ActorSystem; import akka.pattern.Patterns; import akka.testkit.JavaTestKit; import akka.util.Timeout; -import com.google.common.base.Optional; import com.google.common.util.concurrent.Uninterruptibles; -import java.util.Collections; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.junit.Assert; @@ -31,8 +29,7 @@ public class RaftActorTestKit extends JavaTestKit { public RaftActorTestKit(ActorSystem actorSystem, String actorName) { super(actorSystem); - raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName, - Collections.emptyMap(), Optional.absent()), actorName); + raftActor = this.getSystem().actorOf(MockRaftActor.builder().id(actorName).props(), actorName); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java index 01f3375675..e16d794974 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java @@ -16,12 +16,15 @@ import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.snapshot.japi.SnapshotStore; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -36,6 +39,7 @@ public class InMemorySnapshotStore extends SnapshotStore { static final Logger LOG = LoggerFactory.getLogger(InMemorySnapshotStore.class); private static Map> snapshots = new ConcurrentHashMap<>(); + private static final Map snapshotSavedLatches = new ConcurrentHashMap<>(); public static void addSnapshot(String persistentId, Object snapshot) { List snapshotList = snapshots.get(persistentId); @@ -75,6 +79,18 @@ public class InMemorySnapshotStore extends SnapshotStore { snapshots.clear(); } + public static void addSnapshotSavedLatch(String persistenceId) { + snapshotSavedLatches.put(persistenceId, new CountDownLatch(1)); + } + + public static T waitForSavedSnapshot(String persistenceId, Class type) { + if(!Uninterruptibles.awaitUninterruptibly(snapshotSavedLatches.get(persistenceId), 5, TimeUnit.SECONDS)) { + throw new AssertionError("Snapshot was not saved"); + } + + return getSnapshots(persistenceId, type).get(0); + } + @Override public Future> doLoadAsync(String s, SnapshotSelectionCriteria snapshotSelectionCriteria) { @@ -101,6 +117,11 @@ public class InMemorySnapshotStore extends SnapshotStore { snapshotList.add(new StoredSnapshot(snapshotMetadata, o)); } + CountDownLatch latch = snapshotSavedLatches.get(snapshotMetadata.persistenceId()); + if(latch != null) { + latch.countDown(); + } + return Futures.successful(null); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index f8c1db9879..87d591dd22 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -125,4 +125,10 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { log.error("{}: Failed to apply recovery snapshot", shardName, e); } } + + @Override + public byte[] getRestoreFromSnapshot() { + // TODO Auto-generated method stub + return null; + } } -- 2.36.6