import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.concepts.Immutable;
*/
protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) {
- ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
+ ReplicatedLogEntry replicatedLogEntry = new SimpleReplicatedLogEntry(
context.getReplicatedLog().lastIndex() + 1,
context.getTermInformation().getCurrentTerm(), data);
replicatedLogEntry.setPersistencePending(true);
/**
* A {@link ReplicatedLogEntry} implementation.
+ *
+ * @deprecated Use {@link org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry} instead.
*/
+@Deprecated
public class ReplicatedLogImplEntry implements ReplicatedLogEntry, Serializable {
private static final long serialVersionUID = -9085798014576489130L;
return getData().size();
}
+ private Object readResolve() {
+ return org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry.createMigrated(
+ index, term, payload);
+ }
+
@Override
public boolean isPersistencePending() {
return persistencePending;
import java.util.ArrayList;
import java.util.List;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
/**
int size = in.readInt();
List<ReplicatedLogEntry> entries = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
- entries.add(new ReplicatedLogImplEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
+ entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
}
appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+/**
+ * A {@link ReplicatedLogEntry} implementation.
+ *
+ * @author Thomas Pantelis
+ */
+public class SimpleReplicatedLogEntry implements ReplicatedLogEntry, MigratedSerializable {
+ private static final class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private ReplicatedLogEntry replicatedLogEntry;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ // For Externalizable
+ }
+
+ Proxy(final ReplicatedLogEntry replicatedLogEntry) {
+ this.replicatedLogEntry = replicatedLogEntry;
+ }
+
+ @Override
+ public void writeExternal(final ObjectOutput out) throws IOException {
+ out.writeLong(replicatedLogEntry.getIndex());
+ out.writeLong(replicatedLogEntry.getTerm());
+ out.writeObject(replicatedLogEntry.getData());
+ }
+
+ @Override
+ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ replicatedLogEntry = new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject());
+ }
+
+ private Object readResolve() {
+ return replicatedLogEntry;
+ }
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ private final long index;
+ private final long term;
+ private final Payload payload;
+ private boolean persistencePending;
+ private final boolean migrated;
+
+ private SimpleReplicatedLogEntry(long index, long term, Payload payload, boolean migrated) {
+ this.index = index;
+ this.term = term;
+ this.payload = Preconditions.checkNotNull(payload);
+ this.migrated = migrated;
+ }
+
+ /**
+ * Constructs an instance.
+ *
+ * @param index the index
+ * @param term the term
+ * @param payload the payload
+ */
+ public SimpleReplicatedLogEntry(final long index, final long term, final Payload payload) {
+ this(index, term, payload, false);
+ }
+
+ @Deprecated
+ public static ReplicatedLogEntry createMigrated(final long index, final long term, final Payload payload) {
+ return new SimpleReplicatedLogEntry(index, term, payload, true);
+ }
+
+ @Override
+ public Payload getData() {
+ return payload;
+ }
+
+ @Override
+ public long getTerm() {
+ return term;
+ }
+
+ @Override
+ public long getIndex() {
+ return index;
+ }
+
+ @Override
+ public int size() {
+ return getData().size();
+ }
+
+ @Override
+ public boolean isPersistencePending() {
+ return persistencePending;
+ }
+
+ @Override
+ public void setPersistencePending(boolean pending) {
+ persistencePending = pending;
+ }
+
+ @Override
+ public boolean isMigrated() {
+ return migrated;
+ }
+
+ @Override
+ public Object writeReplace() {
+ return new Proxy(this);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + payload.hashCode();
+ result = prime * result + (int) (index ^ index >>> 32);
+ result = prime * result + (int) (term ^ term >>> 32);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ SimpleReplicatedLogEntry other = (SimpleReplicatedLogEntry) obj;
+ return index == other.index && term == other.term && payload.equals(other.payload);
+ }
+
+ @Override
+ public String toString() {
+ return "SimpleReplicatedLogEntry [index=" + index + ", term=" + term + ", payload=" + payload + "]";
+ }
+}
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Arrays;
import java.util.HashSet;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
String persistenceId = factory.generateActorId("test-actor-");
InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
- InMemoryJournal.addEntry(persistenceId, 2, new ReplicatedLogImplEntry(0, 1,
+ InMemoryJournal.addEntry(persistenceId, 2, new SimpleReplicatedLogEntry(0, 1,
new MockRaftActorContext.MockPayload("A")));
InMemoryJournal.addEntry(persistenceId, 3,
new org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries(0));
@Test
public void testNoSnapshotAfterStartupWithNoMigratedMessages() {
+ TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages starting");
String id = factory.generateActorId("test-actor-");
+
+ InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, id));
+ InMemoryJournal.addEntry(id, 2, new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
+ InMemoryJournal.addEntry(id, 3, new ApplyJournalEntries(0));
+
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
List<Snapshot> snapshots = InMemorySnapshotStore.getSnapshots(id, Snapshot.class);
assertEquals("Snapshots", 0, snapshots.size());
+
+ TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages ending");
+ }
+
+ @Test
+ public void testSnapshotAfterStartupWithMigratedReplicatedLogEntry() {
+ TEST_LOG.info("testSnapshotAfterStartupWithMigratedReplicatedLogEntry starting");
+
+ String persistenceId = factory.generateActorId("test-actor-");
+
+ InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
+ MockRaftActorContext.MockPayload expPayload = new MockRaftActorContext.MockPayload("A");
+ InMemoryJournal.addEntry(persistenceId, 2, new org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry(
+ 0, 1, expPayload));
+
+ doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
+ assertEquals("Unapplied entries size", 1, snapshot.getUnAppliedEntries().size());
+ assertEquals("Unapplied entry term", 1, snapshot.getUnAppliedEntries().get(0).getTerm());
+ assertEquals("Unapplied entry index", 0, snapshot.getUnAppliedEntries().get(0).getIndex());
+ assertEquals("Unapplied entry data", expPayload, snapshot.getUnAppliedEntries().get(0).getData());
+ });
+
+ TEST_LOG.info("testSnapshotAfterStartupWithMigratedReplicatedLogEntry ending");
}
private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedServerConfigPayload(boolean persistent) {
new ServerInfo(persistenceId, true), new ServerInfo("downNode", true)));
InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
- InMemoryJournal.addEntry(persistenceId, 3, new ReplicatedLogImplEntry(0, 1, persistedServerConfig));
+ InMemoryJournal.addEntry(persistenceId, 3, new SimpleReplicatedLogEntry(0, 1, persistedServerConfig));
TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedMessage(persistenceId,
persistent, snapshot -> {
};
TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id)
- .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(persistent)).props()
+ .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(persistent))
+ .peerAddresses(ImmutableMap.of("peer", "")).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), id);
MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
import java.util.Map;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.slf4j.Logger;
}
// TODO - this class can be removed and use ReplicatedLogImplEntry directly.
- public static class MockReplicatedLogEntry extends ReplicatedLogImplEntry {
+ public static class MockReplicatedLogEntry extends SimpleReplicatedLogEntry {
private static final long serialVersionUID = 1L;
public MockReplicatedLogEntry(long term, long index, Payload data) {
public MockReplicatedLogBuilder createEntries(int start, int end, int term) {
for (int i = start; i < end; i++) {
- this.mockLog.append(new ReplicatedLogImplEntry(i, term,
+ this.mockLog.append(new SimpleReplicatedLogEntry(i, term,
new MockRaftActorContext.MockPayload(Integer.toString(i))));
}
return this;
}
public MockReplicatedLogBuilder addEntry(int index, int term, MockPayload payload) {
- this.mockLog.append(new ReplicatedLogImplEntry(index, term, payload));
+ this.mockLog.append(new SimpleReplicatedLogEntry(index, term, payload));
return this;
}
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
new ServerInfo(leaderId, true), new ServerInfo(follower1Id, false),
new ServerInfo(follower2Id, true), new ServerInfo("downPeer", false)));
- ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, currentTerm,
+ SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, currentTerm,
persistedServerConfig);
InMemoryJournal.clear();
ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
new ServerInfo(leaderId, true), new ServerInfo(follower1Id, false)));
- ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, persistedTerm,
+ SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, persistedTerm,
persistedServerConfig);
InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(persistedTerm, leaderId));
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
- InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class).size());
+ InMemoryJournal.get(LEADER_ID, SimpleReplicatedLogEntry.class).size());
assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
- InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class).size());
+ InMemoryJournal.get(NEW_SERVER_ID, SimpleReplicatedLogEntry.class).size());
assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
- ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
+ SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
- ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
+ SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
- ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
+ SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
- InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
+ InMemoryJournal.addEntry(node2ID, 3, new SimpleReplicatedLogEntry(1, 1,
new MockRaftActorContext.MockPayload("2")));
InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
- ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
+ SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
// add another non-replicated entry
leaderActor.getReplicatedLog().append(
- new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
+ new SimpleReplicatedLogEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
//fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
- snapshotUnappliedEntries.add(new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
- new MockRaftActorContext.MockPayload("E")));
+ snapshotUnappliedEntries.add(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("E")));
int snapshotLastApplied = 3;
int snapshotLastIndex = 4;
import java.io.ObjectOutputStream;
import org.junit.Assert;
import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
/**
* Unit tests for ReplicatedLogImplEntry.
*
* @author Thomas Pantelis
*/
+@Deprecated
public class ReplicatedLogImplEntryTest {
@Test
try (FileInputStream fis = new FileInputStream("src/test/resources/helium-serialized-ReplicatedLogImplEntry")) {
ObjectInputStream ois = new ObjectInputStream(fis);
- ReplicatedLogImplEntry entry = (ReplicatedLogImplEntry) ois.readObject();
+ SimpleReplicatedLogEntry entry = (SimpleReplicatedLogEntry) ois.readObject();
ois.close();
Assert.assertEquals("getIndex", expIndex, entry.getIndex());
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
*/
public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorIntegrationTest {
- private List<ReplicatedLogImplEntry> origLeaderJournal;
+ private List<SimpleReplicatedLogEntry> origLeaderJournal;
private MockPayload recoveredPayload0;
private MockPayload recoveredPayload1;
long seqId = 1;
InMemoryJournal.addEntry(leaderId, seqId++, new UpdateElectionTerm(initialTerm, leaderId));
recoveredPayload0 = new MockPayload("zero");
- InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(0, initialTerm, recoveredPayload0));
+ InMemoryJournal.addEntry(leaderId, seqId++, new SimpleReplicatedLogEntry(0, initialTerm, recoveredPayload0));
recoveredPayload1 = new MockPayload("one");
- InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(1, initialTerm, recoveredPayload1));
+ InMemoryJournal.addEntry(leaderId, seqId++, new SimpleReplicatedLogEntry(1, initialTerm, recoveredPayload1));
recoveredPayload2 = new MockPayload("two");
- InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(2, initialTerm, recoveredPayload2));
+ InMemoryJournal.addEntry(leaderId, seqId++, new SimpleReplicatedLogEntry(2, initialTerm, recoveredPayload2));
InMemoryJournal.addEntry(leaderId, seqId++, new ApplyJournalEntries(2));
- origLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
+ origLeaderJournal = InMemoryJournal.get(leaderId, SimpleReplicatedLogEntry.class);
// Create the leader and 2 follower actors and verify initial syncing of the followers after leader
// persistence recovery.
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
// The leader's persisted journal log should be cleared since we snapshotted.
- List<ReplicatedLogImplEntry> persistedLeaderJournal =
- InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
+ List<SimpleReplicatedLogEntry> persistedLeaderJournal =
+ InMemoryJournal.get(leaderId, SimpleReplicatedLogEntry.class);
assertEquals("Persisted journal log size", 0, persistedLeaderJournal.size());
// Allow AppendEntries to both followers to proceed. This should catch up the followers and cause a
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
// The leader's persisted journal log should be cleared since we did a snapshot.
- List<ReplicatedLogImplEntry> persistedLeaderJournal = InMemoryJournal.get(
- leaderId, ReplicatedLogImplEntry.class);
+ List<SimpleReplicatedLogEntry> persistedLeaderJournal = InMemoryJournal.get(
+ leaderId, SimpleReplicatedLogEntry.class);
assertEquals("Persisted journal log size", 0, persistedLeaderJournal.size());
// Verify the followers apply all 4 new log entries.
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
// Verify the leaders's persisted journal log - it should only contain the last 2 ReplicatedLogEntries
// added after the snapshot as the persisted journal should've been purged to the snapshot
// sequence number.
- verifyPersistedJournal(leaderId, Arrays.asList(new ReplicatedLogImplEntry(5, currentTerm, payload5),
- new ReplicatedLogImplEntry(6, currentTerm, payload6)));
+ verifyPersistedJournal(leaderId, Arrays.asList(new SimpleReplicatedLogEntry(5, currentTerm, payload5),
+ new SimpleReplicatedLogEntry(6, currentTerm, payload6)));
// Verify the leaders's persisted journal contains an ApplyJournalEntries for at least the last entry index.
List<ApplyJournalEntries> persistedApplyJournalEntries =
private static Snapshot newLithiumSnapshot() {
byte[] state = {1, 2, 3, 4, 5};
List<ReplicatedLogEntry> entries = new ArrayList<>();
- entries.add(new ReplicatedLogImplEntry(6, 2, new MockPayload("payload")));
+ entries.add(new org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry(
+ 6, 2, new MockPayload("payload")));
long lastIndex = 6;
long lastTerm = 2;
long lastAppliedIndex = 5;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
// new entry
- ReplicatedLogImplEntry entry =
- new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ SimpleReplicatedLogEntry entry =
+ new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
new MockRaftActorContext.MockPayload("D"));
actorContext.getReplicatedLog().append(entry);
leader.setSnapshot(null);
// new entry
- ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
new MockRaftActorContext.MockPayload("D"));
actorContext.getReplicatedLog().append(entry);
leader.setSnapshot(null);
for (int i = 0; i < 4; i++) {
- actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
+ actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
new MockRaftActorContext.MockPayload("X" + i)));
}
// new entry
- ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
new MockRaftActorContext.MockPayload("D"));
actorContext.getReplicatedLog().append(entry);
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
/**
* Unit tests for AppendEntries.
@Test
public void testSerialization() {
- ReplicatedLogEntry entry1 = new ReplicatedLogImplEntry(1, 2, new MockPayload("payload1"));
+ ReplicatedLogEntry entry1 = new SimpleReplicatedLogEntry(1, 2, new MockPayload("payload1"));
- ReplicatedLogEntry entry2 = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload2"));
+ ReplicatedLogEntry entry2 = new SimpleReplicatedLogEntry(3, 4, new MockPayload("payload2"));
short payloadVersion = 5;
AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L,
--- /dev/null
+/*
+ * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+
+/**
+ * Unit tests for SimpleReplicatedLogEntry.
+ *
+ * @author Thomas Pantelis
+ */
+public class SimpleReplicatedLogEntryTest {
+
+ @Test
+ public void testSerialization() {
+ SimpleReplicatedLogEntry expected = new SimpleReplicatedLogEntry(0, 1,
+ new MockRaftActorContext.MockPayload("A"));
+ SimpleReplicatedLogEntry cloned = (SimpleReplicatedLogEntry) SerializationUtils.clone(expected);
+
+ assertEquals("getTerm", expected.getTerm(), cloned.getTerm());
+ assertEquals("getIndex", expected.getIndex(), cloned.getIndex());
+ assertEquals("getData", expected.getData(), cloned.getData());
+ assertEquals("isMigrated", false, cloned.isMigrated());
+ }
+}
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
final TransactionIdentifier tx = nextTransactionId();
final ApplyState applyState = new ApplyState(null, tx,
- new ReplicatedLogImplEntry(1, 2, payloadForModification(store, writeMod, tx)));
+ new SimpleReplicatedLogEntry(1, 2, payloadForModification(store, writeMod, tx)));
shard.tell(applyState, shard);
InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
// Set up the InMemoryJournal.
- InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1,
+ InMemoryJournal.addEntry(shardID.toString(), 1, new SimpleReplicatedLogEntry(0, 1,
payloadForModification(source, writeMod, nextTransactionId())));
final int nListEntries = 16;
mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
mod.ready();
- InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
+ InMemoryJournal.addEntry(shardID.toString(), i + 1, new SimpleReplicatedLogEntry(i, 1,
payloadForModification(source, mod, nextTransactionId())));
}