X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=8386e669b83f46cb102771f7c12f1b9458f5e485;hp=aa2c524fe7cf5acd3c0b49fc8f210df013ff6e44;hb=4639f61a41a93d6a762af97b819d164781b0f9f8;hpb=4e3f49788c05730b29468deebc2aaa4ed0d94eef;ds=sidebyside diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index aa2c524fe7..8386e669b8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -18,6 +18,7 @@ import akka.actor.Status; import akka.actor.SupervisorStrategy; import akka.actor.SupervisorStrategy.Directive; import akka.cluster.ClusterEvent; +import akka.cluster.ClusterEvent.MemberWeaklyUp; import akka.cluster.Member; import akka.dispatch.Futures; import akka.dispatch.OnComplete; @@ -40,13 +41,13 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import java.util.function.Supplier; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; @@ -63,10 +64,12 @@ import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundE import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; +import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; @@ -81,12 +84,16 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus; import org.opendaylight.controller.cluster.raft.messages.RemoveServer; import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; @@ -189,6 +196,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onActorInitialized(message); } else if (message instanceof ClusterEvent.MemberUp){ memberUp((ClusterEvent.MemberUp) message); + } else if (message instanceof ClusterEvent.MemberWeaklyUp){ + memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message); } else if (message instanceof ClusterEvent.MemberExited){ memberExited((ClusterEvent.MemberExited) message); } else if(message instanceof ClusterEvent.MemberRemoved) { @@ -220,9 +229,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if(message instanceof ForwardedAddServerFailure) { ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message; onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure); - } else if(message instanceof PrimaryShardFoundForContext) { - PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message; - onPrimaryShardFoundContext(primaryShardFoundContext); } else if(message instanceof RemoveShardReplica) { onRemoveShardReplica((RemoveShardReplica) message); } else if(message instanceof WrappedShardResponse){ @@ -231,6 +237,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onGetSnapshot(); } else if(message instanceof ServerRemoved){ onShardReplicaRemoved((ServerRemoved) message); + } else if(message instanceof ChangeShardMembersVotingStatus){ + onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message); + } else if(message instanceof FlipShardMembersVotingStatus){ + onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message); } else if(message instanceof SaveSnapshotSuccess) { onSaveSnapshotSuccess((SaveSnapshotSuccess)message); } else if(message instanceof SaveSnapshotFailure) { @@ -240,6 +250,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onShutDown(); } else if (message instanceof GetLocalShardIds) { onGetLocalShardIds(); + } else if(message instanceof RunnableMessage) { + ((RunnableMessage)message).run(); } else { unknownMessage(message); } @@ -295,7 +307,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg, String leaderPath) { - shardReplicaOperationsInProgress.remove(shardId); + shardReplicaOperationsInProgress.remove(shardId.getShardName()); LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName()); @@ -312,16 +324,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) { - if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) { - addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(), - getSender()); - } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){ - removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(), - primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender()); - } - } - private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath, final ActorRef sender) { if(isShardReplicaOperationInProgress(shardName, sender)) { @@ -347,6 +349,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void onComplete(Throwable failure, Object response) { if (failure != null) { + shardReplicaOperationsInProgress.remove(shardName); String msg = String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName); @@ -601,9 +604,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String actorName = sender.path().name(); //find shard name from actor name; actor name is stringified shardId - ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build(); - if (shardId.getShardName() == null) { + final ShardIdentifier shardId; + try { + shardId = ShardIdentifier.fromShardIdString(actorName); + } catch (IllegalArgumentException e) { + LOG.debug("{}: ignoring actor {}", actorName, e); return; } @@ -730,7 +736,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void memberRemoved(ClusterEvent.MemberRemoved message) { MemberName memberName = memberToName(message.member()); - LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, + LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); peerAddressResolver.removePeerAddress(memberName); @@ -743,7 +749,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void memberExited(ClusterEvent.MemberExited message) { MemberName memberName = memberToName(message.member()); - LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName, + LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); peerAddressResolver.removePeerAddress(memberName); @@ -756,14 +762,26 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void memberUp(ClusterEvent.MemberUp message) { MemberName memberName = memberToName(message.member()); - LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, + LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); - addPeerAddress(memberName, message.member().address()); + memberUp(memberName, message.member().address()); + } + private void memberUp(MemberName memberName, Address address) { + addPeerAddress(memberName, address); checkReady(); } + private void memberWeaklyUp(MemberWeaklyUp message) { + MemberName memberName = memberToName(message.member()); + + LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName, + message.member().address()); + + memberUp(memberName, message.member().address()); + } + private void addPeerAddress(MemberName memberName, Address address) { peerAddressResolver.addPeerAddress(memberName, address); @@ -778,7 +796,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void memberReachable(ClusterEvent.ReachableMember message) { MemberName memberName = memberToName(message.member()); - LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address()); + LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address()); addPeerAddress(memberName, message.member().address()); @@ -787,7 +805,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void memberUnreachable(ClusterEvent.UnreachableMember message) { MemberName memberName = memberToName(message.member()); - LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address()); + LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address()); markMemberUnavailable(memberName); } @@ -1076,7 +1094,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) { @Override public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { - getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()), getTargetActor()); } @Override @@ -1224,12 +1242,16 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardReplicaMsg.getShardName(), persistenceId(), getSelf()) { @Override public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { - getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + doRemoveShardReplicaAsync(response.getPrimaryPath()); } @Override public void onLocalPrimaryFound(LocalPrimaryShardFound response) { - getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + doRemoveShardReplicaAsync(response.getPrimaryPath()); + } + + private void doRemoveShardReplicaAsync(final String primaryPath) { + getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender()), getTargetActor()); } }); } @@ -1281,6 +1303,140 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { 0, 0)); } + private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) { + LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus); + + String shardName = changeMembersVotingStatus.getShardName(); + Map serverVotingStatusMap = new HashMap<>(); + for(Entry e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) { + serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(), + e.getValue()); + } + + ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap); + + findLocalShard(shardName, getSender(), + localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName, + localShardFound.getPath(), getSender())); + } + + private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) { + LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus); + + ActorRef sender = getSender(); + final String shardName = flipMembersVotingStatus.getShardName(); + findLocalShard(shardName, sender, localShardFound -> { + Future future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE, + Timeout.apply(30, TimeUnit.SECONDS)); + + future.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + sender.tell(new Status.Failure(new RuntimeException( + String.format("Failed to access local shard %s", shardName), failure)), self()); + return; + } + + OnDemandRaftState raftState = (OnDemandRaftState) response; + Map serverVotingStatusMap = new HashMap<>(); + for(Entry e: raftState.getPeerVotingStates().entrySet()) { + serverVotingStatusMap.put(e.getKey(), !e.getValue()); + } + + serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName). + toString(), !raftState.isVoting()); + + changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap), + shardName, localShardFound.getPath(), sender); + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + }); + + } + + private void findLocalShard(final String shardName, final ActorRef sender, + final Consumer onLocalShardFound) { + Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext(). + getShardInitializationTimeout().duration().$times(2)); + + Future futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout); + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + LOG.debug ("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, failure); + sender.tell(new Status.Failure(new RuntimeException( + String.format("Failed to find local shard %s", shardName), failure)), self()); + } else { + if(response instanceof LocalShardFound) { + getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response), sender); + } else if(response instanceof LocalShardNotFound) { + String msg = String.format("Local shard %s does not exist", shardName); + LOG.debug ("{}: {}", persistenceId, msg); + sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self()); + } else { + String msg = String.format("Failed to find local shard %s: received response: %s", + shardName, response); + LOG.debug ("{}: {}", persistenceId, msg); + sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response : + new RuntimeException(msg)), self()); + } + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + + private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus, + final String shardName, final ActorRef shardActorRef, final ActorRef sender) { + if(isShardReplicaOperationInProgress(shardName, sender)) { + return; + } + + shardReplicaOperationsInProgress.add(shardName); + + DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build(); + final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); + + LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(), + changeServersVotingStatus, shardActorRef.path()); + + Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2)); + Future futureObj = ask(shardActorRef, changeServersVotingStatus, timeout); + + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + shardReplicaOperationsInProgress.remove(shardName); + if (failure != null) { + String msg = String.format("ChangeServersVotingStatus request to local shard %s failed", + shardActorRef.path()); + LOG.debug ("{}: {}", persistenceId(), msg, failure); + sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); + } else { + LOG.debug ("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path()); + + ServerChangeReply replyMsg = (ServerChangeReply) response; + if(replyMsg.getStatus() == ServerChangeStatus.OK) { + LOG.debug ("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName); + sender.tell(new Status.Success(null), getSelf()); + } else if(replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) { + sender.tell(new Status.Failure(new IllegalArgumentException(String.format( + "The requested voting state change for shard %s is invalid. At least one member must be voting", + shardId.getShardName()))), getSelf()); + } else { + LOG.warn ("{}: ChangeServersVotingStatus failed for shard {} with status {}", + persistenceId(), shardName, replyMsg.getStatus()); + + Exception error = getServerChangeException(ChangeServersVotingStatus.class, + replyMsg.getStatus(), shardActorRef.path().toString(), shardId); + sender.tell(new Status.Failure(error), getSelf()); + } + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + private static final class ForwardedAddServerReply { ShardInformation shardInfo; AddServerReply addServerReply; @@ -1362,6 +1518,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); } + private static interface RunnableMessage extends Runnable { + } + /** * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the * a remote or local find primary message is processed @@ -1443,52 +1602,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - /** - * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be - * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received - * as a successful response to find primary. - */ - private static class PrimaryShardFoundForContext { - private final String shardName; - private final Object contextMessage; - private final RemotePrimaryShardFound remotePrimaryShardFound; - private final LocalPrimaryShardFound localPrimaryShardFound; - - public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, - @Nonnull Object primaryFoundMessage) { - this.shardName = Preconditions.checkNotNull(shardName); - this.contextMessage = Preconditions.checkNotNull(contextMessage); - Preconditions.checkNotNull(primaryFoundMessage); - this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? - (RemotePrimaryShardFound) primaryFoundMessage : null; - this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? - (LocalPrimaryShardFound) primaryFoundMessage : null; - } - - @Nonnull - String getPrimaryPath(){ - if(remotePrimaryShardFound != null) { - return remotePrimaryShardFound.getPrimaryPath(); - } - return localPrimaryShardFound.getPrimaryPath(); - } - - @Nonnull - Object getContextMessage() { - return contextMessage; - } - - @Nullable - RemotePrimaryShardFound getRemotePrimaryShardFound() { - return remotePrimaryShardFound; - } - - @Nonnull - String getShardName() { - return shardName; - } - } - /** * The WrappedShardResponse class wraps a response from a Shard. */