X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=7d67e0856fe69da8721cd7173c63ec79d294a83e;hb=refs%2Fchanges%2F52%2F12352%2F5;hp=789d51a19f88942e3a35ceb7fc4d69cb20c8abcb;hpb=007324841fd870fc3d3491f300da6383d8874d6a;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 789d51a19f..7d67e0856f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -27,9 +27,11 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; @@ -103,10 +105,6 @@ public class Shard extends RaftActor { private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); - // By default persistent will be true and can be turned off using the system - // property shard.persistent - private final boolean persistent; - /// The name of this shard private final ShardIdentifier name; @@ -119,6 +117,8 @@ public class Shard extends RaftActor { private final DatastoreContext datastoreContext; + private final DataPersistenceProvider dataPersistenceProvider; + private SchemaContext schemaContext; private ActorRef createSnapshotTransaction; @@ -147,12 +147,9 @@ public class Shard extends RaftActor { this.name = name; this.datastoreContext = datastoreContext; this.schemaContext = schemaContext; + this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider(); - String setting = System.getProperty("shard.persistent"); - - this.persistent = !"false".equals(setting); - - LOG.info("Shard created : {} persistent : {}", name, persistent); + LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent()); store = InMemoryDOMDataStoreFactory.create(name.toString(), null, datastoreContext.getDataStoreProperties()); @@ -210,7 +207,7 @@ public class Shard extends RaftActor { } @Override - public void onReceiveRecover(Object message) { + public void onReceiveRecover(Object message) throws Exception { if(LOG.isDebugEnabled()) { LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(), @@ -229,7 +226,7 @@ public class Shard extends RaftActor { } @Override - public void onReceiveCommand(Object message) { + public void onReceiveCommand(Object message) throws Exception { if(LOG.isDebugEnabled()) { LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender()); } @@ -307,12 +304,8 @@ public class Shard extends RaftActor { // currently uses a same thread executor anyway. cohortEntry.getCohort().preCommit().get(); - if(persistent) { - Shard.this.persistData(getSender(), transactionID, - new CompositeModificationPayload(cohortEntry.getModification().toSerializable())); - } else { - Shard.this.finishCommit(getSender(), transactionID); - } + Shard.this.persistData(getSender(), transactionID, + new CompositeModificationPayload(cohortEntry.getModification().toSerializable())); } catch (InterruptedException | ExecutionException e) { LOG.error(e, "An exception occurred while preCommitting transaction {}", cohortEntry.getTransactionID()); @@ -442,9 +435,9 @@ public class Shard extends RaftActor { } else if (getLeader() != null) { getLeader().forward(message, getContext()); } else { - getSender().tell(new akka.actor.Status.Failure(new IllegalStateException( + getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException( "Could not find shard leader so transaction cannot be created. This typically happens" + - " when system is coming up or recovering and a leader is being elected. Try again" + + " when the system is coming up or recovering and a leader is being elected. Try again" + " later.")), getSelf()); } } @@ -842,6 +835,11 @@ public class Shard extends RaftActor { } } + @Override + protected DataPersistenceProvider persistence() { + return dataPersistenceProvider; + } + @Override protected void onLeaderChanged(String oldLeader, String newLeader) { shardMBean.setLeader(newLeader); } @@ -850,6 +848,11 @@ public class Shard extends RaftActor { return this.name.toString(); } + @VisibleForTesting + DataPersistenceProvider getDataPersistenceProvider() { + return dataPersistenceProvider; + } + private static class ShardCreator implements Creator { private static final long serialVersionUID = 1L;