X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=6868cc15cd8b21dd899d3b066142fc7fbeefce23;hp=8e00a1389ca6a057bef39292ffcc0b04958be794;hb=8f0395b38dbfdf6b3164cb68b1cba651b1075a07;hpb=84df20a29292cfb9f52acb0e0a2ebab2b996aa0b diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 8e00a1389c..6868cc15cd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; -import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry; @@ -57,6 +56,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; @@ -115,8 +115,6 @@ public class Shard extends RaftActor { private DatastoreContext datastoreContext; - private DataPersistenceProvider dataPersistenceProvider; - private SchemaContext schemaContext; private int createSnapshotTransactionCounter; @@ -144,6 +142,8 @@ public class Shard extends RaftActor { private final String txnDispatcherPath; + private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this); + protected Shard(final ShardIdentifier name, final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig())); @@ -151,18 +151,17 @@ public class Shard extends RaftActor { this.name = name.toString(); this.datastoreContext = datastoreContext; this.schemaContext = schemaContext; - this.dataPersistenceProvider = (datastoreContext.isPersistent()) - ? new PersistentDataProvider() : new NonPersistentRaftDataProvider(); this.txnDispatcherPath = new Dispatchers(context().system().dispatchers()) .getDispatcherPath(Dispatchers.DispatcherType.Transaction); + setPersistence(datastoreContext.isPersistent()); LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent()); store = InMemoryDOMDataStoreFactory.create(name.toString(), null, datastoreContext.getDataStoreProperties()); - if(schemaContext != null) { + if (schemaContext != null) { store.onGlobalContextUpdated(schemaContext); } @@ -276,6 +275,8 @@ public class Shard extends RaftActor { closeTransactionChain(CloseTransactionChain.fromSerializable(message)); } else if (message instanceof RegisterChangeListener) { registerChangeListener((RegisterChangeListener) message); + } else if (message instanceof RegisterDataTreeChangeListener) { + treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader()); } else if (message instanceof UpdateSchemaContext) { updateSchemaContext((UpdateSchemaContext) message); } else if (message instanceof PeerAddressResolved) { @@ -311,12 +312,10 @@ public class Shard extends RaftActor { setTransactionCommitTimeout(); - if(datastoreContext.isPersistent() && - dataPersistenceProvider instanceof NonPersistentRaftDataProvider) { - dataPersistenceProvider = new PersistentDataProvider(); - } else if(!datastoreContext.isPersistent() && - dataPersistenceProvider instanceof PersistentDataProvider) { - dataPersistenceProvider = new NonPersistentRaftDataProvider(); + if(datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) { + setPersistence(true); + } else if(!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) { + setPersistence(false); } updateConfigParams(datastoreContext.getShardRaftConfig()); @@ -832,6 +831,8 @@ public class Shard extends RaftActor { @Override protected void onStateChanged() { boolean isLeader = isLeader(); + treeChangeSupport.onLeadershipChange(isLeader); + for (ActorSelection dataChangeListener : dataChangeListeners) { dataChangeListener.tell(new EnableNotification(isLeader), getSelf()); } @@ -859,19 +860,10 @@ public class Shard extends RaftActor { } @Override - protected DataPersistenceProvider persistence() { - return dataPersistenceProvider; - } - - @Override public String persistenceId() { + public String persistenceId() { return this.name; } - @VisibleForTesting - DataPersistenceProvider getDataPersistenceProvider() { - return dataPersistenceProvider; - } - @VisibleForTesting ShardCommitCoordinator getCommitCoordinator() { return commitCoordinator;