<type>xml</type>
<classifier>config</classifier>
</dependency>
-
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-clustering-config</artifactId>
+ <version>${mdsal.version}</version>
+ <type>cfg</type>
+ <classifier>datastore</classifier>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-distributed-datastore</artifactId>
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
} else if (message instanceof ReplicatedLogEntry) {
onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
} else if (message instanceof ApplyLogEntries) {
- onRecoveredApplyLogEntries((ApplyLogEntries) message);
+ // Handle this message for backwards compatibility with pre-Lithium versions.
+ onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
+ } else if (message instanceof ApplyJournalEntries) {
+ onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
} else if (message instanceof DeleteEntries) {
replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
} else if (message instanceof UpdateElectionTerm) {
replicatedLog.append(logEntry);
}
- private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
+ private void onRecoveredApplyLogEntries(long toIndex) {
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
- persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
+ persistenceId(), context.getLastApplied() + 1, toIndex);
}
- for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+ for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
batchRecoveredLogEntry(replicatedLog.get(i));
}
- context.setLastApplied(ale.getToIndex());
- context.setCommitIndex(ale.getToIndex());
+ context.setLastApplied(toIndex);
+ context.setCommitIndex(toIndex);
}
private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
applyState(applyState.getClientActor(), applyState.getIdentifier(),
applyState.getReplicatedLogEntry().getData());
- } else if (message instanceof ApplyLogEntries){
- ApplyLogEntries ale = (ApplyLogEntries) message;
+ } else if (message instanceof ApplyJournalEntries){
+ ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
+ LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex());
}
- persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+ persistence().persist(applyEntries, new Procedure<ApplyJournalEntries>() {
@Override
- public void apply(ApplyLogEntries param) throws Exception {
+ public void apply(ApplyJournalEntries param) throws Exception {
}
});
// Apply the state immediately
applyState(clientActor, identifier, data);
- // Send a ApplyLogEntries message so that we write the fact that we applied
+ // Send a ApplyJournalEntries message so that we write the fact that we applied
// the state to durable storage
- self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
+ self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
// Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
if(!context.isSnapshotCaptureInitiated()){
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import java.io.Serializable;
+
+/**
+ * This is an internal message that is stored in the akka's persistent journal. During recovery, this
+ * message is used to apply recovered journal entries to the state whose indexes range from the context's
+ * current lastApplied index to "toIndex" contained in the message. This message is sent internally from a
+ * behavior to the RaftActor to persist.
+ *
+ * @author Thomas Pantelis
+ */
+public class ApplyJournalEntries implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final long toIndex;
+
+ public ApplyJournalEntries(long toIndex) {
+ this.toIndex = toIndex;
+ }
+
+ public long getToIndex() {
+ return toIndex;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("ApplyJournalEntries [toIndex=").append(toIndex).append("]");
+ return builder.toString();
+ }
+}
* This class is also used as a internal message sent from Behaviour to
* RaftActor to persist the ApplyLogEntries
*
+ * @deprecated Deprecated in favor of ApplyJournalEntries whose type for toIndex is long instead of int.
+ * This class was kept for backwards compatibility with Helium.
*/
+@Deprecated
public class ApplyLogEntries implements Serializable {
- private static final long serialVersionUID = 1L;
private final int toIndex;
public ApplyLogEntries(int toIndex) {
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
// will be used during recovery
//in case if the above code throws an error and this message is not sent, it would be fine
// as the append entries received later would initiate add this message to the journal
- actor().tell(new ApplyLogEntries((int) context.getLastApplied()), actor());
+ actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor());
}
protected Object fromSerializableMessage(Object serializable){
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
MockAkkaJournal.addToJournal(5, entry2);
// 2 entries are applied to state besides the 4 entries in snapshot
- MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
+ MockAkkaJournal.addToJournal(6, new ApplyJournalEntries(lastAppliedToState));
MockAkkaJournal.addToJournal(7, entry3);
MockAkkaJournal.addToJournal(8, entry4);
+
// kill the actor
followerActor.tell(PoisonPill.getInstance(), null);
expectMsgClass(duration("5 seconds"), Terminated.class);
}};
}
+ @Test
+ public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String persistenceId = factory.generateActorId("leader-");
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ // Setup the persisted journal with some entries
+ ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+ new MockRaftActorContext.MockPayload("zero"));
+ ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
+ new MockRaftActorContext.MockPayload("oen"));
+ ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
+ new MockRaftActorContext.MockPayload("two"));
+
+ long seqNr = 1;
+ MockAkkaJournal.addToJournal(seqNr++, entry0);
+ MockAkkaJournal.addToJournal(seqNr++, entry1);
+ MockAkkaJournal.addToJournal(seqNr++, new ApplyLogEntries(1));
+ MockAkkaJournal.addToJournal(seqNr++, entry2);
+
+ int lastAppliedToState = 1;
+ int lastIndex = 2;
+
+ //reinstate the actor
+ TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
+ MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
+ Optional.<ConfigParams>of(config)));
+
+ leaderActor.underlyingActor().waitForRecoveryComplete();
+
+ RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
+ assertEquals("Journal log size", 3, context.getReplicatedLog().size());
+ assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
+ assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
+ assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
+ }};
+ }
+
/**
* This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
* process recovery messages
assertEquals("add replicated log entry", 2, replicatedLog.size());
- mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
+ mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
assertEquals("add replicated log entry", 0, replicatedLog.size());
- mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
+ mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
}
@Test
- public void testApplyLogEntriesCallsDataPersistence() throws Exception {
+ public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
new JavaTestKit(getSystem()) {
{
String persistenceId = factory.generateActorId("leader-");
mockRaftActor.waitForInitializeBehaviorComplete();
- mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
+ mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
assertEquals(2, leaderActorContext.getCommitIndex());
- ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
- leaderActor, ApplyLogEntries.class);
+ ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
+ leaderActor, ApplyJournalEntries.class);
assertEquals(2, leaderActorContext.getLastApplied());
- assertEquals(2, applyLogEntries.getToIndex());
+ assertEquals(2, applyJournalEntries.getToIndex());
List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
ApplyState.class);
import akka.persistence.PersistentRepr;
import akka.persistence.journal.japi.AsyncWriteJournal;
import com.google.common.collect.Maps;
-import scala.concurrent.Future;
-
import java.util.Map;
import java.util.concurrent.Callable;
+import scala.concurrent.Future;
public class MockAkkaJournal extends AsyncWriteJournal {
- private static Map<Long, Object> journal = Maps.newHashMap();
+ private static Map<Long, Object> journal = Maps.newLinkedHashMap();
public static void addToJournal(long sequenceNr, Object message) {
journal.put(sequenceNr, message);
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
}
InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
- new ApplyLogEntries(nListEntries));
+ new ApplyJournalEntries(nListEntries));
testRecovery(listEntryKeys);
}
final ExecutorService executor = Executors.newCachedThreadPool();
final Disruptor<DOMNotificationRouterEvent> disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, executor, ProducerType.MULTI, DEFAULT_STRATEGY);
+ disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE);
disruptor.start();