From: Tom Pantelis Date: Thu, 12 Nov 2015 08:47:27 +0000 (-0500) Subject: Bug 2187: Bootstrap EOS shard when no local shards configured X-Git-Tag: release/beryllium~156 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=8882e6077db69d22bcc57fcf12dd4a02a81a4967 Bug 2187: Bootstrap EOS shard when no local shards configured The intended workflow to initially form a cluster dynamically is to change the role for a second node to say member-2. Since the initial static shard config is bootstrapped to member-1, no local shards will be created. However, the entity-ownership shard is special in that it is intended to exist on every node. The EOS will be boostrapped as follows: For the EOS CreateShard message, all unique members for all shards are obtained from the static shard config. It assumes the local member is present in the config however in the above workflow it won't be. So on EOS CreateShard, if the local member isn’t in the initial member list then it will create the local shard with an empty peer list and DisableElectionsRaftPolicy so it stays as follower. Also the shard will be flagged as inactive in the ShardManager. A subsequent AddShardReplica will be needed to make it active. The other option is to not create EOS shard but there may be initial candidate registrations which would be missed unless we add retry logic in the service class. But the EOS shard already has retry logic so it would be ideal to leverage it. I also made changes to the AddShardReplica logic to handle an existing local shard as will occur for the EOS shard: - remove the failure reply if local shard already exists - if the local shard exists and the primary shard is the local shard, do nothing and return AlreadyExistsException failure reply - otherwise send AddServer to the primary - on FindPrimary, if the local shard exists but is not active, do a remote find as if the local shard doesn't exist - on AddServer, if the new server is already in the peer list, the ALREADY_EXISTS status is returned. Return AlreadyExistsException failure reply - on AddServer failure, if the local shard was pre-existing don't remove it. We still want to prevent an AddShardReplica request which one is already in progress so I added a Set to track this. I added an integration test for bootstrapping the EOS shard. It starts with an inactive shard and registers a candidate, which gets queued since there's no leader. It then issues AddShardReplica and verifies the candidate gets registered with the leader. To get this to work required some teaks in the RaftActor and Follower. When the ShardManager clears the DisableElectionsRaftPolicy, the RaftActor creates a new Follower instance however it loses the previous leader Id. If the new server config hasn't been replicated yet then it has no peers and immediately tries to start an election. Since it has no peers it goes to Leader wih no followers creating a 2 leader situation. To alleviate this I transferred the previous leader Id to the new Follower instance to prevent the immediate election. Eeven after that the test still didn't work b/c the leader was still not in the EOS shard peer list so lookup of the leader address returned null. So I changed getPeerAddress in the RaftActorContext to lookup in the resolver if no peer info exists. I also added more units for AddShardReplica to increase code coverage. Change-Id: Id2a12ae226af69611d5ca5155f5f018cef82dff4 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 5f6f3ec24e..b93ea4dd8a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -502,12 +502,23 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { String newRaftPolicy = configParams. getCustomRaftPolicyImplementationClass(); - LOG.debug ("RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", + LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(), oldRaftPolicy, newRaftPolicy); context.setConfigParams(configParams); if (!Objects.equal(oldRaftPolicy, newRaftPolicy)) { - //RaftPolicy is modifed for the Actor. Re-initialize its current behaviour - initializeBehavior(); + // The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower + // but transfer the previous leaderId so it doesn't immediately try to schedule an election. This + // avoids potential disruption. Otherwise, switch to Follower normally. + RaftActorBehavior behavior = currentBehavior.getDelegate(); + if(behavior instanceof Follower) { + String previousLeaderId = ((Follower)behavior).getLeaderId(); + + LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId); + + changeCurrentBehavior(new Follower(context, previousLeaderId)); + } else { + initializeBehavior(); + } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 66059b5d62..a96d502663 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -13,7 +13,6 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import java.util.ArrayList; @@ -24,8 +23,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import org.opendaylight.controller.cluster.DataPersistenceProvider; -import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; +import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.slf4j.Logger; public class RaftActorContextImpl implements RaftActorContext { @@ -181,6 +180,8 @@ public class RaftActorContextImpl implements RaftActorContext { peerAddress = configParams.getPeerAddressResolver().resolve(peerId); peerInfo.setAddress(peerAddress); } + } else { + peerAddress = configParams.getPeerAddressResolver().resolve(peerId); } return peerAddress; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 3b1e69d0ac..0149b57e6e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -45,12 +45,17 @@ public class Follower extends AbstractRaftActorBehavior { private static final int SYNC_THRESHOLD = 10; public Follower(RaftActorContext context) { + this(context, null); + } + + public Follower(RaftActorContext context, String initialLeaderId) { super(context, RaftState.Follower); + leaderId = initialLeaderId; initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD); if(context.getRaftPolicy().automaticElectionsEnabled()) { - if (context.getPeerIds().isEmpty()) { + if (context.getPeerIds().isEmpty() && getLeaderId() == null) { actor().tell(ELECTION_TIMEOUT, actor()); } else { scheduleElection(electionDuration()); @@ -346,6 +351,8 @@ public class Follower extends AbstractRaftActorBehavior { logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(), installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()); + leaderId = installSnapshot.getLeaderId(); + if(snapshotTracker == null){ snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks()); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java index a2f9e78769..f2903983e9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java @@ -60,13 +60,15 @@ public class RaftActorContextImplTest extends AbstractActorTest { PeerAddressResolver mockResolver = mock(PeerAddressResolver.class); doReturn("peerAddress2").when(mockResolver).resolve("peer2"); + doReturn("peerAddress3").when(mockResolver).resolve("peer3"); configParams.setPeerAddressResolver(mockResolver); assertEquals("getPeerAddress", "peerAddress2", context.getPeerAddress("peer2")); + assertEquals("getPeerAddress", "peerAddress3", context.getPeerAddress("peer3")); reset(mockResolver); - assertEquals("getPeerAddress", "peerAddress2", context.getPeerAddress("peer2")); assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1")); + assertEquals("getPeerAddress", "peerAddress2", context.getPeerAddress("peer2")); verify(mockResolver, never()).resolve(anyString()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 98a6090514..7dcc52028c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -36,6 +36,7 @@ import com.google.common.collect.Sets; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -44,9 +45,11 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; +import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; @@ -56,7 +59,6 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.Sha import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; -import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; @@ -134,6 +136,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private DatastoreSnapshot restoreFromSnapshot; + private final Set shardReplicaOperationsInProgress = new HashSet<>(); + + private final String id; + /** */ protected ShardManager(Builder builder) { @@ -148,6 +154,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.primaryShardInfoCache = builder.primaryShardInfoCache; this.restoreFromSnapshot = builder.restoreFromSnapshot; + id = "shard-manager-" + type; + peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName()); // Subscribe this actor to cluster member events @@ -252,47 +260,66 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onCreateShard(CreateShard createShard) { Object reply; try { - ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig(); - if(localShards.containsKey(moduleShardConfig.getShardName())) { - throw new IllegalStateException(String.format("Shard with name %s already exists", - moduleShardConfig.getShardName())); + String shardName = createShard.getModuleShardConfig().getShardName(); + if(localShards.containsKey(shardName)) { + reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName)); + } else { + doCreateShard(createShard); + reply = new akka.actor.Status.Success(null); } + } catch (Exception e) { + LOG.error("onCreateShard failed", e); + reply = new akka.actor.Status.Failure(e); + } - configuration.addModuleShardConfiguration(moduleShardConfig); + if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) { + getSender().tell(reply, getSelf()); + } + } - ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), moduleShardConfig.getShardName()); - Map peerAddresses = getPeerAddresses(moduleShardConfig.getShardName()/*, - moduleShardConfig.getShardMemberNames()*/); + private void doCreateShard(CreateShard createShard) { + ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig(); + String shardName = moduleShardConfig.getShardName(); - LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId, - moduleShardConfig.getShardMemberNames(), peerAddresses); + configuration.addModuleShardConfiguration(moduleShardConfig); - DatastoreContext shardDatastoreContext = createShard.getDatastoreContext(); - if(shardDatastoreContext == null) { - shardDatastoreContext = newShardDatastoreContext(moduleShardConfig.getShardName()); - } else { - shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver( - peerAddressResolver).build(); - } + DatastoreContext shardDatastoreContext = createShard.getDatastoreContext(); + if(shardDatastoreContext == null) { + shardDatastoreContext = newShardDatastoreContext(shardName); + } else { + shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver( + peerAddressResolver).build(); + } - ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses, - shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver); - localShards.put(info.getShardName(), info); + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); - mBean.addLocalShard(shardId.toString()); + Map peerAddresses; + boolean isActiveMember; + if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) { + peerAddresses = getPeerAddresses(shardName); + isActiveMember = true; + } else { + // The local member is not in the given shard member configuration. In this case we'll create + // the shard with no peers and with elections disabled so it stays as follower. A + // subsequent AddServer request will be needed to make it an active member. + isActiveMember = false; + peerAddresses = Collections.emptyMap(); + shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext). + customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build(); + } - if(schemaContext != null) { - info.setActor(newShardActor(schemaContext, info)); - } + LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId, + moduleShardConfig.getShardMemberNames(), peerAddresses); - reply = new CreateShardReply(); - } catch (Exception e) { - LOG.error("onCreateShard failed", e); - reply = new akka.actor.Status.Failure(e); - } + ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses, + shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver); + info.setActiveMember(isActiveMember); + localShards.put(info.getShardName(), info); - if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) { - getSender().tell(reply, getSelf()); + mBean.addLocalShard(shardId.toString()); + + if(schemaContext != null) { + info.setActor(newShardActor(schemaContext, info)); } } @@ -677,7 +704,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // First see if the there is a local replica for the shard final ShardInformation info = localShards.get(shardName); - if (info != null) { + if (info != null && info.isActiveMember()) { sendResponse(info, message.isWaitUntilReady(), true, new Supplier() { @Override public Object get() { @@ -791,7 +818,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public String persistenceId() { - return "shard-manager-" + type; + return id; } @VisibleForTesting @@ -799,22 +826,22 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return mBean; } - private void checkLocalShardExists(final String shardName, final ActorRef sender) { - if (localShards.containsKey(shardName)) { - String msg = String.format("Local shard %s already exists", shardName); + private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) { + if (shardReplicaOperationsInProgress.contains(shardName)) { + String msg = String.format("A shard replica operation for %s is already in progress", shardName); LOG.debug ("{}: {}", persistenceId(), msg); - sender.tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf()); + sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf()); + return true; } + + return false; } private void onAddShardReplica (AddShardReplica shardReplicaMsg) { final String shardName = shardReplicaMsg.getShardName(); - // verify the local shard replica is already available in the controller node LOG.debug ("onAddShardReplica: {}", shardReplicaMsg); - checkLocalShardExists(shardName, getSender()); - // verify the shard with the specified name is present in the cluster configuration if (!(this.configuration.isShardConfigured(shardName))) { String msg = String.format("No module configuration exists for shard %s", shardName); @@ -832,95 +859,114 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } - Map peerAddresses = getPeerAddresses(shardName); - if (peerAddresses.isEmpty()) { - String msg = String.format("Cannot add replica for shard %s because no peer is available", shardName); - LOG.debug ("{}: {}", persistenceId(), msg); - getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf()); - return; - } - Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext(). getShardInitializationTimeout().duration().$times(2)); final ActorRef sender = getSender(); - Future futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout); + Future futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout); futureObj.onComplete(new OnComplete() { @Override public void onComplete(Throwable failure, Object response) { if (failure != null) { LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure); sender.tell(new akka.actor.Status.Failure(new RuntimeException( - String.format("Failed to find leader for shard %s", shardName), failure)), - getSelf()); + String.format("Failed to find leader for shard %s", shardName), failure)), getSelf()); } else { - if (!(response instanceof RemotePrimaryShardFound)) { + if(response instanceof RemotePrimaryShardFound) { + RemotePrimaryShardFound message = (RemotePrimaryShardFound)response; + addShard (shardName, message, sender); + } else if(response instanceof LocalPrimaryShardFound) { + sendLocalReplicaAlreadyExistsReply(shardName, sender); + } else { String msg = String.format("Failed to find leader for shard %s: received response: %s", shardName, response); LOG.debug ("{}: {}", persistenceId(), msg); - sender.tell(new akka.actor.Status.Failure(new RuntimeException(msg)), getSelf()); - return; + sender.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable)response : + new RuntimeException(msg)), getSelf()); } - - RemotePrimaryShardFound message = (RemotePrimaryShardFound)response; - addShard (shardName, message, sender); } } }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); } + private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) { + String msg = String.format("Local shard %s already exists", shardName); + LOG.debug ("{}: {}", persistenceId(), msg); + sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf()); + } + private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) { - checkLocalShardExists(shardName, sender); + if(isShardReplicaOperationInProgress(shardName, sender)) { + return; + } - ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); - String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName()); + shardReplicaOperationsInProgress.add(shardName); + + final ShardInformation shardInfo; + final boolean removeShardOnFailure; + ShardInformation existingShardInfo = localShards.get(shardName); + if(existingShardInfo == null) { + removeShardOnFailure = true; + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); + + DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation( + DisableElectionsRaftPolicy.class.getName()).build(); - DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation( - DisableElectionsRaftPolicy.class.getName()).build(); + shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext, + Shard.builder(), peerAddressResolver); + shardInfo.setActiveMember(false); + localShards.put(shardName, shardInfo); + shardInfo.setActor(newShardActor(schemaContext, shardInfo)); + } else { + removeShardOnFailure = false; + shardInfo = existingShardInfo; + } - final ShardInformation shardInfo = new ShardInformation(shardName, shardId, - getPeerAddresses(shardName), datastoreContext, - Shard.builder(), peerAddressResolver); - shardInfo.setShardActiveMember(false); - localShards.put(shardName, shardInfo); - shardInfo.setActor(newShardActor(schemaContext, shardInfo)); + String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName()); //inform ShardLeader to add this shard as a replica by sending an AddServer message LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(), - response.getPrimaryPath(), shardId); + response.getPrimaryPath(), shardInfo.getShardId()); - Timeout addServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(4)); + Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout(). + duration()); Future futureObj = ask(getContext().actorSelection(response.getPrimaryPath()), - new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout); + new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout); futureObj.onComplete(new OnComplete() { @Override public void onComplete(Throwable failure, Object addServerResponse) { + shardReplicaOperationsInProgress.remove(shardName); if (failure != null) { LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(), response.getPrimaryPath(), shardName, failure); - // Remove the shard - localShards.remove(shardName); - if (shardInfo.getActor() != null) { - shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf()); - } - - sender.tell(new akka.actor.Status.Failure(new RuntimeException( - String.format("AddServer request to leader %s for shard %s failed", - response.getPrimaryPath(), shardName), failure)), getSelf()); + onAddServerFailure(shardName, String.format("AddServer request to leader %s for shard %s failed", + response.getPrimaryPath(), shardName), failure, sender, removeShardOnFailure); } else { AddServerReply reply = (AddServerReply)addServerResponse; - onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath()); + onAddServerReply(shardInfo, reply, sender, response.getPrimaryPath(), removeShardOnFailure); } } - }, new Dispatchers(context().system().dispatchers()). - getDispatcher(Dispatchers.DispatcherType.Client)); - return; + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + + private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender, + boolean removeShardOnFailure) { + if(removeShardOnFailure) { + ShardInformation shardInfo = localShards.remove(shardName); + if (shardInfo.getActor() != null) { + shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf()); + } + } + + sender.tell(new akka.actor.Status.Failure(message == null ? failure : + new RuntimeException(message, failure)), getSelf()); } - private void onAddServerReply (String shardName, ShardInformation shardInfo, - AddServerReply replyMsg, ActorRef sender, String leaderPath) { + private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender, + String leaderPath, boolean removeShardOnFailure) { + String shardName = shardInfo.getShardName(); LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath); if (replyMsg.getStatus() == ServerChangeStatus.OK) { @@ -928,35 +974,35 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // Make the local shard voting capable shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf()); - shardInfo.setShardActiveMember(true); + shardInfo.setActiveMember(true); persistShardList(); mBean.addLocalShard(shardInfo.getShardId().toString()); - sender.tell(new akka.actor.Status.Success(true), getSelf()); + sender.tell(new akka.actor.Status.Success(null), getSelf()); + } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) { + sendLocalReplicaAlreadyExistsReply(shardName, sender); } else { - LOG.warn ("{}: Leader failed to add shard replica {} with status {} - removing the local shard", + LOG.warn ("{}: Leader failed to add shard replica {} with status {}", persistenceId(), shardName, replyMsg.getStatus()); - //remove the local replica created - localShards.remove(shardName); - if (shardInfo.getActor() != null) { - shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf()); - } + Exception failure; switch (replyMsg.getStatus()) { case TIMEOUT: - sender.tell(new akka.actor.Status.Failure(new RuntimeException( - String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data", - leaderPath, shardName))), getSelf()); + failure = new TimeoutException(String.format( + "The shard leader %s timed out trying to replicate the initial data to the new shard %s." + + "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data", + leaderPath, shardName)); break; case NO_LEADER: - sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format( - "There is no shard leader available for shard %s", shardName))), getSelf()); + failure = createNoShardLeaderException(shardInfo.getShardId()); break; default : - sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format( - "AddServer request to leader %s for shard %s failed with status %s", - leaderPath, shardName, replyMsg.getStatus()))), getSelf()); + failure = new RuntimeException(String.format( + "AddServer request to leader %s for shard %s failed with status %s", + leaderPath, shardName, replyMsg.getStatus())); } + + onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure); } } @@ -976,9 +1022,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void persistShardList() { - List shardList = new ArrayList(localShards.keySet()); + List shardList = new ArrayList<>(localShards.keySet()); for (ShardInformation shardInfo : localShards.values()) { - if (!shardInfo.isShardActiveMember()) { + if (!shardInfo.isActiveMember()) { shardList.remove(shardInfo.getShardName()); } } @@ -1031,7 +1077,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private DatastoreContext datastoreContext; private Shard.AbstractBuilder builder; private final ShardPeerAddressResolver addressResolver; - private boolean shardActiveStatus = true; + private boolean isActiveMember = true; private ShardInformation(String shardName, ShardIdentifier shardId, Map initialPeerAddresses, DatastoreContext datastoreContext, @@ -1231,12 +1277,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.leaderVersion = leaderVersion; } - void setShardActiveMember(boolean flag) { - shardActiveStatus = flag; + boolean isActiveMember() { + return isActiveMember; } - boolean isShardActiveMember() { - return shardActiveStatus; + void setActiveMember(boolean isActiveMember) { + this.isActiveMember = isActiveMember; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java index 1f31d122cd..bd07dc550a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java @@ -244,7 +244,12 @@ class EntityOwnershipShard extends Shard { protected void onStateChanged() { super.onStateChanged(); - commitCoordinator.onStateChanged(this, isLeader()); + boolean isLeader = isLeader(); + if(LOG.isDebugEnabled()) { + LOG.debug("{}: onStateChanged: isLeader: {}, hasLeader: {}", persistenceId(), isLeader, hasLeader()); + } + + commitCoordinator.onStateChanged(this, isLeader); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateShardReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/AlreadyExistsException.java similarity index 55% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateShardReply.java rename to opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/AlreadyExistsException.java index 1882391369..8d6d4f5170 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateShardReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/AlreadyExistsException.java @@ -5,12 +5,12 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.cluster.datastore.messages; +package org.opendaylight.controller.cluster.datastore.exceptions; -/** - * Reply message for CreateShard. - * - * @author Thomas Pantelis - */ -public class CreateShardReply { +public class AlreadyExistsException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public AlreadyExistsException(String message) { + super(message); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index b6ea14ff95..2072af68d4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -24,15 +24,18 @@ import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; import akka.actor.Status.Failure; +import akka.actor.Status.Success; import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.dispatch.Dispatchers; import akka.japi.Creator; import akka.pattern.Patterns; import akka.persistence.RecoveryCompleted; +import akka.serialization.Serialization; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; +import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -52,6 +55,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -62,6 +66,7 @@ import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; +import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; @@ -70,7 +75,6 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIde import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; -import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; @@ -99,6 +103,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; @@ -113,7 +118,7 @@ public class ShardManagerTest extends AbstractActorTest { private static int ID_COUNTER = 1; private final String shardMrgIDSuffix = "config" + ID_COUNTER++; - private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix; + private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); @Mock private static CountDownLatch ready; @@ -851,24 +856,22 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnRecoveryJournalIsCleaned() { - InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules( + String persistenceID = "shard-manager-" + shardMrgIDSuffix; + InMemoryJournal.addEntry(persistenceID, 1L, new ShardManager.SchemaContextModules( ImmutableSet.of("foo"))); - InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules( + InMemoryJournal.addEntry(persistenceID, 2L, new ShardManager.SchemaContextModules( ImmutableSet.of("bar"))); - InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID); + InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID); - new JavaTestKit(getSystem()) {{ - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + TestShardManager shardManager = newTestShardManager(); - shardManager.underlyingActor().waitForRecoveryComplete(); - InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID); + InMemoryJournal.waitForDeleteMessagesComplete(persistenceID); - // Journal entries up to the last one should've been deleted - Map journal = InMemoryJournal.get(shardMgrID); - synchronized (journal) { - assertEquals("Journal size", 0, journal.size()); - } - }}; + // Journal entries up to the last one should've been deleted + Map journal = InMemoryJournal.get(persistenceID); + synchronized (journal) { + assertEquals("Journal size", 0, journal.size()); + } } @Test @@ -1056,7 +1059,7 @@ public class ShardManagerTest extends AbstractActorTest { } @Test - public void testOnReceiveCreateShard() { + public void testOnCreateShard() { new JavaTestKit(getSystem()) {{ datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); @@ -1074,7 +1077,7 @@ public class ShardManagerTest extends AbstractActorTest { "foo", null, Arrays.asList("member-1", "member-5", "member-6")); shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef()); - expectMsgClass(duration("5 seconds"), CreateShardReply.class); + expectMsgClass(duration("5 seconds"), Success.class); shardManager.tell(new FindLocalShard("foo", true), getRef()); @@ -1090,16 +1093,43 @@ public class ShardManagerTest extends AbstractActorTest { shardBuilder.getId()); assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext()); - // Send CreateShard with same name - should fail. + // Send CreateShard with same name - should return Success with a message. + + shardManager.tell(new CreateShard(config, shardBuilder, null), getRef()); + + Success success = expectMsgClass(duration("5 seconds"), Success.class); + assertNotNull("Success status is null", success.status()); + }}; + } + + @Test + public void testOnCreateShardWithLocalMemberNotInShardConfig() { + new JavaTestKit(getSystem()) {{ + datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); + + ActorRef shardManager = getSystem().actorOf(newShardMgrProps( + new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender()); + + Shard.Builder shardBuilder = Shard.builder(); + ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", + "foo", null, Arrays.asList("member-5", "member-6")); shardManager.tell(new CreateShard(config, shardBuilder, null), getRef()); + expectMsgClass(duration("5 seconds"), Success.class); - expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + shardManager.tell(new FindLocalShard("foo", true), getRef()); + expectMsgClass(duration("5 seconds"), LocalShardFound.class); + + assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size()); + assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), + shardBuilder.getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass()); }}; } @Test - public void testOnReceiveCreateShardWithNoInitialSchemaContext() { + public void testOnCreateShardWithNoInitialSchemaContext() { new JavaTestKit(getSystem()) {{ ActorRef shardManager = getSystem().actorOf(newShardMgrProps( new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); @@ -1110,7 +1140,7 @@ public class ShardManagerTest extends AbstractActorTest { "foo", null, Arrays.asList("member-1")); shardManager.tell(new CreateShard(config, shardBuilder, null), getRef()); - expectMsgClass(duration("5 seconds"), CreateShardReply.class); + expectMsgClass(duration("5 seconds"), Success.class); SchemaContext schemaContext = TestModel.createTestContext(); shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); @@ -1165,7 +1195,7 @@ public class ShardManagerTest extends AbstractActorTest { } @Test - public void testAddShardReplicaForNonExistentShard() throws Exception { + public void testAddShardReplicaForNonExistentShardConfig() throws Exception { new JavaTestKit(getSystem()) {{ ActorRef shardManager = getSystem().actorOf(newShardMgrProps( new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); @@ -1178,17 +1208,6 @@ public class ShardManagerTest extends AbstractActorTest { }}; } - @Test - public void testAddShardReplicaForAlreadyCreatedShard() throws Exception { - new JavaTestKit(getSystem()) {{ - ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); - shardManager.tell(new AddShardReplica("default"), getRef()); - Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class); - assertEquals("Failure obtained", true, - (resp.cause() instanceof IllegalArgumentException)); - }}; - } - @Test public void testAddShardReplica() throws Exception { MockConfiguration mockConfig = @@ -1253,35 +1272,167 @@ public class ShardManagerTest extends AbstractActorTest { } @Test - public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception { - MockConfiguration mockConfig = - new MockConfiguration(ImmutableMap.>builder(). - put("default", Arrays.asList("member-1", "member-2")). - put("astronauts", Arrays.asList("member-2")).build()); + public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception { + new JavaTestKit(getSystem()) {{ + TestActorRef shardManager = TestActorRef.create(getSystem(), + newPropsShardMgrWithMockShardActor(), shardMgrID); - String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - // Create an ActorSystem ShardManager actor for member-1. - final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); - Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); - ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); - final TestActorRef newReplicaShardManager = TestActorRef.create(system1, - newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor, - new ClusterWrapperImpl(system1), mockConfig), shardManagerID); + String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix; + AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null); + ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf( + Props.create(MockRespondActor.class, addServerReply), leaderId); - new JavaTestKit(system1) {{ + MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString()); + + String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(newReplicaId, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); + shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId, Optional.absent(), + DataStoreVersions.CURRENT_VERSION), mockShardActor); + + shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef()); + + MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class); + + Failure resp = expectMsgClass(duration("5 seconds"), Failure.class); + assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass()); + + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); + expectMsgClass(duration("5 seconds"), LocalShardFound.class); + + // Send message again to verify previous in progress state is cleared + + shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef()); + resp = expectMsgClass(duration("5 seconds"), Failure.class); + assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass()); + + // Send message again with an AddServer timeout to verify the pre-existing shard actor isn't terminated. + + shardManager.tell(newDatastoreContextFactory(datastoreContextBuilder. + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), getRef()); + leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender()); + shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef()); + expectMsgClass(duration("5 seconds"), Failure.class); + + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); + expectMsgClass(duration("5 seconds"), LocalShardFound.class); + + leaderShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception { + new JavaTestKit(getSystem()) {{ + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION), getRef()); + shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), + RaftState.Leader.name())), mockShardActor); + + shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef()); + Failure resp = expectMsgClass(duration("5 seconds"), Failure.class); + assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass()); + + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); + expectMsgClass(duration("5 seconds"), LocalShardFound.class); + }}; + } + + @Test + public void testAddShardReplicaWithAddServerReplyFailure() throws Exception { + new JavaTestKit(getSystem()) {{ + JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem()); + + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("astronauts", Arrays.asList("member-2")).build()); + + ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1"); + TestActorRef shardManager = TestActorRef.create(getSystem(), + newPropsShardMgrWithMockShardActor("newReplicaShardManager", mockNewReplicaShardActor, + new MockClusterWrapper(), mockConfig), shardMgrID); + shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + JavaTestKit terminateWatcher = new JavaTestKit(getSystem()); + terminateWatcher.watch(mockNewReplicaShardActor); + + shardManager.tell(new AddShardReplica("astronauts"), getRef()); + + AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class); + assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix, + addServerMsg.getNewServerId()); + mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null)); + + Failure failure = expectMsgClass(duration("5 seconds"), Failure.class); + assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass()); + + shardManager.tell(new FindLocalShard("astronauts", false), getRef()); + expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); + + terminateWatcher.expectTerminated(mockNewReplicaShardActor); + + shardManager.tell(new AddShardReplica("astronauts"), getRef()); + mockShardLeaderKit.expectMsgClass(AddServer.class); + mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null)); + failure = expectMsgClass(duration("5 seconds"), Failure.class); + assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass()); + }}; + } + + @Test + public void testAddShardReplicaWithAlreadyInProgress() throws Exception { + new JavaTestKit(getSystem()) {{ + JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem()); + JavaTestKit secondRequestKit = new JavaTestKit(getSystem()); + + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("astronauts", Arrays.asList("member-2")).build()); + + TestActorRef shardManager = TestActorRef.create(getSystem(), + newPropsShardMgrWithMockShardActor("newReplicaShardManager", mockShardActor, + new MockClusterWrapper(), mockConfig), shardMgrID); + shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + shardManager.tell(new AddShardReplica("astronauts"), getRef()); + + mockShardLeaderKit.expectMsgClass(AddServer.class); + + shardManager.tell(new AddShardReplica("astronauts"), secondRequestKit.getRef()); + + secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class); + }}; + } + + @Test + public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception { + new JavaTestKit(getSystem()) {{ + MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). + put("astronauts", Arrays.asList("member-2")).build()); + + ActorRef newReplicaShardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor( + "shardManager", mockShardActor, new MockClusterWrapper(), mockConfig)); newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString()); - newReplicaShardManager.underlyingActor().waitForMemberUp(); newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef()); Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class); assertEquals("Failure obtained", true, (resp.cause() instanceof RuntimeException)); }}; - - JavaTestKit.shutdownActorSystem(system1); } @Test @@ -1308,7 +1459,7 @@ public class ShardManagerTest extends AbstractActorTest { put("people", Arrays.asList("member-1", "member-2")).build()); String[] restoredShards = {"default", "astronauts"}; ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards)); - InMemorySnapshotStore.addSnapshot(shardMgrID, snapshot); + InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot); //create shardManager to come up with restored data TestActorRef newRestoredShardManager = TestActorRef.create(getSystem(), @@ -1388,6 +1539,24 @@ public class ShardManagerTest extends AbstractActorTest { } } + interface MessageInterceptor extends Function { + boolean canIntercept(Object message); + } + + private MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) { + return new MessageInterceptor(){ + @Override + public Object apply(Object message) { + return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1); + } + + @Override + public boolean canIntercept(Object message) { + return message instanceof FindPrimary; + } + }; + } + private static class ForwardingShardManager extends ShardManager { private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1); private CountDownLatch memberUpReceived = new CountDownLatch(1); @@ -1398,6 +1567,7 @@ public class ShardManagerTest extends AbstractActorTest { private final String name; private final CountDownLatch snapshotPersist = new CountDownLatch(1); private ShardManagerSnapshot snapshot; + private volatile MessageInterceptor messageInterceptor; public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) { super(builder); @@ -1405,10 +1575,19 @@ public class ShardManagerTest extends AbstractActorTest { this.name = name; } + void setMessageInterceptor(MessageInterceptor messageInterceptor) { + this.messageInterceptor = messageInterceptor; + } + + @Override public void handleCommand(Object message) throws Exception { try{ - super.handleCommand(message); + if(messageInterceptor != null && messageInterceptor.canIntercept(message)) { + getSender().tell(messageInterceptor.apply(message), getSelf()); + } else { + super.handleCommand(message); + } } finally { if(message instanceof FindPrimary) { findPrimaryMessageReceived.countDown(); @@ -1491,9 +1670,19 @@ public class ShardManagerTest extends AbstractActorTest { } private static class MockRespondActor extends MessageCollectorActor { + static final String CLEAR_RESPONSE = "clear-response"; private volatile Object responseMsg; + @SuppressWarnings("unused") + public MockRespondActor() { + } + + @SuppressWarnings("unused") + public MockRespondActor(Object responseMsg) { + this.responseMsg = responseMsg; + } + public void updateResponse(Object response) { responseMsg = response; } @@ -1504,8 +1693,9 @@ public class ShardManagerTest extends AbstractActorTest { if (message instanceof AddServer) { if (responseMsg != null) { getSender().tell(responseMsg, getSelf()); - responseMsg = null; } + } if(message.equals(CLEAR_RESPONSE)) { + responseMsg = null; } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java index 2ca128271e..2167bd275d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java @@ -17,11 +17,14 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.entityownership.AbstractEntityOwnershipTest.ownershipChange; +import static org.opendaylight.controller.cluster.datastore.entityownership.DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; +import akka.actor.Status.Failure; +import akka.actor.Status.Success; import akka.cluster.Cluster; import akka.testkit.JavaTestKit; import com.google.common.base.Function; @@ -44,6 +47,7 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.IntegrationTestKit; import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig; +import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException; import org.opendaylight.controller.md.sal.common.api.clustering.Entity; @@ -66,6 +70,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; public class DistributedEntityOwnershipIntegrationTest { private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); private static final String MODULE_SHARDS_CONFIG = "module-shards-default.conf"; + private static final String MODULE_SHARDS_MEMBER_1_CONFIG = "module-shards-default-member-1.conf"; private static final String ENTITY_TYPE1 = "entityType1"; private static final String ENTITY_TYPE2 = "entityType2"; private static final Entity ENTITY1 = new Entity(ENTITY_TYPE1, "entity1"); @@ -110,32 +115,48 @@ public class DistributedEntityOwnershipIntegrationTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); + } - leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); - Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS); + @After + public void tearDown() { + if(leaderSystem != null) { + JavaTestKit.shutdownActorSystem(leaderSystem); + } - follower1System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); - Cluster.get(follower1System).join(MEMBER_1_ADDRESS); + if(follower1System != null) { + JavaTestKit.shutdownActorSystem(follower1System); + } + + if(follower2System != null) { + JavaTestKit.shutdownActorSystem(follower2System); + } + } + private void startAllSystems() { + startLeaderSystem(); + startFollower1System(); + startFollower2System(); + } + + private void startFollower2System() { follower2System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member3")); Cluster.get(follower2System).join(MEMBER_1_ADDRESS); } - @After - public void tearDown() { - JavaTestKit.shutdownActorSystem(leaderSystem); - JavaTestKit.shutdownActorSystem(follower1System); - JavaTestKit.shutdownActorSystem(follower2System); + private void startFollower1System() { + follower1System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); + Cluster.get(follower1System).join(MEMBER_1_ADDRESS); + } + + private void startLeaderSystem() { + leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS); } private void initDatastores(String type) { - leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder); - leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore( - type, MODULE_SHARDS_CONFIG, false, SCHEMA_CONTEXT); + initLeaderDatastore(type, MODULE_SHARDS_CONFIG); - follower1TestKit = new IntegrationTestKit(follower1System, followerDatastoreContextBuilder); - follower1DistributedDataStore = follower1TestKit.setupDistributedDataStore( - type, MODULE_SHARDS_CONFIG, false, SCHEMA_CONTEXT); + initFollower1Datastore(type, MODULE_SHARDS_CONFIG); follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder); follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore( @@ -145,21 +166,44 @@ public class DistributedEntityOwnershipIntegrationTest { follower1DistributedDataStore.waitTillReady(); follower2DistributedDataStore.waitTillReady(); - leaderEntityOwnershipService = new DistributedEntityOwnershipService(leaderDistributedDataStore, EntityOwnerSelectionStrategyConfig.newBuilder().build()); - leaderEntityOwnershipService.start(); + startLeaderService(); - follower1EntityOwnershipService = new DistributedEntityOwnershipService(follower1DistributedDataStore, EntityOwnerSelectionStrategyConfig.newBuilder().build()); - follower1EntityOwnershipService.start(); + startFollower1Service(); - follower2EntityOwnershipService = new DistributedEntityOwnershipService(follower2DistributedDataStore, EntityOwnerSelectionStrategyConfig.newBuilder().build()); + follower2EntityOwnershipService = new DistributedEntityOwnershipService(follower2DistributedDataStore, + EntityOwnerSelectionStrategyConfig.newBuilder().build()); follower2EntityOwnershipService.start(); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), - DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME); + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME); + } + + private void startFollower1Service() { + follower1EntityOwnershipService = new DistributedEntityOwnershipService(follower1DistributedDataStore, + EntityOwnerSelectionStrategyConfig.newBuilder().build()); + follower1EntityOwnershipService.start(); + } + + private void startLeaderService() { + leaderEntityOwnershipService = new DistributedEntityOwnershipService(leaderDistributedDataStore, + EntityOwnerSelectionStrategyConfig.newBuilder().build()); + leaderEntityOwnershipService.start(); + } + + private void initFollower1Datastore(String type, String moduleConfig) { + follower1TestKit = new IntegrationTestKit(follower1System, followerDatastoreContextBuilder); + follower1DistributedDataStore = follower1TestKit.setupDistributedDataStore( + type, moduleConfig, false, SCHEMA_CONTEXT); + } + + private void initLeaderDatastore(String type, String moduleConfig) { + leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder); + leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore( + type, moduleConfig, false, SCHEMA_CONTEXT); } @Test public void test() throws Exception { + startAllSystems(); initDatastores("test"); leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener); @@ -275,6 +319,7 @@ public class DistributedEntityOwnershipIntegrationTest { */ @Test public void testCloseCandidateRegistrationInQuickSuccession() throws CandidateAlreadyRegisteredException { + startAllSystems(); initDatastores("testCloseCandidateRegistrationInQuickSuccession"); leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener); @@ -318,6 +363,46 @@ public class DistributedEntityOwnershipIntegrationTest { assertFalse(follower2ChangeCaptor.getAllValues().get(follower2ChangeCaptor.getAllValues().size()-1).hasOwner()); } + /** + * Tests bootstrapping the entity-ownership shard when there's no shards initially configured for local + * member. The entity-ownership shard is initially created as inactive (ie remains a follower), requiring + * an AddShardReplica request to join it to an existing leader. + */ + @Test + public void testEntityOwnershipShardBootstrapping() throws Throwable { + startLeaderSystem(); + startFollower1System(); + String type = "testEntityOwnershipShardBootstrapping"; + initLeaderDatastore(type, MODULE_SHARDS_MEMBER_1_CONFIG); + initFollower1Datastore(type, MODULE_SHARDS_MEMBER_1_CONFIG); + + leaderDistributedDataStore.waitTillReady(); + follower1DistributedDataStore.waitTillReady(); + + startLeaderService(); + startFollower1Service(); + + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME); + + leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener); + + // Register a candidate for follower1 - should get queued since follower1 has no leader + follower1EntityOwnershipService.registerCandidate(ENTITY1); + verify(leaderMockListener, timeout(300).never()).ownershipChanged(ownershipChange(ENTITY1)); + + // Add replica in follower1 + AddShardReplica addReplica = new AddShardReplica(ENTITY_OWNERSHIP_SHARD_NAME); + follower1DistributedDataStore.getActorContext().getShardManager().tell(addReplica , follower1TestKit.getRef()); + Object reply = follower1TestKit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), Success.class, Failure.class); + if(reply instanceof Failure) { + throw ((Failure)reply).cause(); + } + + // The queued candidate registration should proceed + verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1)); + + } + private static void verifyGetOwnershipState(DistributedEntityOwnershipService service, Entity entity, boolean isOwner, boolean hasOwner) { Optional state = service.getOwnershipState(entity); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java index 75fa987544..100e6dec01 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java @@ -26,7 +26,9 @@ import akka.actor.PoisonPill; import akka.actor.Props; import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -86,12 +88,18 @@ public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnersh DatastoreContext datastoreContext = DatastoreContext.newBuilder().dataStoreType(dataStoreType). shardInitializationTimeout(10, TimeUnit.SECONDS).build(); - Configuration configuration = new ConfigurationImpl(new ModuleShardConfigProvider() { + ModuleShardConfigProvider configProvider = new ModuleShardConfigProvider() { @Override public Map retrieveModuleConfigs(Configuration configuration) { return Collections.emptyMap(); } - }); + }; + Configuration configuration = new ConfigurationImpl(configProvider) { + @Override + public Collection getUniqueMemberNamesForAllShards() { + return Sets.newHashSet("member-1"); + } + }; DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class); Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-default-member-1.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-default-member-1.conf new file mode 100644 index 0000000000..627d232253 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-default-member-1.conf @@ -0,0 +1,13 @@ +module-shards = [ + { + name = "default" + shards = [ + { + name="default", + replicas = [ + "member-1" + ] + } + ] + } +]