X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=7073ea758b2b751d93708914bfc3935779076908;hp=d53cb48e501c75c25e3532ce2c71e481717612ad;hb=a469dbcec569cc972df0cd57cf725a2173d2604a;hpb=ebb4158697741bc88fde22ac5f75edf65869506f;ds=sidebyside diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index d53cb48e50..7073ea758b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -65,6 +65,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; @@ -131,6 +132,8 @@ public class Shard extends RaftActor { private Cancellable txCommitTimeoutCheckSchedule; + private Optional roleChangeNotifier; + /** * Coordinates persistence recovery on startup. */ @@ -171,6 +174,9 @@ public class Shard extends RaftActor { transactionCommitTimeout = TimeUnit.MILLISECONDS.convert( datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); + + // create a notifier actor for each cluster member + roleChangeNotifier = createRoleChangeNotifier(name.toString()); } private static Map mapPeerAddresses( @@ -196,6 +202,12 @@ public class Shard extends RaftActor { return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext)); } + private Optional createRoleChangeNotifier(String shardId) { + ActorRef shardRoleChangeNotifier = this.getContext().actorOf( + RoleChangeNotifier.getProps(shardId), shardId + "-notifier"); + return Optional.of(shardRoleChangeNotifier); + } + @Override public void postStop() { super.postStop(); @@ -259,6 +271,11 @@ public class Shard extends RaftActor { } } + @Override + protected Optional getRoleChangeNotifier() { + return roleChangeNotifier; + } + private void handleTransactionCommitTimeoutCheck() { CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry(); if(cohortEntry != null) {