package org.opendaylight.controller.cluster.raft;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.slf4j.Logger;
/**
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
}
/**
- * @deprecated Deprecated in favor of {@link org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries}
+ * @deprecated Deprecated in favor of {@link org.opendaylight.controller.cluster.raft.persisted.DeleteEntries}
* whose type for fromIndex is long instead of int. This class was kept for backwards
* compatibility with Helium.
*/
public int getFromIndex() {
return fromIndex;
}
+
+ private Object readResolve() {
+ return org.opendaylight.controller.cluster.raft.persisted.DeleteEntries.createMigrated(fromIndex);
+ }
}
/**
- * @deprecated Deprecated in favor of non-inner class {@link org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm}
+ * @deprecated Deprecated in favor of non-inner class {@link org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm}
* which has serialVersionUID set. This class was kept for backwards compatibility with Helium.
*/
// Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility.
public String getVotedFor() {
return votedFor;
}
+
+ private Object readResolve() {
+ return org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm.createMigrated(
+ currentTerm, votedFor);
+ }
}
/**
import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotOffer;
-import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.base.Stopwatch;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import java.util.Collections;
import org.opendaylight.controller.cluster.PersistentDataProvider;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
import org.slf4j.Logger;
-
/**
* Support class that handles persistence recovery for a RaftActor.
*
private int currentRecoveryBatchCount;
private boolean dataRecoveredWithPersistenceDisabled;
private boolean anyDataRecovered;
+ private boolean hasMigratedDataRecovered;
private Stopwatch recoveryTimer;
private final Logger log;
anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted);
+ if(isMigratedSerializable(message)) {
+ hasMigratedDataRecovered = true;
+ }
+
boolean recoveryComplete = false;
- DataPersistenceProvider persistence = context.getPersistenceProvider();
- if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm) {
- // Handle this message for backwards compatibility with pre-Lithium versions.
- org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm update =
- (org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm)message;
- context.getTermInformation().update(update.getCurrentTerm(), update.getVotedFor());
- } else if (message instanceof UpdateElectionTerm) {
+ if (message instanceof UpdateElectionTerm) {
context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
((UpdateElectionTerm) message).getVotedFor());
- } else if(persistence.isRecoveryApplicable()) {
- if (message instanceof SnapshotOffer) {
- onRecoveredSnapshot((SnapshotOffer) message);
- } else if (message instanceof ReplicatedLogEntry) {
- onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
- } else if (message instanceof ApplyJournalEntries) {
- onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
- } else if (message instanceof DeleteEntries) {
- replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
- } else if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) {
- // Handle this message for backwards compatibility with pre-Lithium versions.
- replicatedLog().removeFrom(((org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) message).getFromIndex());
- } else if (message instanceof RecoveryCompleted) {
- onRecoveryCompletedMessage();
- possiblyRestoreFromSnapshot();
- recoveryComplete = true;
- }
+ } else if (message instanceof SnapshotOffer) {
+ onRecoveredSnapshot((SnapshotOffer) message);
+ } else if (message instanceof ReplicatedLogEntry) {
+ onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
+ } else if (message instanceof ApplyJournalEntries) {
+ onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
+ } else if (message instanceof DeleteEntries) {
+ onDeleteEntries((DeleteEntries) message);
} else if (message instanceof RecoveryCompleted) {
recoveryComplete = true;
-
- if(dataRecoveredWithPersistenceDisabled) {
- // Data persistence is disabled but we recovered some data entries so we must have just
- // transitioned to disabled or a persistence backup was restored. Either way, delete all the
- // messages from the akka journal for efficiency and so that we do not end up with consistency
- // issues in case persistence is -re-enabled.
- persistentProvider.deleteMessages(persistentProvider.getLastSequenceNumber());
-
- // Delete all the akka snapshots as they will not be needed
- persistentProvider.deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(),
- scala.Long.MaxValue(), 0L, 0L));
-
- // Since we cleaned out the journal, we need to re-write the current election info.
- context.getTermInformation().updateAndPersist(context.getTermInformation().getCurrentTerm(),
- context.getTermInformation().getVotedFor());
- }
-
- onRecoveryCompletedMessage();
- possiblyRestoreFromSnapshot();
- } else {
- boolean isServerConfigPayload = false;
- if(message instanceof ReplicatedLogEntry){
- ReplicatedLogEntry repLogEntry = (ReplicatedLogEntry)message;
- if(isServerConfigurationPayload(repLogEntry)){
- isServerConfigPayload = true;
- context.updatePeerIds((ServerConfigurationPayload)repLogEntry.getData());
- }
- }
-
- if(!isServerConfigPayload){
- dataRecoveredWithPersistenceDisabled = true;
- }
+ onRecoveryCompletedMessage(persistentProvider);
}
return recoveryComplete;
Snapshot snapshot = (Snapshot) offer.snapshot();
+ for(ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
+ if(isMigratedPayload(entry)) {
+ hasMigratedDataRecovered = true;
+ }
+ }
+
+ if(!context.getPersistenceProvider().isRecoveryApplicable()) {
+ // We may have just transitioned to disabled and have a snapshot containing state data and/or log
+ // entries - we don't want to preserve these, only the server config and election term info.
+
+ snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1,
+ snapshot.getElectionTerm(), snapshot.getElectionVotedFor(), snapshot.getServerConfiguration());
+ }
+
// Create a replicated log with the snapshot information
// The replicated log can be used later on to retrieve this snapshot
// when we need to install it on a peer
if (snapshot.getServerConfiguration() != null) {
context.updatePeerIds(snapshot.getServerConfiguration());
+
+ if(isMigratedSerializable(snapshot.getServerConfiguration())) {
+ hasMigratedDataRecovered = true;
+ }
}
timer.stop();
if(isServerConfigurationPayload(logEntry)){
context.updatePeerIds((ServerConfigurationPayload)logEntry.getData());
}
- replicatedLog().append(logEntry);
+
+ if(isMigratedPayload(logEntry)) {
+ hasMigratedDataRecovered = true;
+ }
+
+ if(context.getPersistenceProvider().isRecoveryApplicable()) {
+ replicatedLog().append(logEntry);
+ } else if(!isPersistentPayload(logEntry)) {
+ dataRecoveredWithPersistenceDisabled = true;
+ }
}
private void onRecoveredApplyLogEntries(long toIndex) {
+ if(!context.getPersistenceProvider().isRecoveryApplicable()) {
+ dataRecoveredWithPersistenceDisabled = true;
+ return;
+ }
+
long lastUnappliedIndex = context.getLastApplied() + 1;
if(log.isDebugEnabled()) {
context.setCommitIndex(lastApplied);
}
+ private void onDeleteEntries(DeleteEntries deleteEntries) {
+ if(context.getPersistenceProvider().isRecoveryApplicable()) {
+ replicatedLog().removeFrom(deleteEntries.getFromIndex());
+ } else {
+ dataRecoveredWithPersistenceDisabled = true;
+ }
+ }
+
private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
initRecoveryTimer();
currentRecoveryBatchCount = 0;
}
- private void onRecoveryCompletedMessage() {
+ private void onRecoveryCompletedMessage(PersistentDataProvider persistentProvider) {
if(currentRecoveryBatchCount > 0) {
endCurrentLogRecoveryBatch();
}
" Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, " +
"journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
replicatedLog().getSnapshotTerm(), replicatedLog().size());
+
+ if(dataRecoveredWithPersistenceDisabled ||
+ (hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable())) {
+ if(hasMigratedDataRecovered) {
+ log.info("{}: Saving snapshot after recovery due to migrated messages", context.getId());
+ } else {
+ log.info("{}: Saving snapshot after recovery due to data persistence disabled", context.getId());
+ }
+
+ // Either data persistence is disabled and we recovered some data entries (ie we must have just
+ // transitioned to disabled or a persistence backup was restored) or we recovered migrated
+ // messages. Either way, we persist a snapshot and delete all the messages from the akka journal
+ // to clean out unwanted messages.
+
+ Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(), -1, -1, -1, -1,
+ context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
+ context.getPeerServerInfo(true));
+
+ persistentProvider.saveSnapshot(snapshot);
+
+ persistentProvider.deleteMessages(persistentProvider.getLastSequenceNumber());
+ } else if(hasMigratedDataRecovered) {
+ log.info("{}: Snapshot capture initiated after recovery due to migrated messages", context.getId());
+
+ context.getSnapshotManager().capture(replicatedLog().last(), -1);
+ } else {
+ possiblyRestoreFromSnapshot();
+ }
}
private static boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry){
- return (repLogEntry.getData() instanceof ServerConfigurationPayload);
+ return repLogEntry.getData() instanceof ServerConfigurationPayload;
+ }
+
+ private static boolean isPersistentPayload(ReplicatedLogEntry repLogEntry){
+ return repLogEntry.getData() instanceof PersistentPayload;
+ }
+
+ private static boolean isMigratedPayload(ReplicatedLogEntry repLogEntry){
+ return isMigratedSerializable(repLogEntry.getData());
+ }
+
+ private static boolean isMigratedSerializable(Object message){
+ return message instanceof MigratedSerializable && ((MigratedSerializable)message).isMigrated();
}
}
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
-import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
/**
* Implementation of ReplicatedLog used by the RaftActor.
* behavior to the RaftActor to persist.
*
* @author Thomas Pantelis
+ *
+ * @deprecated Use {@link org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries} instead.
*/
+@Deprecated
public class ApplyJournalEntries implements Serializable {
private static final long serialVersionUID = 1L;
return toIndex;
}
+ private Object readResolve() {
+ return org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries.createMigrated(toIndex);
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
* Internal message that is stored in the akka's persistent journal to delete journal entries.
*
* @author Thomas Pantelis
+ *
+ * @deprecated Use {@link org.opendaylight.controller.cluster.raft.persisted.DeleteEntries} instead.
*/
+@Deprecated
public class DeleteEntries implements Serializable {
private static final long serialVersionUID = 1L;
return fromIndex;
}
+ private Object readResolve() {
+ return org.opendaylight.controller.cluster.raft.persisted.DeleteEntries.createMigrated(fromIndex);
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
/**
* Message class to persist election term information.
+ *
+ * @deprecated Use {@link org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm} instead.
*/
+@Deprecated
public class UpdateElectionTerm implements Serializable {
private static final long serialVersionUID = 1L;
return votedFor;
}
+ private Object readResolve() {
+ return org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm.createMigrated(
+ currentTerm, votedFor);
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.slf4j.Logger;
import scala.concurrent.duration.FiniteDuration;
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+
+/**
+ * This is an internal message that is stored in the akka's persistent journal. During recovery, this
+ * message is used to apply recovered journal entries to the state whose indexes range from the context's
+ * current lastApplied index to "toIndex" contained in the message. This message is sent internally from a
+ * behavior to the RaftActor to persist.
+ *
+ * @author Thomas Pantelis
+ */
+public class ApplyJournalEntries implements Serializable, MigratedSerializable {
+ private static final class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private ApplyJournalEntries applyEntries;
+
+ public Proxy() {
+ // For Externalizable
+ }
+
+ Proxy(final ApplyJournalEntries applyEntries) {
+ this.applyEntries = applyEntries;
+ }
+
+ @Override
+ public void writeExternal(final ObjectOutput out) throws IOException {
+ out.writeLong(applyEntries.toIndex);
+ }
+
+ @Override
+ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ applyEntries = new ApplyJournalEntries(in.readLong());
+ }
+
+ private Object readResolve() {
+ return applyEntries;
+ }
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ private final long toIndex;
+ private final boolean migrated;
+
+ private ApplyJournalEntries(final long toIndex, final boolean migrated) {
+ this.toIndex = toIndex;
+ this.migrated = migrated;
+ }
+
+ public ApplyJournalEntries(final long toIndex) {
+ this(toIndex, false);
+ }
+
+ public long getToIndex() {
+ return toIndex;
+ }
+
+ @Override
+ public boolean isMigrated() {
+ return migrated;
+ }
+
+ @Override
+ public Object writeReplace() {
+ return new Proxy(this);
+ }
+
+ @Deprecated
+ public static ApplyJournalEntries createMigrated(final long fromIndex) {
+ return new ApplyJournalEntries(fromIndex, true);
+ }
+
+ @Override
+ public String toString() {
+ return "ApplyJournalEntries [toIndex=" + toIndex + "]";
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+
+/**
+ * Internal message that is stored in the akka's persistent journal to delete journal entries.
+ *
+ * @author Thomas Pantelis
+ */
+public class DeleteEntries implements Serializable, MigratedSerializable {
+ private static final class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private DeleteEntries deleteEntries;
+
+ public Proxy() {
+ // For Externalizable
+ }
+
+ Proxy(final DeleteEntries deleteEntries) {
+ this.deleteEntries = deleteEntries;
+ }
+
+ @Override
+ public void writeExternal(final ObjectOutput out) throws IOException {
+ out.writeLong(deleteEntries.fromIndex);
+ }
+
+ @Override
+ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ deleteEntries = new DeleteEntries(in.readLong());
+ }
+
+ private Object readResolve() {
+ return deleteEntries;
+ }
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ private final long fromIndex;
+ private final boolean migrated;
+
+ private DeleteEntries(final long fromIndex, final boolean migrated) {
+ this.fromIndex = fromIndex;
+ this.migrated = migrated;
+ }
+
+ public DeleteEntries(final long fromIndex) {
+ this(fromIndex, false);
+ }
+
+ public long getFromIndex() {
+ return fromIndex;
+ }
+
+ @Override
+ public boolean isMigrated() {
+ return migrated;
+ }
+
+ @Override
+ public Object writeReplace() {
+ return new Proxy(this);
+ }
+
+ @Deprecated
+ public static DeleteEntries createMigrated(final long fromIndex) {
+ return new DeleteEntries(fromIndex, true);
+ }
+
+ @Override
+ public String toString() {
+ return "DeleteEntries [fromIndex=" + fromIndex + "]";
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+
+/**
+ * Message class to persist election term information.
+ */
+public class UpdateElectionTerm implements Serializable, MigratedSerializable {
+ private static final class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private UpdateElectionTerm updateElectionTerm;
+
+ public Proxy() {
+ // For Externalizable
+ }
+
+ Proxy(final UpdateElectionTerm updateElectionTerm) {
+ this.updateElectionTerm = updateElectionTerm;
+ }
+
+ @Override
+ public void writeExternal(final ObjectOutput out) throws IOException {
+ out.writeLong(updateElectionTerm.currentTerm);
+ out.writeObject(updateElectionTerm.votedFor);
+ }
+
+ @Override
+ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ updateElectionTerm = new UpdateElectionTerm(in.readLong(), (String) in.readObject());
+ }
+
+ private Object readResolve() {
+ return updateElectionTerm;
+ }
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ private final long currentTerm;
+ private final String votedFor;
+ private final boolean migrated;
+
+ private UpdateElectionTerm(final long currentTerm, final String votedFor, final boolean migrated) {
+ this.currentTerm = currentTerm;
+ this.votedFor = votedFor;
+ this.migrated = migrated;
+ }
+
+ public UpdateElectionTerm(final long currentTerm, final String votedFor) {
+ this(currentTerm, votedFor, false);
+ }
+
+ public long getCurrentTerm() {
+ return currentTerm;
+ }
+
+ public String getVotedFor() {
+ return votedFor;
+ }
+
+ @Override
+ public boolean isMigrated() {
+ return migrated;
+ }
+
+ @Override
+ public Object writeReplace() {
+ return new Proxy(this);
+ }
+
+ @Deprecated
+ public static UpdateElectionTerm createMigrated(final long currentTerm, final String votedFor) {
+ return new UpdateElectionTerm(currentTerm, votedFor, true);
+ }
+
+ @Override
+ public String toString() {
+ return "UpdateElectionTerm [currentTerm=" + currentTerm + ", votedFor=" + votedFor + "]";
+ }
+}
+
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import akka.actor.ActorRef;
+import akka.dispatch.Dispatchers;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+
+/**
+ * Unit tests for migrated messages on recovery.
+ *
+ * @author Thomas Pantelis
+ */
+public class MigratedMessagesTest extends AbstractActorTest {
+ static final Logger TEST_LOG = LoggerFactory.getLogger(MigratedMessagesTest.class);
+
+ private TestActorFactory factory;
+
+ @Before
+ public void setUp(){
+ factory = new TestActorFactory(getSystem());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ factory.close();
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
+ }
+
+ @Test
+ public void testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled() {
+ TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled starting");
+ doTestSnapshotAfterStartupWithMigratedServerConfigPayload(true);
+ TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled ending");
+ }
+
+ @Test
+ public void testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled() {
+ TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled starting");
+
+ TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedServerConfigPayload(false);
+ MockRaftActor mockRaftActor = actor.underlyingActor();
+ String id = mockRaftActor.persistenceId();
+ ConfigParams config = mockRaftActor.getRaftActorContext().getConfigParams();
+
+ factory.killActor(actor, new JavaTestKit(getSystem()));
+
+ actor = factory.createTestActor(MockRaftActor.builder().id(id).config(config).persistent(Optional.of(false)).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+ mockRaftActor = actor.underlyingActor();
+ mockRaftActor.waitForRecoveryComplete();
+
+ assertEquals("electionTerm", 1,
+ mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+ assertEquals("votedFor", id,
+ mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
+
+ TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled ending");
+ }
+
+ @Test
+ public void testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled() {
+ TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled starting");
+
+ String persistenceId = factory.generateActorId("test-actor-");
+
+ org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm updateElectionTerm =
+ new org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm(5, persistenceId);
+
+ InMemoryJournal.addEntry(persistenceId, 1, updateElectionTerm);
+
+ doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
+ assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
+ assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
+ });
+
+ TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled ending");
+ }
+
+ @Test
+ public void testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled() {
+ TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled starting");
+
+ String persistenceId = factory.generateActorId("test-actor-");
+
+ org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm updateElectionTerm =
+ new org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm(5, persistenceId);
+
+ InMemoryJournal.addEntry(persistenceId, 1, updateElectionTerm);
+
+ doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, false, snapshot -> {
+ assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
+ assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
+ });
+
+ TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled ending");
+ }
+
+ @Test
+ public void testSnapshotAfterStartupWithMigratedApplyJournalEntries() {
+ TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries starting");
+
+ String persistenceId = factory.generateActorId("test-actor-");
+
+ InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
+ InMemoryJournal.addEntry(persistenceId, 2, new ReplicatedLogImplEntry(0, 1,
+ new MockRaftActorContext.MockPayload("A")));
+ InMemoryJournal.addEntry(persistenceId, 3,
+ new org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries(0));
+
+
+ doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
+ assertEquals("getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
+ assertEquals("getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
+ assertEquals("getLastIndex", 0, snapshot.getLastIndex());
+ assertEquals("getLastTerm", 1, snapshot.getLastTerm());
+ });
+
+ TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries ending");
+ }
+
+ @Test
+ public void testNoSnapshotAfterStartupWithNoMigratedMessages() {
+ String id = factory.generateActorId("test-actor-");
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+ RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
+ @Override
+ public void createSnapshot(ActorRef actorRef) {
+ actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
+ }
+
+ @Override
+ public void applySnapshot(byte[] snapshotBytes) {
+ }
+ };
+
+ TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id).
+ config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(true)).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+ MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+ mockRaftActor.waitForRecoveryComplete();
+
+ Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
+
+ List<Snapshot> snapshots = InMemorySnapshotStore.getSnapshots(id, Snapshot.class);
+ assertEquals("Snapshots", 0, snapshots.size());
+ }
+
+ private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedServerConfigPayload(boolean persistent) {
+ String persistenceId = factory.generateActorId("test-actor-");
+
+ org.opendaylight.controller.cluster.raft.ServerConfigurationPayload persistedServerConfig =
+ new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload(Arrays.asList(
+ new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo(
+ persistenceId, true),
+ new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo(
+ "downNode", true)));
+
+ ServerConfigurationPayload expectedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo(persistenceId, true), new ServerInfo("downNode", true)));
+
+ InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
+ InMemoryJournal.addEntry(persistenceId, 3, new ReplicatedLogImplEntry(0, 1, persistedServerConfig));
+
+ TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedMessage(persistenceId,
+ persistent, snapshot -> {
+ assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
+ assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
+ assertEquals("getServerConfiguration", new HashSet<>(expectedServerConfig.getServerConfig()),
+ new HashSet<>(snapshot.getServerConfiguration().getServerConfig()));
+ });
+
+ return actor;
+ }
+
+
+ private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedMessage(String id, boolean persistent,
+ Consumer<Snapshot> snapshotVerifier) {
+ InMemorySnapshotStore.addSnapshotSavedLatch(id);
+ InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+ RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
+ @Override
+ public void createSnapshot(ActorRef actorRef) {
+ actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
+ }
+
+ @Override
+ public void applySnapshot(byte[] snapshotBytes) {
+ }
+ };
+
+ TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id).
+ config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(persistent)).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+ MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+ mockRaftActor.waitForRecoveryComplete();
+
+ Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
+ snapshotVerifier.accept(snapshot);
+
+ InMemoryJournal.waitForDeleteMessagesComplete(id);
+
+ assertEquals("InMemoryJournal size", 0, InMemoryJournal.get(id).size());
+
+ return raftActorRef;
+ }
+
+}
state = new ArrayList<>();
this.actorDelegate = mock(RaftActor.class);
this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
- this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
+
+ this.snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort :
+ mock(RaftActorSnapshotCohort.class);
if(builder.dataPersistenceProvider == null){
setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
private Optional<Boolean> persistent = Optional.absent();
private final Class<A> actorClass;
private Function<Runnable, Void> pauseLeaderFunction;
+ private RaftActorSnapshotCohort snapshotCohort;
protected AbstractBuilder(Class<A> actorClass) {
this.actorClass = actorClass;
return self();
}
+ public T snapshotCohort(RaftActorSnapshotCohort snapshotCohort) {
+ this.snapshotCohort = snapshotCohort;
+ return self();
+ }
+
public Props props() {
return Props.create(actorClass, this);
}
package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
-import akka.actor.ActorRef;
-import akka.dispatch.Dispatchers;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import akka.actor.ActorRef;
+import akka.dispatch.Dispatchers;
import scala.concurrent.duration.FiniteDuration;
/**
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.AdditionalMatchers.aryEq;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
-import akka.japi.Procedure;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotMetadata;
import akka.persistence.SnapshotOffer;
-import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.Collections;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.PersistentDataProvider;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
verifyNoMoreInteractions(mockCohort);
}
- @Test
- public void testOnDeprecatedDeleteEntries() {
- ReplicatedLog replicatedLog = context.getReplicatedLog();
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 0, new MockRaftActorContext.MockPayload("0")));
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 1, new MockRaftActorContext.MockPayload("1")));
- replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
- 2, new MockRaftActorContext.MockPayload("2")));
-
- sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1));
-
- assertEquals("Journal log size", 1, context.getReplicatedLog().size());
- assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
- }
-
@Test
public void testOnDeleteEntries() {
ReplicatedLog replicatedLog = context.getReplicatedLog();
assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
}
- @Test
- public void testDeprecatedUpdateElectionTerm() {
-
- sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(5, "member2"));
-
- assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
- assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
- }
-
@SuppressWarnings("unchecked")
@Test
public void testDataRecoveredWithPersistenceDisabled() {
+ doNothing().when(mockCohort).applyRecoverySnapshot(aryEq(new byte[0]));
doReturn(false).when(mockPersistence).isRecoveryApplicable();
doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
- sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
-
Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
sendMessageToSupport(snapshotOffer);
+ sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
+
sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
4, new MockRaftActorContext.MockPayload("4")));
sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
sendMessageToSupport(new DeleteEntries(5));
- sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(5));
-
assertEquals("Journal log size", 0, context.getReplicatedLog().size());
assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
assertEquals("Last applied", -1, context.getLastApplied());
sendMessageToSupport(RecoveryCompleted.getInstance(), true);
- verify(mockCohort).getRestoreFromSnapshot();
+ verify(mockCohort).applyRecoverySnapshot(aryEq(new byte[0]));
+ verify(mockCohort, never()).getRestoreFromSnapshot();
verifyNoMoreInteractions(mockCohort);
verify(mockPersistentProvider).deleteMessages(10L);
- verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
- verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class));
}
static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
-import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
DeleteEntries deleteEntries = new DeleteEntries(1);
mockRaftActor.handleRecover(deleteEntries);
- org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries deprecatedDeleteEntries =
- new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1);
- mockRaftActor.handleRecover(deprecatedDeleteEntries);
-
UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
mockRaftActor.handleRecover(updateElectionTerm);
- org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm deprecatedUpdateElectionTerm =
- new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(6, "member3");
- mockRaftActor.handleRecover(deprecatedUpdateElectionTerm);
-
verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class));
verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class));
verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class));
verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class));
- verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries), any(PersistentDataProvider.class));
verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class));
- verify(mockSupport).handleRecoveryMessage(same(deprecatedUpdateElectionTerm), any(PersistentDataProvider.class));
}
@Test
import java.util.List;
import org.junit.Before;
import org.junit.Test;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import com.google.common.collect.ImmutableSet;
import java.util.Arrays;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
-import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
-import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
/**
* Unit tests for ServerConfigurationPayload.
*
* @author Thomas Pantelis
*/
+@Deprecated
public class ServerConfigurationPayloadTest {
@Test
public void testSerialization() {
- ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList(new ServerInfo("1", true),
- new ServerInfo("2", false)));
- ServerConfigurationPayload cloned = (ServerConfigurationPayload) SerializationUtils.clone(expected);
+ ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList(
+ new ServerConfigurationPayload.ServerInfo("1", true),
+ new ServerConfigurationPayload.ServerInfo("2", false)));
+ org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload cloned =
+ (org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload) SerializationUtils.clone(expected);
- assertEquals("getServerConfig", expected.getServerConfig(), cloned.getServerConfig());
- }
-
- @Test
- public void testSize() {
- ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList(new ServerInfo("1", true)));
- assertTrue(expected.size() > 0);
+ assertEquals("getServerConfig", ImmutableSet.of(
+ new org.opendaylight.controller.cluster.raft.persisted.ServerInfo("1", true),
+ new org.opendaylight.controller.cluster.raft.persisted.ServerInfo("2", false)),
+ ImmutableSet.copyOf(cloned.getServerConfig()));
+ assertEquals("isMigrated", true, cloned.isMigrated());
}
}
*
* @author Thomas Pantelis
*/
+@Deprecated
public class DeleteEntriesTest {
@Test
public void testSerialization() {
-
DeleteEntries deleteEntries = new DeleteEntries(11);
-
- DeleteEntries clone = (DeleteEntries) SerializationUtils.clone(deleteEntries);
+ org.opendaylight.controller.cluster.raft.persisted.DeleteEntries clone =
+ (org.opendaylight.controller.cluster.raft.persisted.DeleteEntries) SerializationUtils.clone(deleteEntries);
Assert.assertEquals("getFromIndex", 11, clone.getFromIndex());
+ Assert.assertEquals("isMigrated", true, clone.isMigrated());
}
}
*
* @author Thomas Pantelis
*/
+@Deprecated
public class UpdateElectionTermTest {
@Test
public void testSerialization() {
-
- UpdateElectionTerm deleteEntries = new UpdateElectionTerm(5, "member1");
-
- UpdateElectionTerm clone = (UpdateElectionTerm) SerializationUtils.clone(deleteEntries);
+ UpdateElectionTerm expected = new UpdateElectionTerm(5, "member1");
+ org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm clone =
+ (org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm) SerializationUtils.clone(expected);
Assert.assertEquals("getCurrentTerm", 5, clone.getCurrentTerm());
Assert.assertEquals("getVotedFor", "member1", clone.getVotedFor());
+ Assert.assertEquals("isMigrated", true, clone.isMigrated());
}
}
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.VotingState;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for ApplyJournalEntries.
+ *
+ * @author Thomas Pantelis
+ */
+public class ApplyJournalEntriesTest {
+
+ @Test
+ public void testSerialization() {
+ ApplyJournalEntries expected = new ApplyJournalEntries(5);
+ ApplyJournalEntries cloned = (ApplyJournalEntries) SerializationUtils.clone(expected);
+
+ assertEquals("getFromIndex", expected.getToIndex(), cloned.getToIndex());
+ assertEquals("isMigrated", false, cloned.isMigrated());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for DeleteEntries.
+ *
+ * @author Thomas Pantelis
+ */
+public class DeleteEntriesTest {
+
+ @Test
+ public void testSerialization() {
+ DeleteEntries expected = new DeleteEntries(5);
+ DeleteEntries cloned = (DeleteEntries) SerializationUtils.clone(expected);
+
+ assertEquals("getFromIndex", expected.getFromIndex(), cloned.getFromIndex());
+ assertEquals("isMigrated", false, cloned.isMigrated());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.util.Arrays;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for ServerConfigurationPayload.
+ *
+ * @author Thomas Pantelis
+ */
+public class ServerConfigurationPayloadTest {
+
+ @Test
+ public void testSerialization() {
+ ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList(new ServerInfo("1", true),
+ new ServerInfo("2", false)));
+ ServerConfigurationPayload cloned = (ServerConfigurationPayload) SerializationUtils.clone(expected);
+
+ assertEquals("getServerConfig", expected.getServerConfig(), cloned.getServerConfig());
+ assertEquals("isMigrated", false, cloned.isMigrated());
+ }
+
+ @Test
+ public void testSize() {
+ ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList(new ServerInfo("1", true)));
+ assertTrue(expected.size() > 0);
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for UpdateElectionTerm.
+ *
+ * @author Thomas Pantelis
+ */
+public class UpdateElectionTermTest {
+
+ @Test
+ public void testSerialization() {
+ UpdateElectionTerm expected = new UpdateElectionTerm(5, "leader");
+ UpdateElectionTerm cloned = (UpdateElectionTerm) SerializationUtils.clone(expected);
+
+ assertEquals("getCurrentTerm", expected.getCurrentTerm(), cloned.getCurrentTerm());
+ assertEquals("getVotedFor", expected.getVotedFor(), cloned.getVotedFor());
+ assertEquals("isMigrated", false, cloned.isMigrated());
+ }
+}
} else if (payload instanceof DataTreeCandidatePayload) {
applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate());
} else {
- LOG.warn("{}: ignoring unhandled payload {}", logContext, payload);
+ LOG.debug("{}: ignoring unhandled payload {}", logContext, payload);
}
}
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;