From faad081396a7c6489d234809ca9446ab96d14504 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 25 Mar 2015 22:22:45 -0400 Subject: [PATCH] Refactor DataPersistenceProviders and RaftActor#persistence Refactored protected DataPersistenceProvider inner class impls to new files for reuse in unit tests and to reduce inner code. Also modified RaftActor so the DataPersistenceProvider is set rather than having derived classes provide via the persistence() abstract method. This makes it a little easier for derived RaftActors in that they don't have to maintain a field and easier for unit tests to change the DataPersistenceProvider impl. Added a DelegatingPersistentDataProvider that holds the actual impl for RaftActor. This allows the DataPersistenceProvider to be passed to internal helper classes without having to update them if the underlyting impl is changed. These changes will facilitate further refactoring of code in RaftActor into separate classes to simplify it. Change-Id: I520b0d83635356f195e6bff33e44ac8f49e793cf Signed-off-by: Tom Pantelis --- .../cluster/example/ExampleActor.java | 9 +-- .../controller/cluster/raft/RaftActor.java | 70 +++++++++++-------- .../AbstractRaftActorIntegrationTest.java | 1 - .../cluster/raft/RaftActorTest.java | 55 +++------------ .../DelegatingPersistentDataProvider.java | 57 +++++++++++++++ .../cluster/NonPersistentDataProvider.java | 46 ++++++++++++ .../cluster/PersistentDataProvider.java | 50 +++++++++++++ .../actor/AbstractUntypedPersistentActor.java | 70 ------------------- .../controller/cluster/datastore/Shard.java | 27 ++----- .../cluster/datastore/ShardManager.java | 4 +- .../cluster/datastore/ShardTest.java | 55 +++------------ 11 files changed, 223 insertions(+), 221 deletions(-) create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DelegatingPersistentDataProvider.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/PersistentDataProvider.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index c5ae4c41b2..ed19f21ded 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -19,7 +19,6 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.Map; -import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; @@ -38,7 +37,6 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa public class ExampleActor extends RaftActor { private final Map state = new HashMap(); - private final DataPersistenceProvider dataPersistenceProvider; private long persistIdentifier = 1; private final Optional roleChangeNotifier; @@ -47,7 +45,7 @@ public class ExampleActor extends RaftActor { public ExampleActor(String id, Map peerAddresses, Optional configParams) { super(id, peerAddresses, configParams); - this.dataPersistenceProvider = new PersistentDataProvider(); + setPersistence(true); roleChangeNotifier = createRoleChangeNotifier(id); } @@ -185,11 +183,6 @@ public class ExampleActor extends RaftActor { } - @Override - protected DataPersistenceProvider persistence() { - return dataPersistenceProvider; - } - @Override public void onReceiveRecover(Object message)throws Exception { super.onReceiveRecover(message); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index b74259d485..47ccfb7ed9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -29,6 +29,9 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.time.DurationFormatUtils; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; +import org.opendaylight.controller.cluster.NonPersistentDataProvider; +import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; @@ -118,6 +121,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { */ private final RaftActorContextImpl context; + private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null); + private final Procedure createSnapshotProcedure = new CreateSnapshotProcedure(); /** @@ -587,6 +592,41 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.setConfigParams(configParams); } + public final DataPersistenceProvider persistence() { + return delegatingPersistenceProvider.getDelegate(); + } + + public void setPersistence(DataPersistenceProvider provider) { + delegatingPersistenceProvider.setDelegate(provider); + } + + protected void setPersistence(boolean persistent) { + if(persistent) { + setPersistence(new PersistentDataProvider(this)); + } else { + setPersistence(new NonPersistentDataProvider() { + /** + * The way snapshotting works is, + *
    + *
  1. RaftActor calls createSnapshot on the Shard + *
  2. Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot + *
  3. When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save + * the snapshot. The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the + * RaftActor gets SaveSnapshot success it commits the snapshot to the in-memory journal. This + * commitSnapshot is mimicking what is done in SaveSnapshotSuccess. + *
+ */ + @Override + public void saveSnapshot(Object o) { + // Make saving Snapshot successful + // Committing the snapshot here would end up calling commit in the creating state which would + // be a state violation. That's why now we send a message to commit the snapshot. + self().tell(COMMIT_SNAPSHOT, self()); + } + }); + } + } + /** * setPeerAddress sets the address of a known peer at a later time. *

@@ -688,8 +728,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { */ protected abstract void onStateChanged(); - protected abstract DataPersistenceProvider persistence(); - /** * Notifier Actor for this RaftActor to notify when a role change happens * @return ActorRef - ActorRef of the notifier or Optional.absent if none. @@ -911,34 +949,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider { - - public NonPersistentRaftDataProvider(){ - - } - - /** - * The way snapshotting works is, - *

    - *
  1. RaftActor calls createSnapshot on the Shard - *
  2. Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot - *
  3. When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot. - * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot - * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done - * in SaveSnapshotSuccess. - *
- * @param o - */ - @Override - public void saveSnapshot(Object o) { - // Make saving Snapshot successful - // Committing the snapshot here would end up calling commit in the creating state which would - // be a state violation. That's why now we send a message to commit the snapshot. - self().tell(COMMIT_SNAPSHOT, self()); - } - } - - private class CreateSnapshotProcedure implements Procedure { @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index 13445b0b26..dfaa8d55f6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -57,7 +57,6 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest private TestRaftActor(String id, Map peerAddresses, ConfigParams config, TestActorRef collectorActor) { super(id, peerAddresses, Optional.of(config), null); - dataPersistenceProvider = new PersistentDataProvider(); this.collectorActor = collectorActor; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 0a4a2c7717..4cb555c4b5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -54,6 +54,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; @@ -97,7 +98,6 @@ public class RaftActorTest extends AbstractActorTest { public static class MockRaftActor extends RaftActor { - protected DataPersistenceProvider dataPersistenceProvider; private final RaftActor delegate; private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final List state; @@ -137,9 +137,9 @@ public class RaftActorTest extends AbstractActorTest { state = new ArrayList<>(); this.delegate = mock(RaftActor.class); if(dataPersistenceProvider == null){ - this.dataPersistenceProvider = new PersistentDataProvider(); + setPersistence(true); } else { - this.dataPersistenceProvider = dataPersistenceProvider; + setPersistence(dataPersistenceProvider); } } @@ -252,11 +252,6 @@ public class RaftActorTest extends AbstractActorTest { delegate.onStateChanged(); } - @Override - protected DataPersistenceProvider persistence() { - return this.dataPersistenceProvider; - } - @Override protected Optional getRoleChangeNotifier() { return Optional.fromNullable(roleChangeNotifier); @@ -986,7 +981,7 @@ public class RaftActorTest extends AbstractActorTest { TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, Collections.emptyMap(), Optional.of(config), notifierActor, - new NonPersistentProvider()), persistenceId); + new NonPersistentDataProvider()), persistenceId); List matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3); @@ -1161,13 +1156,13 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); - leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentProvider() + leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider() , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory()); assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); // The commit is needed to complete the snapshot creation process - leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1); + leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1); // capture snapshot reply should remove the snapshotted entries only assertEquals(3, leaderActor.getReplicatedLog().size()); @@ -1271,7 +1266,7 @@ public class RaftActorTest extends AbstractActorTest { assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing()); // The commit is needed to complete the snapshot creation process - followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1); + followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1); // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log @@ -1380,38 +1375,6 @@ public class RaftActorTest extends AbstractActorTest { }; } - - private static class NonPersistentProvider implements DataPersistenceProvider { - @Override - public boolean isRecoveryApplicable() { - return false; - } - - @Override - public void persist(T o, Procedure procedure) { - try { - procedure.apply(o); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Override - public void saveSnapshot(Object o) { - - } - - @Override - public void deleteSnapshots(SnapshotSelectionCriteria criteria) { - - } - - @Override - public void deleteMessages(long sequenceNumber) { - - } - } - @Test public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception { new JavaTestKit(getSystem()) {{ @@ -1421,7 +1384,7 @@ public class RaftActorTest extends AbstractActorTest { config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); config.setSnapshotBatchCount(5); - DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider(); + DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider(); Map peerAddresses = new HashMap<>(); @@ -1468,7 +1431,7 @@ public class RaftActorTest extends AbstractActorTest { config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); config.setSnapshotBatchCount(5); - DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider(); + DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider(); Map peerAddresses = new HashMap<>(); diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DelegatingPersistentDataProvider.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DelegatingPersistentDataProvider.java new file mode 100644 index 0000000000..c74236bb47 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DelegatingPersistentDataProvider.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster; + +import akka.japi.Procedure; +import akka.persistence.SnapshotSelectionCriteria; + +/** + * A DataPersistenceProvider implementation that delegates to another implementation. + * + * @author Thomas Pantelis + */ +public class DelegatingPersistentDataProvider implements DataPersistenceProvider { + private DataPersistenceProvider delegate; + + public DelegatingPersistentDataProvider(DataPersistenceProvider delegate) { + this.delegate = delegate; + } + + public void setDelegate(DataPersistenceProvider delegate) { + this.delegate = delegate; + } + + public DataPersistenceProvider getDelegate() { + return delegate; + } + + @Override + public boolean isRecoveryApplicable() { + return delegate.isRecoveryApplicable(); + } + + @Override + public void persist(T o, Procedure procedure) { + delegate.persist(o, procedure); + } + + @Override + public void saveSnapshot(Object o) { + delegate.saveSnapshot(o); + } + + @Override + public void deleteSnapshots(SnapshotSelectionCriteria criteria) { + delegate.deleteSnapshots(criteria); + } + + @Override + public void deleteMessages(long sequenceNumber) { + delegate.deleteMessages(sequenceNumber); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java new file mode 100644 index 0000000000..fed81177a1 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster; + +import akka.japi.Procedure; +import akka.persistence.SnapshotSelectionCriteria; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A DataPersistenceProvider implementation with persistence disabled, essentially a no-op. + */ +public class NonPersistentDataProvider implements DataPersistenceProvider { + private static final Logger LOG = LoggerFactory.getLogger(NonPersistentDataProvider.class); + + @Override + public boolean isRecoveryApplicable() { + return false; + } + + @Override + public void persist(T o, Procedure procedure) { + try { + procedure.apply(o); + } catch (Exception e) { + LOG.error("An unexpected error occurred", e); + } + } + + @Override + public void saveSnapshot(Object o) { + } + + @Override + public void deleteSnapshots(SnapshotSelectionCriteria criteria) { + } + + @Override + public void deleteMessages(long sequenceNumber) { + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/PersistentDataProvider.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/PersistentDataProvider.java new file mode 100644 index 0000000000..f130a1f27e --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/PersistentDataProvider.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster; + +import akka.japi.Procedure; +import akka.persistence.SnapshotSelectionCriteria; +import akka.persistence.UntypedPersistentActor; +import com.google.common.base.Preconditions; + +/** + * A DataPersistenceProvider implementation with persistence enabled. + */ +public class PersistentDataProvider implements DataPersistenceProvider { + + private final UntypedPersistentActor persistentActor; + + public PersistentDataProvider(UntypedPersistentActor persistentActor) { + this.persistentActor = Preconditions.checkNotNull(persistentActor, "persistentActor can't be null"); + } + + @Override + public boolean isRecoveryApplicable() { + return true; + } + + @Override + public void persist(T o, Procedure procedure) { + persistentActor.persist(o, procedure); + } + + @Override + public void saveSnapshot(Object o) { + persistentActor.saveSnapshot(o); + } + + @Override + public void deleteSnapshots(SnapshotSelectionCriteria criteria) { + persistentActor.deleteSnapshots(criteria); + } + + @Override + public void deleteMessages(long sequenceNumber) { + persistentActor.deleteMessages(sequenceNumber); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java index 432c2d5615..326733f377 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java @@ -8,10 +8,7 @@ package org.opendaylight.controller.cluster.common.actor; -import akka.japi.Procedure; -import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.UntypedPersistentActor; -import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,71 +66,4 @@ public abstract class AbstractUntypedPersistentActor extends UntypedPersistentAc } unhandled(message); } - - protected class PersistentDataProvider implements DataPersistenceProvider { - - public PersistentDataProvider(){ - - } - - @Override - public boolean isRecoveryApplicable() { - return true; - } - - @Override - public void persist(T o, Procedure procedure) { - AbstractUntypedPersistentActor.this.persist(o, procedure); - } - - @Override - public void saveSnapshot(Object o) { - AbstractUntypedPersistentActor.this.saveSnapshot(o); - } - - @Override - public void deleteSnapshots(SnapshotSelectionCriteria criteria) { - AbstractUntypedPersistentActor.this.deleteSnapshots(criteria); - } - - @Override - public void deleteMessages(long sequenceNumber) { - AbstractUntypedPersistentActor.this.deleteMessages(sequenceNumber); - } - } - - protected class NonPersistentDataProvider implements DataPersistenceProvider { - - public NonPersistentDataProvider(){ - - } - - @Override - public boolean isRecoveryApplicable() { - return false; - } - - @Override - public void persist(T o, Procedure procedure) { - try { - procedure.apply(o); - } catch (Exception e) { - LOG.error("An unexpected error occurred", e); - } - } - - @Override - public void saveSnapshot(Object o) { - } - - @Override - public void deleteSnapshots(SnapshotSelectionCriteria criteria) { - - } - - @Override - public void deleteMessages(long sequenceNumber) { - - } - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 8e00a1389c..c04256a28e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; -import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry; @@ -115,8 +114,6 @@ public class Shard extends RaftActor { private DatastoreContext datastoreContext; - private DataPersistenceProvider dataPersistenceProvider; - private SchemaContext schemaContext; private int createSnapshotTransactionCounter; @@ -151,11 +148,10 @@ public class Shard extends RaftActor { this.name = name.toString(); this.datastoreContext = datastoreContext; this.schemaContext = schemaContext; - this.dataPersistenceProvider = (datastoreContext.isPersistent()) - ? new PersistentDataProvider() : new NonPersistentRaftDataProvider(); this.txnDispatcherPath = new Dispatchers(context().system().dispatchers()) .getDispatcherPath(Dispatchers.DispatcherType.Transaction); + setPersistence(datastoreContext.isPersistent()); LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent()); @@ -311,12 +307,10 @@ public class Shard extends RaftActor { setTransactionCommitTimeout(); - if(datastoreContext.isPersistent() && - dataPersistenceProvider instanceof NonPersistentRaftDataProvider) { - dataPersistenceProvider = new PersistentDataProvider(); - } else if(!datastoreContext.isPersistent() && - dataPersistenceProvider instanceof PersistentDataProvider) { - dataPersistenceProvider = new NonPersistentRaftDataProvider(); + if(datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) { + setPersistence(true); + } else if(!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) { + setPersistence(false); } updateConfigParams(datastoreContext.getShardRaftConfig()); @@ -859,19 +853,10 @@ public class Shard extends RaftActor { } @Override - protected DataPersistenceProvider persistence() { - return dataPersistenceProvider; - } - - @Override public String persistenceId() { + public String persistenceId() { return this.name; } - @VisibleForTesting - DataPersistenceProvider getDataPersistenceProvider() { - return dataPersistenceProvider; - } - @VisibleForTesting ShardCommitCoordinator getCommitCoordinator() { return commitCoordinator; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 52762b4eb3..cff44b13cb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -41,6 +41,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.NonPersistentDataProvider; +import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; @@ -136,7 +138,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { - return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider(); + return (persistent) ? new PersistentDataProvider(this) : new NonPersistentDataProvider(); } public static Props props( diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index cc96d0d3b0..e04c1a5d18 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -16,9 +16,7 @@ import akka.actor.Props; import akka.dispatch.Dispatchers; import akka.dispatch.OnComplete; import akka.japi.Creator; -import akka.japi.Procedure; import akka.pattern.Patterns; -import akka.persistence.SnapshotSelectionCriteria; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Function; @@ -41,6 +39,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.mockito.InOrder; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; @@ -1391,44 +1390,20 @@ public class ShardTest extends AbstractShardTest { public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{ final AtomicReference savedSnapshot = new AtomicReference<>(); - class DelegatingPersistentDataProvider implements DataPersistenceProvider { - DataPersistenceProvider delegate; - - DelegatingPersistentDataProvider(DataPersistenceProvider delegate) { - this.delegate = delegate; - } - - @Override - public boolean isRecoveryApplicable() { - return delegate.isRecoveryApplicable(); - } - - @Override - public void persist(T o, Procedure procedure) { - delegate.persist(o, procedure); + class TestPersistentDataProvider extends DelegatingPersistentDataProvider { + TestPersistentDataProvider(DataPersistenceProvider delegate) { + super(delegate); } @Override public void saveSnapshot(Object o) { savedSnapshot.set(o); - delegate.saveSnapshot(o); - } - - @Override - public void deleteSnapshots(SnapshotSelectionCriteria criteria) { - delegate.deleteSnapshots(criteria); - } - - @Override - public void deleteMessages(long sequenceNumber) { - delegate.deleteMessages(sequenceNumber); + super.saveSnapshot(o); } } dataStoreContextBuilder.persistent(persistent); - - new ShardTestKit(getSystem()) {{ final AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); @@ -1437,15 +1412,7 @@ public class ShardTest extends AbstractShardTest { protected TestShard(ShardIdentifier name, Map peerAddresses, DatastoreContext datastoreContext, SchemaContext schemaContext) { super(name, peerAddresses, datastoreContext, schemaContext); - } - - DelegatingPersistentDataProvider delegating; - - protected DataPersistenceProvider persistence() { - if(delegating == null) { - delegating = new DelegatingPersistentDataProvider(super.persistence()); - } - return delegating; + setPersistence(new TestPersistentDataProvider(super.persistence())); } @Override @@ -1560,14 +1527,14 @@ public class ShardTest extends AbstractShardTest { TestActorRef shard1 = TestActorRef.create(getSystem(), persistentProps, "testPersistence1"); - assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable()); shard1.tell(PoisonPill.getInstance(), ActorRef.noSender()); TestActorRef shard2 = TestActorRef.create(getSystem(), nonPersistentProps, "testPersistence2"); - assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable()); shard2.tell(PoisonPill.getInstance(), ActorRef.noSender()); @@ -1583,19 +1550,19 @@ public class ShardTest extends AbstractShardTest { TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext"); assertEquals("isRecoveryApplicable", true, - shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + shard.underlyingActor().persistence().isRecoveryApplicable()); waitUntilLeader(shard); shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender()); assertEquals("isRecoveryApplicable", false, - shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + shard.underlyingActor().persistence().isRecoveryApplicable()); shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender()); assertEquals("isRecoveryApplicable", true, - shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + shard.underlyingActor().persistence().isRecoveryApplicable()); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; -- 2.36.6