BUG 2134 : Make persistence configurable at the datastore level
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 789d51a19f88942e3a35ceb7fc4d69cb20c8abcb..7d67e0856fe69da8721cd7173c63ec79d294a83e 100644 (file)
@@ -27,9 +27,11 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
@@ -103,10 +105,6 @@ public class Shard extends RaftActor {
     private final LoggingAdapter LOG =
         Logging.getLogger(getContext().system(), this);
 
-    // By default persistent will be true and can be turned off using the system
-    // property shard.persistent
-    private final boolean persistent;
-
     /// The name of this shard
     private final ShardIdentifier name;
 
@@ -119,6 +117,8 @@ public class Shard extends RaftActor {
 
     private final DatastoreContext datastoreContext;
 
+    private final DataPersistenceProvider dataPersistenceProvider;
+
     private SchemaContext schemaContext;
 
     private ActorRef createSnapshotTransaction;
@@ -147,12 +147,9 @@ public class Shard extends RaftActor {
         this.name = name;
         this.datastoreContext = datastoreContext;
         this.schemaContext = schemaContext;
+        this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
 
-        String setting = System.getProperty("shard.persistent");
-
-        this.persistent = !"false".equals(setting);
-
-        LOG.info("Shard created : {} persistent : {}", name, persistent);
+        LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent());
 
         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
                 datastoreContext.getDataStoreProperties());
@@ -210,7 +207,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    public void onReceiveRecover(Object message) {
+    public void onReceiveRecover(Object message) throws Exception {
         if(LOG.isDebugEnabled()) {
             LOG.debug("onReceiveRecover: Received message {} from {}",
                 message.getClass().toString(),
@@ -229,7 +226,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    public void onReceiveCommand(Object message) {
+    public void onReceiveCommand(Object message) throws Exception {
         if(LOG.isDebugEnabled()) {
             LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
         }
@@ -307,12 +304,8 @@ public class Shard extends RaftActor {
             // currently uses a same thread executor anyway.
             cohortEntry.getCohort().preCommit().get();
 
-            if(persistent) {
-                Shard.this.persistData(getSender(), transactionID,
-                        new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
-            } else {
-                Shard.this.finishCommit(getSender(), transactionID);
-            }
+            Shard.this.persistData(getSender(), transactionID,
+                    new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
         } catch (InterruptedException | ExecutionException e) {
             LOG.error(e, "An exception occurred while preCommitting transaction {}",
                     cohortEntry.getTransactionID());
@@ -442,9 +435,9 @@ public class Shard extends RaftActor {
         } else if (getLeader() != null) {
             getLeader().forward(message, getContext());
         } else {
-            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(
+            getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
                 "Could not find shard leader so transaction cannot be created. This typically happens" +
-                " when system is coming up or recovering and a leader is being elected. Try again" +
+                " when the system is coming up or recovering and a leader is being elected. Try again" +
                 " later.")), getSelf());
         }
     }
@@ -842,6 +835,11 @@ public class Shard extends RaftActor {
         }
     }
 
+    @Override
+    protected DataPersistenceProvider persistence() {
+        return dataPersistenceProvider;
+    }
+
     @Override protected void onLeaderChanged(String oldLeader, String newLeader) {
         shardMBean.setLeader(newLeader);
     }
@@ -850,6 +848,11 @@ public class Shard extends RaftActor {
         return this.name.toString();
     }
 
+    @VisibleForTesting
+    DataPersistenceProvider getDataPersistenceProvider() {
+        return dataPersistenceProvider;
+    }
+
     private static class ShardCreator implements Creator<Shard> {
 
         private static final long serialVersionUID = 1L;