X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=58cdefe5371d2b58be6e7c9f5e461734f34acd07;hb=35f74293edf98402e2b622e060185f7874d10857;hp=64c6821120f94f99a389c12700757a7b8c7266f5;hpb=86f71bd3f15ce34a9181bb01f0f71fc6979917c5;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 64c6821120..58cdefe537 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -18,6 +18,10 @@ import akka.cluster.ClusterEvent; import akka.japi.Creator; import akka.japi.Function; import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; @@ -26,8 +30,11 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,36 +68,39 @@ public class ShardManager extends AbstractUntypedActor { private final Configuration configuration; + private ShardManagerInfoMBean mBean; + + private final DatastoreContext datastoreContext; + /** * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be * configuration or operational */ - private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) { + private ShardManager(String type, ClusterWrapper cluster, Configuration configuration, + DatastoreContext datastoreContext) { this.type = Preconditions.checkNotNull(type, "type should not be null"); this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); + this.datastoreContext = datastoreContext; // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); - // Create all the local Shards and make them a child of the ShardManager - // TODO: This may need to be initiated when we first get the schema context - createLocalShards(); + //createLocalShards(null); } public static Props props(final String type, final ClusterWrapper cluster, - final Configuration configuration) { - return Props.create(new Creator() { + final Configuration configuration, + final DatastoreContext datastoreContext) { - @Override - public ShardManager create() throws Exception { - return new ShardManager(type, cluster, configuration); - } - }); - } + Preconditions.checkNotNull(type, "type should not be null"); + Preconditions.checkNotNull(cluster, "cluster should not be null"); + Preconditions.checkNotNull(configuration, "configuration should not be null"); + return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext)); + } @Override public void handleReceive(Object message) throws Exception { @@ -108,7 +118,7 @@ public class ShardManager extends AbstractUntypedActor { } else if(message instanceof ClusterEvent.UnreachableMember) { ignoreMessage(message); } else{ - throw new Exception ("Not recognized message received, message="+message); + unknownMessage(message); } } @@ -122,11 +132,8 @@ public class ShardManager extends AbstractUntypedActor { return; } - getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf()); - } - - private void ignoreMessage(Object message){ - LOG.debug("Unhandled message : " + message); + getSender().tell(new LocalShardNotFound(message.getShardName()), + getSelf()); } private void memberRemoved(ClusterEvent.MemberRemoved message) { @@ -140,7 +147,7 @@ public class ShardManager extends AbstractUntypedActor { for(ShardInformation info : localShards.values()){ String shardName = info.getShardName(); - info.updatePeerAddress(getShardActorName(memberName, shardName), + info.updatePeerAddress(getShardIdentifier(memberName, shardName), getShardActorPath(shardName, memberName)); } } @@ -151,17 +158,20 @@ public class ShardManager extends AbstractUntypedActor { * @param message */ private void updateSchemaContext(Object message) { - for(ShardInformation info : localShards.values()){ - info.getActor().tell(message,getSelf()); + SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); + + if(localShards.size() == 0){ + createLocalShards(schemaContext); + } else { + for (ShardInformation info : localShards.values()) { + info.getActor().tell(message, getSelf()); + } } } private void findPrimary(FindPrimary message) { String shardName = message.getShardName(); - List members = - configuration.getMembersFromShardName(shardName); - // First see if the there is a local replica for the shard ShardInformation info = localShards.get(shardName); if(info != null) { @@ -175,6 +185,9 @@ public class ShardManager extends AbstractUntypedActor { } } + List members = + configuration.getMembersFromShardName(shardName); + if(cluster.getCurrentMemberName() != null) { members.remove(cluster.getCurrentMemberName()); } @@ -196,9 +209,13 @@ public class ShardManager extends AbstractUntypedActor { private String getShardActorPath(String shardName, String memberName) { Address address = memberNameToAddress.get(memberName); if(address != null) { - return address.toString() + "/user/shardmanager-" + this.type + "/" - + getShardActorName( - memberName, shardName); + StringBuilder builder = new StringBuilder(); + builder.append(address.toString()) + .append("/user/") + .append(ShardManagerIdentifier.builder().type(type).build().toString()) + .append("/") + .append(getShardIdentifier(memberName, shardName)); + return builder.toString(); } return null; } @@ -211,8 +228,8 @@ public class ShardManager extends AbstractUntypedActor { * @param shardName * @return */ - private String getShardActorName(String memberName, String shardName){ - return memberName + "-shard-" + shardName + "-" + this.type; + private ShardIdentifier getShardIdentifier(String memberName, String shardName){ + return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build(); } /** @@ -220,20 +237,24 @@ public class ShardManager extends AbstractUntypedActor { * runs * */ - private void createLocalShards() { + private void createLocalShards(SchemaContext schemaContext) { String memberName = this.cluster.getCurrentMemberName(); List memberShardNames = this.configuration.getMemberShardNames(memberName); + List localShardActorNames = new ArrayList<>(); for(String shardName : memberShardNames){ - String shardActorName = getShardActorName(memberName, shardName); - Map peerAddresses = getPeerAddresses(shardName); + ShardIdentifier shardId = getShardIdentifier(memberName, shardName); + Map peerAddresses = getPeerAddresses(shardName); ActorRef actor = getContext() - .actorOf(Shard.props(shardActorName, peerAddresses), - shardActorName); + .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext). + withMailbox(ActorContext.MAILBOX), shardId.toString()); + localShardActorNames.add(shardId.toString()); localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses)); } + mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type, + datastoreContext.getDataStoreMXBeanType(), localShardActorNames); } /** @@ -242,9 +263,9 @@ public class ShardManager extends AbstractUntypedActor { * @param shardName * @return */ - private Map getPeerAddresses(String shardName){ + private Map getPeerAddresses(String shardName){ - Map peerAddresses = new HashMap<>(); + Map peerAddresses = new HashMap<>(); List members = this.configuration.getMembersFromShardName(shardName); @@ -253,22 +274,29 @@ public class ShardManager extends AbstractUntypedActor { for(String memberName : members){ if(!currentMemberName.equals(memberName)){ - String shardActorName = getShardActorName(memberName, shardName); + ShardIdentifier shardId = getShardIdentifier(memberName, + shardName); String path = getShardActorPath(shardName, currentMemberName); - peerAddresses.put(shardActorName, path); + peerAddresses.put(shardId, path); } } return peerAddresses; } - @Override public SupervisorStrategy supervisorStrategy() { + return new OneForOneStrategy(10, Duration.create("1 minute"), new Function() { @Override public SupervisorStrategy.Directive apply(Throwable t) { + StringBuilder sb = new StringBuilder(); + for(StackTraceElement element : t.getStackTrace()) { + sb.append("\n\tat ") + .append(element.toString()); + } + LOG.warning("Supervisor Strategy of resume applied {}",sb.toString()); return SupervisorStrategy.resume(); } } @@ -280,10 +308,10 @@ public class ShardManager extends AbstractUntypedActor { private final String shardName; private final ActorRef actor; private final ActorPath actorPath; - private final Map peerAddresses; + private final Map peerAddresses; private ShardInformation(String shardName, ActorRef actor, - Map peerAddresses) { + Map peerAddresses) { this.shardName = shardName; this.actor = actor; this.actorPath = actor.path(); @@ -302,16 +330,15 @@ public class ShardManager extends AbstractUntypedActor { return actorPath; } - public Map getPeerAddresses() { - return peerAddresses; - } - - public void updatePeerAddress(String peerId, String peerAddress){ - LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); + public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){ + LOG.info("updatePeerAddress for peer {} with address {}", peerId, + peerAddress); if(peerAddresses.containsKey(peerId)){ peerAddresses.put(peerId, peerAddress); - LOG.info("Sending PeerAddressResolved for peer {} with address {} to {}", peerId, peerAddress, actor.path()); + LOG.debug( + "Sending PeerAddressResolved for peer {} with address {} to {}", + peerId, peerAddress, actor.path()); actor .tell(new PeerAddressResolved(peerId, peerAddress), @@ -320,4 +347,29 @@ public class ShardManager extends AbstractUntypedActor { } } } + + private static class ShardManagerCreator implements Creator { + private static final long serialVersionUID = 1L; + + final String type; + final ClusterWrapper cluster; + final Configuration configuration; + final DatastoreContext datastoreContext; + + ShardManagerCreator(String type, ClusterWrapper cluster, + Configuration configuration, DatastoreContext datastoreContext) { + this.type = type; + this.cluster = cluster; + this.configuration = configuration; + this.datastoreContext = datastoreContext; + } + + @Override + public ShardManager create() throws Exception { + return new ShardManager(type, cluster, configuration, datastoreContext); + } + } } + + +