X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=724c8d2c03dc787549f511f60ac3e289ea763978;hb=9b013985b871adb08173dd8e90d5cf2af82fa5a1;hp=5f59672ed987b4f8cb8b47cb4e82da67ca4b4f69;hpb=bbc8a16efdc6bfa0d742b73af3374a33a12e2a1c;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 5f59672ed9..724c8d2c03 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -8,18 +8,22 @@ package org.opendaylight.controller.cluster.datastore; +import static akka.pattern.Patterns.ask; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Cancellable; import akka.actor.OneForOneStrategy; +import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; +import akka.dispatch.OnComplete; import akka.japi.Creator; import akka.japi.Function; import akka.persistence.RecoveryCompleted; import akka.serialization.Serialization; +import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.base.Optional; @@ -29,6 +33,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.Sets; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -37,22 +42,29 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; +import org.opendaylight.controller.cluster.datastore.config.Configuration; +import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; -import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; +import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; +import org.opendaylight.controller.cluster.datastore.messages.CreateShard; +import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; +import org.opendaylight.controller.cluster.datastore.messages.PeerDown; +import org.opendaylight.controller.cluster.datastore.messages.PeerUp; import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; @@ -63,10 +75,15 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; +import org.opendaylight.controller.cluster.raft.messages.AddServer; +import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -83,11 +100,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class); - // Stores a mapping between a member name and the address of the member - // Member names look like "member-1", "member-2" etc and are as specified - // in configuration - private final Map memberNameToAddress = new HashMap<>(); - // Stores a mapping between a shard name and it's corresponding information // Shard names look like inventory, topology etc and are as specified in // configuration @@ -97,8 +109,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // A data store could be of type config/operational private final String type; - private final String shardManagerIdentifierString; - private final ClusterWrapper cluster; private final Configuration configuration; @@ -113,6 +123,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final PrimaryShardInfoFutureCache primaryShardInfoCache; + private final ShardPeerAddressResolver peerAddressResolver; + + private SchemaContext schemaContext; + /** */ protected ShardManager(ClusterWrapper cluster, Configuration configuration, @@ -123,12 +137,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); this.datastoreContext = datastoreContext; this.type = datastoreContext.getDataStoreType(); - this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString(); this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; this.primaryShardInfoCache = primaryShardInfoCache; + peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName()); + this.datastoreContext = DatastoreContext.newBuilderFrom(datastoreContext).shardPeerAddressResolver( + peerAddressResolver).build(); + // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); @@ -136,11 +153,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } public static Props props( - final ClusterWrapper cluster, - final Configuration configuration, - final DatastoreContext datastoreContext, - final CountDownLatch waitTillReadyCountdownLatch, - final PrimaryShardInfoFutureCache primaryShardInfoCache) { + final ClusterWrapper cluster, + final Configuration configuration, + final DatastoreContext datastoreContext, + final CountDownLatch waitTillReadyCountdownLatch, + final PrimaryShardInfoFutureCache primaryShardInfoCache) { Preconditions.checkNotNull(cluster, "cluster should not be null"); Preconditions.checkNotNull(configuration, "configuration should not be null"); @@ -170,6 +187,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onActorInitialized(message); } else if (message instanceof ClusterEvent.MemberUp){ memberUp((ClusterEvent.MemberUp) message); + } else if (message instanceof ClusterEvent.MemberExited){ + memberExited((ClusterEvent.MemberExited) message); } else if(message instanceof ClusterEvent.MemberRemoved) { memberRemoved((ClusterEvent.MemberRemoved) message); } else if(message instanceof ClusterEvent.UnreachableMember) { @@ -188,12 +207,65 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onLeaderStateChanged((ShardLeaderStateChanged) message); } else if(message instanceof SwitchShardBehavior){ onSwitchShardBehavior((SwitchShardBehavior) message); + } else if(message instanceof CreateShard) { + onCreateShard((CreateShard)message); + } else if(message instanceof AddShardReplica){ + onAddShardReplica((AddShardReplica)message); + } else if(message instanceof RemoveShardReplica){ + onRemoveShardReplica((RemoveShardReplica)message); } else { unknownMessage(message); } } + private void onCreateShard(CreateShard createShard) { + Object reply; + try { + ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig(); + if(localShards.containsKey(moduleShardConfig.getShardName())) { + throw new IllegalStateException(String.format("Shard with name %s already exists", + moduleShardConfig.getShardName())); + } + + configuration.addModuleShardConfiguration(moduleShardConfig); + + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), moduleShardConfig.getShardName()); + Map peerAddresses = getPeerAddresses(moduleShardConfig.getShardName()/*, + moduleShardConfig.getShardMemberNames()*/); + + LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId, + moduleShardConfig.getShardMemberNames(), peerAddresses); + + DatastoreContext shardDatastoreContext = createShard.getDatastoreContext(); + if(shardDatastoreContext == null) { + shardDatastoreContext = datastoreContext; + } else { + shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver( + peerAddressResolver).build(); + } + + ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses, + shardDatastoreContext, createShard.getShardPropsCreator(), peerAddressResolver); + localShards.put(info.getShardName(), info); + + mBean.addLocalShard(shardId.toString()); + + if(schemaContext != null) { + info.setActor(newShardActor(schemaContext, info)); + } + + reply = new CreateShardReply(); + } catch (Exception e) { + LOG.error("onCreateShard failed", e); + reply = new akka.actor.Status.Failure(e); + } + + if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) { + getSender().tell(reply, getSelf()); + } + } + private void checkReady(){ if (isReadyWithLeaderId()) { LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", @@ -402,11 +474,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSender().tell(messageSupplier.get(), getSelf()); } - private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) { + private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) { return new NoShardLeaderException(null, shardId.toString()); } - private NotInitializedException createNotInitializedException(ShardIdentifier shardId) { + private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) { return new NotInitializedException(String.format( "Found primary shard %s but it's not initialized yet. Please try again later", shardId)); } @@ -417,7 +489,24 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); - memberNameToAddress.remove(message.member().roles().head()); + peerAddressResolver.removePeerAddress(memberName); + + for(ShardInformation info : localShards.values()){ + info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); + } + } + + private void memberExited(ClusterEvent.MemberExited message) { + String memberName = message.member().roles().head(); + + LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName, + message.member().address()); + + peerAddressResolver.removePeerAddress(memberName); + + for(ShardInformation info : localShards.values()){ + info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); + } } private void memberUp(ClusterEvent.MemberUp message) { @@ -426,21 +515,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); - memberNameToAddress.put(memberName, message.member().address()); + addPeerAddress(memberName, message.member().address()); + + checkReady(); + } + + private void addPeerAddress(String memberName, Address address) { + peerAddressResolver.addPeerAddress(memberName, address); for(ShardInformation info : localShards.values()){ String shardName = info.getShardName(); - info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(), - getShardActorPath(shardName, memberName), getSelf()); - } + String peerId = getShardIdentifier(memberName, shardName).toString(); + info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf()); - checkReady(); + info.peerUp(memberName, peerId, getSelf()); + } } private void memberReachable(ClusterEvent.ReachableMember message) { String memberName = message.member().roles().head(); LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address()); + addPeerAddress(memberName, message.member().address()); + markMemberAvailable(memberName); } @@ -460,6 +557,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { primaryShardInfoCache.remove(info.getShardName()); } + + info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); } } @@ -470,11 +569,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("Marking Leader {} as available.", leaderId); info.setLeaderAvailable(true); } + + info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); } } private void onDatastoreContext(DatastoreContext context) { - datastoreContext = context; + datastoreContext = DatastoreContext.newBuilderFrom(context).shardPeerAddressResolver( + peerAddressResolver).build(); for (ShardInformation info : localShards.values()) { if (info.getActor() != null) { info.getActor().tell(datastoreContext, getSelf()); @@ -502,7 +604,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param message */ private void updateSchemaContext(final Object message) { - final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); + schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size()); @@ -523,9 +625,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @VisibleForTesting protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) { - return getContext().actorOf(Shard.props(info.getShardId(), - info.getPeerAddresses(), datastoreContext, schemaContext) - .withDispatcher(shardDispatcherPath), info.getShardId().toString()); + return getContext().actorOf(info.newProps(schemaContext) + .withDispatcher(shardDispatcherPath), info.getShardId().toString()); } private void findPrimary(FindPrimary message) { @@ -545,28 +646,24 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) : new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion()); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); - } + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); + } - return found; + return found; } }); return; } - for(Map.Entry entry: memberNameToAddress.entrySet()) { - if(!cluster.getCurrentMemberName().equals(entry.getKey())) { - String path = getShardManagerActorPathBuilder(entry.getValue()).toString(); - - LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(), - shardName, path); + for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) { + LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(), + shardName, address); - getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName, - message.isWaitUntilReady()), getContext()); - return; - } + getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName, + message.isWaitUntilReady()), getContext()); + return; } LOG.debug("{}: No shard found for {}", persistenceId(), shardName); @@ -575,23 +672,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { String.format("No primary shard found for %s.", shardName)), getSelf()); } - private StringBuilder getShardManagerActorPathBuilder(Address address) { - StringBuilder builder = new StringBuilder(); - builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString); - return builder; - } - - private String getShardActorPath(String shardName, String memberName) { - Address address = memberNameToAddress.get(memberName); - if(address != null) { - StringBuilder builder = getShardManagerActorPathBuilder(address); - builder.append("/") - .append(getShardIdentifier(memberName, shardName)); - return builder.toString(); - } - return null; - } - /** * Construct the name of the shard actor given the name of the member on * which the shard resides and the name of the shard @@ -601,7 +681,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @return */ private ShardIdentifier getShardIdentifier(String memberName, String shardName){ - return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build(); + return peerAddressResolver.getShardIdentifier(memberName, shardName); } /** @@ -611,19 +691,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { */ private void createLocalShards() { String memberName = this.cluster.getCurrentMemberName(); - List memberShardNames = - this.configuration.getMemberShardNames(memberName); + Collection memberShardNames = this.configuration.getMemberShardNames(memberName); + ShardPropsCreator shardPropsCreator = new DefaultShardPropsCreator(); List localShardActorNames = new ArrayList<>(); for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); Map peerAddresses = getPeerAddresses(shardName); localShardActorNames.add(shardId.toString()); - localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses)); + localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext, + shardPropsCreator, peerAddressResolver)); } mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type, - datastoreContext.getDataStoreMXBeanType(), localShardActorNames); + datastoreContext.getDataStoreMXBeanType(), localShardActorNames); mBean.setShardManager(this); } @@ -632,21 +713,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * Given the name of the shard find the addresses of all it's peers * * @param shardName - * @return */ - private Map getPeerAddresses(String shardName){ - + private Map getPeerAddresses(String shardName) { + Collection members = configuration.getMembersFromShardName(shardName); Map peerAddresses = new HashMap<>(); - List members = this.configuration.getMembersFromShardName(shardName); - String currentMemberName = this.cluster.getCurrentMemberName(); - for(String memberName : members){ - if(!currentMemberName.equals(memberName)){ + for(String memberName : members) { + if(!currentMemberName.equals(memberName)) { ShardIdentifier shardId = getShardIdentifier(memberName, shardName); - String path = getShardActorPath(shardName, currentMemberName); - peerAddresses.put(shardId.toString(), path); + String address = peerAddressResolver.getShardActorAddress(shardName, memberName); + peerAddresses.put(shardId.toString(), address); } } return peerAddresses; @@ -656,14 +734,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy(10, Duration.create("1 minute"), - new Function() { - @Override - public SupervisorStrategy.Directive apply(Throwable t) { - LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t); - return SupervisorStrategy.resume(); - } + new Function() { + @Override + public SupervisorStrategy.Directive apply(Throwable t) { + LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t); + return SupervisorStrategy.resume(); } - ); + } + ); } @@ -677,13 +755,188 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return mBean; } + private DatastoreContext getInitShardDataStoreContext() { + return (DatastoreContext.newBuilderFrom(datastoreContext) + .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()) + .build()); + } + + private void checkLocalShardExists(final String shardName, final ActorRef sender) { + if (localShards.containsKey(shardName)) { + String msg = String.format("Local shard %s already exists", shardName); + LOG.debug ("{}: {}", persistenceId(), msg); + sender.tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf()); + } + } + + private void onAddShardReplica (AddShardReplica shardReplicaMsg) { + final String shardName = shardReplicaMsg.getShardName(); + + // verify the local shard replica is already available in the controller node + LOG.debug ("onAddShardReplica: {}", shardReplicaMsg); + + checkLocalShardExists(shardName, getSender()); + + // verify the shard with the specified name is present in the cluster configuration + if (!(this.configuration.isShardConfigured(shardName))) { + String msg = String.format("No module configuration exists for shard %s", shardName); + LOG.debug ("{}: {}", persistenceId(), msg); + getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf()); + return; + } + + // Create the localShard + if (schemaContext == null) { + String msg = String.format( + "No SchemaContext is available in order to create a local shard instance for %s", shardName); + LOG.debug ("{}: {}", persistenceId(), msg); + getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf()); + return; + } + + Map peerAddresses = getPeerAddresses(shardName); + if (peerAddresses.isEmpty()) { + String msg = String.format("Cannot add replica for shard %s because no peer is available", shardName); + LOG.debug ("{}: {}", persistenceId(), msg); + getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf()); + return; + } + + Timeout findPrimaryTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2)); + + final ActorRef sender = getSender(); + Future futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout); + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure); + sender.tell(new akka.actor.Status.Failure(new RuntimeException( + String.format("Failed to find leader for shard %s", shardName), failure)), + getSelf()); + } else { + if (!(response instanceof RemotePrimaryShardFound)) { + String msg = String.format("Failed to find leader for shard %s: received response: %s", + shardName, response); + LOG.debug ("{}: {}", persistenceId(), msg); + sender.tell(new akka.actor.Status.Failure(new RuntimeException(msg)), getSelf()); + return; + } + + RemotePrimaryShardFound message = (RemotePrimaryShardFound)response; + addShard (shardName, message, sender); + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + + private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) { + checkLocalShardExists(shardName, sender); + + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); + String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName()); + final ShardInformation shardInfo = new ShardInformation(shardName, shardId, + getPeerAddresses(shardName), getInitShardDataStoreContext(), + new DefaultShardPropsCreator(), peerAddressResolver); + localShards.put(shardName, shardInfo); + shardInfo.setActor(newShardActor(schemaContext, shardInfo)); + + //inform ShardLeader to add this shard as a replica by sending an AddServer message + LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(), + response.getPrimaryPath(), shardId); + + Timeout addServerTimeout = new Timeout(datastoreContext + .getShardLeaderElectionTimeout().duration().$times(4)); + Future futureObj = ask(getContext().actorSelection(response.getPrimaryPath()), + new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout); + + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object addServerResponse) { + if (failure != null) { + LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(), + response.getPrimaryPath(), shardName, failure); + + // Remove the shard + localShards.remove(shardName); + if (shardInfo.getActor() != null) { + shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf()); + } + + sender.tell(new akka.actor.Status.Failure(new RuntimeException( + String.format("AddServer request to leader %s for shard %s failed", + response.getPrimaryPath(), shardName), failure)), getSelf()); + } else { + AddServerReply reply = (AddServerReply)addServerResponse; + onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath()); + } + } + }, new Dispatchers(context().system().dispatchers()). + getDispatcher(Dispatchers.DispatcherType.Client)); + return; + } + + private void onAddServerReply (String shardName, ShardInformation shardInfo, + AddServerReply replyMsg, ActorRef sender, String leaderPath) { + LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath); + + if (replyMsg.getStatus() == ServerChangeStatus.OK) { + LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName); + + // Make the local shard voting capable + shardInfo.setDatastoreContext(datastoreContext, getSelf()); + + mBean.addLocalShard(shardInfo.getShardId().toString()); + sender.tell(new akka.actor.Status.Success(true), getSelf()); + } else { + LOG.warn ("{}: Leader failed to add shard replica {} with status {} - removing the local shard", + persistenceId(), shardName, replyMsg.getStatus()); + + //remove the local replica created + localShards.remove(shardName); + if (shardInfo.getActor() != null) { + shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf()); + } + switch (replyMsg.getStatus()) { + case TIMEOUT: + sender.tell(new akka.actor.Status.Failure(new RuntimeException( + String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data", + leaderPath, shardName))), getSelf()); + break; + case NO_LEADER: + sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format( + "There is no shard leader available for shard %s", shardName))), getSelf()); + break; + default : + sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format( + "AddServer request to leader %s for shard %s failed with status %s", + leaderPath, shardName, replyMsg.getStatus()))), getSelf()); + } + } + } + + private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) { + String shardName = shardReplicaMsg.getShardName(); + + // verify the local shard replica is available in the controller node + if (!localShards.containsKey(shardName)) { + String msg = String.format("Local shard %s does not", shardName); + LOG.debug ("{}: {}", persistenceId(), msg); + getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf()); + return; + } + // call RemoveShard for the shardName + getSender().tell(new akka.actor.Status.Success(true), getSelf()); + return; + } + @VisibleForTesting protected static class ShardInformation { private final ShardIdentifier shardId; private final String shardName; private ActorRef actor; private ActorPath actorPath; - private final Map peerAddresses; + private final Map initialPeerAddresses; private Optional localShardDataTree; private boolean leaderAvailable = false; @@ -697,11 +950,23 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private String leaderId; private short leaderVersion; + private DatastoreContext datastoreContext; + private final ShardPropsCreator shardPropsCreator; + private final ShardPeerAddressResolver addressResolver; + private ShardInformation(String shardName, ShardIdentifier shardId, - Map peerAddresses) { + Map initialPeerAddresses, DatastoreContext datastoreContext, + ShardPropsCreator shardPropsCreator, ShardPeerAddressResolver addressResolver) { this.shardName = shardName; this.shardId = shardId; - this.peerAddresses = peerAddresses; + this.initialPeerAddresses = initialPeerAddresses; + this.datastoreContext = datastoreContext; + this.shardPropsCreator = shardPropsCreator; + this.addressResolver = addressResolver; + } + + Props newProps(SchemaContext schemaContext) { + return shardPropsCreator.newProps(shardId, initialPeerAddresses, datastoreContext, schemaContext); } String getShardName() { @@ -733,26 +998,30 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return localShardDataTree; } - Map getPeerAddresses() { - return peerAddresses; - } - void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){ - LOG.info("updatePeerAddress for peer {} with address {}", peerId, - peerAddress); - if(peerAddresses.containsKey(peerId)){ - peerAddresses.put(peerId, peerAddress); - - if(actor != null) { - if(LOG.isDebugEnabled()) { - LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}", - peerId, peerAddress, actor.path()); - } + LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); - actor.tell(new PeerAddressResolved(peerId.toString(), peerAddress), sender); + if(actor != null) { + if(LOG.isDebugEnabled()) { + LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}", + peerId, peerAddress, actor.path()); } - notifyOnShardInitializedCallbacks(); + actor.tell(new PeerAddressResolved(peerId, peerAddress), sender); + } + + notifyOnShardInitializedCallbacks(); + } + + void peerDown(String memberName, String peerId, ActorRef sender) { + if(actor != null) { + actor.tell(new PeerDown(memberName, peerId), sender); + } + } + + void peerUp(String memberName, String peerId, ActorRef sender) { + if(actor != null) { + actor.tell(new PeerUp(memberName, peerId), sender); } } @@ -762,7 +1031,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { boolean isShardReadyWithLeaderId() { return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) && - (isLeader() || peerAddresses.get(leaderId) != null); + (isLeader() || addressResolver.resolve(leaderId) != null); } boolean isShardInitialized() { @@ -777,7 +1046,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { if(isLeader()) { return Serialization.serializedActorPath(getActor()); } else { - return peerAddresses.get(leaderId); + return addressResolver.resolve(leaderId); } } @@ -866,6 +1135,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { void setLeaderVersion(short leaderVersion) { this.leaderVersion = leaderVersion; } + + void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) { + this.datastoreContext = datastoreContext; + //notify the datastoreContextchange + LOG.debug ("Notifying RaftPolicy change via datastoreContextChange for {}", + this.shardName); + if (actor != null) { + actor.tell(this.datastoreContext, sender); + } + } } private static class ShardManagerCreator implements Creator {