private final String txnDispatcherPath;
- protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+ protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- super(name.toString(), mapPeerAddresses(peerAddresses),
- Optional.of(datastoreContext.getShardRaftConfig()));
+ super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
this.name = name.toString();
this.datastoreContext = datastoreContext;
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
}
- private static Map<String, String> mapPeerAddresses(
- final Map<ShardIdentifier, String> peerAddresses) {
- Map<String, String> map = new HashMap<>();
-
- for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
- .entrySet()) {
- map.put(entry.getKey().toString(), entry.getValue());
- }
-
- return map;
- }
-
public static Props props(final ShardIdentifier name,
- final Map<ShardIdentifier, String> peerAddresses,
+ final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
Preconditions.checkNotNull(name, "name should not be null");
Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
private static final long serialVersionUID = 1L;
final ShardIdentifier name;
- final Map<ShardIdentifier, String> peerAddresses;
+ final Map<String, String> peerAddresses;
final DatastoreContext datastoreContext;
final SchemaContext schemaContext;
- ShardCreator(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+ ShardCreator(final ShardIdentifier name, final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
this.name = name;
this.peerAddresses = peerAddresses;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.Address;
+import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.japi.Procedure;
import akka.persistence.RecoveryCompleted;
import akka.persistence.RecoveryFailure;
+import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
*/
public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
- private final Logger LOG = LoggerFactory.getLogger(getClass());
+ private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
// Stores a mapping between a member name and the address of the member
// Member names look like "member-1", "member-2" etc and are as specified
onRoleChangeNotification((RoleChangeNotification) message);
} else if(message instanceof FollowerInitialSyncUpStatus){
onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
- } else{
+ } else if(message instanceof ShardNotInitializedTimeout) {
+ onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
+ } else if(message instanceof LeaderStateChanged) {
+ onLeaderStateChanged((LeaderStateChanged)message);
+ } else {
unknownMessage(message);
}
}
+ private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) {
+ LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
+
+ ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
+ if(shardInformation != null) {
+ shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
+ } else {
+ LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
+ }
+ }
+
+ private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
+ ShardInformation shardInfo = message.getShardInfo();
+
+ LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
+ shardInfo.getShardId());
+
+ shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
+
+ if(!shardInfo.isShardInitialized()) {
+ message.getSender().tell(new ActorNotInitialized(), getSelf());
+ } else {
+ message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
+ }
+ }
+
private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
- LOG.info("Received follower initial sync status for {} status sync done {}", status.getName(),
- status.isInitialSyncDone());
+ LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
+ status.getName(), status.isInitialSyncDone());
ShardInformation shardInformation = findShardInformation(status.getName());
}
private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
- LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(),
+ LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
roleChanged.getOldRole(), roleChanged.getNewRole());
ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
shardInformation.setRole(roleChanged.getNewRole());
if (isReady()) {
- LOG.info("All Shards are ready - data store {} is ready, available count is {}", type,
- waitTillReadyCountdownLatch.getCount());
+ LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+ persistenceId(), type, waitTillReadyCountdownLatch.getCount());
waitTillReadyCountdownLatch.countDown();
}
private boolean isReady() {
boolean isReady = true;
for (ShardInformation info : localShards.values()) {
- if(RaftState.Candidate.name().equals(info.getRole()) || Strings.isNullOrEmpty(info.getRole())){
+ if(!info.isShardReady()){
isReady = false;
break;
}
if (shardId.getShardName() == null) {
return;
}
+
markShardAsInitialized(shardId.getShardName());
}
private void markShardAsInitialized(String shardName) {
LOG.debug("Initializing shard [{}]", shardName);
+
ShardInformation shardInformation = localShards.get(shardName);
if (shardInformation != null) {
shardInformation.setActorInitialized();
+
+ shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
}
}
return;
}
- sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
+ sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
@Override
public Object get() {
return new LocalShardFound(shardInformation.getActor());
});
}
- private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
- final Supplier<Object> messageSupplier) {
- if (!shardInformation.isShardInitialized()) {
- if(waitUntilInitialized) {
+ private void sendResponse(ShardInformation shardInformation, boolean doWait,
+ boolean wantShardReady, final Supplier<Object> messageSupplier) {
+ if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
+ if(doWait) {
final ActorRef sender = getSender();
final ActorRef self = self();
- shardInformation.addRunnableOnInitialized(new Runnable() {
+
+ Runnable replyRunnable = new Runnable() {
@Override
public void run() {
sender.tell(messageSupplier.get(), self);
}
- });
- } else {
+ };
+
+ OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
+ new OnShardInitialized(replyRunnable);
+
+ shardInformation.addOnShardInitialized(onShardInitialized);
+
+ Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
+ datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
+ new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
+ getContext().dispatcher(), getSelf());
+
+ onShardInitialized.setTimeoutSchedule(timeoutSchedule);
+
+ } else if (!shardInformation.isShardInitialized()) {
getSender().tell(new ActorNotInitialized(), getSelf());
+ } else {
+ getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
}
return;
getSender().tell(messageSupplier.get(), getSelf());
}
+ private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
+ return new NoShardLeaderException(String.format(
+ "Could not find a leader for shard %s. This typically happens when the system is coming up or " +
+ "recovering and a leader is being elected. Try again later.", shardId));
+ }
+
private void memberRemoved(ClusterEvent.MemberRemoved message) {
memberNameToAddress.remove(message.member().roles().head());
}
for(ShardInformation info : localShards.values()){
String shardName = info.getShardName();
- info.updatePeerAddress(getShardIdentifier(memberName, shardName),
- getShardActorPath(shardName, memberName));
+ info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(),
+ getShardActorPath(shardName, memberName), getSelf());
}
}
LOG.debug("Sending new SchemaContext to Shards");
for (ShardInformation info : localShards.values()) {
if (info.getActor() == null) {
- info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
- info.getPeerAddresses(), datastoreContext, schemaContext)
- .withDispatcher(shardDispatcherPath), info.getShardId().toString()));
+ info.setActor(newShardActor(schemaContext, info));
} else {
info.getActor().tell(message, getSelf());
}
- info.getActor().tell(new RegisterRoleChangeListener(), self());
}
}
}
+ @VisibleForTesting
+ protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
+ return getContext().actorOf(Shard.props(info.getShardId(),
+ info.getPeerAddresses(), datastoreContext, schemaContext)
+ .withDispatcher(shardDispatcherPath), info.getShardId().toString());
+ }
+
private void findPrimary(FindPrimary message) {
- String shardName = message.getShardName();
+ final String shardName = message.getShardName();
// First see if the there is a local replica for the shard
final ShardInformation info = localShards.get(shardName);
if (info != null) {
- sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
+ sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
@Override
public Object get() {
- return new PrimaryFound(info.getActorPath().toString()).toSerializable();
+ Object found = new PrimaryFound(info.getSerializedLeaderActor()).toSerializable();
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Found primary for {}: {}", shardName, found);
+ }
+
+ return found;
}
});
List<String> localShardActorNames = new ArrayList<>();
for(String shardName : memberShardNames){
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
- Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
+ Map<String, String> peerAddresses = getPeerAddresses(shardName);
localShardActorNames.add(shardId.toString());
localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
}
* @param shardName
* @return
*/
- private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
+ private Map<String, String> getPeerAddresses(String shardName){
- Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
+ Map<String, String> peerAddresses = new HashMap<>();
- List<String> members =
- this.configuration.getMembersFromShardName(shardName);
+ List<String> members = this.configuration.getMembersFromShardName(shardName);
String currentMemberName = this.cluster.getCurrentMemberName();
for(String memberName : members){
if(!currentMemberName.equals(memberName)){
- ShardIdentifier shardId = getShardIdentifier(memberName,
- shardName);
- String path =
- getShardActorPath(shardName, currentMemberName);
- peerAddresses.put(shardId, path);
+ ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+ String path = getShardActorPath(shardName, currentMemberName);
+ peerAddresses.put(shardId.toString(), path);
}
}
return peerAddresses;
return mBean;
}
- private class ShardInformation {
+ @VisibleForTesting
+ protected static class ShardInformation {
private final ShardIdentifier shardId;
private final String shardName;
private ActorRef actor;
private ActorPath actorPath;
- private final Map<ShardIdentifier, String> peerAddresses;
+ private final Map<String, String> peerAddresses;
// flag that determines if the actor is ready for business
private boolean actorInitialized = false;
private boolean followerSyncStatus = false;
- private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
+ private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
private String role ;
+ private String leaderId;
private ShardInformation(String shardName, ShardIdentifier shardId,
- Map<ShardIdentifier, String> peerAddresses) {
+ Map<String, String> peerAddresses) {
this.shardName = shardName;
this.shardId = shardId;
this.peerAddresses = peerAddresses;
return shardId;
}
- Map<ShardIdentifier, String> getPeerAddresses() {
+ Map<String, String> getPeerAddresses() {
return peerAddresses;
}
- void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
+ void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
LOG.info("updatePeerAddress for peer {} with address {}", peerId,
peerAddress);
if(peerAddresses.containsKey(peerId)){
peerId, peerAddress, actor.path());
}
- actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf());
+ actor.tell(new PeerAddressResolved(peerId.toString(), peerAddress), sender);
}
+
+ notifyOnShardInitializedCallbacks();
}
}
+ boolean isShardReady() {
+ return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
+ }
+
+ boolean isShardReadyWithLeaderId() {
+ return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId));
+ }
+
boolean isShardInitialized() {
return getActor() != null && actorInitialized;
}
+ boolean isLeader() {
+ return Objects.equal(leaderId, shardId.toString());
+ }
+
+ String getSerializedLeaderActor() {
+ if(isLeader()) {
+ return Serialization.serializedActorPath(getActor());
+ } else {
+ return peerAddresses.get(leaderId);
+ }
+ }
+
void setActorInitialized() {
+ LOG.debug("Shard {} is initialized", shardId);
+
this.actorInitialized = true;
- for(Runnable runnable: runnablesOnInitialized) {
- runnable.run();
+ notifyOnShardInitializedCallbacks();
+ }
+
+ private void notifyOnShardInitializedCallbacks() {
+ if(onShardInitializedSet.isEmpty()) {
+ return;
}
- runnablesOnInitialized.clear();
+ boolean ready = isShardReadyWithLeaderId();
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
+ ready ? "ready" : "initialized", onShardInitializedSet.size());
+ }
+
+ Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
+ while(iter.hasNext()) {
+ OnShardInitialized onShardInitialized = iter.next();
+ if(!(onShardInitialized instanceof OnShardReady) || ready) {
+ iter.remove();
+ onShardInitialized.getTimeoutSchedule().cancel();
+ onShardInitialized.getReplyRunnable().run();
+ }
+ }
}
- void addRunnableOnInitialized(Runnable runnable) {
- runnablesOnInitialized.add(runnable);
+ void addOnShardInitialized(OnShardInitialized onShardInitialized) {
+ onShardInitializedSet.add(onShardInitialized);
}
- public void setRole(String newRole) {
- this.role = newRole;
+ void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
+ onShardInitializedSet.remove(onShardInitialized);
}
- public String getRole(){
- return this.role;
+ void setRole(String newRole) {
+ this.role = newRole;
+
+ notifyOnShardInitializedCallbacks();
}
- public void setFollowerSyncStatus(boolean syncStatus){
+ void setFollowerSyncStatus(boolean syncStatus){
this.followerSyncStatus = syncStatus;
}
- public boolean isInSync(){
+ boolean isInSync(){
if(RaftState.Follower.name().equals(this.role)){
return followerSyncStatus;
} else if(RaftState.Leader.name().equals(this.role)){
return false;
}
+ void setLeaderId(String leaderId) {
+ this.leaderId = leaderId;
+
+ notifyOnShardInitializedCallbacks();
+ }
}
private static class ShardManagerCreator implements Creator<ShardManager> {
}
}
+ private static class OnShardInitialized {
+ private final Runnable replyRunnable;
+ private Cancellable timeoutSchedule;
+
+ OnShardInitialized(Runnable replyRunnable) {
+ this.replyRunnable = replyRunnable;
+ }
+
+ Runnable getReplyRunnable() {
+ return replyRunnable;
+ }
+
+ Cancellable getTimeoutSchedule() {
+ return timeoutSchedule;
+ }
+
+ void setTimeoutSchedule(Cancellable timeoutSchedule) {
+ this.timeoutSchedule = timeoutSchedule;
+ }
+ }
+
+ private static class OnShardReady extends OnShardInitialized {
+ OnShardReady(Runnable replyRunnable) {
+ super(replyRunnable);
+ }
+ }
+
+ private static class ShardNotInitializedTimeout {
+ private final ActorRef sender;
+ private final ShardInformation shardInfo;
+ private final OnShardInitialized onShardInitialized;
+
+ ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
+ this.sender = sender;
+ this.shardInfo = shardInfo;
+ this.onShardInitialized = onShardInitialized;
+ }
+
+ ActorRef getSender() {
+ return sender;
+ }
+
+ ShardInformation getShardInfo() {
+ return shardInfo;
+ }
+
+ OnShardInitialized getOnShardInitialized() {
+ return onShardInitialized;
+ }
+ }
+
static class SchemaContextModules implements Serializable {
private static final long serialVersionUID = -8884620101025936590L;
@Override
public void onComplete(Throwable failure, ActorSelection primaryShard) {
if(failure != null) {
- newTxFutureCallback.onComplete(failure, null);
+ newTxFutureCallback.createTransactionContext(failure, null);
} else {
newTxFutureCallback.setPrimaryShard(primaryShard);
}
if(transactionType == TransactionType.WRITE_ONLY &&
actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
- LOG.debug("Tx {} Primary shard found - creating WRITE_ONLY transaction context", identifier);
+ LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
+ identifier, primaryShard);
// For write-only Tx's we prepare the transaction modifications directly on the shard actor
// to avoid the overhead of creating a separate transaction actor.
* Performs a CreateTransaction try async.
*/
private void tryCreateTransaction() {
- LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} Primary shard {} found - trying create transaction", identifier, primaryShard);
+ }
Object serializedCreateMessage = new CreateTransaction(identifier.toString(),
TransactionProxy.this.transactionType.ordinal(),
}
}
+ createTransactionContext(failure, response);
+ }
+
+ private void createTransactionContext(Throwable failure, Object response) {
// Mainly checking for state violation here to perform a volatile read of "initialized" to
// ensure updates to operationLimter et al are visible to this thread (ie we're doing
// "piggy-back" synchronization here).
public static final Class<FindPrimary> SERIALIZABLE_CLASS = FindPrimary.class;
private final String shardName;
- private final boolean waitUntilInitialized;
+ private final boolean waitUntilReady;
- public FindPrimary(String shardName, boolean waitUntilInitialized){
+ public FindPrimary(String shardName, boolean waitUntilReady){
Preconditions.checkNotNull(shardName, "shardName should not be null");
this.shardName = shardName;
- this.waitUntilInitialized = waitUntilInitialized;
+ this.waitUntilReady = waitUntilReady;
}
public String getShardName() {
return shardName;
}
- public boolean isWaitUntilInitialized() {
- return waitUntilInitialized;
+ public boolean isWaitUntilReady() {
+ return waitUntilReady;
}
@Override
public static FindPrimary fromSerializable(Object message){
return (FindPrimary) message;
}
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("FindPrimary [shardName=").append(shardName).append(", waitUntilReady=").append(waitUntilReady)
+ .append("]");
+ return builder.toString();
+ }
}
package org.opendaylight.controller.cluster.datastore.messages;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
public class PeerAddressResolved {
- private final ShardIdentifier peerId;
+ private final String peerId;
private final String peerAddress;
- public PeerAddressResolved(ShardIdentifier peerId, String peerAddress) {
+ public PeerAddressResolved(String peerId, String peerAddress) {
this.peerId = peerId;
this.peerAddress = peerAddress;
}
- public ShardIdentifier getPeerId() {
+ public String getPeerId() {
return peerId;
}
import akka.actor.PoisonPill;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
+import akka.dispatch.OnComplete;
import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
import com.codahale.metrics.JmxReporter;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
private final int transactionOutstandingOperationLimit;
private Timeout transactionCommitOperationTimeout;
+ private Timeout shardInitializationTimeout;
private final Dispatchers dispatchers;
- private final Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
+ private Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
private volatile SchemaContext schemaContext;
private volatile boolean updated;
this.dispatchers = new Dispatchers(actorSystem.dispatchers());
setCachedProperties();
- primaryShardActorSelectionCache = CacheBuilder.newBuilder()
- .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
- .build();
-
- operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
- operationTimeout = new Timeout(operationDuration);
- transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
- TimeUnit.SECONDS));
Address selfAddress = clusterWrapper.getSelfAddress();
if (selfAddress != null && !selfAddress.host().isEmpty()) {
transactionCommitOperationTimeout = new Timeout(Duration.create(
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
+
+ shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
+
+ primaryShardActorSelectionCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
+ .build();
}
public DatastoreContext getDatastoreContext() {
return schemaContext;
}
- /**
- * Finds the primary shard for the given shard name
- *
- * @param shardName
- * @return
- */
- public Optional<ActorSelection> findPrimaryShard(String shardName) {
- String path = findPrimaryPathOrNull(shardName);
- if (path == null){
- return Optional.absent();
- }
- return Optional.of(actorSystem.actorSelection(path));
- }
-
public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
if(ret != null){
return ret;
}
Future<Object> future = executeOperationAsync(shardManager,
- new FindPrimary(shardName, true).toSerializable(),
- datastoreContext.getShardInitializationTimeout());
+ new FindPrimary(shardName, true).toSerializable(), shardInitializationTimeout);
return future.transform(new Mapper<Object, ActorSelection>() {
@Override
} else if(response instanceof PrimaryNotFound) {
throw new PrimaryNotFoundException(
String.format("No primary shard found for %S.", shardName));
+ } else if(response instanceof NoShardLeaderException) {
+ throw (NoShardLeaderException)response;
}
throw new UnknownMessageException(String.format(
*/
public Future<ActorRef> findLocalShardAsync( final String shardName) {
Future<Object> future = executeOperationAsync(shardManager,
- new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
+ new FindLocalShard(shardName, true), shardInitializationTimeout);
return future.map(new Mapper<Object, ActorRef>() {
@Override
}, getClientDispatcher());
}
- private String findPrimaryPathOrNull(String shardName) {
- Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
-
- if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
- PrimaryFound found = PrimaryFound.fromSerializable(result);
-
- LOG.debug("Primary found {}", found.getPrimaryPath());
- return found.getPrimaryPath();
-
- } else if (result.getClass().equals(ActorNotInitialized.class)){
- throw new NotInitializedException(
- String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
- );
-
- } else {
- return null;
- }
- }
-
-
/**
* Executes an operation on a local actor and wait for it's response
*
*
* @param message
*/
- public void broadcast(Object message){
- for(String shardName : configuration.getAllShardNames()){
-
- Optional<ActorSelection> primary = findPrimaryShard(shardName);
- if (primary.isPresent()) {
- primary.get().tell(message, ActorRef.noSender());
- } else {
- LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
- message.getClass().getSimpleName(), shardName);
- }
+ public void broadcast(final Object message){
+ for(final String shardName : configuration.getAllShardNames()){
+
+ Future<ActorSelection> primaryFuture = findPrimaryShardAsync(shardName);
+ primaryFuture.onComplete(new OnComplete<ActorSelection>() {
+ @Override
+ public void onComplete(Throwable failure, ActorSelection primaryShard) {
+ if(failure != null) {
+ LOG.warn("broadcast failed to send message {} to shard {}: {}",
+ message.getClass().getSimpleName(), shardName, failure);
+ } else {
+ primaryShard.tell(message, ActorRef.noSender());
+ }
+ }
+ }, getClientDispatcher());
}
}
}
protected Props newShardProps() {
- return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+ return Shard.props(shardID, Collections.<String,String>emptyMap(),
newDatastoreContext(), SCHEMA_CONTEXT);
}
Creator<Shard> creator = new Creator<Shard>() {
@Override
public Shard create() throws Exception {
- return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
newDatastoreContext(), SCHEMA_CONTEXT) {
@Override
protected void onRecoveryComplete() {
}};
}
- @Test
- public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{
+ private void testTransactionWritesWithShardNotInitiallyReady(final boolean writeOnly) throws Exception {
new IntegrationTestKit(getSystem()) {{
String testName = "testTransactionWritesWithShardNotInitiallyReady";
String shardName = "test-1";
// Create the write Tx
- // TODO - we'll want to test this with write-only as well when FindPrimary returns the leader shard.
- final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction();
+ final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
+ dataStore.newReadWriteTransaction();
assertNotNull("newReadWriteTransaction returned null", writeTx);
// Do some modification operations and ready the Tx on a separate thread.
}
@Test
- public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{
+ public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
+ datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+ testTransactionWritesWithShardNotInitiallyReady(true);
+ }
+
+ @Test
+ public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
+ testTransactionWritesWithShardNotInitiallyReady(false);
+ }
+
+ @Test
+ public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
new IntegrationTestKit(getSystem()) {{
String testName = "testTransactionReadsWithShardNotInitiallyReady";
String shardName = "test-1";
}};
}
- @Test(expected=NoShardLeaderException.class)
- public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{
+ private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
new IntegrationTestKit(getSystem()) {{
String testName = "testTransactionCommitFailureWithNoShardLeader";
String shardName = "test-1";
// by setting the election timeout, which is based on the heartbeat interval, really high.
datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
+ datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
// Set the leader election timeout low for the test.
// Create the write Tx.
- final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction();
+ final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
+ dataStore.newReadWriteTransaction();
assertNotNull("newReadWriteTransaction returned null", writeTx);
// Do some modifications and ready the Tx on a separate thread.
}};
}
+ @Test(expected=NoShardLeaderException.class)
+ public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
+ datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+ testTransactionCommitFailureWithNoShardLeader(true);
+ }
+
+ @Test(expected=NoShardLeaderException.class)
+ public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
+ testTransactionCommitFailureWithNoShardLeader(false);
+ }
+
@Test
public void testTransactionAbort() throws Exception{
System.setProperty("shard.persistent", "true");
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
public class ShardManagerTest extends AbstractActorTest {
private static int ID_COUNTER = 1;
@Mock
private static CountDownLatch ready;
- private static ActorRef mockShardActor;
+ private static TestActorRef<MessageCollectorActor> mockShardActor;
+
+ private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
+ dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
@Before
public void setUp() {
InMemoryJournal.clear();
if(mockShardActor == null) {
- String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
- mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
+ String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
+ mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name);
}
+
+ mockShardActor.underlyingActor().clear();
}
@After
}
private Props newShardMgrProps() {
- DatastoreContext.Builder builder = DatastoreContext.newBuilder();
- builder.dataStoreType(shardMrgIDSuffix);
return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
- builder.build(), ready);
+ datastoreContextBuilder.build(), ready);
+ }
+
+ private Props newPropsShardMgrWithMockShardActor() {
+ Creator<ShardManager> creator = new Creator<ShardManager>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public ShardManager create() throws Exception {
+ return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
+ datastoreContextBuilder.build(), ready) {
+ @Override
+ protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+ return mockShardActor;
+ }
+ };
+ }
+ };
+
+ return Props.create(new DelegatingShardManagerCreator(creator));
}
@Test
public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
- expectMsgEquals(duration("5 seconds"),
- new PrimaryNotFound("non-existent").toSerializable());
+ expectMsgEquals(duration("5 seconds"), new PrimaryNotFound("non-existent").toSerializable());
}};
}
@Test
- public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
+ public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
+ shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef());
+
+ MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
+ shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
+ RaftState.Leader.name())), mockShardActor);
+
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
- expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
+ primaryFound.getPrimaryPath().contains("member-1-shard-default"));
}};
}
@Test
- public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
+ public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+ MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
+
+ String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.tell(new RoleChangeNotification(memberId1,
+ RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+ shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
+ primaryFound.getPrimaryPath().contains("member-2-shard-default"));
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
}
@Test
- public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
+ public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+
+ expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.tell(new RoleChangeNotification(memberId,
+ RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+
+ expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+
+ shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
+ primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
// We're passing waitUntilInitialized = true to FindPrimary so the response should be
- // delayed until we send ActorInitialized.
- Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
- new Timeout(5, TimeUnit.SECONDS));
+ // delayed until we send ActorInitialized and RoleChangeNotification.
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+
+ expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
shardManager.tell(new ActorInitialized(), mockShardActor);
- Object resp = Await.result(future, duration("5 seconds"));
- assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
+ expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.tell(new RoleChangeNotification(memberId,
+ RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
+
+ expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+
+ shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
+
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
+ primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+
+ expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+
+ expectMsgClass(duration("2 seconds"), ActorNotInitialized.class);
+
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+ shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
+ null, RaftState.Candidate.name()), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+
+ expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+
+ expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
}};
}
@Test
public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
@Test
public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
@Test
public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
@Test
public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
public void testRoleChangeNotificationReleaseReady() throws Exception {
new JavaTestKit(getSystem()) {
{
- final Props persistentProps = ShardManager.props(
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(true).build(), ready);
- final TestActorRef<ShardManager> shardManager =
- TestActorRef.create(getSystem(), persistentProps);
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
- shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+ memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
verify(ready, times(1)).countDown();
public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
new JavaTestKit(getSystem()) {
{
- final Props persistentProps = ShardManager.props(
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(true).build(), ready);
- final TestActorRef<ShardManager> shardManager =
- TestActorRef.create(getSystem(), persistentProps);
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
- shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
+ shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+ "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
verify(ready, never()).countDown();
import org.junit.Test;
import org.mockito.InOrder;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@Override
public Shard create() throws Exception {
- return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
newDatastoreContext(), SCHEMA_CONTEXT) {
@Override
public void onReceiveCommand(final Object message) throws Exception {
final CountDownLatch recoveryComplete = new CountDownLatch(1);
class TestShard extends Shard {
TestShard() {
- super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
+ super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
newDatastoreContext(), SCHEMA_CONTEXT);
}
Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
String address = "akka://foobar";
- shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
+ shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
assertEquals("getPeerAddresses", address,
((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
Creator<Shard> creator = new Creator<Shard>() {
@Override
public Shard create() throws Exception {
- return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
newDatastoreContext(), SCHEMA_CONTEXT) {
@Override
protected boolean isLeader() {
Creator<Shard> creator = new Creator<Shard>() {
@Override
public Shard create() throws Exception {
- return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
newDatastoreContext(), SCHEMA_CONTEXT) {
DelegatingPersistentDataProvider delegating;
final DatastoreContext persistentContext = DatastoreContext.newBuilder().
shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
- final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+ final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
persistentContext, SCHEMA_CONTEXT);
final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
- final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+ final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
nonPersistentContext, SCHEMA_CONTEXT);
new ShardTestKit(getSystem()) {{
}
private ActorRef createShard(){
- return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.<ShardIdentifier, String>emptyMap(), datastoreContext,
+ return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.<String, String>emptyMap(), datastoreContext,
TestModel.createTestContext()));
}
private ActorRef createShard(){
return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.<ShardIdentifier, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
+ Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
}
private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
verifyCohortFutures(proxy, TestException.class);
}
- @Test
- public void testReadyWithInitialCreateTransactionFailure() throws Exception {
-
- doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
- mockActorContext).findPrimaryShardAsync(anyString());
+ private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
+ doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- verifyCohortFutures(proxy, PrimaryNotFoundException.class);
+ verifyCohortFutures(proxy, toThrow.getClass());
+ }
+
+ @Test
+ public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
+ testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
+ }
+
+ @Test
+ public void testWriteOnlyTxWithNotInitializedException() throws Exception {
+ testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
+ }
+
+ @Test
+ public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
+ testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
}
@Test
SchemaContext schemaContext = TestModel.createTestContext();
Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
shardName("inventory").type("config").build(),
- Collections.<ShardIdentifier,String>emptyMap(),
+ Collections.<String,String>emptyMap(),
DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
SchemaContext schemaContext = TestModel.createTestContext();
Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
shardName("inventory").type("config").build(),
- Collections.<ShardIdentifier,String>emptyMap(),
+ Collections.<String,String>emptyMap(),
DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.ConfigFactory;
+import java.util.Arrays;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.time.StopWatch;
import org.junit.Assert;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
public class ActorContextTest extends AbstractActorTest{
+ static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
+
+ private static class TestMessage {
+ }
+
private static class MockShardManager extends UntypedActor {
private final boolean found;
private final ActorRef actorRef;
+ private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
private MockShardManager(boolean found, ActorRef actorRef){
}
@Override public void onReceive(Object message) throws Exception {
+ if(message instanceof FindPrimary) {
+ FindPrimary fp = (FindPrimary)message;
+ Object resp = findPrimaryResponses.get(fp.getShardName());
+ if(resp == null) {
+ log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
+ } else {
+ getSender().tell(resp, getSelf());
+ }
+
+ return;
+ }
+
if(found){
getSender().tell(new LocalShardFound(actorRef), getSelf());
} else {
}
}
+ void addFindPrimaryResp(String shardName, Object resp) {
+ findPrimaryResponses.put(shardName, resp);
+ }
+
private static Props props(final boolean found, final ActorRef actorRef){
return Props.create(new MockShardManagerCreator(found, actorRef) );
}
+ private static Props props(){
+ return Props.create(new MockShardManagerCreator() );
+ }
+
@SuppressWarnings("serial")
private static class MockShardManagerCreator implements Creator<MockShardManager> {
final boolean found;
final ActorRef actorRef;
+ MockShardManagerCreator() {
+ this.found = false;
+ this.actorRef = null;
+ }
+
MockShardManagerCreator(boolean found, ActorRef actorRef) {
this.found = found;
this.actorRef = actorRef;
@Test
public void testRateLimiting(){
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+ transactionCreationInitialRateLimit(155L).build();
ActorContext actorContext =
new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext);
+ mock(Configuration.class), dataStoreContext);
// Check that the initial value is being picked up from DataStoreContext
- assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
+ assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
actorContext.setTxCreationLimit(1.0);
@Test
public void testClientDispatcherIsGlobalDispatcher(){
-
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
-
ActorContext actorContext =
new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext);
+ mock(Configuration.class), DatastoreContext.newBuilder().build());
assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
@Test
public void testClientDispatcherIsNotGlobalDispatcher(){
-
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
-
ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
ActorContext actorContext =
new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext);
+ mock(Configuration.class), DatastoreContext.newBuilder().build());
assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
TestActorRef<MessageCollectorActor> shardManager =
TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext) {
+ mock(Configuration.class), dataStoreContext) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
TestActorRef<MessageCollectorActor> shardManager =
TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext) {
+ mock(Configuration.class), dataStoreContext) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
return Futures.successful((Object) new PrimaryNotFound("foobar"));
Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
assertNull(cached);
-
}
@Test
TestActorRef<MessageCollectorActor> shardManager =
TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext) {
+ mock(Configuration.class), dataStoreContext) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
return Futures.successful((Object) new ActorNotInitialized());
Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
assertNull(cached);
+ }
+
+ @Test
+ public void testBroadcast() {
+ new JavaTestKit(getSystem()) {{
+ ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
+ MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
+ shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()).toSerializable());
+ shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()).toSerializable());
+ shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
+
+ Configuration mockConfig = mock(Configuration.class);
+ doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
+ when(mockConfig).getAllShardNames();
+ ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+ mock(ClusterWrapper.class), mockConfig,
+ DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
+
+ actorContext.broadcast(new TestMessage());
+
+ expectFirstMatching(shardActorRef1, TestMessage.class);
+ expectFirstMatching(shardActorRef2, TestMessage.class);
+ }};
}
+ private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
+ int count = 5000 / 50;
+ for(int i = 0; i < count; i++) {
+ try {
+ T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
+ if(message != null) {
+ return message;
+ }
+ } catch (Exception e) {}
+
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ Assert.fail("Did not receive message of type " + clazz);
+ return null;
+ }
}
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
-
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
* </p>
*/
public class MessageCollectorActor extends UntypedActor {
- private List<Object> messages = new ArrayList<>();
+ private final List<Object> messages = new ArrayList<>();
@Override public void onReceive(Object message) throws Exception {
if(message instanceof String){
}
}
+ public void clear() {
+ messages.clear();
+ }
+
public static List<Object> getAllMessages(ActorRef actor) throws Exception {
FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
Timeout operationTimeout = new Timeout(operationDuration);
return output;
}
+ public static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
+ int count = 5000 / 50;
+ for(int i = 0; i < count; i++) {
+ try {
+ T message = (T) getFirstMatching(actor, clazz);
+ if(message != null) {
+ return message;
+ }
+ } catch (Exception e) {}
+
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ Assert.fail("Did not receive message of type " + clazz);
+ return null;
+ }
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
-import com.google.common.base.Optional;
public class MockActorContext extends ActorContext {
return executeRemoteOperationResponse;
}
- @Override public Optional<ActorSelection> findPrimaryShard(String shardName) {
- return Optional.absent();
- }
-
public void setExecuteShardOperationResponse(Object response){
executeShardOperationResponse = response;
}