import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
private Cancellable txCommitTimeoutCheckSchedule;
+ private Optional<ActorRef> roleChangeNotifier;
+
/**
* Coordinates persistence recovery on startup.
*/
transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+
+ // create a notifier actor for each cluster member
+ roleChangeNotifier = createRoleChangeNotifier(name.toString());
}
private static Map<String, String> mapPeerAddresses(
return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
}
+ private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
+ ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
+ RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
+ return Optional.<ActorRef>of(shardRoleChangeNotifier);
+ }
+
@Override
public void postStop() {
super.postStop();
}
}
+ @Override
+ protected Optional<ActorRef> getRoleChangeNotifier() {
+ return roleChangeNotifier;
+ }
+
private void handleTransactionCommitTimeoutCheck() {
CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
if(cohortEntry != null) {
shardMBean.setCommitIndex(getCommitIndex());
shardMBean.setLastApplied(getLastApplied());
+ shardMBean.setDataSize(getRaftActorContext().getReplicatedLog().dataSize());
}
@Override