import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.japi.Procedure;
-import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null);
+ private final PersistentDataProvider persistentProvider;
+
private RaftActorRecoverySupport raftRecovery;
private RaftActorSnapshotMessageSupport snapshotSupport;
public RaftActor(String id, Map<String, String> peerAddresses,
Optional<ConfigParams> configParams, short payloadVersion) {
+ persistentProvider = new PersistentDataProvider(this);
context = new RaftActorContextImpl(this.getSelf(),
- this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG),
+ this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
-1, -1, peerAddresses,
(configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
delegatingPersistenceProvider, LOG);
raftRecovery = newRaftActorRecoverySupport();
}
- boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message);
+ boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message, persistentProvider);
if(recoveryComplete) {
- if(!persistence().isRecoveryApplicable()) {
- // Delete all the messages from the akka journal so that we do not end up with consistency issues
- // Note I am not using the dataPersistenceProvider and directly using the akka api here
- deleteMessages(lastSequenceNr());
-
- // Delete all the akka snapshots as they will not be needed
- deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
- }
-
onRecoveryComplete();
initializeBehavior();
import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.base.Stopwatch;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
private final RaftActorRecoveryCohort cohort;
private int currentRecoveryBatchCount;
+ private boolean dataRecoveredWithPersistenceDisabled;
private Stopwatch recoveryTimer;
private final Logger log;
this.log = context.getLogger();
}
- boolean handleRecoveryMessage(Object message) {
+ boolean handleRecoveryMessage(Object message, PersistentDataProvider persistentProvider) {
+ log.trace("handleRecoveryMessage: {}", message);
+
boolean recoveryComplete = false;
- if(context.getPersistenceProvider().isRecoveryApplicable()) {
+ DataPersistenceProvider persistence = context.getPersistenceProvider();
+ if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm) {
+ // Handle this message for backwards compatibility with pre-Lithium versions.
+ org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm update =
+ (org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm)message;
+ context.getTermInformation().update(update.getCurrentTerm(), update.getVotedFor());
+ } else if (message instanceof UpdateElectionTerm) {
+ context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+ ((UpdateElectionTerm) message).getVotedFor());
+ } else if(persistence.isRecoveryApplicable()) {
if (message instanceof SnapshotOffer) {
onRecoveredSnapshot((SnapshotOffer) message);
} else if (message instanceof ReplicatedLogEntry) {
} else if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) {
// Handle this message for backwards compatibility with pre-Lithium versions.
replicatedLog().removeFrom(((org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) message).getFromIndex());
- } else if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm) {
- // Handle this message for backwards compatibility with pre-Lithium versions.
- org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm update =
- (org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm)message;
- context.getTermInformation().update(update.getCurrentTerm(), update.getVotedFor());
- } else if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
- ((UpdateElectionTerm) message).getVotedFor());
} else if (message instanceof RecoveryCompleted) {
onRecoveryCompletedMessage();
recoveryComplete = true;
}
} else if (message instanceof RecoveryCompleted) {
recoveryComplete = true;
+
+ if(dataRecoveredWithPersistenceDisabled) {
+ // Data persistence is disabled but we recovered some data entries so we must have just
+ // transitioned to disabled or a persistence backup was restored. Either way, delete all the
+ // messages from the akka journal for efficiency and so that we do not end up with consistency
+ // issues in case persistence is -re-enabled.
+ persistentProvider.deleteMessages(persistentProvider.getLastSequenceNumber());
+
+ // Delete all the akka snapshots as they will not be needed
+ persistentProvider.deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(),
+ scala.Long.MaxValue()));
+
+ // Since we cleaned out the journal, we need to re-write the current election info.
+ context.getTermInformation().updateAndPersist(context.getTermInformation().getCurrentTerm(),
+ context.getTermInformation().getVotedFor());
+ }
+ } else {
+ dataRecoveredWithPersistenceDisabled = true;
}
return recoveryComplete;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
return this.getId();
}
+ protected void newBehavior(RaftActorBehavior newBehavior) {
+ self().tell(newBehavior, ActorRef.noSender());
+ }
+
+ @Override
+ public void handleCommand(final Object message) {
+ if(message instanceof RaftActorBehavior) {
+ super.changeCurrentBehavior((RaftActorBehavior)message);
+ } else {
+ super.handleCommand(message);
+ }
+ }
+
public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
Object obj = null;
ByteArrayInputStream bis = null;
package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import akka.japi.Procedure;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotMetadata;
import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
import java.util.Arrays;
import java.util.Collections;
+import org.hamcrest.Description;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentMatcher;
import org.mockito.InOrder;
+import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
@Mock
private RaftActorRecoveryCohort mockCohort;
+ @Mock
+ PersistentDataProvider mockPersistentProvider;
+
private RaftActorRecoverySupport support;
private RaftActorContext context;
public void setup() {
MockitoAnnotations.initMocks(this);
- context = new RaftActorContextImpl(null, null, "test", new ElectionTermImpl(mockPersistence, "test", LOG),
+ context = new RaftActorContextImpl(null, null, "test", new ElectionTermImpl(mockPersistentProvider, "test", LOG),
-1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort);
}
private void sendMessageToSupport(Object message, boolean expComplete) {
- boolean complete = support.handleRecoveryMessage(message);
+ boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
assertEquals("complete", expComplete, complete);
}
assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
}
+ @SuppressWarnings("unchecked")
@Test
- public void testRecoveryWithPersistenceDisabled() {
+ public void testDataRecoveredWithPersistenceDisabled() {
doReturn(false).when(mockPersistence).isRecoveryApplicable();
+ doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
+
+ sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
+ assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
+
+ sendMessageToSupport(RecoveryCompleted.getInstance(), true);
+
+ verifyNoMoreInteractions(mockCohort);
+
+ verify(mockPersistentProvider).deleteMessages(10L);
+ verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+ verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class));
+ }
+
+ static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
+ return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
+ @Override
+ public boolean matches(Object argument) {
+ UpdateElectionTerm other = (UpdateElectionTerm) argument;
+ return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendValue(new UpdateElectionTerm(term, votedFor));
+ }
+ });
+ }
+
+ @Test
+ public void testNoDataRecoveredWithPersistenceDisabled() {
+ doReturn(false).when(mockPersistence).isRecoveryApplicable();
+
sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
- assertEquals("Current term", 0, context.getTermInformation().getCurrentTerm());
- assertEquals("Voted For", null, context.getTermInformation().getVotedFor());
+ assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
+ assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
sendMessageToSupport(RecoveryCompleted.getInstance(), true);
- verifyNoMoreInteractions(mockCohort);
+ verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
}
}
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
+import akka.dispatch.Dispatchers;
import akka.japi.Procedure;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
import org.junit.Test;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
}};
}
+ @Test
+ public void testUpdateElectionTermPersistedWithPersistenceDisabled() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String persistenceId = factory.generateActorId("follower-");
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ config.setElectionTimeoutFactor(1);
+
+ InMemoryJournal.addWriteMessagesCompleteLatch(persistenceId, 1);
+
+ TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
+ ImmutableMap.<String, String>builder().put("member1", "address").build(),
+ Optional.<ConfigParams>of(config), new NonPersistentDataProvider()).
+ withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+
+ InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
+ List<UpdateElectionTerm> entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
+ assertEquals("UpdateElectionTerm entries", 1, entries.size());
+ UpdateElectionTerm updateEntry = entries.get(0);
+
+ factory.killActor(ref, this);
+
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ ref = factory.createTestActor(MockRaftActor.props(persistenceId,
+ ImmutableMap.<String, String>builder().put("member1", "address").build(),
+ Optional.<ConfigParams>of(config), new NonPersistentDataProvider()).
+ withDispatcher(Dispatchers.DefaultDispatcherId()),
+ factory.generateActorId("follower-"));
+
+ MockRaftActor actor = ref.underlyingActor();
+ actor.waitForRecoveryComplete();
+
+ RaftActorContext newContext = actor.getRaftActorContext();
+ assertEquals("electionTerm", updateEntry.getCurrentTerm(),
+ newContext.getTermInformation().getCurrentTerm());
+ assertEquals("votedFor", updateEntry.getVotedFor(), newContext.getTermInformation().getVotedFor());
+
+ entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
+ assertEquals("UpdateElectionTerm entries", 1, entries.size());
+ }};
+ }
+
@Test
public void testRaftActorForwardsToRaftActorRecoverySupport() {
String persistenceId = factory.generateActorId("leader-");
new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(6, "member3");
mockRaftActor.handleRecover(deprecatedUpdateElectionTerm);
- verify(mockSupport).handleRecoveryMessage(same(snapshotOffer));
- verify(mockSupport).handleRecoveryMessage(same(logEntry));
- verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries));
- verify(mockSupport).handleRecoveryMessage(same(applyLogEntries));
- verify(mockSupport).handleRecoveryMessage(same(deleteEntries));
- verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries));
- verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm));
- verify(mockSupport).handleRecoveryMessage(same(deprecatedUpdateElectionTerm));
+ verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class));
+ verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class));
+ verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class));
+ verify(mockSupport).handleRecoveryMessage(same(applyLogEntries), any(PersistentDataProvider.class));
+ verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class));
+ verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries), any(PersistentDataProvider.class));
+ verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class));
+ verify(mockSupport).handleRecoveryMessage(same(deprecatedUpdateElectionTerm), any(PersistentDataProvider.class));
}
@Test
mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
- verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
+ verify(dataPersistenceProvider).persist(any(ApplyJournalEntries.class), any(Procedure.class));
}
TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
- new NonPersistentDataProvider()), persistenceId);
+ new NonPersistentDataProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
}
};
- raftActor.changeCurrentBehavior(follower);
+ raftActor.newBehavior(follower);
leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
assertEquals(persistenceId, leaderStateChange.getMemberId());
return prefix + actorCount++;
}
+ public void killActor(ActorRef actor, JavaTestKit kit) {
+ killActor(actor, kit, true);
+ }
+
+ private void killActor(ActorRef actor, JavaTestKit kit, boolean remove) {
+ LOG.info("Killing actor {}", actor);
+ kit.watch(actor);
+ actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ kit.expectTerminated(JavaTestKit.duration("5 seconds"), actor);
+
+ if(remove) {
+ createdActors.remove(actor);
+ }
+ }
+
@Override
public void close() {
- new JavaTestKit(system) {{
- for(ActorRef actor : createdActors) {
- watch(actor);
- LOG.info("Killing actor {}", actor);
- actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- expectTerminated(duration("5 seconds"), actor);
- }
- }};
+ JavaTestKit kit = new JavaTestKit(system);
+ for(ActorRef actor : createdActors) {
+ killActor(actor, kit, false);
+ }
}
}
\ No newline at end of file
@Override
public Future<Long> doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) {
// Akka calls this during recovery.
-
Map<Long, Object> journal = journals.get(persistenceId);
if(journal == null) {
- return Futures.successful(-1L);
+ return Futures.successful(fromSequenceNr);
}
synchronized (journal) {
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
-
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
"testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(creator)),
+ Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
"testRegisterChangeListenerWhenNotLeaderInitially");
// Write initial data into the in-memory store.
"testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(creator)),
+ Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
"testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
final YangInstanceIdentifier path = TestModel.TEST_PATH;
"testDataChangeListenerOnFollower-DataChangeListener");
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(creator)),
- "testDataChangeListenerOnFollower");
+ Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()).
+ withDispatcher(Dispatchers.DefaultDispatcherId()),"testDataChangeListenerOnFollower");
assertEquals("Got first ElectionTimeout", true,
onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
member1ShardID.toString());
final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(leaderShardCreator)),
+ Props.create(new DelegatingShardCreator(leaderShardCreator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
member2ShardID.toString());
// Sleep to let election happen
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);