import akka.dispatch.Dispatchers;
import akka.dispatch.OnComplete;
import akka.japi.Creator;
-import akka.japi.Procedure;
import akka.pattern.Patterns;
-import akka.persistence.SnapshotSelectionCriteria;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Function;
import org.junit.Test;
import org.mockito.InOrder;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
- class DelegatingPersistentDataProvider implements DataPersistenceProvider {
- DataPersistenceProvider delegate;
-
- DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public boolean isRecoveryApplicable() {
- return delegate.isRecoveryApplicable();
- }
-
- @Override
- public <T> void persist(T o, Procedure<T> procedure) {
- delegate.persist(o, procedure);
+ class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
+ TestPersistentDataProvider(DataPersistenceProvider delegate) {
+ super(delegate);
}
@Override
public void saveSnapshot(Object o) {
savedSnapshot.set(o);
- delegate.saveSnapshot(o);
- }
-
- @Override
- public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
- delegate.deleteSnapshots(criteria);
- }
-
- @Override
- public void deleteMessages(long sequenceNumber) {
- delegate.deleteMessages(sequenceNumber);
+ super.saveSnapshot(o);
}
}
dataStoreContextBuilder.persistent(persistent);
-
-
new ShardTestKit(getSystem()) {{
final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
DatastoreContext datastoreContext, SchemaContext schemaContext) {
super(name, peerAddresses, datastoreContext, schemaContext);
- }
-
- DelegatingPersistentDataProvider delegating;
-
- protected DataPersistenceProvider persistence() {
- if(delegating == null) {
- delegating = new DelegatingPersistentDataProvider(super.persistence());
- }
- return delegating;
+ setPersistence(new TestPersistentDataProvider(super.persistence()));
}
@Override
TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
persistentProps, "testPersistence1");
- assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+ assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
nonPersistentProps, "testPersistence2");
- assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+ assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
assertEquals("isRecoveryApplicable", true,
- shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+ shard.underlyingActor().persistence().isRecoveryApplicable());
waitUntilLeader(shard);
shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
assertEquals("isRecoveryApplicable", false,
- shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+ shard.underlyingActor().persistence().isRecoveryApplicable());
shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
assertEquals("isRecoveryApplicable", true,
- shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+ shard.underlyingActor().persistence().isRecoveryApplicable());
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};