import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.Creator;
-
import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
-
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
import org.opendaylight.controller.cluster.example.messages.PrintRole;
public class ExampleActor extends RaftActor {
private final Map<String, String> state = new HashMap();
+ private final DataPersistenceProvider dataPersistenceProvider;
private long persistIdentifier = 1;
public ExampleActor(String id, Map<String, String> peerAddresses,
Optional<ConfigParams> configParams) {
super(id, peerAddresses, configParams);
+ this.dataPersistenceProvider = new PersistentDataProvider();
}
public static Props props(final String id, final Map<String, String> peerAddresses,
});
}
- @Override public void onReceiveCommand(Object message){
+ @Override public void onReceiveCommand(Object message) throws Exception{
if(message instanceof KeyValue){
if(isLeader()) {
String persistId = Long.toString(persistIdentifier++);
}
- @Override public void onReceiveRecover(Object message) {
+ @Override
+ protected DataPersistenceProvider persistence() {
+ return dataPersistenceProvider;
+ }
+
+ @Override public void onReceiveRecover(Object message)throws Exception {
super.onReceiveRecover(message);
}
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
-import akka.persistence.UntypedPersistentActor;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+
import java.io.Serializable;
import java.util.Map;
* <li> when a snapshot should be saved </li>
* </ul>
*/
-public abstract class RaftActor extends UntypedPersistentActor {
+public abstract class RaftActor extends AbstractUntypedPersistentActor {
protected final LoggingAdapter LOG =
Logging.getLogger(getContext().system(), this);
public void preStart() throws Exception {
LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
context.getConfigParams().getJournalRecoveryLogBatchSize());
+
super.preStart();
}
@Override
- public void onReceiveRecover(Object message) {
- if (message instanceof SnapshotOffer) {
- onRecoveredSnapshot((SnapshotOffer)message);
- } else if (message instanceof ReplicatedLogEntry) {
- onRecoveredJournalLogEntry((ReplicatedLogEntry)message);
- } else if (message instanceof ApplyLogEntries) {
- onRecoveredApplyLogEntries((ApplyLogEntries)message);
- } else if (message instanceof DeleteEntries) {
- replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
- } else if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
- ((UpdateElectionTerm) message).getVotedFor());
- } else if (message instanceof RecoveryCompleted) {
- onRecoveryCompletedMessage();
+ public void handleRecover(Object message) {
+ if(persistence().isRecoveryApplicable()) {
+ if (message instanceof SnapshotOffer) {
+ onRecoveredSnapshot((SnapshotOffer) message);
+ } else if (message instanceof ReplicatedLogEntry) {
+ onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
+ } else if (message instanceof ApplyLogEntries) {
+ onRecoveredApplyLogEntries((ApplyLogEntries) message);
+ } else if (message instanceof DeleteEntries) {
+ replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+ } else if (message instanceof UpdateElectionTerm) {
+ context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+ ((UpdateElectionTerm) message).getVotedFor());
+ } else if (message instanceof RecoveryCompleted) {
+ onRecoveryCompletedMessage();
+ }
+ } else {
+ if (message instanceof RecoveryCompleted) {
+ // Delete all the messages from the akka journal so that we do not end up with consistency issues
+ // Note I am not using the dataPersistenceProvider and directly using the akka api here
+ deleteMessages(lastSequenceNr());
+
+ // Delete all the akka snapshots as they will not be needed
+ deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
+
+ onRecoveryComplete();
+ currentBehavior = new Follower(context);
+ onStateChanged();
+ }
}
}
onStateChanged();
}
- @Override public void onReceiveCommand(Object message) {
+ @Override public void handleCommand(Object message) {
if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
if(LOG.isDebugEnabled()) {
LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex());
}
- persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+ persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
@Override
public void apply(ApplyLogEntries param) throws Exception {
}
SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
LOG.info("SaveSnapshotSuccess received for snapshot");
- context.getReplicatedLog().snapshotCommit();
+ long sequenceNumber = success.metadata().sequenceNr();
- // TODO: Not sure if we want to be this aggressive with trimming stuff
- trimPersistentData(success.metadata().sequenceNr());
+ commitSnapshot(sequenceNumber);
} else if (message instanceof SaveSnapshotFailure) {
SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
context.setPeerAddress(peerId, peerAddress);
}
+ protected void commitSnapshot(long sequenceNumber) {
+ context.getReplicatedLog().snapshotCommit();
+ // TODO: Not sure if we want to be this aggressive with trimming stuff
+ trimPersistentData(sequenceNumber);
+ }
/**
* The applyState method will be called by the RaftActor when some data
/**
* This method is called during recovery to append state data to the current batch. This method
- * is called 1 or more times after {@link #startRecoveryStateBatch}.
+ * is called 1 or more times after {@link #startLogRecoveryBatch}.
*
* @param data the state data
*/
/**
* This method is called during recovery at the end of a batch to apply the current batched
- * log entries. This method is called after {@link #appendRecoveryLogEntry}.
+ * log entries. This method is called after {@link #appendRecoveredLogEntry}.
*/
protected abstract void applyCurrentLogRecoveryBatch();
*/
protected abstract void onStateChanged();
+ protected abstract DataPersistenceProvider persistence();
+
protected void onLeaderChanged(String oldLeader, String newLeader){};
private void trimPersistentData(long sequenceNumber) {
// Trim akka snapshots
// FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
// For now guessing that it is ANDed.
- deleteSnapshots(new SnapshotSelectionCriteria(
+ persistence().deleteSnapshots(new SnapshotSelectionCriteria(
sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
// Trim akka journal
- deleteMessages(sequenceNumber);
+ persistence().deleteMessages(sequenceNumber);
}
private String getLeaderAddress(){
captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
- saveSnapshot(sn);
+ persistence().saveSnapshot(sn);
LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
// FIXME: Maybe this should be done after the command is saved
journal.subList(adjustedIndex , journal.size()).clear();
- persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+ persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
@Override public void apply(DeleteEntries param)
throws Exception {
// persist call and the execution(s) of the associated event
// handler. This also holds for multiple persist calls in context
// of a single command.
- persist(replicatedLogEntry,
+ persistence().persist(replicatedLogEntry,
new Procedure<ReplicatedLogEntry>() {
@Override
public void apply(ReplicatedLogEntry evt) throws Exception {
}
- private static class DeleteEntries implements Serializable {
+ static class DeleteEntries implements Serializable {
private final int fromIndex;
public void updateAndPersist(long currentTerm, String votedFor){
update(currentTerm, votedFor);
// FIXME : Maybe first persist then update the state
- persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
+ persistence().persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
@Override public void apply(UpdateElectionTerm param)
throws Exception {
}
}
- private static class UpdateElectionTerm implements Serializable {
+ static class UpdateElectionTerm implements Serializable {
private final long currentTerm;
private final String votedFor;
}
}
+ protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider {
+
+ public NonPersistentRaftDataProvider(){
+
+ }
+
+ /**
+ * The way snapshotting works is,
+ * <ol>
+ * <li> RaftActor calls createSnapshot on the Shard
+ * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
+ * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot.
+ * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot
+ * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done
+ * in SaveSnapshotSuccess.
+ * </ol>
+ * @param o
+ */
+ @Override
+ public void saveSnapshot(Object o) {
+ // Make saving Snapshot successful
+ commitSnapshot(-1L);
+ }
+ }
+
}
import akka.actor.Terminated;
import akka.event.Logging;
import akka.japi.Creator;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotMetadata;
+import akka.persistence.SnapshotOffer;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import org.junit.After;
import org.junit.Test;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
import scala.concurrent.duration.FiniteDuration;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.mockito.Mockito.mock;
public class RaftActorTest extends AbstractActorTest {
public static class MockRaftActor extends RaftActor {
+ private final DataPersistenceProvider dataPersistenceProvider;
+
public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
private final Map<String, String> peerAddresses;
private final String id;
private final Optional<ConfigParams> config;
+ private final DataPersistenceProvider dataPersistenceProvider;
private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
- Optional<ConfigParams> config) {
+ Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
this.peerAddresses = peerAddresses;
this.id = id;
this.config = config;
+ this.dataPersistenceProvider = dataPersistenceProvider;
}
@Override
public MockRaftActor create() throws Exception {
- return new MockRaftActor(id, peerAddresses, config);
+ return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider);
}
}
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+ private final CountDownLatch applyRecoverySnapshot = new CountDownLatch(1);
+ private final CountDownLatch applyStateLatch = new CountDownLatch(1);
+
private final List<Object> state;
- public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config) {
+ public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
super(id, peerAddresses, config);
state = new ArrayList<>();
+ if(dataPersistenceProvider == null){
+ this.dataPersistenceProvider = new PersistentDataProvider();
+ } else {
+ this.dataPersistenceProvider = dataPersistenceProvider;
+ }
}
public void waitForRecoveryComplete() {
}
}
+ public CountDownLatch getApplyRecoverySnapshotLatch(){
+ return applyRecoverySnapshot;
+ }
+
public List<Object> getState() {
return state;
}
public static Props props(final String id, final Map<String, String> peerAddresses,
Optional<ConfigParams> config){
- return Props.create(new MockRaftActorCreator(peerAddresses, id, config));
+ return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null));
}
+ public static Props props(final String id, final Map<String, String> peerAddresses,
+ Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
+ return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider));
+ }
+
+
@Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+ applyStateLatch.countDown();
}
@Override
@Override
protected void applyRecoverySnapshot(ByteString snapshot) {
+ applyRecoverySnapshot.countDown();
try {
Object data = toObject(snapshot);
System.out.println("!!!!!applyRecoverySnapshot: "+data);
}
@Override protected void createSnapshot() {
- throw new UnsupportedOperationException("createSnapshot");
}
@Override protected void applySnapshot(ByteString snapshot) {
@Override protected void onStateChanged() {
}
+ @Override
+ protected DataPersistenceProvider persistence() {
+ return this.dataPersistenceProvider;
+ }
+
@Override public String persistenceId() {
return this.getId();
}
return obj;
}
+ public ReplicatedLog getReplicatedLog(){
+ return this.getRaftActorContext().getReplicatedLog();
+ }
}
}};
}
+ /**
+ * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
+ * process recovery messages
+ *
+ * @throws Exception
+ */
+
+ @Test
+ public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable";
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+ Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
+
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("A"),
+ new MockRaftActorContext.MockPayload("B"),
+ new MockRaftActorContext.MockPayload("C"),
+ new MockRaftActorContext.MockPayload("D")));
+
+ Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
+ Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
+
+ MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+ mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
+
+ CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch();
+
+ assertEquals("apply recovery snapshot", true, applyRecoverySnapshotLatch.await(5, TimeUnit.SECONDS));
+
+ mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
+
+ ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
+
+ assertEquals("add replicated log entry", 1, replicatedLog.size());
+
+ mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
+
+ assertEquals("add replicated log entry", 2, replicatedLog.size());
+
+ mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
+
+ assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
+
+ // The snapshot had 4 items + we added 2 more items during the test
+ // We start removing from 5 and we should get 1 item in the replicated log
+ mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
+
+ assertEquals("remove log entries", 1, replicatedLog.size());
+
+ mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
+
+ assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+ assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
+
+ mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
+
+ mockRaftActor.waitForRecoveryComplete();
+
+ mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+ }};
+ }
+
+ /**
+ * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
+ * not process recovery messages
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable";
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+ Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
+
+ MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("A"),
+ new MockRaftActorContext.MockPayload("B"),
+ new MockRaftActorContext.MockPayload("C"),
+ new MockRaftActorContext.MockPayload("D")));
+
+ Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
+ Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
+
+ mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
+
+ CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch();
+
+ assertEquals("apply recovery snapshot", false, applyRecoverySnapshotLatch.await(1, TimeUnit.SECONDS));
+
+ mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
+
+ ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
+
+ assertEquals("add replicated log entry", 0, replicatedLog.size());
+
+ mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
+
+ assertEquals("add replicated log entry", 0, replicatedLog.size());
+
+ mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
+
+ assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
+
+ mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
+
+ assertEquals("remove log entries", 0, replicatedLog.size());
+
+ mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
+
+ assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+ assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
+
+ mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
+
+ mockRaftActor.waitForRecoveryComplete();
+
+ mockActorRef.tell(PoisonPill.getInstance(), getRef());
+ }};
+ }
+
+
+ @Test
+ public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ String persistenceId = "testUpdatingElectionTermCallsDataPersistence";
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ CountDownLatch persistLatch = new CountDownLatch(1);
+ DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+ dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+
+ TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+ Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+ MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+ mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
+
+ assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
+
+ mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+ }
+ };
+ }
+
+ @Test
+ public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence";
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ CountDownLatch persistLatch = new CountDownLatch(1);
+ DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+ dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+
+ TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+ Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+ MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+ mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class)));
+
+ assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
+
+ mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+ }
+ };
+ }
+
+ @Test
+ public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence";
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ CountDownLatch persistLatch = new CountDownLatch(2);
+ DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+ dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+
+ TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+ Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+ MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+ mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
+
+ mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
+
+ assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
+
+ mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+ }
+ };
+ }
+
+ @Test
+ public void testApplyLogEntriesCallsDataPersistence() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ String persistenceId = "testApplyLogEntriesCallsDataPersistence";
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ CountDownLatch persistLatch = new CountDownLatch(1);
+ DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+ dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+
+ TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+ Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+ MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+ mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
+
+ assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
+
+ mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+ }
+ };
+ }
+
+ @Test
+ public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence";
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ CountDownLatch persistLatch = new CountDownLatch(1);
+ DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+ dataPersistenceProviderMonitor.setSaveSnapshotLatch(persistLatch);
+
+ TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+ Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+ MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("A"),
+ new MockRaftActorContext.MockPayload("B"),
+ new MockRaftActorContext.MockPayload("C"),
+ new MockRaftActorContext.MockPayload("D")));
+
+ mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
+
+ mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
+
+ assertEquals("Save Snapshot called", true, persistLatch.await(5, TimeUnit.SECONDS));
+
+ mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+ }
+ };
+ }
+
+ @Test
+ public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence";
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ CountDownLatch deleteMessagesLatch = new CountDownLatch(1);
+ CountDownLatch deleteSnapshotsLatch = new CountDownLatch(1);
+ DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+ dataPersistenceProviderMonitor.setDeleteMessagesLatch(deleteMessagesLatch);
+ dataPersistenceProviderMonitor.setDeleteSnapshotsLatch(deleteSnapshotsLatch);
+
+ TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+ Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+ MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("A"),
+ new MockRaftActorContext.MockPayload("B"),
+ new MockRaftActorContext.MockPayload("C"),
+ new MockRaftActorContext.MockPayload("D")));
+
+ mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
+
+ mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
+
+ mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
+
+ assertEquals("Delete Messages called", true, deleteMessagesLatch.await(5, TimeUnit.SECONDS));
+
+ assertEquals("Delete Snapshots called", true, deleteSnapshotsLatch.await(5, TimeUnit.SECONDS));
+
+ mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+ }
+ };
+ }
+
private ByteString fromObject(Object snapshot) throws Exception {
ByteArrayOutputStream b = null;
ObjectOutputStream o = null;
}
serialization-bindings {
+ "org.opendaylight.controller.cluster.common.actor.Monitor" = java
"org.opendaylight.controller.cluster.raft.client.messages.FindLeader" = java
"org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java
"com.google.protobuf.Message" = proto
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+
+/**
+ * DataPersistenceProvider provides methods to persist data and is an abstraction of the akka-persistence persistence
+ * API.
+ */
+public interface DataPersistenceProvider {
+ /**
+ * @return false if recovery is not applicable. In that case the provider is not persistent and may not have
+ * anything to be recovered
+ */
+ boolean isRecoveryApplicable();
+
+ /**
+ * Persist a journal entry.
+ *
+ * @param o
+ * @param procedure
+ * @param <T>
+ */
+ <T> void persist(T o, Procedure<T> procedure);
+
+ /**
+ * Save a snapshot
+ *
+ * @param o
+ */
+ void saveSnapshot(Object o);
+
+ /**
+ * Delete snapshots based on the criteria
+ *
+ * @param criteria
+ */
+ void deleteSnapshots(SnapshotSelectionCriteria criteria);
+
+ /**
+ * Delete journal entries up to the sequence number
+ *
+ * @param sequenceNumber
+ */
+ void deleteMessages(long sequenceNumber);
+
+}
import akka.event.Logging;
import akka.event.LoggingAdapter;
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
import akka.persistence.UntypedPersistentActor;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor {
}
unhandled(message);
}
+
+ protected class PersistentDataProvider implements DataPersistenceProvider {
+
+ public PersistentDataProvider(){
+
+ }
+
+ @Override
+ public boolean isRecoveryApplicable() {
+ return true;
+ }
+
+ @Override
+ public <T> void persist(T o, Procedure<T> procedure) {
+ AbstractUntypedPersistentActor.this.persist(o, procedure);
+ }
+
+ @Override
+ public void saveSnapshot(Object o) {
+ AbstractUntypedPersistentActor.this.saveSnapshot(o);
+ }
+
+ @Override
+ public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+ AbstractUntypedPersistentActor.this.deleteSnapshots(criteria);
+ }
+
+ @Override
+ public void deleteMessages(long sequenceNumber) {
+ AbstractUntypedPersistentActor.this.deleteMessages(sequenceNumber);
+ }
+ }
+
+ protected class NonPersistentDataProvider implements DataPersistenceProvider {
+
+ public NonPersistentDataProvider(){
+
+ }
+
+ @Override
+ public boolean isRecoveryApplicable() {
+ return false;
+ }
+
+ @Override
+ public <T> void persist(T o, Procedure<T> procedure) {
+ try {
+ procedure.apply(o);
+ } catch (Exception e) {
+ LOG.error(e, "An unexpected error occurred");
+ }
+ }
+
+ @Override
+ public void saveSnapshot(Object o) {
+ }
+
+ @Override
+ public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+
+ }
+
+ @Override
+ public void deleteMessages(long sequenceNumber) {
+
+ }
+ }
}
import akka.actor.ActorRef;
-public class Monitor {
+import java.io.Serializable;
+
+public class Monitor implements Serializable {
private final ActorRef actorRef;
public Monitor(ActorRef actorRef){
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * This class is intended for testing purposes. It just triggers CountDownLatch's in each method.
+ * This class really should be under src/test/java but it was problematic trying to uses it in other projects.
+ */
+public class DataPersistenceProviderMonitor implements DataPersistenceProvider {
+
+ private CountDownLatch persistLatch = new CountDownLatch(1);
+ private CountDownLatch saveSnapshotLatch = new CountDownLatch(1);
+ private CountDownLatch deleteSnapshotsLatch = new CountDownLatch(1);;
+ private CountDownLatch deleteMessagesLatch = new CountDownLatch(1);;
+
+ @Override
+ public boolean isRecoveryApplicable() {
+ return false;
+ }
+
+ @Override
+ public <T> void persist(T o, Procedure<T> procedure) {
+ persistLatch.countDown();
+ }
+
+ @Override
+ public void saveSnapshot(Object o) {
+ saveSnapshotLatch.countDown();
+ }
+
+ @Override
+ public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+ deleteSnapshotsLatch.countDown();
+ }
+
+ @Override
+ public void deleteMessages(long sequenceNumber) {
+ deleteMessagesLatch.countDown();
+ }
+
+ public void setPersistLatch(CountDownLatch persistLatch) {
+ this.persistLatch = persistLatch;
+ }
+
+ public void setSaveSnapshotLatch(CountDownLatch saveSnapshotLatch) {
+ this.saveSnapshotLatch = saveSnapshotLatch;
+ }
+
+ public void setDeleteSnapshotsLatch(CountDownLatch deleteSnapshotsLatch) {
+ this.deleteSnapshotsLatch = deleteSnapshotsLatch;
+ }
+
+ public void setDeleteMessagesLatch(CountDownLatch deleteMessagesLatch) {
+ this.deleteMessagesLatch = deleteMessagesLatch;
+ }
+}
package org.opendaylight.controller.cluster.datastore;
+import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader;
+import org.opendaylight.controller.cluster.datastore.config.FileConfigurationReader;
import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import akka.util.Timeout;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
+
import java.util.concurrent.TimeUnit;
/**
private final int shardTransactionCommitQueueCapacity;
private final Timeout shardInitializationTimeout;
private final Timeout shardLeaderElectionTimeout;
+ private final boolean persistent;
+ private final ConfigurationReader configurationReader;
private DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
ConfigParams shardRaftConfig, String dataStoreMXBeanType, int operationTimeoutInSeconds,
Duration shardTransactionIdleTimeout, int shardTransactionCommitTimeoutInSeconds,
int shardTransactionCommitQueueCapacity, Timeout shardInitializationTimeout,
- Timeout shardLeaderElectionTimeout) {
+ Timeout shardLeaderElectionTimeout,
+ boolean persistent, ConfigurationReader configurationReader) {
this.dataStoreProperties = dataStoreProperties;
this.shardRaftConfig = shardRaftConfig;
this.dataStoreMXBeanType = dataStoreMXBeanType;
this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
this.shardInitializationTimeout = shardInitializationTimeout;
this.shardLeaderElectionTimeout = shardLeaderElectionTimeout;
+ this.persistent = persistent;
+ this.configurationReader = configurationReader;
}
public static Builder newBuilder() {
return shardLeaderElectionTimeout;
}
+ public boolean isPersistent() {
+ return persistent;
+ }
+
+ public ConfigurationReader getConfigurationReader() {
+ return configurationReader;
+ }
+
public static class Builder {
private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
private Duration shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
private int shardTransactionCommitQueueCapacity = 20000;
private Timeout shardInitializationTimeout = new Timeout(5, TimeUnit.MINUTES);
private Timeout shardLeaderElectionTimeout = new Timeout(30, TimeUnit.SECONDS);
+ private boolean persistent = true;
+ private ConfigurationReader configurationReader = new FileConfigurationReader();
public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
return this;
}
+ public Builder configurationReader(ConfigurationReader configurationReader){
+ this.configurationReader = configurationReader;
+ return this;
+ }
+
+
+ public Builder persistent(boolean persistent){
+ this.persistent = persistent;
+ return this;
+ }
+
public DatastoreContext build() {
DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
return new DatastoreContext(dataStoreProperties, raftConfig, dataStoreMXBeanType,
operationTimeoutInSeconds, shardTransactionIdleTimeout,
shardTransactionCommitTimeoutInSeconds, shardTransactionCommitQueueCapacity,
- shardInitializationTimeout, shardLeaderElectionTimeout);
+ shardInitializationTimeout, shardLeaderElectionTimeout,
+ persistent, configurationReader);
}
}
}
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.osgi.BundleDelegatingClassLoader;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.osgi.framework.BundleContext;
-import java.io.File;
import java.util.concurrent.atomic.AtomicReference;
public class DistributedDataStoreFactory {
- public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf";
public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
+
public static final String CONFIGURATION_NAME = "odl-cluster-data";
- private static AtomicReference<ActorSystem> actorSystem = new AtomicReference<>();
+
+ private static AtomicReference<ActorSystem> persistentActorSystem = new AtomicReference<>();
public static DistributedDataStore createInstance(String name, SchemaService schemaService,
DatastoreContext datastoreContext, BundleContext bundleContext) {
- ActorSystem actorSystem = getOrCreateInstance(bundleContext);
+ ActorSystem actorSystem = getOrCreateInstance(bundleContext, datastoreContext.getConfigurationReader());
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
final DistributedDataStore dataStore =
new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
return dataStore;
}
- synchronized private static final ActorSystem getOrCreateInstance(final BundleContext bundleContext) {
+ synchronized private static final ActorSystem getOrCreateInstance(final BundleContext bundleContext, ConfigurationReader configurationReader) {
+
+ AtomicReference<ActorSystem> actorSystemReference = persistentActorSystem;
+ String configurationName = CONFIGURATION_NAME;
+ String actorSystemName = ACTOR_SYSTEM_NAME;
- if (actorSystem.get() != null){
- return actorSystem.get();
+ if (actorSystemReference.get() != null){
+ return actorSystemReference.get();
}
+
// Create an OSGi bundle classloader for actor system
BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
Thread.currentThread().getContextClassLoader());
- ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
- ConfigFactory.load(readAkkaConfiguration()).getConfig(CONFIGURATION_NAME), classLoader);
+ ActorSystem system = ActorSystem.create(actorSystemName,
+ ConfigFactory.load(configurationReader.read()).getConfig(configurationName), classLoader);
system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
- actorSystem.set(system);
+ actorSystemReference.set(system);
return system;
}
-
- private static final Config readAkkaConfiguration() {
- File defaultConfigFile = new File(AKKA_CONF_PATH);
- Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing");
- return ConfigFactory.parseFile(defaultConfigFile);
- }
}
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
private final LoggingAdapter LOG =
Logging.getLogger(getContext().system(), this);
- // By default persistent will be true and can be turned off using the system
- // property shard.persistent
- private final boolean persistent;
-
/// The name of this shard
private final ShardIdentifier name;
private final DatastoreContext datastoreContext;
+ private final DataPersistenceProvider dataPersistenceProvider;
+
private SchemaContext schemaContext;
private ActorRef createSnapshotTransaction;
this.name = name;
this.datastoreContext = datastoreContext;
this.schemaContext = schemaContext;
+ this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
- String setting = System.getProperty("shard.persistent");
-
- this.persistent = !"false".equals(setting);
-
- LOG.info("Shard created : {} persistent : {}", name, persistent);
+ LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent());
store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
datastoreContext.getDataStoreProperties());
}
@Override
- public void onReceiveRecover(Object message) {
+ public void onReceiveRecover(Object message) throws Exception {
if(LOG.isDebugEnabled()) {
LOG.debug("onReceiveRecover: Received message {} from {}",
message.getClass().toString(),
}
@Override
- public void onReceiveCommand(Object message) {
+ public void onReceiveCommand(Object message) throws Exception {
if(LOG.isDebugEnabled()) {
LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
}
// currently uses a same thread executor anyway.
cohortEntry.getCohort().preCommit().get();
- if(persistent) {
- Shard.this.persistData(getSender(), transactionID,
- new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
- } else {
- Shard.this.finishCommit(getSender(), transactionID);
- }
+ Shard.this.persistData(getSender(), transactionID,
+ new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
} catch (InterruptedException | ExecutionException e) {
LOG.error(e, "An exception occurred while preCommitting transaction {}",
cohortEntry.getTransactionID());
}
}
+ @Override
+ protected DataPersistenceProvider persistence() {
+ return dataPersistenceProvider;
+ }
+
@Override protected void onLeaderChanged(String oldLeader, String newLeader) {
shardMBean.setLeader(newLeader);
}
return this.name.toString();
}
+ @VisibleForTesting
+ DataPersistenceProvider getDataPersistenceProvider() {
+ return dataPersistenceProvider;
+ }
+
private static class ShardCreator implements Creator<Shard> {
private static final long serialVersionUID = 1L;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
private final Collection<String> knownModules = new HashSet<>(128);
+ private final DataPersistenceProvider dataPersistenceProvider;
+
/**
* @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
* configuration or operational
this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
this.datastoreContext = datastoreContext;
+ this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
createLocalShards();
}
+ protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
+ return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
+ }
+
public static Props props(final String type,
final ClusterWrapper cluster,
final Configuration configuration,
@Override
protected void handleRecover(Object message) throws Exception {
- if(message instanceof SchemaContextModules){
- SchemaContextModules msg = (SchemaContextModules) message;
- knownModules.clear();
- knownModules.addAll(msg.getModules());
- } else if(message instanceof RecoveryFailure){
- RecoveryFailure failure = (RecoveryFailure) message;
- LOG.error(failure.cause(), "Recovery failed");
- } else if(message instanceof RecoveryCompleted){
- LOG.info("Recovery complete : {}", persistenceId());
-
- // Delete all the messages from the akka journal except the last one
- deleteMessages(lastSequenceNr() - 1);
+ if(dataPersistenceProvider.isRecoveryApplicable()) {
+ if (message instanceof SchemaContextModules) {
+ SchemaContextModules msg = (SchemaContextModules) message;
+ knownModules.clear();
+ knownModules.addAll(msg.getModules());
+ } else if (message instanceof RecoveryFailure) {
+ RecoveryFailure failure = (RecoveryFailure) message;
+ LOG.error(failure.cause(), "Recovery failed");
+ } else if (message instanceof RecoveryCompleted) {
+ LOG.info("Recovery complete : {}", persistenceId());
+
+ // Delete all the messages from the akka journal except the last one
+ deleteMessages(lastSequenceNr() - 1);
+ }
+ } else {
+ if (message instanceof RecoveryCompleted) {
+ LOG.info("Recovery complete : {}", persistenceId());
+
+ // Delete all the messages from the akka journal
+ deleteMessages(lastSequenceNr());
+ }
}
}
knownModules.clear();
knownModules.addAll(newModules);
- persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
+ dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
@Override
public void apply(SchemaContextModules param) throws Exception {
LOG.info("Sending new SchemaContext to Shards");
for (ShardInformation info : localShards.values()) {
- if(info.getActor() == null) {
+ if (info.getActor() == null) {
info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
- info.getPeerAddresses(), datastoreContext, schemaContext),
+ info.getPeerAddresses(), datastoreContext, schemaContext),
info.getShardId().toString()));
} else {
info.getActor().tell(message, getSelf());
return knownModules;
}
+ @VisibleForTesting
+ DataPersistenceProvider getDataPersistenceProvider() {
+ return dataPersistenceProvider;
+ }
+
private class ShardInformation {
private final ShardIdentifier shardId;
private final String shardName;
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.config;
+
+import com.typesafe.config.Config;
+
+public interface ConfigurationReader {
+ Config read();
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.config;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import java.io.File;
+
+public class FileConfigurationReader implements ConfigurationReader{
+
+ public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf";
+
+ @Override
+ public Config read() {
+ File defaultConfigFile = new File(AKKA_CONF_PATH);
+ Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing");
+ return ConfigFactory.parseFile(defaultConfigFile);
+
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.config;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+public class ResourceConfigurationReader implements ConfigurationReader {
+ @Override
+ public Config read() {
+ return ConfigFactory.load();
+ }
+}
props.getShardTransactionCommitTimeoutInSeconds().getValue().intValue())
.shardTransactionCommitQueueCapacity(
props.getShardTransactionCommitQueueCapacity().getValue().intValue())
+ .persistent(props.getPersistent().booleanValue())
.build();
return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
props.getShardTransactionCommitTimeoutInSeconds().getValue().intValue())
.shardTransactionCommitQueueCapacity(
props.getShardTransactionCommitQueueCapacity().getValue().intValue())
+ .persistent(props.getPersistent().booleanValue())
.build();
return DistributedDataStoreFactory.createInstance("operational",
type non-zero-uint32-type;
description "Max queue size that an actor's mailbox can reach";
}
+
+ leaf persistent {
+ default true;
+ type boolean;
+ description "Enable or disable data persistence";
+ }
}
// Augments the 'configuration' choice node under modules/module.
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Uninterruptibles;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
private final DatastoreContext.Builder datastoreContextBuilder =
@Test
public void testWriteTransactionWithSingleShard() throws Exception{
- System.setProperty("shard.persistent", "true");
new IntegrationTestKit(getSystem()) {{
DistributedDataStore dataStore =
setupDistributedDataStore("transactionIntegrationTest", "test-1");
@Test
public void testWriteTransactionWithMultipleShards() throws Exception{
- System.setProperty("shard.persistent", "true");
new IntegrationTestKit(getSystem()) {{
DistributedDataStore dataStore =
setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
import akka.actor.ActorRef;
import akka.actor.Props;
+import akka.japi.Creator;
import akka.pattern.Patterns;
import akka.persistence.RecoveryCompleted;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
-import akka.japi.Creator;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
import scala.concurrent.Future;
+
import java.net.URI;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
}};
}
+ @Test
+ public void testRecoveryApplicable(){
+ new JavaTestKit(getSystem()) {
+ {
+ final Props persistentProps = ShardManager.props(shardMrgIDSuffix,
+ new MockClusterWrapper(),
+ new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(true).build());
+ final TestActorRef<ShardManager> persistentShardManager =
+ TestActorRef.create(getSystem(), persistentProps);
+
+ DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
+
+ assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
+
+ final Props nonPersistentProps = ShardManager.props(shardMrgIDSuffix,
+ new MockClusterWrapper(),
+ new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(false).build());
+ final TestActorRef<ShardManager> nonPersistentShardManager =
+ TestActorRef.create(getSystem(), nonPersistentProps);
+
+ DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
+
+ assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
+
+
+ }};
+
+ }
+
+ @Test
+ public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
+ throws Exception {
+ final CountDownLatch persistLatch = new CountDownLatch(1);
+ final Creator<ShardManager> creator = new Creator<ShardManager>() {
+ @Override
+ public ShardManager create() throws Exception {
+ return new ShardManager(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build()) {
+ @Override
+ protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
+ DataPersistenceProviderMonitor dataPersistenceProviderMonitor
+ = new DataPersistenceProviderMonitor();
+ dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+ return dataPersistenceProviderMonitor;
+ }
+ };
+ }
+ };
+
+ new JavaTestKit(getSystem()) {{
+
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
+
+ ModuleIdentifier foo = mock(ModuleIdentifier.class);
+ when(foo.getNamespace()).thenReturn(new URI("foo"));
+
+ Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
+ moduleIdentifierSet.add(foo);
+
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
+
+ shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertEquals("Persisted", true,
+ Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
+
+ }};
+ }
+
+
private static class TestShardManager extends ShardManager {
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
}
}
+
+ private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
+ private Creator<ShardManager> delegate;
+
+ public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public ShardManager create() throws Exception {
+ return delegate.create();
+ }
+ }
}
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
+
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+
public class ShardTest extends AbstractActorTest {
@Before
public void setUp() {
- System.setProperty("shard.persistent", "false");
-
InMemorySnapshotStore.clear();
InMemoryJournal.clear();
}
return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
dataStoreContext, SCHEMA_CONTEXT) {
@Override
- public void onReceiveCommand(final Object message) {
+ public void onReceiveCommand(final Object message) throws Exception {
if(message instanceof ElectionTimeout && firstElectionTimeout) {
// Got the first ElectionTimeout. We don't forward it to the
// base Shard yet until we've sent the RegisterChangeListener
}
@Test
- public void testPeerAddressResolved(){
+ public void testPeerAddressResolved() throws Exception {
new ShardTestKit(getSystem()) {{
final CountDownLatch recoveryComplete = new CountDownLatch(1);
class TestShard extends Shard {
}
@Test
- public void testApplySnapshot() throws ExecutionException, InterruptedException {
+ public void testApplySnapshot() throws Exception {
TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
"testApplySnapshot");
@SuppressWarnings({ "unchecked" })
@Test
public void testConcurrentThreePhaseCommits() throws Throwable {
- System.setProperty("shard.persistent", "true");
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
@Test
public void testAbortBeforeFinishCommit() throws Throwable {
- System.setProperty("shard.persistent", "true");
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
@Test
public void testCreateSnapshot() throws IOException, InterruptedException {
+ testCreateSnapshot(true, "testCreateSnapshot");
+ }
+
+ @Test
+ public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
+ testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
+ }
+
+ public void testCreateSnapshot(boolean persistent, final String shardActorName) throws IOException, InterruptedException {
+ final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
+ shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
+
new ShardTestKit(getSystem()) {{
final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
Creator<Shard> creator = new Creator<Shard>() {
return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
dataStoreContext, SCHEMA_CONTEXT) {
@Override
- public void saveSnapshot(Object snapshot) {
- super.saveSnapshot(snapshot);
+ protected void commitSnapshot(long sequenceNumber) {
+ super.commitSnapshot(sequenceNumber);
latch.get().countDown();
}
};
};
TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(creator)), "testCreateSnapshot");
+ Props.create(new DelegatingShardCreator(creator)), shardActorName);
waitUntilLeader(shard);
}
+ @Test
+ public void testRecoveryApplicable(){
+
+ final DatastoreContext persistentContext = DatastoreContext.newBuilder().
+ shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
+
+ final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+ persistentContext, SCHEMA_CONTEXT);
+
+ final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
+ shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
+
+ final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+ nonPersistentContext, SCHEMA_CONTEXT);
+
+ new ShardTestKit(getSystem()) {{
+ TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
+ persistentProps, "testPersistence1");
+
+ assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+ shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+ TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
+ nonPersistentProps, "testPersistence2");
+
+ assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+ shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+ }};
+
+ }
+
+
private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
*/
package org.opendaylight.controller.cluster.datastore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Assert;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import com.google.common.util.concurrent.Uninterruptibles;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
class ShardTestKit extends JavaTestKit {
Assert.fail("Leader not found for shard " + shard.path());
}
+
}
\ No newline at end of file
future.checkedGet(5, TimeUnit.SECONDS);
fail("Expected ReadFailedException");
} catch(ReadFailedException e) {
- e.printStackTrace();
throw e.getCause();
}
}