import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
-import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
}
protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
- return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
+ return (persistent) ? new PersistentDataProvider(this) : new NonPersistentDataProvider();
}
public static Props props(
onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
} else if(message instanceof ShardNotInitializedTimeout) {
onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
- } else if(message instanceof LeaderStateChanged) {
- onLeaderStateChanged((LeaderStateChanged)message);
+ } else if(message instanceof ShardLeaderStateChanged) {
+ onLeaderStateChanged((ShardLeaderStateChanged)message);
} else {
unknownMessage(message);
}
}
- private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) {
+ private void checkReady(){
+ if (isReadyWithLeaderId()) {
+ LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+ persistenceId(), type, waitTillReadyCountdownLatch.getCount());
+
+ waitTillReadyCountdownLatch.countDown();
+ }
+ }
+
+ private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
if(shardInformation != null) {
+ shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
- if (isReadyWithLeaderId()) {
- LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
- persistenceId(), type, waitTillReadyCountdownLatch.getCount());
-
- waitTillReadyCountdownLatch.countDown();
- }
-
+ checkReady();
} else {
LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
}
ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
if(shardInformation != null) {
shardInformation.setRole(roleChanged.getNewRole());
-
- if (isReadyWithLeaderId()) {
- LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
- persistenceId(), type, waitTillReadyCountdownLatch.getCount());
-
- waitTillReadyCountdownLatch.countDown();
- }
-
+ checkReady();
mBean.setSyncStatus(isInSync());
}
}
info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(),
getShardActorPath(shardName, memberName), getSelf());
}
+
+ checkReady();
}
private void onDatastoreContext(DatastoreContext context) {
LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
final String shardName = message.getShardName();
+ final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
// First see if the there is a local replica for the shard
final ShardInformation info = localShards.get(shardName);
sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
@Override
public Object get() {
- Object found = new PrimaryFound(info.getSerializedLeaderActor());
+ String primaryPath = info.getSerializedLeaderActor();
+ Object found = canReturnLocalShardState && info.isLeader() ?
+ new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+ new RemotePrimaryShardFound(primaryPath);
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
shardName, path);
- getContext().actorSelection(path).forward(message, getContext());
+ getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName,
+ message.isWaitUntilReady()), getContext());
return;
}
}
private ActorRef actor;
private ActorPath actorPath;
private final Map<String, String> peerAddresses;
+ private Optional<DataTree> localShardDataTree;
// flag that determines if the actor is ready for business
private boolean actorInitialized = false;
return shardId;
}
+ void setLocalDataTree(Optional<DataTree> localShardDataTree) {
+ this.localShardDataTree = localShardDataTree;
+ }
+
+ Optional<DataTree> getLocalShardDataTree() {
+ return localShardDataTree;
+ }
+
Map<String, String> getPeerAddresses() {
return peerAddresses;
}
}
boolean isShardReadyWithLeaderId() {
- return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId));
+ return isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
}
boolean isShardInitialized() {