import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
ignoreMessage(message);
} else if(message instanceof DatastoreContext) {
onDatastoreContext((DatastoreContext)message);
- } else if(message instanceof RoleChangeNotification){
+ } else if(message instanceof RoleChangeNotification) {
onRoleChangeNotification((RoleChangeNotification) message);
+ } else if(message instanceof FollowerInitialSyncUpStatus){
+ onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
} else{
unknownMessage(message);
}
}
- private void onRoleChangeNotification(RoleChangeNotification message) {
- RoleChangeNotification roleChanged = message;
+ private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
+ LOG.info("Received follower initial sync status for {} status sync done {}", status.getName(),
+ status.isInitialSyncDone());
+
+ ShardInformation shardInformation = findShardInformation(status.getName());
+
+ if(shardInformation != null) {
+ shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
+
+ mBean.setSyncStatus(isInSync());
+ }
+
+ }
+
+ private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(),
roleChanged.getOldRole(), roleChanged.getNewRole());
waitTillReadyCountdownLatch.countDown();
}
+
+ mBean.setSyncStatus(isInSync());
}
}
return isReady;
}
+ private boolean isInSync(){
+ for (ShardInformation info : localShards.values()) {
+ if(!info.isInSync()){
+ return false;
+ }
+ }
+ return true;
+ }
+
private void onActorInitialized(Object message) {
final ActorRef sender = getSender();
return dataPersistenceProvider;
}
+ @VisibleForTesting
+ ShardManagerInfoMBean getMBean(){
+ return mBean;
+ }
+
private class ShardInformation {
private final ShardIdentifier shardId;
private final String shardName;
// flag that determines if the actor is ready for business
private boolean actorInitialized = false;
+ private boolean followerSyncStatus = false;
+
private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
private String role ;
return this.role;
}
+ public void setFollowerSyncStatus(boolean syncStatus){
+ this.followerSyncStatus = syncStatus;
+ }
+
+ public boolean isInSync(){
+ if(RaftState.Follower.name().equals(this.role)){
+ return followerSyncStatus;
+ } else if(RaftState.Leader.name().equals(this.role)){
+ return true;
+ }
+
+ return false;
+ }
+
}
private static class ShardManagerCreator implements Creator<ShardManager> {