From: Moiz Raja Date: Wed, 25 Mar 2015 16:15:58 +0000 (+0000) Subject: Merge "Bug 2194: Modify FindPrimary to check for leader" X-Git-Tag: release/lithium~354 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=468b9523001807db03b3a545328ac9bf819278c7;hp=66ca1b633a7840c973bd802fb10f31caaa3470a4 Merge "Bug 2194: Modify FindPrimary to check for leader" --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 99bc9de6a2..66467af130 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -146,10 +146,9 @@ public class Shard extends RaftActor { private final String txnDispatcherPath; - protected Shard(final ShardIdentifier name, final Map peerAddresses, + protected Shard(final ShardIdentifier name, final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { - super(name.toString(), mapPeerAddresses(peerAddresses), - Optional.of(datastoreContext.getShardRaftConfig())); + super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig())); this.name = name.toString(); this.datastoreContext = datastoreContext; @@ -198,20 +197,8 @@ public class Shard extends RaftActor { datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); } - private static Map mapPeerAddresses( - final Map peerAddresses) { - Map map = new HashMap<>(); - - for (Map.Entry entry : peerAddresses - .entrySet()) { - map.put(entry.getKey().toString(), entry.getValue()); - } - - return map; - } - public static Props props(final ShardIdentifier name, - final Map peerAddresses, + final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { Preconditions.checkNotNull(name, "name should not be null"); Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null"); @@ -950,11 +937,11 @@ public class Shard extends RaftActor { private static final long serialVersionUID = 1L; final ShardIdentifier name; - final Map peerAddresses; + final Map peerAddresses; final DatastoreContext datastoreContext; final SchemaContext schemaContext; - ShardCreator(final ShardIdentifier name, final Map peerAddresses, + ShardCreator(final ShardIdentifier name, final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { this.name = name; this.peerAddresses = peerAddresses; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 136c6813ea..bc4c825351 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.Address; +import akka.actor.Cancellable; import akka.actor.OneForOneStrategy; import akka.actor.Props; import akka.actor.SupervisorStrategy; @@ -20,24 +21,28 @@ import akka.japi.Function; import akka.japi.Procedure; import akka.persistence.RecoveryCompleted; import akka.persistence.RecoveryFailure; +import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; @@ -53,6 +58,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; @@ -74,7 +80,7 @@ import scala.concurrent.duration.Duration; */ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { - private final Logger LOG = LoggerFactory.getLogger(getClass()); + private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class); // Stores a mapping between a member name and the address of the member // Member names look like "member-1", "member-2" etc and are as specified @@ -172,15 +178,45 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onRoleChangeNotification((RoleChangeNotification) message); } else if(message instanceof FollowerInitialSyncUpStatus){ onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message); - } else{ + } else if(message instanceof ShardNotInitializedTimeout) { + onShardNotInitializedTimeout((ShardNotInitializedTimeout)message); + } else if(message instanceof LeaderStateChanged) { + onLeaderStateChanged((LeaderStateChanged)message); + } else { unknownMessage(message); } } + private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) { + LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged); + + ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId()); + if(shardInformation != null) { + shardInformation.setLeaderId(leaderStateChanged.getLeaderId()); + } else { + LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId()); + } + } + + private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) { + ShardInformation shardInfo = message.getShardInfo(); + + LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(), + shardInfo.getShardId()); + + shardInfo.removeOnShardInitialized(message.getOnShardInitialized()); + + if(!shardInfo.isShardInitialized()) { + message.getSender().tell(new ActorNotInitialized(), getSelf()); + } else { + message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf()); + } + } + private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) { - LOG.info("Received follower initial sync status for {} status sync done {}", status.getName(), - status.isInitialSyncDone()); + LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(), + status.getName(), status.isInitialSyncDone()); ShardInformation shardInformation = findShardInformation(status.getName()); @@ -193,7 +229,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void onRoleChangeNotification(RoleChangeNotification roleChanged) { - LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(), + LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(), roleChanged.getOldRole(), roleChanged.getNewRole()); ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId()); @@ -201,8 +237,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardInformation.setRole(roleChanged.getNewRole()); if (isReady()) { - LOG.info("All Shards are ready - data store {} is ready, available count is {}", type, - waitTillReadyCountdownLatch.getCount()); + LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", + persistenceId(), type, waitTillReadyCountdownLatch.getCount()); waitTillReadyCountdownLatch.countDown(); } @@ -225,7 +261,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private boolean isReady() { boolean isReady = true; for (ShardInformation info : localShards.values()) { - if(RaftState.Candidate.name().equals(info.getRole()) || Strings.isNullOrEmpty(info.getRole())){ + if(!info.isShardReady()){ isReady = false; break; } @@ -256,14 +292,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { if (shardId.getShardName() == null) { return; } + markShardAsInitialized(shardId.getShardName()); } private void markShardAsInitialized(String shardName) { LOG.debug("Initializing shard [{}]", shardName); + ShardInformation shardInformation = localShards.get(shardName); if (shardInformation != null) { shardInformation.setActorInitialized(); + + shardInformation.getActor().tell(new RegisterRoleChangeListener(), self()); } } @@ -300,7 +340,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } - sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier() { + sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier() { @Override public Object get() { return new LocalShardFound(shardInformation.getActor()); @@ -308,20 +348,36 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } - private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized, - final Supplier messageSupplier) { - if (!shardInformation.isShardInitialized()) { - if(waitUntilInitialized) { + private void sendResponse(ShardInformation shardInformation, boolean doWait, + boolean wantShardReady, final Supplier messageSupplier) { + if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) { + if(doWait) { final ActorRef sender = getSender(); final ActorRef self = self(); - shardInformation.addRunnableOnInitialized(new Runnable() { + + Runnable replyRunnable = new Runnable() { @Override public void run() { sender.tell(messageSupplier.get(), self); } - }); - } else { + }; + + OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) : + new OnShardInitialized(replyRunnable); + + shardInformation.addOnShardInitialized(onShardInitialized); + + Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce( + datastoreContext.getShardInitializationTimeout().duration(), getSelf(), + new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender), + getContext().dispatcher(), getSelf()); + + onShardInitialized.setTimeoutSchedule(timeoutSchedule); + + } else if (!shardInformation.isShardInitialized()) { getSender().tell(new ActorNotInitialized(), getSelf()); + } else { + getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf()); } return; @@ -330,6 +386,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSender().tell(messageSupplier.get(), getSelf()); } + private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) { + return new NoShardLeaderException(String.format( + "Could not find a leader for shard %s. This typically happens when the system is coming up or " + + "recovering and a leader is being elected. Try again later.", shardId)); + } + private void memberRemoved(ClusterEvent.MemberRemoved message) { memberNameToAddress.remove(message.member().roles().head()); } @@ -341,8 +403,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { for(ShardInformation info : localShards.values()){ String shardName = info.getShardName(); - info.updatePeerAddress(getShardIdentifier(memberName, shardName), - getShardActorPath(shardName, memberName)); + info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(), + getShardActorPath(shardName, memberName), getSelf()); } } @@ -384,13 +446,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("Sending new SchemaContext to Shards"); for (ShardInformation info : localShards.values()) { if (info.getActor() == null) { - info.setActor(getContext().actorOf(Shard.props(info.getShardId(), - info.getPeerAddresses(), datastoreContext, schemaContext) - .withDispatcher(shardDispatcherPath), info.getShardId().toString())); + info.setActor(newShardActor(schemaContext, info)); } else { info.getActor().tell(message, getSelf()); } - info.getActor().tell(new RegisterRoleChangeListener(), self()); } } @@ -402,16 +461,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } + @VisibleForTesting + protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) { + return getContext().actorOf(Shard.props(info.getShardId(), + info.getPeerAddresses(), datastoreContext, schemaContext) + .withDispatcher(shardDispatcherPath), info.getShardId().toString()); + } + private void findPrimary(FindPrimary message) { - String shardName = message.getShardName(); + final String shardName = message.getShardName(); // First see if the there is a local replica for the shard final ShardInformation info = localShards.get(shardName); if (info != null) { - sendResponse(info, message.isWaitUntilInitialized(), new Supplier() { + sendResponse(info, message.isWaitUntilReady(), true, new Supplier() { @Override public Object get() { - return new PrimaryFound(info.getActorPath().toString()).toSerializable(); + Object found = new PrimaryFound(info.getSerializedLeaderActor()).toSerializable(); + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Found primary for {}: {}", shardName, found); + } + + return found; } }); @@ -481,7 +553,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { List localShardActorNames = new ArrayList<>(); for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); - Map peerAddresses = getPeerAddresses(shardName); + Map peerAddresses = getPeerAddresses(shardName); localShardActorNames.add(shardId.toString()); localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses)); } @@ -496,22 +568,19 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param shardName * @return */ - private Map getPeerAddresses(String shardName){ + private Map getPeerAddresses(String shardName){ - Map peerAddresses = new HashMap<>(); + Map peerAddresses = new HashMap<>(); - List members = - this.configuration.getMembersFromShardName(shardName); + List members = this.configuration.getMembersFromShardName(shardName); String currentMemberName = this.cluster.getCurrentMemberName(); for(String memberName : members){ if(!currentMemberName.equals(memberName)){ - ShardIdentifier shardId = getShardIdentifier(memberName, - shardName); - String path = - getShardActorPath(shardName, currentMemberName); - peerAddresses.put(shardId, path); + ShardIdentifier shardId = getShardIdentifier(memberName, shardName); + String path = getShardActorPath(shardName, currentMemberName); + peerAddresses.put(shardId.toString(), path); } } return peerAddresses; @@ -552,23 +621,25 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return mBean; } - private class ShardInformation { + @VisibleForTesting + protected static class ShardInformation { private final ShardIdentifier shardId; private final String shardName; private ActorRef actor; private ActorPath actorPath; - private final Map peerAddresses; + private final Map peerAddresses; // flag that determines if the actor is ready for business private boolean actorInitialized = false; private boolean followerSyncStatus = false; - private final List runnablesOnInitialized = Lists.newArrayList(); + private final Set onShardInitializedSet = Sets.newHashSet(); private String role ; + private String leaderId; private ShardInformation(String shardName, ShardIdentifier shardId, - Map peerAddresses) { + Map peerAddresses) { this.shardName = shardName; this.shardId = shardId; this.peerAddresses = peerAddresses; @@ -595,11 +666,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return shardId; } - Map getPeerAddresses() { + Map getPeerAddresses() { return peerAddresses; } - void updatePeerAddress(ShardIdentifier peerId, String peerAddress){ + void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){ LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); if(peerAddresses.containsKey(peerId)){ @@ -611,42 +682,87 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { peerId, peerAddress, actor.path()); } - actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf()); + actor.tell(new PeerAddressResolved(peerId.toString(), peerAddress), sender); } + + notifyOnShardInitializedCallbacks(); } } + boolean isShardReady() { + return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role); + } + + boolean isShardReadyWithLeaderId() { + return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId)); + } + boolean isShardInitialized() { return getActor() != null && actorInitialized; } + boolean isLeader() { + return Objects.equal(leaderId, shardId.toString()); + } + + String getSerializedLeaderActor() { + if(isLeader()) { + return Serialization.serializedActorPath(getActor()); + } else { + return peerAddresses.get(leaderId); + } + } + void setActorInitialized() { + LOG.debug("Shard {} is initialized", shardId); + this.actorInitialized = true; - for(Runnable runnable: runnablesOnInitialized) { - runnable.run(); + notifyOnShardInitializedCallbacks(); + } + + private void notifyOnShardInitializedCallbacks() { + if(onShardInitializedSet.isEmpty()) { + return; } - runnablesOnInitialized.clear(); + boolean ready = isShardReadyWithLeaderId(); + + if(LOG.isDebugEnabled()) { + LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId, + ready ? "ready" : "initialized", onShardInitializedSet.size()); + } + + Iterator iter = onShardInitializedSet.iterator(); + while(iter.hasNext()) { + OnShardInitialized onShardInitialized = iter.next(); + if(!(onShardInitialized instanceof OnShardReady) || ready) { + iter.remove(); + onShardInitialized.getTimeoutSchedule().cancel(); + onShardInitialized.getReplyRunnable().run(); + } + } } - void addRunnableOnInitialized(Runnable runnable) { - runnablesOnInitialized.add(runnable); + void addOnShardInitialized(OnShardInitialized onShardInitialized) { + onShardInitializedSet.add(onShardInitialized); } - public void setRole(String newRole) { - this.role = newRole; + void removeOnShardInitialized(OnShardInitialized onShardInitialized) { + onShardInitializedSet.remove(onShardInitialized); } - public String getRole(){ - return this.role; + void setRole(String newRole) { + this.role = newRole; + + notifyOnShardInitializedCallbacks(); } - public void setFollowerSyncStatus(boolean syncStatus){ + void setFollowerSyncStatus(boolean syncStatus){ this.followerSyncStatus = syncStatus; } - public boolean isInSync(){ + boolean isInSync(){ if(RaftState.Follower.name().equals(this.role)){ return followerSyncStatus; } else if(RaftState.Leader.name().equals(this.role)){ @@ -656,6 +772,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return false; } + void setLeaderId(String leaderId) { + this.leaderId = leaderId; + + notifyOnShardInitializedCallbacks(); + } } private static class ShardManagerCreator implements Creator { @@ -680,6 +801,57 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private static class OnShardInitialized { + private final Runnable replyRunnable; + private Cancellable timeoutSchedule; + + OnShardInitialized(Runnable replyRunnable) { + this.replyRunnable = replyRunnable; + } + + Runnable getReplyRunnable() { + return replyRunnable; + } + + Cancellable getTimeoutSchedule() { + return timeoutSchedule; + } + + void setTimeoutSchedule(Cancellable timeoutSchedule) { + this.timeoutSchedule = timeoutSchedule; + } + } + + private static class OnShardReady extends OnShardInitialized { + OnShardReady(Runnable replyRunnable) { + super(replyRunnable); + } + } + + private static class ShardNotInitializedTimeout { + private final ActorRef sender; + private final ShardInformation shardInfo; + private final OnShardInitialized onShardInitialized; + + ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) { + this.sender = sender; + this.shardInfo = shardInfo; + this.onShardInitialized = onShardInitialized; + } + + ActorRef getSender() { + return sender; + } + + ShardInformation getShardInfo() { + return shardInfo; + } + + OnShardInitialized getOnShardInitialized() { + return onShardInitialized; + } + } + static class SchemaContextModules implements Serializable { private static final long serialVersionUID = -8884620101025936590L; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 7f2f328135..64f914b19f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -485,7 +485,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void onComplete(Throwable failure, ActorSelection primaryShard) { if(failure != null) { - newTxFutureCallback.onComplete(failure, null); + newTxFutureCallback.createTransactionContext(failure, null); } else { newTxFutureCallback.setPrimaryShard(primaryShard); } @@ -566,7 +566,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(transactionType == TransactionType.WRITE_ONLY && actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { - LOG.debug("Tx {} Primary shard found - creating WRITE_ONLY transaction context", identifier); + LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context", + identifier, primaryShard); // For write-only Tx's we prepare the transaction modifications directly on the shard actor // to avoid the overhead of creating a separate transaction actor. @@ -612,7 +613,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { * Performs a CreateTransaction try async. */ private void tryCreateTransaction() { - LOG.debug("Tx {} Primary shard found - trying create transaction", identifier); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} Primary shard {} found - trying create transaction", identifier, primaryShard); + } Object serializedCreateMessage = new CreateTransaction(identifier.toString(), TransactionProxy.this.transactionType.ordinal(), @@ -645,6 +648,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } + createTransactionContext(failure, response); + } + + private void createTransactionContext(Throwable failure, Object response) { // Mainly checking for state violation here to perform a volatile read of "initialized" to // ensure updates to operationLimter et al are visible to this thread (ie we're doing // "piggy-back" synchronization here). diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java index a34330bcf6..d51d6800a2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java @@ -18,22 +18,22 @@ public class FindPrimary implements SerializableMessage{ public static final Class SERIALIZABLE_CLASS = FindPrimary.class; private final String shardName; - private final boolean waitUntilInitialized; + private final boolean waitUntilReady; - public FindPrimary(String shardName, boolean waitUntilInitialized){ + public FindPrimary(String shardName, boolean waitUntilReady){ Preconditions.checkNotNull(shardName, "shardName should not be null"); this.shardName = shardName; - this.waitUntilInitialized = waitUntilInitialized; + this.waitUntilReady = waitUntilReady; } public String getShardName() { return shardName; } - public boolean isWaitUntilInitialized() { - return waitUntilInitialized; + public boolean isWaitUntilReady() { + return waitUntilReady; } @Override @@ -44,4 +44,12 @@ public class FindPrimary implements SerializableMessage{ public static FindPrimary fromSerializable(Object message){ return (FindPrimary) message; } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("FindPrimary [shardName=").append(shardName).append(", waitUntilReady=").append(waitUntilReady) + .append("]"); + return builder.toString(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java index 346519ed5a..82f3649939 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java @@ -8,18 +8,17 @@ package org.opendaylight.controller.cluster.datastore.messages; -import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; public class PeerAddressResolved { - private final ShardIdentifier peerId; + private final String peerId; private final String peerAddress; - public PeerAddressResolved(ShardIdentifier peerId, String peerAddress) { + public PeerAddressResolved(String peerId, String peerAddress) { this.peerId = peerId; this.peerAddress = peerAddress; } - public ShardIdentifier getPeerId() { + public String getPeerId() { return peerId; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 0fb09d8231..6f9bb7fc9f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -17,6 +17,7 @@ import akka.actor.Address; import akka.actor.PoisonPill; import akka.dispatch.Futures; import akka.dispatch.Mapper; +import akka.dispatch.OnComplete; import akka.pattern.AskTimeoutException; import akka.util.Timeout; import com.codahale.metrics.JmxReporter; @@ -35,6 +36,7 @@ import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; @@ -98,8 +100,9 @@ public class ActorContext { private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build(); private final int transactionOutstandingOperationLimit; private Timeout transactionCommitOperationTimeout; + private Timeout shardInitializationTimeout; private final Dispatchers dispatchers; - private final Cache> primaryShardActorSelectionCache; + private Cache> primaryShardActorSelectionCache; private volatile SchemaContext schemaContext; private volatile boolean updated; @@ -121,14 +124,6 @@ public class ActorContext { this.dispatchers = new Dispatchers(actorSystem.dispatchers()); setCachedProperties(); - primaryShardActorSelectionCache = CacheBuilder.newBuilder() - .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS) - .build(); - - operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); - operationTimeout = new Timeout(operationDuration); - transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(), - TimeUnit.SECONDS)); Address selfAddress = clusterWrapper.getSelfAddress(); if (selfAddress != null && !selfAddress.host().isEmpty()) { @@ -150,6 +145,12 @@ public class ActorContext { transactionCommitOperationTimeout = new Timeout(Duration.create( datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS)); + + shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2)); + + primaryShardActorSelectionCache = CacheBuilder.newBuilder() + .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS) + .build(); } public DatastoreContext getDatastoreContext() { @@ -202,28 +203,13 @@ public class ActorContext { return schemaContext; } - /** - * Finds the primary shard for the given shard name - * - * @param shardName - * @return - */ - public Optional findPrimaryShard(String shardName) { - String path = findPrimaryPathOrNull(shardName); - if (path == null){ - return Optional.absent(); - } - return Optional.of(actorSystem.actorSelection(path)); - } - public Future findPrimaryShardAsync(final String shardName) { Future ret = primaryShardActorSelectionCache.getIfPresent(shardName); if(ret != null){ return ret; } Future future = executeOperationAsync(shardManager, - new FindPrimary(shardName, true).toSerializable(), - datastoreContext.getShardInitializationTimeout()); + new FindPrimary(shardName, true).toSerializable(), shardInitializationTimeout); return future.transform(new Mapper() { @Override @@ -242,6 +228,8 @@ public class ActorContext { } else if(response instanceof PrimaryNotFound) { throw new PrimaryNotFoundException( String.format("No primary shard found for %S.", shardName)); + } else if(response instanceof NoShardLeaderException) { + throw (NoShardLeaderException)response; } throw new UnknownMessageException(String.format( @@ -277,7 +265,7 @@ public class ActorContext { */ public Future findLocalShardAsync( final String shardName) { Future future = executeOperationAsync(shardManager, - new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout()); + new FindLocalShard(shardName, true), shardInitializationTimeout); return future.map(new Mapper() { @Override @@ -301,26 +289,6 @@ public class ActorContext { }, getClientDispatcher()); } - private String findPrimaryPathOrNull(String shardName) { - Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable()); - - if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) { - PrimaryFound found = PrimaryFound.fromSerializable(result); - - LOG.debug("Primary found {}", found.getPrimaryPath()); - return found.getPrimaryPath(); - - } else if (result.getClass().equals(ActorNotInitialized.class)){ - throw new NotInitializedException( - String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName) - ); - - } else { - return null; - } - } - - /** * Executes an operation on a local actor and wait for it's response * @@ -428,16 +396,21 @@ public class ActorContext { * * @param message */ - public void broadcast(Object message){ - for(String shardName : configuration.getAllShardNames()){ - - Optional primary = findPrimaryShard(shardName); - if (primary.isPresent()) { - primary.get().tell(message, ActorRef.noSender()); - } else { - LOG.warn("broadcast failed to send message {} to shard {}. Primary not found", - message.getClass().getSimpleName(), shardName); - } + public void broadcast(final Object message){ + for(final String shardName : configuration.getAllShardNames()){ + + Future primaryFuture = findPrimaryShardAsync(shardName); + primaryFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, ActorSelection primaryShard) { + if(failure != null) { + LOG.warn("broadcast failed to send message {} to shard {}: {}", + message.getClass().getSimpleName(), shardName, failure); + } else { + primaryShard.tell(message, ActorRef.noSender()); + } + } + }, getClientDispatcher()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 8cafb46528..378bc717f4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -87,7 +87,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ } protected Props newShardProps() { - return Shard.props(shardID, Collections.emptyMap(), + return Shard.props(shardID, Collections.emptyMap(), newDatastoreContext(), SCHEMA_CONTEXT); } @@ -102,7 +102,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ Creator creator = new Creator() { @Override public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), + return new Shard(shardID, Collections.emptyMap(), newDatastoreContext(), SCHEMA_CONTEXT) { @Override protected void onRecoveryComplete() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index a3c5eb4b00..fdc7e664c2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -147,8 +147,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } - @Test - public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{ + private void testTransactionWritesWithShardNotInitiallyReady(final boolean writeOnly) throws Exception { new IntegrationTestKit(getSystem()) {{ String testName = "testTransactionWritesWithShardNotInitiallyReady"; String shardName = "test-1"; @@ -163,8 +162,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // Create the write Tx - // TODO - we'll want to test this with write-only as well when FindPrimary returns the leader shard. - final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction(); + final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : + dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", writeTx); // Do some modification operations and ready the Tx on a separate thread. @@ -240,7 +239,18 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { } @Test - public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{ + public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception { + datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + testTransactionWritesWithShardNotInitiallyReady(true); + } + + @Test + public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception { + testTransactionWritesWithShardNotInitiallyReady(false); + } + + @Test + public void testTransactionReadsWithShardNotInitiallyReady() throws Exception { new IntegrationTestKit(getSystem()) {{ String testName = "testTransactionReadsWithShardNotInitiallyReady"; String shardName = "test-1"; @@ -455,8 +465,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } - @Test(expected=NoShardLeaderException.class) - public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{ + private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable { new IntegrationTestKit(getSystem()) {{ String testName = "testTransactionCommitFailureWithNoShardLeader"; String shardName = "test-1"; @@ -465,6 +474,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // by setting the election timeout, which is based on the heartbeat interval, really high. datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000); + datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); // Set the leader election timeout low for the test. @@ -474,7 +484,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // Create the write Tx. - final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction(); + final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : + dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", writeTx); // Do some modifications and ready the Tx on a separate thread. @@ -523,6 +534,17 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } + @Test(expected=NoShardLeaderException.class) + public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable { + datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + testTransactionCommitFailureWithNoShardLeader(true); + } + + @Test(expected=NoShardLeaderException.class) + public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable { + testTransactionCommitFailureWithNoShardLeader(false); + } + @Test public void testTransactionAbort() throws Exception{ System.setProperty("shard.persistent", "true"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 99417076bf..ae7a4f96c5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -34,6 +34,7 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; @@ -44,9 +45,11 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; +import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; @@ -56,6 +59,7 @@ import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; public class ShardManagerTest extends AbstractActorTest { private static int ID_COUNTER = 1; @@ -66,7 +70,10 @@ public class ShardManagerTest extends AbstractActorTest { @Mock private static CountDownLatch ready; - private static ActorRef mockShardActor; + private static TestActorRef mockShardActor; + + private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). + dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS); @Before public void setUp() { @@ -75,9 +82,11 @@ public class ShardManagerTest extends AbstractActorTest { InMemoryJournal.clear(); if(mockShardActor == null) { - String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString(); - mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name); + String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString(); + mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name); } + + mockShardActor.underlyingActor().clear(); } @After @@ -86,44 +95,93 @@ public class ShardManagerTest extends AbstractActorTest { } private Props newShardMgrProps() { - DatastoreContext.Builder builder = DatastoreContext.newBuilder(); - builder.dataStoreType(shardMrgIDSuffix); return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), - builder.build(), ready); + datastoreContextBuilder.build(), ready); + } + + private Props newPropsShardMgrWithMockShardActor() { + Creator creator = new Creator() { + private static final long serialVersionUID = 1L; + @Override + public ShardManager create() throws Exception { + return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), + datastoreContextBuilder.build(), ready) { + @Override + protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { + return mockShardActor; + } + }; + } + }; + + return Props.create(new DelegatingShardManagerCreator(creator)); } @Test public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef()); - expectMsgEquals(duration("5 seconds"), - new PrimaryNotFound("non-existent").toSerializable()); + expectMsgEquals(duration("5 seconds"), new PrimaryNotFound("non-existent").toSerializable()); }}; } @Test - public void testOnReceiveFindPrimaryForExistentShard() throws Exception { + public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef()); + + MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class); + shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), + RaftState.Leader.name())), mockShardActor); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); - expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-1-shard-default")); }}; } @Test - public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception { + public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); + + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId1, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); + shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-2-shard-default")); + }}; + } + + @Test + public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); @@ -132,28 +190,129 @@ public class ShardManagerTest extends AbstractActorTest { } @Test - public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception { + public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); + }}; + } + + @Test + public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); + + shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-1-shard-default")); + }}; + } + + @Test + public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); // We're passing waitUntilInitialized = true to FindPrimary so the response should be - // delayed until we send ActorInitialized. - Future future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true), - new Timeout(5, TimeUnit.SECONDS)); + // delayed until we send ActorInitialized and RoleChangeNotification. + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + + expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); shardManager.tell(new ActorInitialized(), mockShardActor); - Object resp = Await.result(future, duration("5 seconds")); - assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound); + expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); + + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor); + + expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); + + shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); + + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-1-shard-default")); + + expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); + }}; + } + + @Test + public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + + expectMsgClass(duration("2 seconds"), ActorNotInitialized.class); + + shardManager.tell(new ActorInitialized(), mockShardActor); + + expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); + }}; + } + + @Test + public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, + null, RaftState.Candidate.name()), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + + expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); + }}; + } + + @Test + public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + + expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; } @Test public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -168,7 +327,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindLocalShardForExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -185,7 +344,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); @@ -196,7 +355,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -439,14 +598,11 @@ public class ShardManagerTest extends AbstractActorTest { public void testRoleChangeNotificationReleaseReady() throws Exception { new JavaTestKit(getSystem()) { { - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", RaftState.Candidate.name(), RaftState.Leader.name())); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( + memberId, RaftState.Candidate.name(), RaftState.Leader.name())); verify(ready, times(1)).countDown(); @@ -457,14 +613,10 @@ public class ShardManagerTest extends AbstractActorTest { public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { new JavaTestKit(getSystem()) { { - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), RaftState.Leader.name())); + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( + "unknown", RaftState.Candidate.name(), RaftState.Leader.name())); verify(ready, never()).countDown(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 0fbe68665e..adc7f4706c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -41,7 +41,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.mockito.InOrder; import org.opendaylight.controller.cluster.DataPersistenceProvider; -import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; @@ -159,7 +158,7 @@ public class ShardTest extends AbstractShardTest { @Override public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), + return new Shard(shardID, Collections.emptyMap(), newDatastoreContext(), SCHEMA_CONTEXT) { @Override public void onReceiveCommand(final Object message) throws Exception { @@ -287,7 +286,7 @@ public class ShardTest extends AbstractShardTest { final CountDownLatch recoveryComplete = new CountDownLatch(1); class TestShard extends Shard { TestShard() { - super(shardID, Collections.singletonMap(shardID, null), + super(shardID, Collections.singletonMap(shardID.toString(), null), newDatastoreContext(), SCHEMA_CONTEXT); } @@ -318,7 +317,7 @@ public class ShardTest extends AbstractShardTest { Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); String address = "akka://foobar"; - shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address)); + shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address)); assertEquals("getPeerAddresses", address, ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString())); @@ -763,7 +762,7 @@ public class ShardTest extends AbstractShardTest { Creator creator = new Creator() { @Override public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), + return new Shard(shardID, Collections.emptyMap(), newDatastoreContext(), SCHEMA_CONTEXT) { @Override protected boolean isLeader() { @@ -1427,7 +1426,7 @@ public class ShardTest extends AbstractShardTest { Creator creator = new Creator() { @Override public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), + return new Shard(shardID, Collections.emptyMap(), newDatastoreContext(), SCHEMA_CONTEXT) { DelegatingPersistentDataProvider delegating; @@ -1528,13 +1527,13 @@ public class ShardTest extends AbstractShardTest { final DatastoreContext persistentContext = DatastoreContext.newBuilder(). shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build(); - final Props persistentProps = Shard.props(shardID, Collections.emptyMap(), + final Props persistentProps = Shard.props(shardID, Collections.emptyMap(), persistentContext, SCHEMA_CONTEXT); final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder(). shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build(); - final Props nonPersistentProps = Shard.props(shardID, Collections.emptyMap(), + final Props nonPersistentProps = Shard.props(shardID, Collections.emptyMap(), nonPersistentContext, SCHEMA_CONTEXT); new ShardTestKit(getSystem()) {{ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java index 09a4532b53..c3fef611e3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -60,7 +60,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { } private ActorRef createShard(){ - return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.emptyMap(), datastoreContext, + return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.emptyMap(), datastoreContext, TestModel.createTestContext())); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index 8ebb145728..e63ace3e2c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -79,7 +79,7 @@ public class ShardTransactionTest extends AbstractActorTest { private ActorRef createShard(){ return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, - Collections.emptyMap(), datastoreContext, TestModel.createTestContext())); + Collections.emptyMap(), datastoreContext, TestModel.createTestContext())); } private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index ac2c079641..265ec59f1c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -33,6 +33,8 @@ import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; @@ -659,11 +661,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verifyCohortFutures(proxy, TestException.class); } - @Test - public void testReadyWithInitialCreateTransactionFailure() throws Exception { - - doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when( - mockActorContext).findPrimaryShardAsync(anyString()); + private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception { + doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -681,7 +680,22 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - verifyCohortFutures(proxy, PrimaryNotFoundException.class); + verifyCohortFutures(proxy, toThrow.getClass()); + } + + @Test + public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception { + testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock")); + } + + @Test + public void testWriteOnlyTxWithNotInitializedException() throws Exception { + testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock")); + } + + @Test + public void testWriteOnlyTxWithNoShardLeaderException() throws Exception { + testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock")); } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java index 2a29d2c089..e206e69cda 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java @@ -59,7 +59,7 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc SchemaContext schemaContext = TestModel.createTestContext(); Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1"). shardName("inventory").type("config").build(), - Collections.emptyMap(), + Collections.emptyMap(), DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(), schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId()); @@ -133,7 +133,7 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc SchemaContext schemaContext = TestModel.createTestContext(); Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1"). shardName("inventory").type("config").build(), - Collections.emptyMap(), + Collections.emptyMap(), DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(), schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 6bd732e038..2746bcf982 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -20,8 +20,12 @@ import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; +import java.util.Arrays; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.time.StopWatch; import org.junit.Assert; @@ -30,14 +34,18 @@ import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; +import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -45,10 +53,16 @@ import scala.concurrent.duration.FiniteDuration; public class ActorContextTest extends AbstractActorTest{ + static final Logger log = LoggerFactory.getLogger(ActorContextTest.class); + + private static class TestMessage { + } + private static class MockShardManager extends UntypedActor { private final boolean found; private final ActorRef actorRef; + private final Map findPrimaryResponses = Maps.newHashMap(); private MockShardManager(boolean found, ActorRef actorRef){ @@ -57,6 +71,18 @@ public class ActorContextTest extends AbstractActorTest{ } @Override public void onReceive(Object message) throws Exception { + if(message instanceof FindPrimary) { + FindPrimary fp = (FindPrimary)message; + Object resp = findPrimaryResponses.get(fp.getShardName()); + if(resp == null) { + log.error("No expected FindPrimary response found for shard name {}", fp.getShardName()); + } else { + getSender().tell(resp, getSelf()); + } + + return; + } + if(found){ getSender().tell(new LocalShardFound(actorRef), getSelf()); } else { @@ -64,15 +90,28 @@ public class ActorContextTest extends AbstractActorTest{ } } + void addFindPrimaryResp(String shardName, Object resp) { + findPrimaryResponses.put(shardName, resp); + } + private static Props props(final boolean found, final ActorRef actorRef){ return Props.create(new MockShardManagerCreator(found, actorRef) ); } + private static Props props(){ + return Props.create(new MockShardManagerCreator() ); + } + @SuppressWarnings("serial") private static class MockShardManagerCreator implements Creator { final boolean found; final ActorRef actorRef; + MockShardManagerCreator() { + this.found = false; + this.actorRef = null; + } + MockShardManagerCreator(boolean found, ActorRef actorRef) { this.found = found; this.actorRef = actorRef; @@ -287,18 +326,15 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testRateLimiting(){ - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + transactionCreationInitialRateLimit(155L).build(); ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext); + mock(Configuration.class), dataStoreContext); // Check that the initial value is being picked up from DataStoreContext - assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15); + assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15); actorContext.setTxCreationLimit(1.0); @@ -320,16 +356,9 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testClientDispatcherIsGlobalDispatcher(){ - - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); - ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext); + mock(Configuration.class), DatastoreContext.newBuilder().build()); assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); @@ -337,18 +366,11 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testClientDispatcherIsNotGlobalDispatcher(){ - - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); - ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf")); ActorContext actorContext = new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext); + mock(Configuration.class), DatastoreContext.newBuilder().build()); assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); @@ -388,15 +410,12 @@ public class ActorContextTest extends AbstractActorTest{ TestActorRef shardManager = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext) { + mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { return Futures.successful((Object) new PrimaryFound("akka://test-system/test")); @@ -431,15 +450,12 @@ public class ActorContextTest extends AbstractActorTest{ TestActorRef shardManager = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext) { + mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { return Futures.successful((Object) new PrimaryNotFound("foobar")); @@ -459,7 +475,6 @@ public class ActorContextTest extends AbstractActorTest{ Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); assertNull(cached); - } @Test @@ -468,15 +483,12 @@ public class ActorContextTest extends AbstractActorTest{ TestActorRef shardManager = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext) { + mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { return Futures.successful((Object) new ActorNotInitialized()); @@ -496,7 +508,49 @@ public class ActorContextTest extends AbstractActorTest{ Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); assertNull(cached); + } + + @Test + public void testBroadcast() { + new JavaTestKit(getSystem()) {{ + ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + TestActorRef shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props()); + MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor(); + shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()).toSerializable()); + shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()).toSerializable()); + shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found")); + + Configuration mockConfig = mock(Configuration.class); + doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))). + when(mockConfig).getAllShardNames(); + ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, + mock(ClusterWrapper.class), mockConfig, + DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build()); + + actorContext.broadcast(new TestMessage()); + + expectFirstMatching(shardActorRef1, TestMessage.class); + expectFirstMatching(shardActorRef2, TestMessage.class); + }}; } + private T expectFirstMatching(ActorRef actor, Class clazz) { + int count = 5000 / 50; + for(int i = 0; i < count; i++) { + try { + T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz); + if(message != null) { + return message; + } + } catch (Exception e) {} + + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + Assert.fail("Did not receive message of type " + clazz); + return null; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java index 4bd0ad818f..d62c9dbc28 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java @@ -10,13 +10,14 @@ package org.opendaylight.controller.cluster.datastore.utils; import akka.actor.ActorRef; import akka.actor.UntypedActor; - import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import org.junit.Assert; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -31,7 +32,7 @@ import scala.concurrent.duration.FiniteDuration; *

*/ public class MessageCollectorActor extends UntypedActor { - private List messages = new ArrayList<>(); + private final List messages = new ArrayList<>(); @Override public void onReceive(Object message) throws Exception { if(message instanceof String){ @@ -43,6 +44,10 @@ public class MessageCollectorActor extends UntypedActor { } } + public void clear() { + messages.clear(); + } + public static List getAllMessages(ActorRef actor) throws Exception { FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS); Timeout operationTimeout = new Timeout(operationDuration); @@ -87,4 +92,20 @@ public class MessageCollectorActor extends UntypedActor { return output; } + public static T expectFirstMatching(ActorRef actor, Class clazz) { + int count = 5000 / 50; + for(int i = 0; i < count; i++) { + try { + T message = (T) getFirstMatching(actor, clazz); + if(message != null) { + return message; + } + } catch (Exception e) {} + + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + Assert.fail("Did not receive message of type " + clazz); + return null; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java index 81b6bccaf0..63878df23c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java @@ -12,7 +12,6 @@ import static org.junit.Assert.assertNotNull; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; -import com.google.common.base.Optional; public class MockActorContext extends ActorContext { @@ -36,10 +35,6 @@ public class MockActorContext extends ActorContext { return executeRemoteOperationResponse; } - @Override public Optional findPrimaryShard(String shardName) { - return Optional.absent(); - } - public void setExecuteShardOperationResponse(Object response){ executeShardOperationResponse = response; }