X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=a91109c64b3fa5aac674677b95d2a31c83477fbd;hp=d61e12e1cb2fd2bc97e7f6b34429c22adaf72669;hb=d4d59200f8c56551755c36fbbd2b4aa52defa5cb;hpb=06af5e406a6698a7211bf4e4435c6aa2e8e3f628 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index d61e12e1cb..a91109c64b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -16,6 +16,7 @@ import akka.actor.Cancellable; import akka.actor.OneForOneStrategy; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.actor.Status; import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; import akka.dispatch.OnComplete; @@ -90,6 +91,8 @@ import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.RemoveServer; +import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; @@ -156,7 +159,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.cluster = builder.cluster; this.configuration = builder.configuration; this.datastoreContextFactory = builder.datastoreContextFactory; - this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType(); + this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(); this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch; @@ -232,8 +235,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if(message instanceof PrimaryShardFoundForContext) { PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message; onPrimaryShardFoundContext(primaryShardFoundContext); - } else if(message instanceof RemoveShardReplica){ - onRemoveShardReplica((RemoveShardReplica)message); + } else if(message instanceof RemoveShardReplica) { + onRemoveShardReplica((RemoveShardReplica) message); + } else if(message instanceof WrappedShardResponse){ + onWrappedShardResponse((WrappedShardResponse) message); } else if(message instanceof GetSnapshot) { onGetSnapshot(); } else if(message instanceof ServerRemoved){ @@ -248,12 +253,67 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void onWrappedShardResponse(WrappedShardResponse message) { + if (message.getResponse() instanceof RemoveServerReply) { + onRemoveServerReply(getSender(), message.getShardName(), (RemoveServerReply) message.getResponse()); + } + } + + private void onRemoveServerReply(ActorRef originalSender, String shardName, RemoveServerReply response) { + shardReplicaOperationsInProgress.remove(shardName); + originalSender.tell(new Status.Success(null), self()); + } + private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) { if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) { - addShard(primaryShardFoundContext.shardName, primaryShardFoundContext.getRemotePrimaryShardFound(), getSender()); + addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(), + getSender()); + } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){ + removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(), + primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender()); } } + private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath, + final ActorRef sender) { + if(isShardReplicaOperationInProgress(shardName, sender)) { + return; + } + + shardReplicaOperationsInProgress.add(shardName); + + final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName); + + final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build(); + + //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message + LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(), + primaryPath, shardId); + + Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout(). + duration()); + Future futureObj = ask(getContext().actorSelection(primaryPath), + new RemoveServer(shardId.toString()), removeServerTimeout); + + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + String msg = String.format("RemoveServer request to leader %s for shard %s failed", + primaryPath, shardName); + + LOG.debug ("{}: {}", persistenceId(), msg, failure); + + // FAILURE + sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); + } else { + // SUCCESS + self().tell(new WrappedShardResponse(shardName, response), sender); + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + private void onShardReplicaRemoved(ServerRemoved message) { final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build(); final ShardInformation shardInformation = localShards.remove(shardId.getShardName()); @@ -797,12 +857,25 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } + Collection visitedAddresses; + if(message instanceof RemoteFindPrimary) { + visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses(); + } else { + visitedAddresses = new ArrayList<>(); + } + + visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString()); + for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) { + if(visitedAddresses.contains(address)) { + continue; + } + LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(), shardName, address); getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName, - message.isWaitUntilReady()), getContext()); + message.isWaitUntilReady(), visitedAddresses), getContext()); return; } @@ -939,7 +1012,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); - } @Override @@ -1050,40 +1122,52 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.warn ("{}: Leader failed to add shard replica {} with status {}", persistenceId(), shardName, replyMsg.getStatus()); - Exception failure; - switch (replyMsg.getStatus()) { - case TIMEOUT: - failure = new TimeoutException(String.format( - "The shard leader %s timed out trying to replicate the initial data to the new shard %s." + - "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data", - leaderPath, shardName)); - break; - case NO_LEADER: - failure = createNoShardLeaderException(shardInfo.getShardId()); - break; - default : - failure = new RuntimeException(String.format( - "AddServer request to leader %s for shard %s failed with status %s", - leaderPath, shardName, replyMsg.getStatus())); - } + Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId()); onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure); } } - private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) { - String shardName = shardReplicaMsg.getShardName(); - - // verify the local shard replica is available in the controller node - if (!localShards.containsKey(shardName)) { - String msg = String.format("Local shard %s does not", shardName); - LOG.debug ("{}: {}", persistenceId(), msg); - getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf()); - return; + private Exception getServerChangeException(Class serverChange, ServerChangeStatus serverChangeStatus, + String leaderPath, ShardIdentifier shardId) { + Exception failure; + switch (serverChangeStatus) { + case TIMEOUT: + failure = new TimeoutException(String.format( + "The shard leader %s timed out trying to replicate the initial data to the new shard %s." + + "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data", + leaderPath, shardId.getShardName())); + break; + case NO_LEADER: + failure = createNoShardLeaderException(shardId); + break; + case NOT_SUPPORTED: + failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s", + serverChange.getSimpleName(), shardId.getShardName())); + break; + default : + failure = new RuntimeException(String.format( + "%s request to leader %s for shard %s failed with status %s", + serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus)); } - // call RemoveShard for the shardName - getSender().tell(new akka.actor.Status.Success(true), getSelf()); - return; + return failure; + } + + private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) { + LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg); + + findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(), + shardReplicaMsg.getShardName(), persistenceId(), getSelf()) { + @Override + public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { + getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + } + + @Override + public void onLocalPrimaryFound(LocalPrimaryShardFound response) { + getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + } + }); } private void persistShardList() { @@ -1376,6 +1460,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { void setLeaderAvailable(boolean leaderAvailable) { this.leaderAvailable = leaderAvailable; + + if(leaderAvailable) { + notifyOnShardInitializedCallbacks(); + } } short getLeaderVersion() { @@ -1476,6 +1564,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private CountDownLatch waitTillReadyCountdownLatch; private PrimaryShardInfoFutureCache primaryShardInfoCache; private DatastoreSnapshot restoreFromSnapshot; + private volatile boolean sealed; @SuppressWarnings("unchecked") @@ -1629,10 +1718,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return shardName; } - public ActorRef getShardManagerActor() { - return shardManagerActor; - } - @Override public void onFailure(Throwable failure) { LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure); @@ -1662,45 +1747,60 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final RemotePrimaryShardFound remotePrimaryShardFound; private final LocalPrimaryShardFound localPrimaryShardFound; - public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, @Nonnull Object primaryFoundMessage) { + public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, + @Nonnull Object primaryFoundMessage) { this.shardName = Preconditions.checkNotNull(shardName); this.contextMessage = Preconditions.checkNotNull(contextMessage); Preconditions.checkNotNull(primaryFoundMessage); - this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? (RemotePrimaryShardFound) primaryFoundMessage : null; - this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? (LocalPrimaryShardFound) primaryFoundMessage : null; + this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? + (RemotePrimaryShardFound) primaryFoundMessage : null; + this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? + (LocalPrimaryShardFound) primaryFoundMessage : null; } @Nonnull - public String getPrimaryPath(){ - if(remotePrimaryShardFound != null){ + String getPrimaryPath(){ + if(remotePrimaryShardFound != null) { return remotePrimaryShardFound.getPrimaryPath(); } return localPrimaryShardFound.getPrimaryPath(); } @Nonnull - public Object getContextMessage() { + Object getContextMessage() { return contextMessage; } @Nullable - public RemotePrimaryShardFound getRemotePrimaryShardFound(){ + RemotePrimaryShardFound getRemotePrimaryShardFound() { return remotePrimaryShardFound; } - @Nullable - public LocalPrimaryShardFound getLocalPrimaryShardFound(){ - return localPrimaryShardFound; + @Nonnull + String getShardName() { + return shardName; } + } - boolean isPrimaryLocal(){ - return (remotePrimaryShardFound == null); + /** + * The WrappedShardResponse class wraps a response from a Shard. + */ + private static class WrappedShardResponse { + private final String shardName; + private final Object response; + + private WrappedShardResponse(String shardName, Object response) { + this.shardName = shardName; + this.response = response; } - @Nonnull - public String getShardName() { + String getShardName() { return shardName; } + + Object getResponse() { + return response; + } } }