String newRaftPolicy = configParams.
getCustomRaftPolicyImplementationClass();
- LOG.debug ("RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}",
+ LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(),
oldRaftPolicy, newRaftPolicy);
context.setConfigParams(configParams);
if (!Objects.equal(oldRaftPolicy, newRaftPolicy)) {
- //RaftPolicy is modifed for the Actor. Re-initialize its current behaviour
- initializeBehavior();
+ // The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower
+ // but transfer the previous leaderId so it doesn't immediately try to schedule an election. This
+ // avoids potential disruption. Otherwise, switch to Follower normally.
+ RaftActorBehavior behavior = currentBehavior.getDelegate();
+ if(behavior instanceof Follower) {
+ String previousLeaderId = ((Follower)behavior).getLeaderId();
+
+ LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId);
+
+ changeCurrentBehavior(new Follower(context, previousLeaderId));
+ } else {
+ initializeBehavior();
+ }
}
}
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
+import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.slf4j.Logger;
public class RaftActorContextImpl implements RaftActorContext {
peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
peerInfo.setAddress(peerAddress);
}
+ } else {
+ peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
}
return peerAddress;
private static final int SYNC_THRESHOLD = 10;
public Follower(RaftActorContext context) {
+ this(context, null);
+ }
+
+ public Follower(RaftActorContext context, String initialLeaderId) {
super(context, RaftState.Follower);
+ leaderId = initialLeaderId;
initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
if(context.getRaftPolicy().automaticElectionsEnabled()) {
- if (context.getPeerIds().isEmpty()) {
+ if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
actor().tell(ELECTION_TIMEOUT, actor());
} else {
scheduleElection(electionDuration());
logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+ leaderId = installSnapshot.getLeaderId();
+
if(snapshotTracker == null){
snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
}
PeerAddressResolver mockResolver = mock(PeerAddressResolver.class);
doReturn("peerAddress2").when(mockResolver).resolve("peer2");
+ doReturn("peerAddress3").when(mockResolver).resolve("peer3");
configParams.setPeerAddressResolver(mockResolver);
assertEquals("getPeerAddress", "peerAddress2", context.getPeerAddress("peer2"));
+ assertEquals("getPeerAddress", "peerAddress3", context.getPeerAddress("peer3"));
reset(mockResolver);
- assertEquals("getPeerAddress", "peerAddress2", context.getPeerAddress("peer2"));
assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1"));
+ assertEquals("getPeerAddress", "peerAddress2", context.getPeerAddress("peer2"));
verify(mockResolver, never()).resolve(anyString());
}
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
private DatastoreSnapshot restoreFromSnapshot;
+ private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
+
+ private final String id;
+
/**
*/
protected ShardManager(Builder builder) {
this.primaryShardInfoCache = builder.primaryShardInfoCache;
this.restoreFromSnapshot = builder.restoreFromSnapshot;
+ id = "shard-manager-" + type;
+
peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
// Subscribe this actor to cluster member events
private void onCreateShard(CreateShard createShard) {
Object reply;
try {
- ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
- if(localShards.containsKey(moduleShardConfig.getShardName())) {
- throw new IllegalStateException(String.format("Shard with name %s already exists",
- moduleShardConfig.getShardName()));
+ String shardName = createShard.getModuleShardConfig().getShardName();
+ if(localShards.containsKey(shardName)) {
+ reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
+ } else {
+ doCreateShard(createShard);
+ reply = new akka.actor.Status.Success(null);
}
+ } catch (Exception e) {
+ LOG.error("onCreateShard failed", e);
+ reply = new akka.actor.Status.Failure(e);
+ }
- configuration.addModuleShardConfiguration(moduleShardConfig);
+ if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+ getSender().tell(reply, getSelf());
+ }
+ }
- ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), moduleShardConfig.getShardName());
- Map<String, String> peerAddresses = getPeerAddresses(moduleShardConfig.getShardName()/*,
- moduleShardConfig.getShardMemberNames()*/);
+ private void doCreateShard(CreateShard createShard) {
+ ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+ String shardName = moduleShardConfig.getShardName();
- LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
- moduleShardConfig.getShardMemberNames(), peerAddresses);
+ configuration.addModuleShardConfiguration(moduleShardConfig);
- DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
- if(shardDatastoreContext == null) {
- shardDatastoreContext = newShardDatastoreContext(moduleShardConfig.getShardName());
- } else {
- shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
- peerAddressResolver).build();
- }
+ DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
+ if(shardDatastoreContext == null) {
+ shardDatastoreContext = newShardDatastoreContext(shardName);
+ } else {
+ shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
+ peerAddressResolver).build();
+ }
- ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
- shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
- localShards.put(info.getShardName(), info);
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
- mBean.addLocalShard(shardId.toString());
+ Map<String, String> peerAddresses;
+ boolean isActiveMember;
+ if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) {
+ peerAddresses = getPeerAddresses(shardName);
+ isActiveMember = true;
+ } else {
+ // The local member is not in the given shard member configuration. In this case we'll create
+ // the shard with no peers and with elections disabled so it stays as follower. A
+ // subsequent AddServer request will be needed to make it an active member.
+ isActiveMember = false;
+ peerAddresses = Collections.emptyMap();
+ shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
+ customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
+ }
- if(schemaContext != null) {
- info.setActor(newShardActor(schemaContext, info));
- }
+ LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
+ moduleShardConfig.getShardMemberNames(), peerAddresses);
- reply = new CreateShardReply();
- } catch (Exception e) {
- LOG.error("onCreateShard failed", e);
- reply = new akka.actor.Status.Failure(e);
- }
+ ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
+ shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
+ info.setActiveMember(isActiveMember);
+ localShards.put(info.getShardName(), info);
- if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
- getSender().tell(reply, getSelf());
+ mBean.addLocalShard(shardId.toString());
+
+ if(schemaContext != null) {
+ info.setActor(newShardActor(schemaContext, info));
}
}
// First see if the there is a local replica for the shard
final ShardInformation info = localShards.get(shardName);
- if (info != null) {
+ if (info != null && info.isActiveMember()) {
sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
@Override
public Object get() {
@Override
public String persistenceId() {
- return "shard-manager-" + type;
+ return id;
}
@VisibleForTesting
return mBean;
}
- private void checkLocalShardExists(final String shardName, final ActorRef sender) {
- if (localShards.containsKey(shardName)) {
- String msg = String.format("Local shard %s already exists", shardName);
+ private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
+ if (shardReplicaOperationsInProgress.contains(shardName)) {
+ String msg = String.format("A shard replica operation for %s is already in progress", shardName);
LOG.debug ("{}: {}", persistenceId(), msg);
- sender.tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+ sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+ return true;
}
+
+ return false;
}
private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();
- // verify the local shard replica is already available in the controller node
LOG.debug ("onAddShardReplica: {}", shardReplicaMsg);
- checkLocalShardExists(shardName, getSender());
-
// verify the shard with the specified name is present in the cluster configuration
if (!(this.configuration.isShardConfigured(shardName))) {
String msg = String.format("No module configuration exists for shard %s", shardName);
return;
}
- Map<String, String> peerAddresses = getPeerAddresses(shardName);
- if (peerAddresses.isEmpty()) {
- String msg = String.format("Cannot add replica for shard %s because no peer is available", shardName);
- LOG.debug ("{}: {}", persistenceId(), msg);
- getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
- return;
- }
-
Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
getShardInitializationTimeout().duration().$times(2));
final ActorRef sender = getSender();
- Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout);
+ Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
futureObj.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object response) {
if (failure != null) {
LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure);
sender.tell(new akka.actor.Status.Failure(new RuntimeException(
- String.format("Failed to find leader for shard %s", shardName), failure)),
- getSelf());
+ String.format("Failed to find leader for shard %s", shardName), failure)), getSelf());
} else {
- if (!(response instanceof RemotePrimaryShardFound)) {
+ if(response instanceof RemotePrimaryShardFound) {
+ RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
+ addShard (shardName, message, sender);
+ } else if(response instanceof LocalPrimaryShardFound) {
+ sendLocalReplicaAlreadyExistsReply(shardName, sender);
+ } else {
String msg = String.format("Failed to find leader for shard %s: received response: %s",
shardName, response);
LOG.debug ("{}: {}", persistenceId(), msg);
- sender.tell(new akka.actor.Status.Failure(new RuntimeException(msg)), getSelf());
- return;
+ sender.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable)response :
+ new RuntimeException(msg)), getSelf());
}
-
- RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
- addShard (shardName, message, sender);
}
}
}, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
}
+ private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
+ String msg = String.format("Local shard %s already exists", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
+ }
+
private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
- checkLocalShardExists(shardName, sender);
+ if(isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
- ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
- String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardInformation shardInfo;
+ final boolean removeShardOnFailure;
+ ShardInformation existingShardInfo = localShards.get(shardName);
+ if(existingShardInfo == null) {
+ removeShardOnFailure = true;
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+ DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
+ DisableElectionsRaftPolicy.class.getName()).build();
- DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
- DisableElectionsRaftPolicy.class.getName()).build();
+ shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
+ Shard.builder(), peerAddressResolver);
+ shardInfo.setActiveMember(false);
+ localShards.put(shardName, shardInfo);
+ shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+ } else {
+ removeShardOnFailure = false;
+ shardInfo = existingShardInfo;
+ }
- final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
- getPeerAddresses(shardName), datastoreContext,
- Shard.builder(), peerAddressResolver);
- shardInfo.setShardActiveMember(false);
- localShards.put(shardName, shardInfo);
- shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+ String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
//inform ShardLeader to add this shard as a replica by sending an AddServer message
LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
- response.getPrimaryPath(), shardId);
+ response.getPrimaryPath(), shardInfo.getShardId());
- Timeout addServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(4));
+ Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
+ duration());
Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
- new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout);
+ new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
futureObj.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object addServerResponse) {
+ shardReplicaOperationsInProgress.remove(shardName);
if (failure != null) {
LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
response.getPrimaryPath(), shardName, failure);
- // Remove the shard
- localShards.remove(shardName);
- if (shardInfo.getActor() != null) {
- shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
- }
-
- sender.tell(new akka.actor.Status.Failure(new RuntimeException(
- String.format("AddServer request to leader %s for shard %s failed",
- response.getPrimaryPath(), shardName), failure)), getSelf());
+ onAddServerFailure(shardName, String.format("AddServer request to leader %s for shard %s failed",
+ response.getPrimaryPath(), shardName), failure, sender, removeShardOnFailure);
} else {
AddServerReply reply = (AddServerReply)addServerResponse;
- onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath());
+ onAddServerReply(shardInfo, reply, sender, response.getPrimaryPath(), removeShardOnFailure);
}
}
- }, new Dispatchers(context().system().dispatchers()).
- getDispatcher(Dispatchers.DispatcherType.Client));
- return;
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
+ boolean removeShardOnFailure) {
+ if(removeShardOnFailure) {
+ ShardInformation shardInfo = localShards.remove(shardName);
+ if (shardInfo.getActor() != null) {
+ shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+ }
+ }
+
+ sender.tell(new akka.actor.Status.Failure(message == null ? failure :
+ new RuntimeException(message, failure)), getSelf());
}
- private void onAddServerReply (String shardName, ShardInformation shardInfo,
- AddServerReply replyMsg, ActorRef sender, String leaderPath) {
+ private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
+ String leaderPath, boolean removeShardOnFailure) {
+ String shardName = shardInfo.getShardName();
LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
// Make the local shard voting capable
shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
- shardInfo.setShardActiveMember(true);
+ shardInfo.setActiveMember(true);
persistShardList();
mBean.addLocalShard(shardInfo.getShardId().toString());
- sender.tell(new akka.actor.Status.Success(true), getSelf());
+ sender.tell(new akka.actor.Status.Success(null), getSelf());
+ } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
+ sendLocalReplicaAlreadyExistsReply(shardName, sender);
} else {
- LOG.warn ("{}: Leader failed to add shard replica {} with status {} - removing the local shard",
+ LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
persistenceId(), shardName, replyMsg.getStatus());
- //remove the local replica created
- localShards.remove(shardName);
- if (shardInfo.getActor() != null) {
- shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
- }
+ Exception failure;
switch (replyMsg.getStatus()) {
case TIMEOUT:
- sender.tell(new akka.actor.Status.Failure(new RuntimeException(
- String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
- leaderPath, shardName))), getSelf());
+ failure = new TimeoutException(String.format(
+ "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
+ "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+ leaderPath, shardName));
break;
case NO_LEADER:
- sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
- "There is no shard leader available for shard %s", shardName))), getSelf());
+ failure = createNoShardLeaderException(shardInfo.getShardId());
break;
default :
- sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
- "AddServer request to leader %s for shard %s failed with status %s",
- leaderPath, shardName, replyMsg.getStatus()))), getSelf());
+ failure = new RuntimeException(String.format(
+ "AddServer request to leader %s for shard %s failed with status %s",
+ leaderPath, shardName, replyMsg.getStatus()));
}
+
+ onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
}
}
}
private void persistShardList() {
- List<String> shardList = new ArrayList(localShards.keySet());
+ List<String> shardList = new ArrayList<>(localShards.keySet());
for (ShardInformation shardInfo : localShards.values()) {
- if (!shardInfo.isShardActiveMember()) {
+ if (!shardInfo.isActiveMember()) {
shardList.remove(shardInfo.getShardName());
}
}
private DatastoreContext datastoreContext;
private Shard.AbstractBuilder<?, ?> builder;
private final ShardPeerAddressResolver addressResolver;
- private boolean shardActiveStatus = true;
+ private boolean isActiveMember = true;
private ShardInformation(String shardName, ShardIdentifier shardId,
Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
this.leaderVersion = leaderVersion;
}
- void setShardActiveMember(boolean flag) {
- shardActiveStatus = flag;
+ boolean isActiveMember() {
+ return isActiveMember;
}
- boolean isShardActiveMember() {
- return shardActiveStatus;
+ void setActiveMember(boolean isActiveMember) {
+ this.isActiveMember = isActiveMember;
}
}
protected void onStateChanged() {
super.onStateChanged();
- commitCoordinator.onStateChanged(this, isLeader());
+ boolean isLeader = isLeader();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: onStateChanged: isLeader: {}, hasLeader: {}", persistenceId(), isLeader, hasLeader());
+ }
+
+ commitCoordinator.onStateChanged(this, isLeader);
}
@Override
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.cluster.datastore.messages;
+package org.opendaylight.controller.cluster.datastore.exceptions;
-/**
- * Reply message for CreateShard.
- *
- * @author Thomas Pantelis
- */
-public class CreateShardReply {
+public class AlreadyExistsException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public AlreadyExistsException(String message) {
+ super(message);
+ }
}
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.Status.Failure;
+import akka.actor.Status.Success;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.dispatch.Dispatchers;
import akka.japi.Creator;
import akka.pattern.Patterns;
import akka.persistence.RecoveryCompleted;
+import akka.serialization.Serialization;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
private static int ID_COUNTER = 1;
private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
- private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
+ private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
@Mock
private static CountDownLatch ready;
@Test
public void testOnRecoveryJournalIsCleaned() {
- InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
+ String persistenceID = "shard-manager-" + shardMrgIDSuffix;
+ InMemoryJournal.addEntry(persistenceID, 1L, new ShardManager.SchemaContextModules(
ImmutableSet.of("foo")));
- InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
+ InMemoryJournal.addEntry(persistenceID, 2L, new ShardManager.SchemaContextModules(
ImmutableSet.of("bar")));
- InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
+ InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
- new JavaTestKit(getSystem()) {{
- TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+ TestShardManager shardManager = newTestShardManager();
- shardManager.underlyingActor().waitForRecoveryComplete();
- InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
+ InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);
- // Journal entries up to the last one should've been deleted
- Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
- synchronized (journal) {
- assertEquals("Journal size", 0, journal.size());
- }
- }};
+ // Journal entries up to the last one should've been deleted
+ Map<Long, Object> journal = InMemoryJournal.get(persistenceID);
+ synchronized (journal) {
+ assertEquals("Journal size", 0, journal.size());
+ }
}
@Test
}
@Test
- public void testOnReceiveCreateShard() {
+ public void testOnCreateShard() {
new JavaTestKit(getSystem()) {{
datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
"foo", null, Arrays.asList("member-1", "member-5", "member-6"));
shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
- expectMsgClass(duration("5 seconds"), CreateShardReply.class);
+ expectMsgClass(duration("5 seconds"), Success.class);
shardManager.tell(new FindLocalShard("foo", true), getRef());
shardBuilder.getId());
assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
- // Send CreateShard with same name - should fail.
+ // Send CreateShard with same name - should return Success with a message.
+
+ shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
+
+ Success success = expectMsgClass(duration("5 seconds"), Success.class);
+ assertNotNull("Success status is null", success.status());
+ }};
+ }
+
+ @Test
+ public void testOnCreateShardWithLocalMemberNotInShardConfig() {
+ new JavaTestKit(getSystem()) {{
+ datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
+
+ ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
+ new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
+
+ Shard.Builder shardBuilder = Shard.builder();
+ ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
+ "foo", null, Arrays.asList("member-5", "member-6"));
shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
+ expectMsgClass(duration("5 seconds"), Success.class);
- expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+ shardManager.tell(new FindLocalShard("foo", true), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
+ assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(),
+ shardBuilder.getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
}};
}
@Test
- public void testOnReceiveCreateShardWithNoInitialSchemaContext() {
+ public void testOnCreateShardWithNoInitialSchemaContext() {
new JavaTestKit(getSystem()) {{
ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
"foo", null, Arrays.asList("member-1"));
shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
- expectMsgClass(duration("5 seconds"), CreateShardReply.class);
+ expectMsgClass(duration("5 seconds"), Success.class);
SchemaContext schemaContext = TestModel.createTestContext();
shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
}
@Test
- public void testAddShardReplicaForNonExistentShard() throws Exception {
+ public void testAddShardReplicaForNonExistentShardConfig() throws Exception {
new JavaTestKit(getSystem()) {{
ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
}};
}
- @Test
- public void testAddShardReplicaForAlreadyCreatedShard() throws Exception {
- new JavaTestKit(getSystem()) {{
- ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
- shardManager.tell(new AddShardReplica("default"), getRef());
- Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
- assertEquals("Failure obtained", true,
- (resp.cause() instanceof IllegalArgumentException));
- }};
- }
-
@Test
public void testAddShardReplica() throws Exception {
MockConfiguration mockConfig =
}
@Test
- public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
- MockConfiguration mockConfig =
- new MockConfiguration(ImmutableMap.<String, List<String>>builder().
- put("default", Arrays.asList("member-1", "member-2")).
- put("astronauts", Arrays.asList("member-2")).build());
+ public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ TestActorRef<ForwardingShardManager> shardManager = TestActorRef.create(getSystem(),
+ newPropsShardMgrWithMockShardActor(), shardMgrID);
- String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
- // Create an ActorSystem ShardManager actor for member-1.
- final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
- Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
- ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
- final TestActorRef<ForwardingShardManager> newReplicaShardManager = TestActorRef.create(system1,
- newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor,
- new ClusterWrapperImpl(system1), mockConfig), shardManagerID);
+ String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
+ AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
+ ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
+ Props.create(MockRespondActor.class, addServerReply), leaderId);
- new JavaTestKit(system1) {{
+ MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
+
+ String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.tell(new RoleChangeNotification(newReplicaId,
+ RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+ shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId, Optional.<DataTree>absent(),
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+
+ MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
+
+ Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
+
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ // Send message again to verify previous in progress state is cleared
+
+ shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+ resp = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
+
+ // Send message again with an AddServer timeout to verify the pre-existing shard actor isn't terminated.
+
+ shardManager.tell(newDatastoreContextFactory(datastoreContextBuilder.
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), getRef());
+ leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
+ shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+ expectMsgClass(duration("5 seconds"), Failure.class);
+
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ leaderShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)),
+ DataStoreVersions.CURRENT_VERSION), getRef());
+ shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
+ RaftState.Leader.name())), mockShardActor);
+
+ shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+ Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
+
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+ }};
+ }
+
+ @Test
+ public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
+
+ MockConfiguration mockConfig =
+ new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("astronauts", Arrays.asList("member-2")).build());
+
+ ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
+ TestActorRef<ForwardingShardManager> shardManager = TestActorRef.create(getSystem(),
+ newPropsShardMgrWithMockShardActor("newReplicaShardManager", mockNewReplicaShardActor,
+ new MockClusterWrapper(), mockConfig), shardMgrID);
+ shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
+ terminateWatcher.watch(mockNewReplicaShardActor);
+
+ shardManager.tell(new AddShardReplica("astronauts"), getRef());
+
+ AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
+ assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
+ addServerMsg.getNewServerId());
+ mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
+
+ Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
+
+ shardManager.tell(new FindLocalShard("astronauts", false), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+ terminateWatcher.expectTerminated(mockNewReplicaShardActor);
+
+ shardManager.tell(new AddShardReplica("astronauts"), getRef());
+ mockShardLeaderKit.expectMsgClass(AddServer.class);
+ mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
+ failure = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
+ }};
+ }
+
+ @Test
+ public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
+ JavaTestKit secondRequestKit = new JavaTestKit(getSystem());
+
+ MockConfiguration mockConfig =
+ new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("astronauts", Arrays.asList("member-2")).build());
+
+ TestActorRef<ForwardingShardManager> shardManager = TestActorRef.create(getSystem(),
+ newPropsShardMgrWithMockShardActor("newReplicaShardManager", mockShardActor,
+ new MockClusterWrapper(), mockConfig), shardMgrID);
+ shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ shardManager.tell(new AddShardReplica("astronauts"), getRef());
+
+ mockShardLeaderKit.expectMsgClass(AddServer.class);
+
+ shardManager.tell(new AddShardReplica("astronauts"), secondRequestKit.getRef());
+
+ secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
+ }};
+ }
+
+ @Test
+ public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("astronauts", Arrays.asList("member-2")).build());
+
+ ActorRef newReplicaShardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(
+ "shardManager", mockShardActor, new MockClusterWrapper(), mockConfig));
newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString());
- newReplicaShardManager.underlyingActor().waitForMemberUp();
newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
assertEquals("Failure obtained", true,
(resp.cause() instanceof RuntimeException));
}};
-
- JavaTestKit.shutdownActorSystem(system1);
}
@Test
put("people", Arrays.asList("member-1", "member-2")).build());
String[] restoredShards = {"default", "astronauts"};
ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
- InMemorySnapshotStore.addSnapshot(shardMgrID, snapshot);
+ InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
//create shardManager to come up with restored data
TestActorRef<TestShardManager> newRestoredShardManager = TestActorRef.create(getSystem(),
}
}
+ interface MessageInterceptor extends Function<Object, Object> {
+ boolean canIntercept(Object message);
+ }
+
+ private MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
+ return new MessageInterceptor(){
+ @Override
+ public Object apply(Object message) {
+ return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
+ }
+
+ @Override
+ public boolean canIntercept(Object message) {
+ return message instanceof FindPrimary;
+ }
+ };
+ }
+
private static class ForwardingShardManager extends ShardManager {
private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
private CountDownLatch memberUpReceived = new CountDownLatch(1);
private final String name;
private final CountDownLatch snapshotPersist = new CountDownLatch(1);
private ShardManagerSnapshot snapshot;
+ private volatile MessageInterceptor messageInterceptor;
public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) {
super(builder);
this.name = name;
}
+ void setMessageInterceptor(MessageInterceptor messageInterceptor) {
+ this.messageInterceptor = messageInterceptor;
+ }
+
+
@Override
public void handleCommand(Object message) throws Exception {
try{
- super.handleCommand(message);
+ if(messageInterceptor != null && messageInterceptor.canIntercept(message)) {
+ getSender().tell(messageInterceptor.apply(message), getSelf());
+ } else {
+ super.handleCommand(message);
+ }
} finally {
if(message instanceof FindPrimary) {
findPrimaryMessageReceived.countDown();
}
private static class MockRespondActor extends MessageCollectorActor {
+ static final String CLEAR_RESPONSE = "clear-response";
private volatile Object responseMsg;
+ @SuppressWarnings("unused")
+ public MockRespondActor() {
+ }
+
+ @SuppressWarnings("unused")
+ public MockRespondActor(Object responseMsg) {
+ this.responseMsg = responseMsg;
+ }
+
public void updateResponse(Object response) {
responseMsg = response;
}
if (message instanceof AddServer) {
if (responseMsg != null) {
getSender().tell(responseMsg, getSelf());
- responseMsg = null;
}
+ } if(message.equals(CLEAR_RESPONSE)) {
+ responseMsg = null;
}
}
}
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.entityownership.AbstractEntityOwnershipTest.ownershipChange;
+import static org.opendaylight.controller.cluster.datastore.entityownership.DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.AddressFromURIString;
+import akka.actor.Status.Failure;
+import akka.actor.Status.Success;
import akka.cluster.Cluster;
import akka.testkit.JavaTestKit;
import com.google.common.base.Function;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
+import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
public class DistributedEntityOwnershipIntegrationTest {
private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
private static final String MODULE_SHARDS_CONFIG = "module-shards-default.conf";
+ private static final String MODULE_SHARDS_MEMBER_1_CONFIG = "module-shards-default-member-1.conf";
private static final String ENTITY_TYPE1 = "entityType1";
private static final String ENTITY_TYPE2 = "entityType2";
private static final Entity ENTITY1 = new Entity(ENTITY_TYPE1, "entity1");
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
+ }
- leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
- Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
+ @After
+ public void tearDown() {
+ if(leaderSystem != null) {
+ JavaTestKit.shutdownActorSystem(leaderSystem);
+ }
- follower1System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
- Cluster.get(follower1System).join(MEMBER_1_ADDRESS);
+ if(follower1System != null) {
+ JavaTestKit.shutdownActorSystem(follower1System);
+ }
+
+ if(follower2System != null) {
+ JavaTestKit.shutdownActorSystem(follower2System);
+ }
+ }
+ private void startAllSystems() {
+ startLeaderSystem();
+ startFollower1System();
+ startFollower2System();
+ }
+
+ private void startFollower2System() {
follower2System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member3"));
Cluster.get(follower2System).join(MEMBER_1_ADDRESS);
}
- @After
- public void tearDown() {
- JavaTestKit.shutdownActorSystem(leaderSystem);
- JavaTestKit.shutdownActorSystem(follower1System);
- JavaTestKit.shutdownActorSystem(follower2System);
+ private void startFollower1System() {
+ follower1System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+ Cluster.get(follower1System).join(MEMBER_1_ADDRESS);
+ }
+
+ private void startLeaderSystem() {
+ leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+ Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
}
private void initDatastores(String type) {
- leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
- leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(
- type, MODULE_SHARDS_CONFIG, false, SCHEMA_CONTEXT);
+ initLeaderDatastore(type, MODULE_SHARDS_CONFIG);
- follower1TestKit = new IntegrationTestKit(follower1System, followerDatastoreContextBuilder);
- follower1DistributedDataStore = follower1TestKit.setupDistributedDataStore(
- type, MODULE_SHARDS_CONFIG, false, SCHEMA_CONTEXT);
+ initFollower1Datastore(type, MODULE_SHARDS_CONFIG);
follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder);
follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(
follower1DistributedDataStore.waitTillReady();
follower2DistributedDataStore.waitTillReady();
- leaderEntityOwnershipService = new DistributedEntityOwnershipService(leaderDistributedDataStore, EntityOwnerSelectionStrategyConfig.newBuilder().build());
- leaderEntityOwnershipService.start();
+ startLeaderService();
- follower1EntityOwnershipService = new DistributedEntityOwnershipService(follower1DistributedDataStore, EntityOwnerSelectionStrategyConfig.newBuilder().build());
- follower1EntityOwnershipService.start();
+ startFollower1Service();
- follower2EntityOwnershipService = new DistributedEntityOwnershipService(follower2DistributedDataStore, EntityOwnerSelectionStrategyConfig.newBuilder().build());
+ follower2EntityOwnershipService = new DistributedEntityOwnershipService(follower2DistributedDataStore,
+ EntityOwnerSelectionStrategyConfig.newBuilder().build());
follower2EntityOwnershipService.start();
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
- DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME);
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
+ }
+
+ private void startFollower1Service() {
+ follower1EntityOwnershipService = new DistributedEntityOwnershipService(follower1DistributedDataStore,
+ EntityOwnerSelectionStrategyConfig.newBuilder().build());
+ follower1EntityOwnershipService.start();
+ }
+
+ private void startLeaderService() {
+ leaderEntityOwnershipService = new DistributedEntityOwnershipService(leaderDistributedDataStore,
+ EntityOwnerSelectionStrategyConfig.newBuilder().build());
+ leaderEntityOwnershipService.start();
+ }
+
+ private void initFollower1Datastore(String type, String moduleConfig) {
+ follower1TestKit = new IntegrationTestKit(follower1System, followerDatastoreContextBuilder);
+ follower1DistributedDataStore = follower1TestKit.setupDistributedDataStore(
+ type, moduleConfig, false, SCHEMA_CONTEXT);
+ }
+
+ private void initLeaderDatastore(String type, String moduleConfig) {
+ leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
+ leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(
+ type, moduleConfig, false, SCHEMA_CONTEXT);
}
@Test
public void test() throws Exception {
+ startAllSystems();
initDatastores("test");
leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
*/
@Test
public void testCloseCandidateRegistrationInQuickSuccession() throws CandidateAlreadyRegisteredException {
+ startAllSystems();
initDatastores("testCloseCandidateRegistrationInQuickSuccession");
leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
assertFalse(follower2ChangeCaptor.getAllValues().get(follower2ChangeCaptor.getAllValues().size()-1).hasOwner());
}
+ /**
+ * Tests bootstrapping the entity-ownership shard when there's no shards initially configured for local
+ * member. The entity-ownership shard is initially created as inactive (ie remains a follower), requiring
+ * an AddShardReplica request to join it to an existing leader.
+ */
+ @Test
+ public void testEntityOwnershipShardBootstrapping() throws Throwable {
+ startLeaderSystem();
+ startFollower1System();
+ String type = "testEntityOwnershipShardBootstrapping";
+ initLeaderDatastore(type, MODULE_SHARDS_MEMBER_1_CONFIG);
+ initFollower1Datastore(type, MODULE_SHARDS_MEMBER_1_CONFIG);
+
+ leaderDistributedDataStore.waitTillReady();
+ follower1DistributedDataStore.waitTillReady();
+
+ startLeaderService();
+ startFollower1Service();
+
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
+
+ leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
+
+ // Register a candidate for follower1 - should get queued since follower1 has no leader
+ follower1EntityOwnershipService.registerCandidate(ENTITY1);
+ verify(leaderMockListener, timeout(300).never()).ownershipChanged(ownershipChange(ENTITY1));
+
+ // Add replica in follower1
+ AddShardReplica addReplica = new AddShardReplica(ENTITY_OWNERSHIP_SHARD_NAME);
+ follower1DistributedDataStore.getActorContext().getShardManager().tell(addReplica , follower1TestKit.getRef());
+ Object reply = follower1TestKit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), Success.class, Failure.class);
+ if(reply instanceof Failure) {
+ throw ((Failure)reply).cause();
+ }
+
+ // The queued candidate registration should proceed
+ verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1));
+
+ }
+
private static void verifyGetOwnershipState(DistributedEntityOwnershipService service, Entity entity,
boolean isOwner, boolean hasOwner) {
Optional<EntityOwnershipState> state = service.getOwnershipState(entity);
import akka.actor.Props;
import com.google.common.base.Function;
import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
DatastoreContext datastoreContext = DatastoreContext.newBuilder().dataStoreType(dataStoreType).
shardInitializationTimeout(10, TimeUnit.SECONDS).build();
- Configuration configuration = new ConfigurationImpl(new ModuleShardConfigProvider() {
+ ModuleShardConfigProvider configProvider = new ModuleShardConfigProvider() {
@Override
public Map<String, ModuleConfig> retrieveModuleConfigs(Configuration configuration) {
return Collections.emptyMap();
}
- });
+ };
+ Configuration configuration = new ConfigurationImpl(configProvider) {
+ @Override
+ public Collection<String> getUniqueMemberNamesForAllShards() {
+ return Sets.newHashSet("member-1");
+ }
+ };
DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
--- /dev/null
+module-shards = [
+ {
+ name = "default"
+ shards = [
+ {
+ name="default",
+ replicas = [
+ "member-1"
+ ]
+ }
+ ]
+ }
+]