import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
import org.opendaylight.controller.cluster.example.messages.PrintRole;
public class ExampleActor extends RaftActor {
private final Map<String, String> state = new HashMap();
- private final DataPersistenceProvider dataPersistenceProvider;
private long persistIdentifier = 1;
private final Optional<ActorRef> roleChangeNotifier;
public ExampleActor(String id, Map<String, String> peerAddresses,
Optional<ConfigParams> configParams) {
super(id, peerAddresses, configParams);
- this.dataPersistenceProvider = new PersistentDataProvider();
+ setPersistence(true);
roleChangeNotifier = createRoleChangeNotifier(id);
}
}
- @Override
- protected DataPersistenceProvider persistence() {
- return dataPersistenceProvider;
- }
-
@Override public void onReceiveRecover(Object message)throws Exception {
super.onReceiveRecover(message);
}
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
*/
private final RaftActorContextImpl context;
+ private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null);
+
private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
/**
context.setConfigParams(configParams);
}
+ public final DataPersistenceProvider persistence() {
+ return delegatingPersistenceProvider.getDelegate();
+ }
+
+ public void setPersistence(DataPersistenceProvider provider) {
+ delegatingPersistenceProvider.setDelegate(provider);
+ }
+
+ protected void setPersistence(boolean persistent) {
+ if(persistent) {
+ setPersistence(new PersistentDataProvider(this));
+ } else {
+ setPersistence(new NonPersistentDataProvider() {
+ /**
+ * The way snapshotting works is,
+ * <ol>
+ * <li> RaftActor calls createSnapshot on the Shard
+ * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
+ * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save
+ * the snapshot. The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the
+ * RaftActor gets SaveSnapshot success it commits the snapshot to the in-memory journal. This
+ * commitSnapshot is mimicking what is done in SaveSnapshotSuccess.
+ * </ol>
+ */
+ @Override
+ public void saveSnapshot(Object o) {
+ // Make saving Snapshot successful
+ // Committing the snapshot here would end up calling commit in the creating state which would
+ // be a state violation. That's why now we send a message to commit the snapshot.
+ self().tell(COMMIT_SNAPSHOT, self());
+ }
+ });
+ }
+ }
+
/**
* setPeerAddress sets the address of a known peer at a later time.
* <p>
*/
protected abstract void onStateChanged();
- protected abstract DataPersistenceProvider persistence();
-
/**
* Notifier Actor for this RaftActor to notify when a role change happens
* @return ActorRef - ActorRef of the notifier or Optional.absent if none.
}
}
- protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider {
-
- public NonPersistentRaftDataProvider(){
-
- }
-
- /**
- * The way snapshotting works is,
- * <ol>
- * <li> RaftActor calls createSnapshot on the Shard
- * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
- * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot.
- * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot
- * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done
- * in SaveSnapshotSuccess.
- * </ol>
- * @param o
- */
- @Override
- public void saveSnapshot(Object o) {
- // Make saving Snapshot successful
- // Committing the snapshot here would end up calling commit in the creating state which would
- // be a state violation. That's why now we send a message to commit the snapshot.
- self().tell(COMMIT_SNAPSHOT, self());
- }
- }
-
-
private class CreateSnapshotProcedure implements Procedure<Void> {
@Override
private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
TestActorRef<MessageCollectorActor> collectorActor) {
super(id, peerAddresses, Optional.of(config), null);
- dataPersistenceProvider = new PersistentDataProvider();
this.collectorActor = collectorActor;
}
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
public static class MockRaftActor extends RaftActor {
- protected DataPersistenceProvider dataPersistenceProvider;
private final RaftActor delegate;
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final List<Object> state;
state = new ArrayList<>();
this.delegate = mock(RaftActor.class);
if(dataPersistenceProvider == null){
- this.dataPersistenceProvider = new PersistentDataProvider();
+ setPersistence(true);
} else {
- this.dataPersistenceProvider = dataPersistenceProvider;
+ setPersistence(dataPersistenceProvider);
}
}
delegate.onStateChanged();
}
- @Override
- protected DataPersistenceProvider persistence() {
- return this.dataPersistenceProvider;
- }
-
@Override
protected Optional<ActorRef> getRoleChangeNotifier() {
return Optional.fromNullable(roleChangeNotifier);
TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
- new NonPersistentProvider()), persistenceId);
+ new NonPersistentDataProvider()), persistenceId);
List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
- leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentProvider()
+ leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
, snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
+ leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
// capture snapshot reply should remove the snapshotted entries only
assertEquals(3, leaderActor.getReplicatedLog().size());
assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
+ followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
// capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
};
}
-
- private static class NonPersistentProvider implements DataPersistenceProvider {
- @Override
- public boolean isRecoveryApplicable() {
- return false;
- }
-
- @Override
- public <T> void persist(T o, Procedure<T> procedure) {
- try {
- procedure.apply(o);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void saveSnapshot(Object o) {
-
- }
-
- @Override
- public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
-
- }
-
- @Override
- public void deleteMessages(long sequenceNumber) {
-
- }
- }
-
@Test
public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
new JavaTestKit(getSystem()) {{
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
- DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
+ DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
Map<String, String> peerAddresses = new HashMap<>();
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
- DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
+ DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
Map<String, String> peerAddresses = new HashMap<>();
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+
+/**
+ * A DataPersistenceProvider implementation that delegates to another implementation.
+ *
+ * @author Thomas Pantelis
+ */
+public class DelegatingPersistentDataProvider implements DataPersistenceProvider {
+ private DataPersistenceProvider delegate;
+
+ public DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
+ this.delegate = delegate;
+ }
+
+ public void setDelegate(DataPersistenceProvider delegate) {
+ this.delegate = delegate;
+ }
+
+ public DataPersistenceProvider getDelegate() {
+ return delegate;
+ }
+
+ @Override
+ public boolean isRecoveryApplicable() {
+ return delegate.isRecoveryApplicable();
+ }
+
+ @Override
+ public <T> void persist(T o, Procedure<T> procedure) {
+ delegate.persist(o, procedure);
+ }
+
+ @Override
+ public void saveSnapshot(Object o) {
+ delegate.saveSnapshot(o);
+ }
+
+ @Override
+ public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+ delegate.deleteSnapshots(criteria);
+ }
+
+ @Override
+ public void deleteMessages(long sequenceNumber) {
+ delegate.deleteMessages(sequenceNumber);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A DataPersistenceProvider implementation with persistence disabled, essentially a no-op.
+ */
+public class NonPersistentDataProvider implements DataPersistenceProvider {
+ private static final Logger LOG = LoggerFactory.getLogger(NonPersistentDataProvider.class);
+
+ @Override
+ public boolean isRecoveryApplicable() {
+ return false;
+ }
+
+ @Override
+ public <T> void persist(T o, Procedure<T> procedure) {
+ try {
+ procedure.apply(o);
+ } catch (Exception e) {
+ LOG.error("An unexpected error occurred", e);
+ }
+ }
+
+ @Override
+ public void saveSnapshot(Object o) {
+ }
+
+ @Override
+ public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+ }
+
+ @Override
+ public void deleteMessages(long sequenceNumber) {
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.persistence.UntypedPersistentActor;
+import com.google.common.base.Preconditions;
+
+/**
+ * A DataPersistenceProvider implementation with persistence enabled.
+ */
+public class PersistentDataProvider implements DataPersistenceProvider {
+
+ private final UntypedPersistentActor persistentActor;
+
+ public PersistentDataProvider(UntypedPersistentActor persistentActor) {
+ this.persistentActor = Preconditions.checkNotNull(persistentActor, "persistentActor can't be null");
+ }
+
+ @Override
+ public boolean isRecoveryApplicable() {
+ return true;
+ }
+
+ @Override
+ public <T> void persist(T o, Procedure<T> procedure) {
+ persistentActor.persist(o, procedure);
+ }
+
+ @Override
+ public void saveSnapshot(Object o) {
+ persistentActor.saveSnapshot(o);
+ }
+
+ @Override
+ public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+ persistentActor.deleteSnapshots(criteria);
+ }
+
+ @Override
+ public void deleteMessages(long sequenceNumber) {
+ persistentActor.deleteMessages(sequenceNumber);
+ }
+}
\ No newline at end of file
package org.opendaylight.controller.cluster.common.actor;
-import akka.japi.Procedure;
-import akka.persistence.SnapshotSelectionCriteria;
import akka.persistence.UntypedPersistentActor;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
unhandled(message);
}
-
- protected class PersistentDataProvider implements DataPersistenceProvider {
-
- public PersistentDataProvider(){
-
- }
-
- @Override
- public boolean isRecoveryApplicable() {
- return true;
- }
-
- @Override
- public <T> void persist(T o, Procedure<T> procedure) {
- AbstractUntypedPersistentActor.this.persist(o, procedure);
- }
-
- @Override
- public void saveSnapshot(Object o) {
- AbstractUntypedPersistentActor.this.saveSnapshot(o);
- }
-
- @Override
- public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
- AbstractUntypedPersistentActor.this.deleteSnapshots(criteria);
- }
-
- @Override
- public void deleteMessages(long sequenceNumber) {
- AbstractUntypedPersistentActor.this.deleteMessages(sequenceNumber);
- }
- }
-
- protected class NonPersistentDataProvider implements DataPersistenceProvider {
-
- public NonPersistentDataProvider(){
-
- }
-
- @Override
- public boolean isRecoveryApplicable() {
- return false;
- }
-
- @Override
- public <T> void persist(T o, Procedure<T> procedure) {
- try {
- procedure.apply(o);
- } catch (Exception e) {
- LOG.error("An unexpected error occurred", e);
- }
- }
-
- @Override
- public void saveSnapshot(Object o) {
- }
-
- @Override
- public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
-
- }
-
- @Override
- public void deleteMessages(long sequenceNumber) {
-
- }
- }
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
private DatastoreContext datastoreContext;
- private DataPersistenceProvider dataPersistenceProvider;
-
private SchemaContext schemaContext;
private int createSnapshotTransactionCounter;
this.name = name.toString();
this.datastoreContext = datastoreContext;
this.schemaContext = schemaContext;
- this.dataPersistenceProvider = (datastoreContext.isPersistent())
- ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
this.txnDispatcherPath = new Dispatchers(context().system().dispatchers())
.getDispatcherPath(Dispatchers.DispatcherType.Transaction);
+ setPersistence(datastoreContext.isPersistent());
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
setTransactionCommitTimeout();
- if(datastoreContext.isPersistent() &&
- dataPersistenceProvider instanceof NonPersistentRaftDataProvider) {
- dataPersistenceProvider = new PersistentDataProvider();
- } else if(!datastoreContext.isPersistent() &&
- dataPersistenceProvider instanceof PersistentDataProvider) {
- dataPersistenceProvider = new NonPersistentRaftDataProvider();
+ if(datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) {
+ setPersistence(true);
+ } else if(!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) {
+ setPersistence(false);
}
updateConfigParams(datastoreContext.getShardRaftConfig());
}
@Override
- protected DataPersistenceProvider persistence() {
- return dataPersistenceProvider;
- }
-
- @Override public String persistenceId() {
+ public String persistenceId() {
return this.name;
}
- @VisibleForTesting
- DataPersistenceProvider getDataPersistenceProvider() {
- return dataPersistenceProvider;
- }
-
@VisibleForTesting
ShardCommitCoordinator getCommitCoordinator() {
return commitCoordinator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
}
protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
- return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
+ return (persistent) ? new PersistentDataProvider(this) : new NonPersistentDataProvider();
}
public static Props props(
import akka.dispatch.Dispatchers;
import akka.dispatch.OnComplete;
import akka.japi.Creator;
-import akka.japi.Procedure;
import akka.pattern.Patterns;
-import akka.persistence.SnapshotSelectionCriteria;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Function;
import org.junit.Test;
import org.mockito.InOrder;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
- class DelegatingPersistentDataProvider implements DataPersistenceProvider {
- DataPersistenceProvider delegate;
-
- DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public boolean isRecoveryApplicable() {
- return delegate.isRecoveryApplicable();
- }
-
- @Override
- public <T> void persist(T o, Procedure<T> procedure) {
- delegate.persist(o, procedure);
+ class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
+ TestPersistentDataProvider(DataPersistenceProvider delegate) {
+ super(delegate);
}
@Override
public void saveSnapshot(Object o) {
savedSnapshot.set(o);
- delegate.saveSnapshot(o);
- }
-
- @Override
- public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
- delegate.deleteSnapshots(criteria);
- }
-
- @Override
- public void deleteMessages(long sequenceNumber) {
- delegate.deleteMessages(sequenceNumber);
+ super.saveSnapshot(o);
}
}
dataStoreContextBuilder.persistent(persistent);
-
-
new ShardTestKit(getSystem()) {{
final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
DatastoreContext datastoreContext, SchemaContext schemaContext) {
super(name, peerAddresses, datastoreContext, schemaContext);
- }
-
- DelegatingPersistentDataProvider delegating;
-
- protected DataPersistenceProvider persistence() {
- if(delegating == null) {
- delegating = new DelegatingPersistentDataProvider(super.persistence());
- }
- return delegating;
+ setPersistence(new TestPersistentDataProvider(super.persistence()));
}
@Override
TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
persistentProps, "testPersistence1");
- assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+ assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
nonPersistentProps, "testPersistence2");
- assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+ assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
assertEquals("isRecoveryApplicable", true,
- shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+ shard.underlyingActor().persistence().isRecoveryApplicable());
waitUntilLeader(shard);
shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
assertEquals("isRecoveryApplicable", false,
- shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+ shard.underlyingActor().persistence().isRecoveryApplicable());
shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
assertEquals("isRecoveryApplicable", true,
- shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+ shard.underlyingActor().persistence().isRecoveryApplicable());
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};