Bug 2194: Modify FindPrimary to check for leader 64/16164/18
authorTom Pantelis <tpanteli@brocade.com>
Tue, 24 Mar 2015 18:46:21 +0000 (14:46 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 24 Mar 2015 19:12:39 +0000 (15:12 -0400)
Write-only transaactions now bypass sending the CreateTransaction
message to the shard however we still need to check if the shard has
a leader.

So I modified the ShardManager to check for this on FindPrimaryShard
message utilizing the RoleChanged and LeaderStateChanged events recently
added. On FindPrimaryShard, if the shard is not ready, ie its role is
Candidate or it has no leaderId or is not initialized, the sender is
recorded and a timeout schedule is started. If the shard becomes ready,
the success message is sent to the sender. If the timer expires, an error
message is sent to the sender, either ActorNotInitilized or
NoShardLeaderException.

I also changed the ShardInformation peerAddresses map to String key
instead of ShardIdentifier for convenient lookup of the leaderId to
obviate the need to parse a ShardIdentifier from a String. The key is
not used/needed as a ShardIdentifier and the Shard converts the keys to
Strings anyway so it made sense to define the key as String. Several
unit tests had to be changed as a result of this.

In ActorContext#findPrimaryShardAsync, I added a check for
NoShardLeaderException.  I also removed the synchronous findPrimaryShard
and findPrimaryShardOrNull methods to avoid having to duplicate the
exception handling. ActorContext#broadcast was the only caller and it
should use findPrimaryShardAsync anyway.

Change-Id: I22bc661314b25812024b7d274afb390768408b0b
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
17 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java

index 99bc9de..66467af 100644 (file)
@@ -146,10 +146,9 @@ public class Shard extends RaftActor {
 
     private final String txnDispatcherPath;
 
-    protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+    protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
             final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
-        super(name.toString(), mapPeerAddresses(peerAddresses),
-                Optional.of(datastoreContext.getShardRaftConfig()));
+        super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
 
         this.name = name.toString();
         this.datastoreContext = datastoreContext;
@@ -198,20 +197,8 @@ public class Shard extends RaftActor {
                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
     }
 
-    private static Map<String, String> mapPeerAddresses(
-        final Map<ShardIdentifier, String> peerAddresses) {
-        Map<String, String> map = new HashMap<>();
-
-        for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
-            .entrySet()) {
-            map.put(entry.getKey().toString(), entry.getValue());
-        }
-
-        return map;
-    }
-
     public static Props props(final ShardIdentifier name,
-        final Map<ShardIdentifier, String> peerAddresses,
+        final Map<String, String> peerAddresses,
         final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
         Preconditions.checkNotNull(name, "name should not be null");
         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
@@ -950,11 +937,11 @@ public class Shard extends RaftActor {
         private static final long serialVersionUID = 1L;
 
         final ShardIdentifier name;
-        final Map<ShardIdentifier, String> peerAddresses;
+        final Map<String, String> peerAddresses;
         final DatastoreContext datastoreContext;
         final SchemaContext schemaContext;
 
-        ShardCreator(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+        ShardCreator(final ShardIdentifier name, final Map<String, String> peerAddresses,
                 final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
             this.name = name;
             this.peerAddresses = peerAddresses;
index 136c681..bc4c825 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.Address;
+import akka.actor.Cancellable;
 import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
@@ -20,24 +21,28 @@ import akka.japi.Function;
 import akka.japi.Procedure;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.RecoveryFailure;
+import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
@@ -53,6 +58,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -74,7 +80,7 @@ import scala.concurrent.duration.Duration;
  */
 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
-    private final Logger LOG = LoggerFactory.getLogger(getClass());
+    private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
 
     // Stores a mapping between a member name and the address of the member
     // Member names look like "member-1", "member-2" etc and are as specified
@@ -172,15 +178,45 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onRoleChangeNotification((RoleChangeNotification) message);
         } else if(message instanceof FollowerInitialSyncUpStatus){
             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
-        } else{
+        } else if(message instanceof ShardNotInitializedTimeout) {
+            onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
+        } else if(message instanceof LeaderStateChanged) {
+            onLeaderStateChanged((LeaderStateChanged)message);
+        } else {
             unknownMessage(message);
         }
 
     }
 
+    private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) {
+        LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
+
+        ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
+        if(shardInformation != null) {
+            shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
+        } else {
+            LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
+        }
+    }
+
+    private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
+        ShardInformation shardInfo = message.getShardInfo();
+
+        LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
+                shardInfo.getShardId());
+
+        shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
+
+        if(!shardInfo.isShardInitialized()) {
+            message.getSender().tell(new ActorNotInitialized(), getSelf());
+        } else {
+            message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
+        }
+    }
+
     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
-        LOG.info("Received follower initial sync status for {} status sync done {}", status.getName(),
-                status.isInitialSyncDone());
+        LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
+                status.getName(), status.isInitialSyncDone());
 
         ShardInformation shardInformation = findShardInformation(status.getName());
 
@@ -193,7 +229,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
-        LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(),
+        LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
                 roleChanged.getOldRole(), roleChanged.getNewRole());
 
         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
@@ -201,8 +237,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             shardInformation.setRole(roleChanged.getNewRole());
 
             if (isReady()) {
-                LOG.info("All Shards are ready - data store {} is ready, available count is {}", type,
-                        waitTillReadyCountdownLatch.getCount());
+                LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+                        persistenceId(), type, waitTillReadyCountdownLatch.getCount());
 
                 waitTillReadyCountdownLatch.countDown();
             }
@@ -225,7 +261,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private boolean isReady() {
         boolean isReady = true;
         for (ShardInformation info : localShards.values()) {
-            if(RaftState.Candidate.name().equals(info.getRole()) || Strings.isNullOrEmpty(info.getRole())){
+            if(!info.isShardReady()){
                 isReady = false;
                 break;
             }
@@ -256,14 +292,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         if (shardId.getShardName() == null) {
             return;
         }
+
         markShardAsInitialized(shardId.getShardName());
     }
 
     private void markShardAsInitialized(String shardName) {
         LOG.debug("Initializing shard [{}]", shardName);
+
         ShardInformation shardInformation = localShards.get(shardName);
         if (shardInformation != null) {
             shardInformation.setActorInitialized();
+
+            shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
         }
     }
 
@@ -300,7 +340,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
-        sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
+        sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
             @Override
             public Object get() {
                 return new LocalShardFound(shardInformation.getActor());
@@ -308,20 +348,36 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         });
     }
 
-    private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
-            final Supplier<Object> messageSupplier) {
-        if (!shardInformation.isShardInitialized()) {
-            if(waitUntilInitialized) {
+    private void sendResponse(ShardInformation shardInformation, boolean doWait,
+            boolean wantShardReady, final Supplier<Object> messageSupplier) {
+        if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
+            if(doWait) {
                 final ActorRef sender = getSender();
                 final ActorRef self = self();
-                shardInformation.addRunnableOnInitialized(new Runnable() {
+
+                Runnable replyRunnable = new Runnable() {
                     @Override
                     public void run() {
                         sender.tell(messageSupplier.get(), self);
                     }
-                });
-            } else {
+                };
+
+                OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
+                    new OnShardInitialized(replyRunnable);
+
+                shardInformation.addOnShardInitialized(onShardInitialized);
+
+                Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
+                        datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
+                        new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
+                        getContext().dispatcher(), getSelf());
+
+                onShardInitialized.setTimeoutSchedule(timeoutSchedule);
+
+            } else if (!shardInformation.isShardInitialized()) {
                 getSender().tell(new ActorNotInitialized(), getSelf());
+            } else {
+                getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
             }
 
             return;
@@ -330,6 +386,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         getSender().tell(messageSupplier.get(), getSelf());
     }
 
+    private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
+        return new NoShardLeaderException(String.format(
+                "Could not find a leader for shard %s. This typically happens when the system is coming up or " +
+                "recovering and a leader is being elected. Try again later.", shardId));
+    }
+
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
         memberNameToAddress.remove(message.member().roles().head());
     }
@@ -341,8 +403,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         for(ShardInformation info : localShards.values()){
             String shardName = info.getShardName();
-            info.updatePeerAddress(getShardIdentifier(memberName, shardName),
-                getShardActorPath(shardName, memberName));
+            info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(),
+                getShardActorPath(shardName, memberName), getSelf());
         }
     }
 
@@ -384,13 +446,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     LOG.debug("Sending new SchemaContext to Shards");
                     for (ShardInformation info : localShards.values()) {
                         if (info.getActor() == null) {
-                            info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
-                                    info.getPeerAddresses(), datastoreContext, schemaContext)
-                                            .withDispatcher(shardDispatcherPath), info.getShardId().toString()));
+                            info.setActor(newShardActor(schemaContext, info));
                         } else {
                             info.getActor().tell(message, getSelf());
                         }
-                        info.getActor().tell(new RegisterRoleChangeListener(), self());
                     }
                 }
 
@@ -402,16 +461,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     }
 
+    @VisibleForTesting
+    protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
+        return getContext().actorOf(Shard.props(info.getShardId(),
+                info.getPeerAddresses(), datastoreContext, schemaContext)
+                        .withDispatcher(shardDispatcherPath), info.getShardId().toString());
+    }
+
     private void findPrimary(FindPrimary message) {
-        String shardName = message.getShardName();
+        final String shardName = message.getShardName();
 
         // First see if the there is a local replica for the shard
         final ShardInformation info = localShards.get(shardName);
         if (info != null) {
-            sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
+            sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
                 @Override
                 public Object get() {
-                    return new PrimaryFound(info.getActorPath().toString()).toSerializable();
+                    Object found = new PrimaryFound(info.getSerializedLeaderActor()).toSerializable();
+
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("{}: Found primary for {}: {}", shardName, found);
+                    }
+
+                    return found;
                 }
             });
 
@@ -481,7 +553,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         List<String> localShardActorNames = new ArrayList<>();
         for(String shardName : memberShardNames){
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
-            Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
+            Map<String, String> peerAddresses = getPeerAddresses(shardName);
             localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
         }
@@ -496,22 +568,19 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @param shardName
      * @return
      */
-    private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
+    private Map<String, String> getPeerAddresses(String shardName){
 
-        Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
+        Map<String, String> peerAddresses = new HashMap<>();
 
-        List<String> members =
-            this.configuration.getMembersFromShardName(shardName);
+        List<String> members = this.configuration.getMembersFromShardName(shardName);
 
         String currentMemberName = this.cluster.getCurrentMemberName();
 
         for(String memberName : members){
             if(!currentMemberName.equals(memberName)){
-                ShardIdentifier shardId = getShardIdentifier(memberName,
-                    shardName);
-                String path =
-                    getShardActorPath(shardName, currentMemberName);
-                peerAddresses.put(shardId, path);
+                ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+                String path = getShardActorPath(shardName, currentMemberName);
+                peerAddresses.put(shardId.toString(), path);
             }
         }
         return peerAddresses;
@@ -552,23 +621,25 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return mBean;
     }
 
-    private class ShardInformation {
+    @VisibleForTesting
+    protected static class ShardInformation {
         private final ShardIdentifier shardId;
         private final String shardName;
         private ActorRef actor;
         private ActorPath actorPath;
-        private final Map<ShardIdentifier, String> peerAddresses;
+        private final Map<String, String> peerAddresses;
 
         // flag that determines if the actor is ready for business
         private boolean actorInitialized = false;
 
         private boolean followerSyncStatus = false;
 
-        private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
+        private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
         private String role ;
+        private String leaderId;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
-                Map<ShardIdentifier, String> peerAddresses) {
+                Map<String, String> peerAddresses) {
             this.shardName = shardName;
             this.shardId = shardId;
             this.peerAddresses = peerAddresses;
@@ -595,11 +666,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return shardId;
         }
 
-        Map<ShardIdentifier, String> getPeerAddresses() {
+        Map<String, String> getPeerAddresses() {
             return peerAddresses;
         }
 
-        void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
+        void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
             LOG.info("updatePeerAddress for peer {} with address {}", peerId,
                 peerAddress);
             if(peerAddresses.containsKey(peerId)){
@@ -611,42 +682,87 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                                 peerId, peerAddress, actor.path());
                     }
 
-                    actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf());
+                    actor.tell(new PeerAddressResolved(peerId.toString(), peerAddress), sender);
                 }
+
+                notifyOnShardInitializedCallbacks();
             }
         }
 
+        boolean isShardReady() {
+            return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
+        }
+
+        boolean isShardReadyWithLeaderId() {
+            return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId));
+        }
+
         boolean isShardInitialized() {
             return getActor() != null && actorInitialized;
         }
 
+        boolean isLeader() {
+            return Objects.equal(leaderId, shardId.toString());
+        }
+
+        String getSerializedLeaderActor() {
+            if(isLeader()) {
+                return Serialization.serializedActorPath(getActor());
+            } else {
+                return peerAddresses.get(leaderId);
+            }
+        }
+
         void setActorInitialized() {
+            LOG.debug("Shard {} is initialized", shardId);
+
             this.actorInitialized = true;
 
-            for(Runnable runnable: runnablesOnInitialized) {
-                runnable.run();
+            notifyOnShardInitializedCallbacks();
+        }
+
+        private void notifyOnShardInitializedCallbacks() {
+            if(onShardInitializedSet.isEmpty()) {
+                return;
             }
 
-            runnablesOnInitialized.clear();
+            boolean ready = isShardReadyWithLeaderId();
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
+                        ready ? "ready" : "initialized", onShardInitializedSet.size());
+            }
+
+            Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
+            while(iter.hasNext()) {
+                OnShardInitialized onShardInitialized = iter.next();
+                if(!(onShardInitialized instanceof OnShardReady) || ready) {
+                    iter.remove();
+                    onShardInitialized.getTimeoutSchedule().cancel();
+                    onShardInitialized.getReplyRunnable().run();
+                }
+            }
         }
 
-        void addRunnableOnInitialized(Runnable runnable) {
-            runnablesOnInitialized.add(runnable);
+        void addOnShardInitialized(OnShardInitialized onShardInitialized) {
+            onShardInitializedSet.add(onShardInitialized);
         }
 
-        public void setRole(String newRole) {
-            this.role = newRole;
+        void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
+            onShardInitializedSet.remove(onShardInitialized);
         }
 
-        public String getRole(){
-            return this.role;
+        void setRole(String newRole) {
+            this.role = newRole;
+
+            notifyOnShardInitializedCallbacks();
         }
 
-        public void setFollowerSyncStatus(boolean syncStatus){
+        void setFollowerSyncStatus(boolean syncStatus){
             this.followerSyncStatus = syncStatus;
         }
 
-        public boolean isInSync(){
+        boolean isInSync(){
             if(RaftState.Follower.name().equals(this.role)){
                 return followerSyncStatus;
             } else if(RaftState.Leader.name().equals(this.role)){
@@ -656,6 +772,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return false;
         }
 
+        void setLeaderId(String leaderId) {
+            this.leaderId = leaderId;
+
+            notifyOnShardInitializedCallbacks();
+        }
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {
@@ -680,6 +801,57 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private static class OnShardInitialized {
+        private final Runnable replyRunnable;
+        private Cancellable timeoutSchedule;
+
+        OnShardInitialized(Runnable replyRunnable) {
+            this.replyRunnable = replyRunnable;
+        }
+
+        Runnable getReplyRunnable() {
+            return replyRunnable;
+        }
+
+        Cancellable getTimeoutSchedule() {
+            return timeoutSchedule;
+        }
+
+        void setTimeoutSchedule(Cancellable timeoutSchedule) {
+            this.timeoutSchedule = timeoutSchedule;
+        }
+    }
+
+    private static class OnShardReady extends OnShardInitialized {
+        OnShardReady(Runnable replyRunnable) {
+            super(replyRunnable);
+        }
+    }
+
+    private static class ShardNotInitializedTimeout {
+        private final ActorRef sender;
+        private final ShardInformation shardInfo;
+        private final OnShardInitialized onShardInitialized;
+
+        ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
+            this.sender = sender;
+            this.shardInfo = shardInfo;
+            this.onShardInitialized = onShardInitialized;
+        }
+
+        ActorRef getSender() {
+            return sender;
+        }
+
+        ShardInformation getShardInfo() {
+            return shardInfo;
+        }
+
+        OnShardInitialized getOnShardInitialized() {
+            return onShardInitialized;
+        }
+    }
+
     static class SchemaContextModules implements Serializable {
         private static final long serialVersionUID = -8884620101025936590L;
 
index 7f2f328..64f914b 100644 (file)
@@ -485,7 +485,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 @Override
                 public void onComplete(Throwable failure, ActorSelection primaryShard) {
                     if(failure != null) {
-                        newTxFutureCallback.onComplete(failure, null);
+                        newTxFutureCallback.createTransactionContext(failure, null);
                     } else {
                         newTxFutureCallback.setPrimaryShard(primaryShard);
                     }
@@ -566,7 +566,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
             if(transactionType == TransactionType.WRITE_ONLY &&
                     actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
-                LOG.debug("Tx {} Primary shard found - creating WRITE_ONLY transaction context", identifier);
+                LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
+                        identifier, primaryShard);
 
                 // For write-only Tx's we prepare the transaction modifications directly on the shard actor
                 // to avoid the overhead of creating a separate transaction actor.
@@ -612,7 +613,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Performs a CreateTransaction try async.
          */
         private void tryCreateTransaction() {
-            LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} Primary shard {} found - trying create transaction", identifier, primaryShard);
+            }
 
             Object serializedCreateMessage = new CreateTransaction(identifier.toString(),
                     TransactionProxy.this.transactionType.ordinal(),
@@ -645,6 +648,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
             }
 
+            createTransactionContext(failure, response);
+        }
+
+        private void createTransactionContext(Throwable failure, Object response) {
             // Mainly checking for state violation here to perform a volatile read of "initialized" to
             // ensure updates to operationLimter et al are visible to this thread (ie we're doing
             // "piggy-back" synchronization here).
index a34330b..d51d680 100644 (file)
@@ -18,22 +18,22 @@ public class FindPrimary implements SerializableMessage{
     public static final Class<FindPrimary> SERIALIZABLE_CLASS = FindPrimary.class;
 
     private final String shardName;
-    private final boolean waitUntilInitialized;
+    private final boolean waitUntilReady;
 
-    public FindPrimary(String shardName, boolean waitUntilInitialized){
+    public FindPrimary(String shardName, boolean waitUntilReady){
 
         Preconditions.checkNotNull(shardName, "shardName should not be null");
 
         this.shardName = shardName;
-        this.waitUntilInitialized = waitUntilInitialized;
+        this.waitUntilReady = waitUntilReady;
     }
 
     public String getShardName() {
         return shardName;
     }
 
-    public boolean isWaitUntilInitialized() {
-        return waitUntilInitialized;
+    public boolean isWaitUntilReady() {
+        return waitUntilReady;
     }
 
     @Override
@@ -44,4 +44,12 @@ public class FindPrimary implements SerializableMessage{
     public static FindPrimary fromSerializable(Object message){
         return (FindPrimary) message;
     }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("FindPrimary [shardName=").append(shardName).append(", waitUntilReady=").append(waitUntilReady)
+                .append("]");
+        return builder.toString();
+    }
 }
index 346519e..82f3649 100644 (file)
@@ -8,18 +8,17 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 
 public class PeerAddressResolved {
-    private final ShardIdentifier peerId;
+    private final String peerId;
     private final String peerAddress;
 
-    public PeerAddressResolved(ShardIdentifier peerId, String peerAddress) {
+    public PeerAddressResolved(String peerId, String peerAddress) {
         this.peerId = peerId;
         this.peerAddress = peerAddress;
     }
 
-    public ShardIdentifier getPeerId() {
+    public String getPeerId() {
         return peerId;
     }
 
index 0fb09d8..6f9bb7f 100644 (file)
@@ -17,6 +17,7 @@ import akka.actor.Address;
 import akka.actor.PoisonPill;
 import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
+import akka.dispatch.OnComplete;
 import akka.pattern.AskTimeoutException;
 import akka.util.Timeout;
 import com.codahale.metrics.JmxReporter;
@@ -35,6 +36,7 @@ import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
@@ -98,8 +100,9 @@ public class ActorContext {
     private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
     private final int transactionOutstandingOperationLimit;
     private Timeout transactionCommitOperationTimeout;
+    private Timeout shardInitializationTimeout;
     private final Dispatchers dispatchers;
-    private final Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
+    private Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
 
     private volatile SchemaContext schemaContext;
     private volatile boolean updated;
@@ -121,14 +124,6 @@ public class ActorContext {
         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
 
         setCachedProperties();
-        primaryShardActorSelectionCache = CacheBuilder.newBuilder()
-                .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
-                .build();
-
-        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
-        operationTimeout = new Timeout(operationDuration);
-        transactionCommitOperationTimeout =  new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
-                TimeUnit.SECONDS));
 
         Address selfAddress = clusterWrapper.getSelfAddress();
         if (selfAddress != null && !selfAddress.host().isEmpty()) {
@@ -150,6 +145,12 @@ public class ActorContext {
 
         transactionCommitOperationTimeout =  new Timeout(Duration.create(
                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
+
+        shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
+
+        primaryShardActorSelectionCache = CacheBuilder.newBuilder()
+                .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
+                .build();
     }
 
     public DatastoreContext getDatastoreContext() {
@@ -202,28 +203,13 @@ public class ActorContext {
         return schemaContext;
     }
 
-    /**
-     * Finds the primary shard for the given shard name
-     *
-     * @param shardName
-     * @return
-     */
-    public Optional<ActorSelection> findPrimaryShard(String shardName) {
-        String path = findPrimaryPathOrNull(shardName);
-        if (path == null){
-            return Optional.absent();
-        }
-        return Optional.of(actorSystem.actorSelection(path));
-    }
-
     public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
         Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
         if(ret != null){
             return ret;
         }
         Future<Object> future = executeOperationAsync(shardManager,
-                new FindPrimary(shardName, true).toSerializable(),
-                datastoreContext.getShardInitializationTimeout());
+                new FindPrimary(shardName, true).toSerializable(), shardInitializationTimeout);
 
         return future.transform(new Mapper<Object, ActorSelection>() {
             @Override
@@ -242,6 +228,8 @@ public class ActorContext {
                 } else if(response instanceof PrimaryNotFound) {
                     throw new PrimaryNotFoundException(
                             String.format("No primary shard found for %S.", shardName));
+                } else if(response instanceof NoShardLeaderException) {
+                    throw (NoShardLeaderException)response;
                 }
 
                 throw new UnknownMessageException(String.format(
@@ -277,7 +265,7 @@ public class ActorContext {
      */
     public Future<ActorRef> findLocalShardAsync( final String shardName) {
         Future<Object> future = executeOperationAsync(shardManager,
-                new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
+                new FindLocalShard(shardName, true), shardInitializationTimeout);
 
         return future.map(new Mapper<Object, ActorRef>() {
             @Override
@@ -301,26 +289,6 @@ public class ActorContext {
         }, getClientDispatcher());
     }
 
-    private String findPrimaryPathOrNull(String shardName) {
-        Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
-
-        if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
-            PrimaryFound found = PrimaryFound.fromSerializable(result);
-
-            LOG.debug("Primary found {}", found.getPrimaryPath());
-            return found.getPrimaryPath();
-
-        } else if (result.getClass().equals(ActorNotInitialized.class)){
-            throw new NotInitializedException(
-                String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
-            );
-
-        } else {
-            return null;
-        }
-    }
-
-
     /**
      * Executes an operation on a local actor and wait for it's response
      *
@@ -428,16 +396,21 @@ public class ActorContext {
      *
      * @param message
      */
-    public void broadcast(Object message){
-        for(String shardName : configuration.getAllShardNames()){
-
-            Optional<ActorSelection> primary = findPrimaryShard(shardName);
-            if (primary.isPresent()) {
-                primary.get().tell(message, ActorRef.noSender());
-            } else {
-                LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
-                        message.getClass().getSimpleName(), shardName);
-            }
+    public void broadcast(final Object message){
+        for(final String shardName : configuration.getAllShardNames()){
+
+            Future<ActorSelection> primaryFuture = findPrimaryShardAsync(shardName);
+            primaryFuture.onComplete(new OnComplete<ActorSelection>() {
+                @Override
+                public void onComplete(Throwable failure, ActorSelection primaryShard) {
+                    if(failure != null) {
+                        LOG.warn("broadcast failed to send message {} to shard {}:  {}",
+                                message.getClass().getSimpleName(), shardName, failure);
+                    } else {
+                        primaryShard.tell(message, ActorRef.noSender());
+                    }
+                }
+            }, getClientDispatcher());
         }
     }
 
index 8cafb46..378bc71 100644 (file)
@@ -87,7 +87,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{
     }
 
     protected Props newShardProps() {
-        return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+        return Shard.props(shardID, Collections.<String,String>emptyMap(),
                 newDatastoreContext(), SCHEMA_CONTEXT);
     }
 
@@ -102,7 +102,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{
         Creator<Shard> creator = new Creator<Shard>() {
             @Override
             public Shard create() throws Exception {
-                return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+                return new Shard(shardID, Collections.<String,String>emptyMap(),
                         newDatastoreContext(), SCHEMA_CONTEXT) {
                     @Override
                     protected void onRecoveryComplete() {
index a3c5eb4..fdc7e66 100644 (file)
@@ -147,8 +147,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
         }};
     }
 
-    @Test
-    public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{
+    private void testTransactionWritesWithShardNotInitiallyReady(final boolean writeOnly) throws Exception {
         new IntegrationTestKit(getSystem()) {{
             String testName = "testTransactionWritesWithShardNotInitiallyReady";
             String shardName = "test-1";
@@ -163,8 +162,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
             // Create the write Tx
 
-            // TODO - we'll want to test this with write-only as well when FindPrimary returns the leader shard.
-            final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction();
+            final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
+                    dataStore.newReadWriteTransaction();
             assertNotNull("newReadWriteTransaction returned null", writeTx);
 
             // Do some modification operations and ready the Tx on a separate thread.
@@ -240,7 +239,18 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
     }
 
     @Test
-    public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{
+    public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
+        datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+        testTransactionWritesWithShardNotInitiallyReady(true);
+    }
+
+    @Test
+    public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
+        testTransactionWritesWithShardNotInitiallyReady(false);
+    }
+
+    @Test
+    public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
         new IntegrationTestKit(getSystem()) {{
             String testName = "testTransactionReadsWithShardNotInitiallyReady";
             String shardName = "test-1";
@@ -455,8 +465,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
         }};
     }
 
-    @Test(expected=NoShardLeaderException.class)
-    public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{
+    private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
         new IntegrationTestKit(getSystem()) {{
             String testName = "testTransactionCommitFailureWithNoShardLeader";
             String shardName = "test-1";
@@ -465,6 +474,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
             // by setting the election timeout, which is based on the heartbeat interval, really high.
 
             datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
+            datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
 
             // Set the leader election timeout low for the test.
 
@@ -474,7 +484,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
             // Create the write Tx.
 
-            final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction();
+            final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
+                dataStore.newReadWriteTransaction();
             assertNotNull("newReadWriteTransaction returned null", writeTx);
 
             // Do some modifications and ready the Tx on a separate thread.
@@ -523,6 +534,17 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
         }};
     }
 
+    @Test(expected=NoShardLeaderException.class)
+    public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
+        datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+        testTransactionCommitFailureWithNoShardLeader(true);
+    }
+
+    @Test(expected=NoShardLeaderException.class)
+    public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
+        testTransactionCommitFailureWithNoShardLeader(false);
+    }
+
     @Test
     public void testTransactionAbort() throws Exception{
         System.setProperty("shard.persistent", "true");
index 9941707..ae7a4f9 100644 (file)
@@ -34,6 +34,7 @@ import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
@@ -44,9 +45,11 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
@@ -56,6 +59,7 @@ import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 public class ShardManagerTest extends AbstractActorTest {
     private static int ID_COUNTER = 1;
@@ -66,7 +70,10 @@ public class ShardManagerTest extends AbstractActorTest {
     @Mock
     private static CountDownLatch ready;
 
-    private static ActorRef mockShardActor;
+    private static TestActorRef<MessageCollectorActor> mockShardActor;
+
+    private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
+            dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
 
     @Before
     public void setUp() {
@@ -75,9 +82,11 @@ public class ShardManagerTest extends AbstractActorTest {
         InMemoryJournal.clear();
 
         if(mockShardActor == null) {
-            String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
-            mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
+            String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
+            mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name);
         }
+
+        mockShardActor.underlyingActor().clear();
     }
 
     @After
@@ -86,44 +95,93 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     private Props newShardMgrProps() {
-        DatastoreContext.Builder builder = DatastoreContext.newBuilder();
-        builder.dataStoreType(shardMrgIDSuffix);
         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
-                builder.build(), ready);
+                datastoreContextBuilder.build(), ready);
+    }
+
+    private Props newPropsShardMgrWithMockShardActor() {
+        Creator<ShardManager> creator = new Creator<ShardManager>() {
+            private static final long serialVersionUID = 1L;
+            @Override
+            public ShardManager create() throws Exception {
+                return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
+                        datastoreContextBuilder.build(), ready) {
+                    @Override
+                    protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+                        return mockShardActor;
+                    }
+                };
+            }
+        };
+
+        return Props.create(new DelegatingShardManagerCreator(creator));
     }
 
     @Test
     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
             shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
 
-            expectMsgEquals(duration("5 seconds"),
-                    new PrimaryNotFound("non-existent").toSerializable());
+            expectMsgEquals(duration("5 seconds"), new PrimaryNotFound("non-existent").toSerializable());
         }};
     }
 
     @Test
-    public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
+    public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+            String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
+            shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef());
+
+            MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
+            shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
+                    RaftState.Leader.name())), mockShardActor);
+
             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
 
-            expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
+                    primaryFound.getPrimaryPath().contains("member-1-shard-default"));
         }};
     }
 
     @Test
-    public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
+    public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+
+            String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+            MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
+
+            String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager.tell(new RoleChangeNotification(memberId1,
+                    RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+            shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
+
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
+                    primaryFound.getPrimaryPath().contains("member-2-shard-default"));
+        }};
+    }
+
+    @Test
+    public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
 
@@ -132,28 +190,129 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
+    public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+
+            expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+        }};
+    }
+
+    @Test
+    public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+
+            String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager.tell(new RoleChangeNotification(memberId,
+                    RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+
+            expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+
+            shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
+
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
+                    primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+        }};
+    }
+
+    @Test
+    public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
-            // delayed until we send ActorInitialized.
-            Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
-                    new Timeout(5, TimeUnit.SECONDS));
+            // delayed until we send ActorInitialized and RoleChangeNotification.
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+
+            expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
 
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
-            Object resp = Await.result(future, duration("5 seconds"));
-            assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
+            expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+
+            String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager.tell(new RoleChangeNotification(memberId,
+                    RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
+
+            expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+
+            shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
+
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
+                    primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+
+            expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
+        }};
+    }
+
+    @Test
+    public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+
+            expectMsgClass(duration("2 seconds"), ActorNotInitialized.class);
+
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+
+            expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
+        }};
+    }
+
+    @Test
+    public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+            shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
+                    null, RaftState.Candidate.name()), mockShardActor);
+
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+
+            expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
+        }};
+    }
+
+    @Test
+    public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+
+            expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
         }};
     }
 
     @Test
     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
@@ -168,7 +327,7 @@ public class ShardManagerTest extends AbstractActorTest {
     @Test
     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), mockShardActor);
@@ -185,7 +344,7 @@ public class ShardManagerTest extends AbstractActorTest {
     @Test
     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
 
@@ -196,7 +355,7 @@ public class ShardManagerTest extends AbstractActorTest {
     @Test
     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
@@ -439,14 +598,11 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRoleChangeNotificationReleaseReady() throws Exception {
         new JavaTestKit(getSystem()) {
             {
-                final Props persistentProps = ShardManager.props(
-                        new MockClusterWrapper(),
-                        new MockConfiguration(),
-                        DatastoreContext.newBuilder().persistent(true).build(), ready);
-                final TestActorRef<ShardManager> shardManager =
-                        TestActorRef.create(getSystem(), persistentProps);
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
 
-                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
+                String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+                        memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
 
                 verify(ready, times(1)).countDown();
 
@@ -457,14 +613,10 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
         new JavaTestKit(getSystem()) {
             {
-                final Props persistentProps = ShardManager.props(
-                        new MockClusterWrapper(),
-                        new MockConfiguration(),
-                        DatastoreContext.newBuilder().persistent(true).build(), ready);
-                final TestActorRef<ShardManager> shardManager =
-                        TestActorRef.create(getSystem(), persistentProps);
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
 
-                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
+                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+                        "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
 
                 verify(ready, never()).countDown();
 
index 0fbe686..adc7f47 100644 (file)
@@ -41,7 +41,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@@ -159,7 +158,7 @@ public class ShardTest extends AbstractShardTest {
 
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+                    return new Shard(shardID, Collections.<String,String>emptyMap(),
                             newDatastoreContext(), SCHEMA_CONTEXT) {
                         @Override
                         public void onReceiveCommand(final Object message) throws Exception {
@@ -287,7 +286,7 @@ public class ShardTest extends AbstractShardTest {
             final CountDownLatch recoveryComplete = new CountDownLatch(1);
             class TestShard extends Shard {
                 TestShard() {
-                    super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
+                    super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
                             newDatastoreContext(), SCHEMA_CONTEXT);
                 }
 
@@ -318,7 +317,7 @@ public class ShardTest extends AbstractShardTest {
                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
 
             String address = "akka://foobar";
-            shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
+            shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
 
             assertEquals("getPeerAddresses", address,
                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
@@ -763,7 +762,7 @@ public class ShardTest extends AbstractShardTest {
             Creator<Shard> creator = new Creator<Shard>() {
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+                    return new Shard(shardID, Collections.<String,String>emptyMap(),
                             newDatastoreContext(), SCHEMA_CONTEXT) {
                         @Override
                         protected boolean isLeader() {
@@ -1427,7 +1426,7 @@ public class ShardTest extends AbstractShardTest {
             Creator<Shard> creator = new Creator<Shard>() {
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+                    return new Shard(shardID, Collections.<String,String>emptyMap(),
                             newDatastoreContext(), SCHEMA_CONTEXT) {
 
                         DelegatingPersistentDataProvider delegating;
@@ -1528,13 +1527,13 @@ public class ShardTest extends AbstractShardTest {
         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
 
-        final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+        final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
                 persistentContext, SCHEMA_CONTEXT);
 
         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
 
-        final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+        final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
                 nonPersistentContext, SCHEMA_CONTEXT);
 
         new ShardTestKit(getSystem()) {{
index 09a4532..c3fef61 100644 (file)
@@ -60,7 +60,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
     }
 
     private ActorRef createShard(){
-        return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.<ShardIdentifier, String>emptyMap(), datastoreContext,
+        return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.<String, String>emptyMap(), datastoreContext,
                 TestModel.createTestContext()));
     }
 
index 8ebb145..e63ace3 100644 (file)
@@ -79,7 +79,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     private ActorRef createShard(){
         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-            Collections.<ShardIdentifier, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
+            Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
     }
 
     private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
index ac2c079..265ec59 100644 (file)
@@ -33,6 +33,8 @@ import org.junit.Test;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@@ -659,11 +661,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         verifyCohortFutures(proxy, TestException.class);
     }
 
-    @Test
-    public void testReadyWithInitialCreateTransactionFailure() throws Exception {
-
-        doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
-                mockActorContext).findPrimaryShardAsync(anyString());
+    private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
+        doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
@@ -681,7 +680,22 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        verifyCohortFutures(proxy, PrimaryNotFoundException.class);
+        verifyCohortFutures(proxy, toThrow.getClass());
+    }
+
+    @Test
+    public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
+        testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
+    }
+
+    @Test
+    public void testWriteOnlyTxWithNotInitializedException() throws Exception {
+        testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
+    }
+
+    @Test
+    public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
+        testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
     }
 
     @Test
index 2a29d2c..e206e69 100644 (file)
@@ -59,7 +59,7 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc
             SchemaContext schemaContext = TestModel.createTestContext();
             Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
                     shardName("inventory").type("config").build(),
-                    Collections.<ShardIdentifier,String>emptyMap(),
+                    Collections.<String,String>emptyMap(),
                     DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
                     schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
 
@@ -133,7 +133,7 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc
             SchemaContext schemaContext = TestModel.createTestContext();
             Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
                     shardName("inventory").type("config").build(),
-                    Collections.<ShardIdentifier,String>emptyMap(),
+                    Collections.<String,String>emptyMap(),
                     DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
                     schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
 
index 6bd732e..2746bcf 100644 (file)
@@ -20,8 +20,12 @@ import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
+import java.util.Arrays;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.time.StopWatch;
 import org.junit.Assert;
@@ -30,14 +34,18 @@ import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
@@ -45,10 +53,16 @@ import scala.concurrent.duration.FiniteDuration;
 
 public class ActorContextTest extends AbstractActorTest{
 
+    static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
+
+    private static class TestMessage {
+    }
+
     private static class MockShardManager extends UntypedActor {
 
         private final boolean found;
         private final ActorRef actorRef;
+        private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
 
         private MockShardManager(boolean found, ActorRef actorRef){
 
@@ -57,6 +71,18 @@ public class ActorContextTest extends AbstractActorTest{
         }
 
         @Override public void onReceive(Object message) throws Exception {
+            if(message instanceof FindPrimary) {
+                FindPrimary fp = (FindPrimary)message;
+                Object resp = findPrimaryResponses.get(fp.getShardName());
+                if(resp == null) {
+                    log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
+                } else {
+                    getSender().tell(resp, getSelf());
+                }
+
+                return;
+            }
+
             if(found){
                 getSender().tell(new LocalShardFound(actorRef), getSelf());
             } else {
@@ -64,15 +90,28 @@ public class ActorContextTest extends AbstractActorTest{
             }
         }
 
+        void addFindPrimaryResp(String shardName, Object resp) {
+            findPrimaryResponses.put(shardName, resp);
+        }
+
         private static Props props(final boolean found, final ActorRef actorRef){
             return Props.create(new MockShardManagerCreator(found, actorRef) );
         }
 
+        private static Props props(){
+            return Props.create(new MockShardManagerCreator() );
+        }
+
         @SuppressWarnings("serial")
         private static class MockShardManagerCreator implements Creator<MockShardManager> {
             final boolean found;
             final ActorRef actorRef;
 
+            MockShardManagerCreator() {
+                this.found = false;
+                this.actorRef = null;
+            }
+
             MockShardManagerCreator(boolean found, ActorRef actorRef) {
                 this.found = found;
                 this.actorRef = actorRef;
@@ -287,18 +326,15 @@ public class ActorContextTest extends AbstractActorTest{
 
     @Test
     public void testRateLimiting(){
-        DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
-        doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
-        doReturn("config").when(mockDataStoreContext).getDataStoreType();
-        doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+                transactionCreationInitialRateLimit(155L).build();
 
         ActorContext actorContext =
                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
-                        mock(Configuration.class), mockDataStoreContext);
+                        mock(Configuration.class), dataStoreContext);
 
         // Check that the initial value is being picked up from DataStoreContext
-        assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
+        assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
 
         actorContext.setTxCreationLimit(1.0);
 
@@ -320,16 +356,9 @@ public class ActorContextTest extends AbstractActorTest{
 
     @Test
     public void testClientDispatcherIsGlobalDispatcher(){
-
-        DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
-        doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
-        doReturn("config").when(mockDataStoreContext).getDataStoreType();
-        doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
-
         ActorContext actorContext =
                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
-                        mock(Configuration.class), mockDataStoreContext);
+                        mock(Configuration.class), DatastoreContext.newBuilder().build());
 
         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
 
@@ -337,18 +366,11 @@ public class ActorContextTest extends AbstractActorTest{
 
     @Test
     public void testClientDispatcherIsNotGlobalDispatcher(){
-
-        DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
-        doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
-        doReturn("config").when(mockDataStoreContext).getDataStoreType();
-        doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
-
         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
 
         ActorContext actorContext =
                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
-                        mock(Configuration.class), mockDataStoreContext);
+                        mock(Configuration.class), DatastoreContext.newBuilder().build());
 
         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
 
@@ -388,15 +410,12 @@ public class ActorContextTest extends AbstractActorTest{
             TestActorRef<MessageCollectorActor> shardManager =
                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
 
-            DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
-            doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
-            doReturn("config").when(mockDataStoreContext).getDataStoreType();
-            doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+            DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
 
             ActorContext actorContext =
                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), mockDataStoreContext) {
+                            mock(Configuration.class), dataStoreContext) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
                             return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
@@ -431,15 +450,12 @@ public class ActorContextTest extends AbstractActorTest{
             TestActorRef<MessageCollectorActor> shardManager =
                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
 
-            DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
-            doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
-            doReturn("config").when(mockDataStoreContext).getDataStoreType();
-            doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+            DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
 
             ActorContext actorContext =
                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), mockDataStoreContext) {
+                            mock(Configuration.class), dataStoreContext) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
                             return Futures.successful((Object) new PrimaryNotFound("foobar"));
@@ -459,7 +475,6 @@ public class ActorContextTest extends AbstractActorTest{
             Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
 
             assertNull(cached);
-
     }
 
     @Test
@@ -468,15 +483,12 @@ public class ActorContextTest extends AbstractActorTest{
             TestActorRef<MessageCollectorActor> shardManager =
                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
 
-            DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
-            doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
-            doReturn("config").when(mockDataStoreContext).getDataStoreType();
-            doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+            DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
 
             ActorContext actorContext =
                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), mockDataStoreContext) {
+                            mock(Configuration.class), dataStoreContext) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
                             return Futures.successful((Object) new ActorNotInitialized());
@@ -496,7 +508,49 @@ public class ActorContextTest extends AbstractActorTest{
             Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
 
             assertNull(cached);
+    }
+
+    @Test
+    public void testBroadcast() {
+        new JavaTestKit(getSystem()) {{
+            ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+            ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+            TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
+            MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
+            shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()).toSerializable());
+            shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()).toSerializable());
+            shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
+
+            Configuration mockConfig = mock(Configuration.class);
+            doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
+                    when(mockConfig).getAllShardNames();
 
+            ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+                    mock(ClusterWrapper.class), mockConfig,
+                    DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
+
+            actorContext.broadcast(new TestMessage());
+
+            expectFirstMatching(shardActorRef1, TestMessage.class);
+            expectFirstMatching(shardActorRef2, TestMessage.class);
+        }};
     }
 
+    private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
+        int count = 5000 / 50;
+        for(int i = 0; i < count; i++) {
+            try {
+                T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
+                if(message != null) {
+                    return message;
+                }
+            } catch (Exception e) {}
+
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
+
+        Assert.fail("Did not receive message of type " + clazz);
+        return null;
+    }
 }
index 4bd0ad8..d62c9db 100644 (file)
@@ -10,13 +10,14 @@ package org.opendaylight.controller.cluster.datastore.utils;
 
 import akka.actor.ActorRef;
 import akka.actor.UntypedActor;
-
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
@@ -31,7 +32,7 @@ import scala.concurrent.duration.FiniteDuration;
  * </p>
  */
 public class MessageCollectorActor extends UntypedActor {
-    private List<Object> messages = new ArrayList<>();
+    private final List<Object> messages = new ArrayList<>();
 
     @Override public void onReceive(Object message) throws Exception {
         if(message instanceof String){
@@ -43,6 +44,10 @@ public class MessageCollectorActor extends UntypedActor {
         }
     }
 
+    public void clear() {
+        messages.clear();
+    }
+
     public static List<Object> getAllMessages(ActorRef actor) throws Exception {
         FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
         Timeout operationTimeout = new Timeout(operationDuration);
@@ -87,4 +92,20 @@ public class MessageCollectorActor extends UntypedActor {
         return output;
     }
 
+    public static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
+        int count = 5000 / 50;
+        for(int i = 0; i < count; i++) {
+            try {
+                T message = (T) getFirstMatching(actor, clazz);
+                if(message != null) {
+                    return message;
+                }
+            } catch (Exception e) {}
+
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
+
+        Assert.fail("Did not receive message of type " + clazz);
+        return null;
+    }
 }
index 81b6bcc..63878df 100644 (file)
@@ -12,7 +12,6 @@ import static org.junit.Assert.assertNotNull;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
-import com.google.common.base.Optional;
 
 public class MockActorContext extends ActorContext {
 
@@ -36,10 +35,6 @@ public class MockActorContext extends ActorContext {
         return executeRemoteOperationResponse;
     }
 
-    @Override public Optional<ActorSelection> findPrimaryShard(String shardName) {
-        return Optional.absent();
-    }
-
     public void setExecuteShardOperationResponse(Object response){
         executeShardOperationResponse = response;
     }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.