X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=e68628dbf5c1fc992afb30ae986fff8ed8f6eef1;hb=f9f6d8ed4b12e39386801f6b16b9485f5558336e;hp=a8a182380911db26f8c9530e363441d2d2b491cc;hpb=b3e553ce5b3d3e972cbe19465ab7af2fcb39934c;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
index a8a1823809..e68628dbf5 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
@@ -24,11 +24,13 @@ import akka.persistence.RecoveryCompleted;
import akka.persistence.RecoveryFailure;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
+import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
@@ -59,7 +61,7 @@ import java.util.Set;
*
Monitor the cluster members and store their addresses
*
*/
-public class ShardManager extends AbstractUntypedPersistentActor {
+public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
protected final LoggingAdapter LOG =
Logging.getLogger(getContext().system(), this);
@@ -127,6 +129,8 @@ public class ShardManager extends AbstractUntypedPersistentActor {
findLocalShard((FindLocalShard) message);
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext(message);
+ } else if(message instanceof ActorInitialized) {
+ onActorInitialized(message);
} else if (message instanceof ClusterEvent.MemberUp){
memberUp((ClusterEvent.MemberUp) message);
} else if(message instanceof ClusterEvent.MemberRemoved) {
@@ -139,6 +143,31 @@ public class ShardManager extends AbstractUntypedPersistentActor {
}
+ private void onActorInitialized(Object message) {
+ final ActorRef sender = getSender();
+
+ if (sender == null) {
+ return; //why is a non-actor sending this message? Just ignore.
+ }
+
+ String actorName = sender.path().name();
+ //find shard name from actor name; actor name is stringified shardId
+ ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
+
+ if (shardId.getShardName() == null) {
+ return;
+ }
+ markShardAsInitialized(shardId.getShardName());
+ }
+
+ @VisibleForTesting protected void markShardAsInitialized(String shardName) {
+ LOG.debug("Initializing shard [{}]", shardName);
+ ShardInformation shardInformation = localShards.get(shardName);
+ if (shardInformation != null) {
+ shardInformation.setShardInitialized(true);
+ }
+ }
+
@Override protected void handleRecover(Object message) throws Exception {
if(message instanceof SchemaContextModules){
@@ -157,16 +186,23 @@ public class ShardManager extends AbstractUntypedPersistentActor {
}
private void findLocalShard(FindLocalShard message) {
- ShardInformation shardInformation =
- localShards.get(message.getShardName());
+ ShardInformation shardInformation = localShards.get(message.getShardName());
- if(shardInformation != null){
- getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf());
+ if(shardInformation == null){
+ getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
return;
}
- getSender().tell(new LocalShardNotFound(message.getShardName()),
- getSelf());
+ sendResponse(shardInformation, new LocalShardFound(shardInformation.getActor()));
+ }
+
+ private void sendResponse(ShardInformation shardInformation, Object message) {
+ if (!shardInformation.isShardInitialized()) {
+ getSender().tell(new ActorNotInitialized(), getSelf());
+ return;
+ }
+
+ getSender().tell(message, getSelf());
}
private void memberRemoved(ClusterEvent.MemberRemoved message) {
@@ -176,7 +212,7 @@ public class ShardManager extends AbstractUntypedPersistentActor {
private void memberUp(ClusterEvent.MemberUp message) {
String memberName = message.member().roles().head();
- memberNameToAddress.put(memberName , message.member().address());
+ memberNameToAddress.put(memberName, message.member().address());
for(ShardInformation info : localShards.values()){
String shardName = info.getShardName();
@@ -229,28 +265,27 @@ public class ShardManager extends AbstractUntypedPersistentActor {
}
private void findPrimary(FindPrimary message) {
+ final ActorRef sender = getSender();
String shardName = message.getShardName();
// First see if the there is a local replica for the shard
ShardInformation info = localShards.get(shardName);
- if(info != null) {
+ if (info != null) {
ActorPath shardPath = info.getActorPath();
- if (shardPath != null) {
- getSender()
- .tell(
- new PrimaryFound(shardPath.toString()).toSerializable(),
- getSelf());
- return;
- }
+ sendResponse(info, new PrimaryFound(shardPath.toString()).toSerializable());
+ return;
}
- List members =
- configuration.getMembersFromShardName(shardName);
+ List members = configuration.getMembersFromShardName(shardName);
if(cluster.getCurrentMemberName() != null) {
members.remove(cluster.getCurrentMemberName());
}
+ /**
+ * FIXME: Instead of sending remote shard actor path back to sender,
+ * forward FindPrimary message to remote shard manager
+ */
// There is no way for us to figure out the primary (for now) so assume
// that one of the remote nodes is a primary
for(String memberName : members) {
@@ -376,6 +411,7 @@ public class ShardManager extends AbstractUntypedPersistentActor {
private final ActorRef actor;
private final ActorPath actorPath;
private final Map peerAddresses;
+ private boolean shardInitialized = false; //flag that determines if the actor is ready for business
private ShardInformation(String shardName, ActorRef actor,
Map peerAddresses) {
@@ -413,6 +449,14 @@ public class ShardManager extends AbstractUntypedPersistentActor {
}
}
+
+ public boolean isShardInitialized() {
+ return shardInitialized;
+ }
+
+ public void setShardInitialized(boolean shardInitialized) {
+ this.shardInitialized = shardInitialized;
+ }
}
private static class ShardManagerCreator implements Creator {