Refactor DataPersistenceProviders and RaftActor#persistence 06/17206/6
authorTom Pantelis <tpanteli@brocade.com>
Thu, 26 Mar 2015 02:22:45 +0000 (22:22 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 26 Mar 2015 08:59:57 +0000 (04:59 -0400)
Refactored protected DataPersistenceProvider inner class impls to new
files for reuse in unit tests and to reduce inner code.

Also modified RaftActor so the DataPersistenceProvider is set rather
than having derived classes provide via the persistence() abstract method.
This makes it a little easier for derived RaftActors in that they don't
have to maintain a field and easier for unit tests to change the
DataPersistenceProvider impl.

Added a DelegatingPersistentDataProvider that holds the actual impl for
RaftActor. This allows the DataPersistenceProvider to be passed to
internal helper classes without having to update them if the underlyting
impl is changed.

These changes will facilitate further refactoring of code in RaftActor
into separate classes to simplify it.

Change-Id: I520b0d83635356f195e6bff33e44ac8f49e793cf
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DelegatingPersistentDataProvider.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/PersistentDataProvider.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index c5ae4c41b2f4822a04ca9da300171ae2e618d52e..ed19f21dedb8d5900a0df35aa31fc20ce6b8aee4 100644 (file)
@@ -19,7 +19,6 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.Map;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
@@ -38,7 +37,6 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa
 public class ExampleActor extends RaftActor {
 
     private final Map<String, String> state = new HashMap();
-    private final DataPersistenceProvider dataPersistenceProvider;
 
     private long persistIdentifier = 1;
     private final Optional<ActorRef> roleChangeNotifier;
@@ -47,7 +45,7 @@ public class ExampleActor extends RaftActor {
     public ExampleActor(String id, Map<String, String> peerAddresses,
         Optional<ConfigParams> configParams) {
         super(id, peerAddresses, configParams);
-        this.dataPersistenceProvider = new PersistentDataProvider();
+        setPersistence(true);
         roleChangeNotifier = createRoleChangeNotifier(id);
     }
 
@@ -185,11 +183,6 @@ public class ExampleActor extends RaftActor {
 
     }
 
-    @Override
-    protected DataPersistenceProvider persistence() {
-        return dataPersistenceProvider;
-    }
-
     @Override public void onReceiveRecover(Object message)throws Exception {
         super.onReceiveRecover(message);
     }
index b74259d4851153659df0c2866f6323b9234eff06..47ccfb7ed96cc5a9577ae6557f81ef1f4c7a798c 100644 (file)
@@ -29,6 +29,9 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
@@ -118,6 +121,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      */
     private final RaftActorContextImpl context;
 
+    private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null);
+
     private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
 
     /**
@@ -587,6 +592,41 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         context.setConfigParams(configParams);
     }
 
+    public final DataPersistenceProvider persistence() {
+        return delegatingPersistenceProvider.getDelegate();
+    }
+
+    public void setPersistence(DataPersistenceProvider provider) {
+        delegatingPersistenceProvider.setDelegate(provider);
+    }
+
+    protected void setPersistence(boolean persistent) {
+        if(persistent) {
+            setPersistence(new PersistentDataProvider(this));
+        } else {
+            setPersistence(new NonPersistentDataProvider() {
+                /**
+                 * The way snapshotting works is,
+                 * <ol>
+                 * <li> RaftActor calls createSnapshot on the Shard
+                 * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
+                 * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save
+                 * the snapshot. The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the
+                 * RaftActor gets SaveSnapshot success it commits the snapshot to the in-memory journal. This
+                 * commitSnapshot is mimicking what is done in SaveSnapshotSuccess.
+                 * </ol>
+                 */
+                @Override
+                public void saveSnapshot(Object o) {
+                    // Make saving Snapshot successful
+                    // Committing the snapshot here would end up calling commit in the creating state which would
+                    // be a state violation. That's why now we send a message to commit the snapshot.
+                    self().tell(COMMIT_SNAPSHOT, self());
+                }
+            });
+        }
+    }
+
     /**
      * setPeerAddress sets the address of a known peer at a later time.
      * <p>
@@ -688,8 +728,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      */
     protected abstract void onStateChanged();
 
-    protected abstract DataPersistenceProvider persistence();
-
     /**
      * Notifier Actor for this RaftActor to notify when a role change happens
      * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
@@ -911,34 +949,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider {
-
-        public NonPersistentRaftDataProvider(){
-
-        }
-
-        /**
-         * The way snapshotting works is,
-         * <ol>
-         * <li> RaftActor calls createSnapshot on the Shard
-         * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
-         * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot.
-         * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot
-         * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done
-         * in SaveSnapshotSuccess.
-         * </ol>
-         * @param o
-         */
-        @Override
-        public void saveSnapshot(Object o) {
-            // Make saving Snapshot successful
-            // Committing the snapshot here would end up calling commit in the creating state which would
-            // be a state violation. That's why now we send a message to commit the snapshot.
-            self().tell(COMMIT_SNAPSHOT, self());
-        }
-    }
-
-
     private class CreateSnapshotProcedure implements Procedure<Void> {
 
         @Override
index 13445b0b26bb54f2794446647ab494ffb794fd26..dfaa8d55f6e913b7ce34559cc38d68a8343d71e1 100644 (file)
@@ -57,7 +57,6 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
                 TestActorRef<MessageCollectorActor> collectorActor) {
             super(id, peerAddresses, Optional.of(config), null);
-            dataPersistenceProvider = new PersistentDataProvider();
             this.collectorActor = collectorActor;
         }
 
index 0a4a2c7717facfcc9fc883c2234d4577ff49f876..4cb555c4b547bb9cd3c796644da2f74352e75521 100644 (file)
@@ -54,6 +54,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
@@ -97,7 +98,6 @@ public class RaftActorTest extends AbstractActorTest {
 
     public static class MockRaftActor extends RaftActor {
 
-        protected DataPersistenceProvider dataPersistenceProvider;
         private final RaftActor delegate;
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
         private final List<Object> state;
@@ -137,9 +137,9 @@ public class RaftActorTest extends AbstractActorTest {
             state = new ArrayList<>();
             this.delegate = mock(RaftActor.class);
             if(dataPersistenceProvider == null){
-                this.dataPersistenceProvider = new PersistentDataProvider();
+                setPersistence(true);
             } else {
-                this.dataPersistenceProvider = dataPersistenceProvider;
+                setPersistence(dataPersistenceProvider);
             }
         }
 
@@ -252,11 +252,6 @@ public class RaftActorTest extends AbstractActorTest {
             delegate.onStateChanged();
         }
 
-        @Override
-        protected DataPersistenceProvider persistence() {
-            return this.dataPersistenceProvider;
-        }
-
         @Override
         protected Optional<ActorRef> getRoleChangeNotifier() {
             return Optional.fromNullable(roleChangeNotifier);
@@ -986,7 +981,7 @@ public class RaftActorTest extends AbstractActorTest {
 
             TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
-                    new NonPersistentProvider()), persistenceId);
+                    new NonPersistentDataProvider()), persistenceId);
 
             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
 
@@ -1161,13 +1156,13 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("foo-3"),
                         new MockRaftActorContext.MockPayload("foo-4")));
 
-                leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentProvider()
+                leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
                         , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
 
                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
                 // The commit is needed to complete the snapshot creation process
-                leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
+                leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
 
                 // capture snapshot reply should remove the snapshotted entries only
                 assertEquals(3, leaderActor.getReplicatedLog().size());
@@ -1271,7 +1266,7 @@ public class RaftActorTest extends AbstractActorTest {
                 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
                 // The commit is needed to complete the snapshot creation process
-                followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
+                followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
 
                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
@@ -1380,38 +1375,6 @@ public class RaftActorTest extends AbstractActorTest {
         };
     }
 
-
-    private static class NonPersistentProvider implements DataPersistenceProvider {
-        @Override
-        public boolean isRecoveryApplicable() {
-            return false;
-        }
-
-        @Override
-        public <T> void persist(T o, Procedure<T> procedure) {
-            try {
-                procedure.apply(o);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-
-        @Override
-        public void saveSnapshot(Object o) {
-
-        }
-
-        @Override
-        public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
-
-        }
-
-        @Override
-        public void deleteMessages(long sequenceNumber) {
-
-        }
-    }
-
     @Test
     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -1421,7 +1384,7 @@ public class RaftActorTest extends AbstractActorTest {
             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
             config.setSnapshotBatchCount(5);
 
-            DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
+            DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
 
             Map<String, String> peerAddresses = new HashMap<>();
 
@@ -1468,7 +1431,7 @@ public class RaftActorTest extends AbstractActorTest {
             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
             config.setSnapshotBatchCount(5);
 
-            DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
+            DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
 
             Map<String, String> peerAddresses = new HashMap<>();
 
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DelegatingPersistentDataProvider.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DelegatingPersistentDataProvider.java
new file mode 100644 (file)
index 0000000..c74236b
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+
+/**
+ * A DataPersistenceProvider implementation that delegates to another implementation.
+ *
+ * @author Thomas Pantelis
+ */
+public class DelegatingPersistentDataProvider implements DataPersistenceProvider {
+    private DataPersistenceProvider delegate;
+
+    public DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
+        this.delegate = delegate;
+    }
+
+    public void setDelegate(DataPersistenceProvider delegate) {
+        this.delegate = delegate;
+    }
+
+    public DataPersistenceProvider getDelegate() {
+        return delegate;
+    }
+
+    @Override
+    public boolean isRecoveryApplicable() {
+        return delegate.isRecoveryApplicable();
+    }
+
+    @Override
+    public <T> void persist(T o, Procedure<T> procedure) {
+        delegate.persist(o, procedure);
+    }
+
+    @Override
+    public void saveSnapshot(Object o) {
+        delegate.saveSnapshot(o);
+    }
+
+    @Override
+    public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+        delegate.deleteSnapshots(criteria);
+    }
+
+    @Override
+    public void deleteMessages(long sequenceNumber) {
+        delegate.deleteMessages(sequenceNumber);
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java
new file mode 100644 (file)
index 0000000..fed8117
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A DataPersistenceProvider implementation with persistence disabled, essentially a no-op.
+ */
+public class NonPersistentDataProvider implements DataPersistenceProvider {
+    private static final Logger LOG = LoggerFactory.getLogger(NonPersistentDataProvider.class);
+
+    @Override
+    public boolean isRecoveryApplicable() {
+        return false;
+    }
+
+    @Override
+    public <T> void persist(T o, Procedure<T> procedure) {
+        try {
+            procedure.apply(o);
+        } catch (Exception e) {
+            LOG.error("An unexpected error occurred", e);
+        }
+    }
+
+    @Override
+    public void saveSnapshot(Object o) {
+    }
+
+    @Override
+    public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+    }
+
+    @Override
+    public void deleteMessages(long sequenceNumber) {
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/PersistentDataProvider.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/PersistentDataProvider.java
new file mode 100644 (file)
index 0000000..f130a1f
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.persistence.UntypedPersistentActor;
+import com.google.common.base.Preconditions;
+
+/**
+ * A DataPersistenceProvider implementation with persistence enabled.
+ */
+public class PersistentDataProvider implements DataPersistenceProvider {
+
+    private final UntypedPersistentActor persistentActor;
+
+    public PersistentDataProvider(UntypedPersistentActor persistentActor) {
+        this.persistentActor = Preconditions.checkNotNull(persistentActor, "persistentActor can't be null");
+    }
+
+    @Override
+    public boolean isRecoveryApplicable() {
+        return true;
+    }
+
+    @Override
+    public <T> void persist(T o, Procedure<T> procedure) {
+        persistentActor.persist(o, procedure);
+    }
+
+    @Override
+    public void saveSnapshot(Object o) {
+        persistentActor.saveSnapshot(o);
+    }
+
+    @Override
+    public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+        persistentActor.deleteSnapshots(criteria);
+    }
+
+    @Override
+    public void deleteMessages(long sequenceNumber) {
+        persistentActor.deleteMessages(sequenceNumber);
+    }
+}
\ No newline at end of file
index 432c2d5615227d7d67f856bef6215fd550053084..326733f377e21d62fcf4865a0eeab10ad14e7ad6 100644 (file)
@@ -8,10 +8,7 @@
 
 package org.opendaylight.controller.cluster.common.actor;
 
-import akka.japi.Procedure;
-import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,71 +66,4 @@ public abstract class AbstractUntypedPersistentActor extends UntypedPersistentAc
         }
         unhandled(message);
     }
-
-    protected class PersistentDataProvider implements DataPersistenceProvider {
-
-        public PersistentDataProvider(){
-
-        }
-
-        @Override
-        public boolean isRecoveryApplicable() {
-            return true;
-        }
-
-        @Override
-        public <T> void persist(T o, Procedure<T> procedure) {
-            AbstractUntypedPersistentActor.this.persist(o, procedure);
-        }
-
-        @Override
-        public void saveSnapshot(Object o) {
-            AbstractUntypedPersistentActor.this.saveSnapshot(o);
-        }
-
-        @Override
-        public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
-            AbstractUntypedPersistentActor.this.deleteSnapshots(criteria);
-        }
-
-        @Override
-        public void deleteMessages(long sequenceNumber) {
-            AbstractUntypedPersistentActor.this.deleteMessages(sequenceNumber);
-        }
-    }
-
-    protected class NonPersistentDataProvider implements DataPersistenceProvider {
-
-        public NonPersistentDataProvider(){
-
-        }
-
-        @Override
-        public boolean isRecoveryApplicable() {
-            return false;
-        }
-
-        @Override
-        public <T> void persist(T o, Procedure<T> procedure) {
-            try {
-                procedure.apply(o);
-            } catch (Exception e) {
-                LOG.error("An unexpected error occurred", e);
-            }
-        }
-
-        @Override
-        public void saveSnapshot(Object o) {
-        }
-
-        @Override
-        public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
-
-        }
-
-        @Override
-        public void deleteMessages(long sequenceNumber) {
-
-        }
-    }
 }
index 8e00a1389ca6a057bef39292ffcc0b04958be794..c04256a28efb5b01dd94429f81cb72b71ae3e1eb 100644 (file)
@@ -29,7 +29,6 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
@@ -115,8 +114,6 @@ public class Shard extends RaftActor {
 
     private DatastoreContext datastoreContext;
 
-    private DataPersistenceProvider dataPersistenceProvider;
-
     private SchemaContext schemaContext;
 
     private int createSnapshotTransactionCounter;
@@ -151,11 +148,10 @@ public class Shard extends RaftActor {
         this.name = name.toString();
         this.datastoreContext = datastoreContext;
         this.schemaContext = schemaContext;
-        this.dataPersistenceProvider = (datastoreContext.isPersistent())
-                ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
         this.txnDispatcherPath = new Dispatchers(context().system().dispatchers())
                 .getDispatcherPath(Dispatchers.DispatcherType.Transaction);
 
+        setPersistence(datastoreContext.isPersistent());
 
         LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
 
@@ -311,12 +307,10 @@ public class Shard extends RaftActor {
 
         setTransactionCommitTimeout();
 
-        if(datastoreContext.isPersistent() &&
-                dataPersistenceProvider instanceof NonPersistentRaftDataProvider) {
-            dataPersistenceProvider = new PersistentDataProvider();
-        } else if(!datastoreContext.isPersistent() &&
-                dataPersistenceProvider instanceof PersistentDataProvider) {
-            dataPersistenceProvider = new NonPersistentRaftDataProvider();
+        if(datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) {
+            setPersistence(true);
+        } else if(!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) {
+            setPersistence(false);
         }
 
         updateConfigParams(datastoreContext.getShardRaftConfig());
@@ -859,19 +853,10 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected DataPersistenceProvider persistence() {
-        return dataPersistenceProvider;
-    }
-
-    @Override public String persistenceId() {
+    public String persistenceId() {
         return this.name;
     }
 
-    @VisibleForTesting
-    DataPersistenceProvider getDataPersistenceProvider() {
-        return dataPersistenceProvider;
-    }
-
     @VisibleForTesting
     ShardCommitCoordinator getCommitCoordinator() {
         return commitCoordinator;
index 52762b4eb352ff9de295e44969b13c4410b7f2f0..cff44b13cb3edb1756ed6159e95d8fe259f91fde 100644 (file)
@@ -41,6 +41,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
@@ -136,7 +138,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
-        return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
+        return (persistent) ? new PersistentDataProvider(this) : new NonPersistentDataProvider();
     }
 
     public static Props props(
index cc96d0d3b0d070623c737dc8f78340c45a20539f..e04c1a5d185ffbb5bc81b4d035a07545c02ef3a7 100644 (file)
@@ -16,9 +16,7 @@ import akka.actor.Props;
 import akka.dispatch.Dispatchers;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
-import akka.japi.Procedure;
 import akka.pattern.Patterns;
-import akka.persistence.SnapshotSelectionCriteria;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Function;
@@ -41,6 +39,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
@@ -1391,44 +1390,20 @@ public class ShardTest extends AbstractShardTest {
     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
 
         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
-        class DelegatingPersistentDataProvider implements DataPersistenceProvider {
-            DataPersistenceProvider delegate;
-
-            DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
-                this.delegate = delegate;
-            }
-
-            @Override
-            public boolean isRecoveryApplicable() {
-                return delegate.isRecoveryApplicable();
-            }
-
-            @Override
-            public <T> void persist(T o, Procedure<T> procedure) {
-                delegate.persist(o, procedure);
+        class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
+            TestPersistentDataProvider(DataPersistenceProvider delegate) {
+                super(delegate);
             }
 
             @Override
             public void saveSnapshot(Object o) {
                 savedSnapshot.set(o);
-                delegate.saveSnapshot(o);
-            }
-
-            @Override
-            public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
-                delegate.deleteSnapshots(criteria);
-            }
-
-            @Override
-            public void deleteMessages(long sequenceNumber) {
-                delegate.deleteMessages(sequenceNumber);
+                super.saveSnapshot(o);
             }
         }
 
         dataStoreContextBuilder.persistent(persistent);
 
-
-
         new ShardTestKit(getSystem()) {{
             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
 
@@ -1437,15 +1412,7 @@ public class ShardTest extends AbstractShardTest {
                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
                                     DatastoreContext datastoreContext, SchemaContext schemaContext) {
                     super(name, peerAddresses, datastoreContext, schemaContext);
-                }
-
-                DelegatingPersistentDataProvider delegating;
-
-                protected DataPersistenceProvider persistence() {
-                    if(delegating == null) {
-                        delegating = new DelegatingPersistentDataProvider(super.persistence());
-                    }
-                    return delegating;
+                    setPersistence(new TestPersistentDataProvider(super.persistence()));
                 }
 
                 @Override
@@ -1560,14 +1527,14 @@ public class ShardTest extends AbstractShardTest {
             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
                     persistentProps, "testPersistence1");
 
-            assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+            assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
 
             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
 
             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
                     nonPersistentProps, "testPersistence2");
 
-            assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+            assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
 
             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
 
@@ -1583,19 +1550,19 @@ public class ShardTest extends AbstractShardTest {
             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
 
             assertEquals("isRecoveryApplicable", true,
-                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+                    shard.underlyingActor().persistence().isRecoveryApplicable());
 
             waitUntilLeader(shard);
 
             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
 
             assertEquals("isRecoveryApplicable", false,
-                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+                    shard.underlyingActor().persistence().isRecoveryApplicable());
 
             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
 
             assertEquals("isRecoveryApplicable", true,
-                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+                    shard.underlyingActor().persistence().isRecoveryApplicable());
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};