Merge "Remove redundant features: odl-adsal-controller-northbound and odl-nsf-control...
authorEd Warnicke <eaw@cisco.com>
Wed, 25 Mar 2015 17:04:39 +0000 (17:04 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 25 Mar 2015 17:04:40 +0000 (17:04 +0000)
23 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java

index 9faffb9395dcdb98d02951e7e9530d1ac602b3f5..aa72485187cc9143fbcf6eac5f1adb0b7815b27e 100644 (file)
@@ -17,6 +17,7 @@ import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
@@ -30,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
@@ -128,6 +130,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private int currentRecoveryBatchCount;
 
+    private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
+
     public RaftActor(String id, Map<String, String> peerAddresses) {
         this(id, peerAddresses, Optional.<ConfigParams>absent());
     }
@@ -306,9 +310,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
-        RaftActorBehavior oldBehavior = currentBehavior;
+        reusableBehaviorStateHolder.init(currentBehavior);
         currentBehavior = newBehavior;
-        handleBehaviorChange(oldBehavior, currentBehavior);
+        handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
     }
 
     @Override public void handleCommand(Object message) {
@@ -396,10 +400,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if(message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
         } else {
-            RaftActorBehavior oldBehavior = currentBehavior;
+            reusableBehaviorStateHolder.init(currentBehavior);
+
             currentBehavior = currentBehavior.handleMessage(getSender(), message);
 
-            handleBehaviorChange(oldBehavior, currentBehavior);
+            handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
         }
     }
 
@@ -446,22 +451,30 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     }
 
-    private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
+    private void handleBehaviorChange(BehaviorStateHolder oldBehaviorState, RaftActorBehavior currentBehavior) {
+        RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
+
         if (oldBehavior != currentBehavior){
             onStateChanged();
         }
 
-        String oldBehaviorLeaderId = oldBehavior == null? null : oldBehavior.getLeaderId();
-        String oldBehaviorState = oldBehavior == null? null : oldBehavior.state().name();
+        String oldBehaviorLeaderId = oldBehavior == null ? null : oldBehaviorState.getLeaderId();
+        String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name();
 
         // it can happen that the state has not changed but the leader has changed.
-        onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
+        Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
+        if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId())) {
+            if(roleChangeNotifier.isPresent()) {
+                roleChangeNotifier.get().tell(new LeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf());
+            }
 
-        if (getRoleChangeNotifier().isPresent() &&
+            onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
+        }
+
+        if (roleChangeNotifier.isPresent() &&
                 (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
-            getRoleChangeNotifier().get().tell(
-                    new RoleChanged(getId(), oldBehaviorState , currentBehavior.state().name()),
-                    getSelf());
+            roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
+                    currentBehavior.state().name()), getSelf());
         }
     }
 
@@ -1051,4 +1064,21 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return currentBehavior;
     }
 
+    private static class BehaviorStateHolder {
+        private RaftActorBehavior behavior;
+        private String leaderId;
+
+        void init(RaftActorBehavior behavior) {
+            this.behavior = behavior;
+            this.leaderId = behavior != null ? behavior.getLeaderId() : null;
+        }
+
+        RaftActorBehavior getBehavior() {
+            return behavior;
+        }
+
+        String getLeaderId() {
+            return leaderId;
+        }
+    }
 }
index b192b7cd242918a6735a85ad5a22e913493578ac..34932c7249a38f753856cdb99addf72d221f0722 100644 (file)
@@ -54,6 +54,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
@@ -64,6 +65,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotRep
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -944,7 +946,8 @@ public class RaftActorTest extends AbstractActorTest {
     @Test
     public void testRaftRoleChangeNotifier() throws Exception {
         new JavaTestKit(getSystem()) {{
-            ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
+            TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
+                    Props.create(MessageCollectorActor.class));
             MessageCollectorActor.waitUntilReady(notifierActor);
 
             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
@@ -954,20 +957,10 @@ public class RaftActorTest extends AbstractActorTest {
 
             String persistenceId = factory.generateActorId("notifier-");
 
-            factory.createTestActor(MockRaftActor.props(persistenceId,
+            TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
 
-            List<RoleChanged> matches =  null;
-            for(int i = 0; i < 5000 / heartBeatInterval; i++) {
-                matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
-                assertNotNull(matches);
-                if(matches.size() == 3) {
-                    break;
-                }
-                Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
-            }
-
-            assertEquals(3, matches.size());
+            List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
 
             // check if the notifier got a role change from null to Follower
             RoleChanged raftRoleChanged = matches.get(0);
@@ -986,6 +979,41 @@ public class RaftActorTest extends AbstractActorTest {
             assertEquals(persistenceId, raftRoleChanged.getMemberId());
             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
+
+            LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
+                    notifierActor, LeaderStateChanged.class);
+
+            assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
+
+            notifierActor.underlyingActor().clear();
+
+            MockRaftActor raftActor = raftActorRef.underlyingActor();
+            final String newLeaderId = "new-leader";
+            Follower follower = new Follower(raftActor.getRaftActorContext()) {
+                @Override
+                public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+                    leaderId = newLeaderId;
+                    return this;
+                }
+            };
+
+            raftActor.changeCurrentBehavior(follower);
+
+            leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+            assertEquals(persistenceId, leaderStateChange.getMemberId());
+            assertEquals(null, leaderStateChange.getLeaderId());
+
+            raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
+            assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
+            assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+
+            notifierActor.underlyingActor().clear();
+
+            raftActor.handleCommand("any");
+
+            leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+            assertEquals(persistenceId, leaderStateChange.getMemberId());
+            assertEquals(newLeaderId, leaderStateChange.getLeaderId());
         }};
     }
 
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java
new file mode 100644 (file)
index 0000000..ec35b03
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * A message initiated internally from the RaftActor when some state of a leader has changed
+ *
+ * @author Thomas Pantelis
+ */
+public class LeaderStateChanged implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String memberId;
+    private final String leaderId;
+
+    public LeaderStateChanged(String memberId, String leaderId) {
+        this.memberId = memberId;
+        this.leaderId = leaderId;
+    }
+
+    public String getMemberId() {
+        return memberId;
+    }
+
+    public String getLeaderId() {
+        return leaderId;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("LeaderStateChanged [memberId=").append(memberId).append(", leaderId=").append(leaderId)
+                .append("]");
+        return builder.toString();
+    }
+}
index d065f6d211be3ecba0bfc1364eb0693e68e42679..598dfb1fe827fe8b03234003dc5d0bdf7c750223 100644 (file)
@@ -17,16 +17,17 @@ import java.util.Map;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 
 /**
- * The RoleChangeNotifier is responsible for receiving Raft role change messages and notifying
+ * The RoleChangeNotifier is responsible for receiving Raft role and leader state change messages and notifying
  * the listeners (within the same node), which are registered with it.
  * <p/>
  * The RoleChangeNotifier is instantiated by the Shard and injected into the RaftActor.
  */
 public class RoleChangeNotifier extends AbstractUntypedActor implements AutoCloseable {
 
-    private String memberId;
-    private Map<ActorPath, ActorRef> registeredListeners = Maps.newHashMap();
+    private final String memberId;
+    private final Map<ActorPath, ActorRef> registeredListeners = Maps.newHashMap();
     private RoleChangeNotification latestRoleChangeNotification = null;
+    private LeaderStateChanged latestLeaderStateChanged;
 
     public RoleChangeNotifier(String memberId) {
         this.memberId = memberId;
@@ -62,6 +63,10 @@ public class RoleChangeNotifier extends AbstractUntypedActor implements AutoClos
 
             getSender().tell(new RegisterRoleChangeListenerReply(), getSelf());
 
+            if(latestLeaderStateChanged != null) {
+                getSender().tell(latestLeaderStateChanged, getSelf());
+            }
+
             if (latestRoleChangeNotification != null) {
                 getSender().tell(latestRoleChangeNotification, getSelf());
             }
@@ -81,6 +86,12 @@ public class RoleChangeNotifier extends AbstractUntypedActor implements AutoClos
             for (ActorRef listener: registeredListeners.values()) {
                 listener.tell(latestRoleChangeNotification, getSelf());
             }
+        } else if (message instanceof LeaderStateChanged) {
+            latestLeaderStateChanged = (LeaderStateChanged)message;
+
+            for (ActorRef listener: registeredListeners.values()) {
+                listener.tell(latestLeaderStateChanged, getSelf());
+            }
         }
     }
 
index 99bc9de6a23f0a7c637ff1f6c168048f257ef857..66467af1303d9f86e043bd16c5cf604800a72410 100644 (file)
@@ -146,10 +146,9 @@ public class Shard extends RaftActor {
 
     private final String txnDispatcherPath;
 
-    protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+    protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
             final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
-        super(name.toString(), mapPeerAddresses(peerAddresses),
-                Optional.of(datastoreContext.getShardRaftConfig()));
+        super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
 
         this.name = name.toString();
         this.datastoreContext = datastoreContext;
@@ -198,20 +197,8 @@ public class Shard extends RaftActor {
                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
     }
 
-    private static Map<String, String> mapPeerAddresses(
-        final Map<ShardIdentifier, String> peerAddresses) {
-        Map<String, String> map = new HashMap<>();
-
-        for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
-            .entrySet()) {
-            map.put(entry.getKey().toString(), entry.getValue());
-        }
-
-        return map;
-    }
-
     public static Props props(final ShardIdentifier name,
-        final Map<ShardIdentifier, String> peerAddresses,
+        final Map<String, String> peerAddresses,
         final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
         Preconditions.checkNotNull(name, "name should not be null");
         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
@@ -950,11 +937,11 @@ public class Shard extends RaftActor {
         private static final long serialVersionUID = 1L;
 
         final ShardIdentifier name;
-        final Map<ShardIdentifier, String> peerAddresses;
+        final Map<String, String> peerAddresses;
         final DatastoreContext datastoreContext;
         final SchemaContext schemaContext;
 
-        ShardCreator(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+        ShardCreator(final ShardIdentifier name, final Map<String, String> peerAddresses,
                 final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
             this.name = name;
             this.peerAddresses = peerAddresses;
index 136c6813eaba9d7d116b4ba0b4609bbd4848fb13..bc4c825351cc72148f5276fc28d5a94e2e64f79d 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.Address;
+import akka.actor.Cancellable;
 import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
@@ -20,24 +21,28 @@ import akka.japi.Function;
 import akka.japi.Procedure;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.RecoveryFailure;
+import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
@@ -53,6 +58,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -74,7 +80,7 @@ import scala.concurrent.duration.Duration;
  */
 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
-    private final Logger LOG = LoggerFactory.getLogger(getClass());
+    private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
 
     // Stores a mapping between a member name and the address of the member
     // Member names look like "member-1", "member-2" etc and are as specified
@@ -172,15 +178,45 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onRoleChangeNotification((RoleChangeNotification) message);
         } else if(message instanceof FollowerInitialSyncUpStatus){
             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
-        } else{
+        } else if(message instanceof ShardNotInitializedTimeout) {
+            onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
+        } else if(message instanceof LeaderStateChanged) {
+            onLeaderStateChanged((LeaderStateChanged)message);
+        } else {
             unknownMessage(message);
         }
 
     }
 
+    private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) {
+        LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
+
+        ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
+        if(shardInformation != null) {
+            shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
+        } else {
+            LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
+        }
+    }
+
+    private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
+        ShardInformation shardInfo = message.getShardInfo();
+
+        LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
+                shardInfo.getShardId());
+
+        shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
+
+        if(!shardInfo.isShardInitialized()) {
+            message.getSender().tell(new ActorNotInitialized(), getSelf());
+        } else {
+            message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
+        }
+    }
+
     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
-        LOG.info("Received follower initial sync status for {} status sync done {}", status.getName(),
-                status.isInitialSyncDone());
+        LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
+                status.getName(), status.isInitialSyncDone());
 
         ShardInformation shardInformation = findShardInformation(status.getName());
 
@@ -193,7 +229,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
-        LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(),
+        LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
                 roleChanged.getOldRole(), roleChanged.getNewRole());
 
         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
@@ -201,8 +237,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             shardInformation.setRole(roleChanged.getNewRole());
 
             if (isReady()) {
-                LOG.info("All Shards are ready - data store {} is ready, available count is {}", type,
-                        waitTillReadyCountdownLatch.getCount());
+                LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+                        persistenceId(), type, waitTillReadyCountdownLatch.getCount());
 
                 waitTillReadyCountdownLatch.countDown();
             }
@@ -225,7 +261,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private boolean isReady() {
         boolean isReady = true;
         for (ShardInformation info : localShards.values()) {
-            if(RaftState.Candidate.name().equals(info.getRole()) || Strings.isNullOrEmpty(info.getRole())){
+            if(!info.isShardReady()){
                 isReady = false;
                 break;
             }
@@ -256,14 +292,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         if (shardId.getShardName() == null) {
             return;
         }
+
         markShardAsInitialized(shardId.getShardName());
     }
 
     private void markShardAsInitialized(String shardName) {
         LOG.debug("Initializing shard [{}]", shardName);
+
         ShardInformation shardInformation = localShards.get(shardName);
         if (shardInformation != null) {
             shardInformation.setActorInitialized();
+
+            shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
         }
     }
 
@@ -300,7 +340,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
-        sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
+        sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
             @Override
             public Object get() {
                 return new LocalShardFound(shardInformation.getActor());
@@ -308,20 +348,36 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         });
     }
 
-    private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
-            final Supplier<Object> messageSupplier) {
-        if (!shardInformation.isShardInitialized()) {
-            if(waitUntilInitialized) {
+    private void sendResponse(ShardInformation shardInformation, boolean doWait,
+            boolean wantShardReady, final Supplier<Object> messageSupplier) {
+        if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
+            if(doWait) {
                 final ActorRef sender = getSender();
                 final ActorRef self = self();
-                shardInformation.addRunnableOnInitialized(new Runnable() {
+
+                Runnable replyRunnable = new Runnable() {
                     @Override
                     public void run() {
                         sender.tell(messageSupplier.get(), self);
                     }
-                });
-            } else {
+                };
+
+                OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
+                    new OnShardInitialized(replyRunnable);
+
+                shardInformation.addOnShardInitialized(onShardInitialized);
+
+                Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
+                        datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
+                        new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
+                        getContext().dispatcher(), getSelf());
+
+                onShardInitialized.setTimeoutSchedule(timeoutSchedule);
+
+            } else if (!shardInformation.isShardInitialized()) {
                 getSender().tell(new ActorNotInitialized(), getSelf());
+            } else {
+                getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
             }
 
             return;
@@ -330,6 +386,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         getSender().tell(messageSupplier.get(), getSelf());
     }
 
+    private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
+        return new NoShardLeaderException(String.format(
+                "Could not find a leader for shard %s. This typically happens when the system is coming up or " +
+                "recovering and a leader is being elected. Try again later.", shardId));
+    }
+
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
         memberNameToAddress.remove(message.member().roles().head());
     }
@@ -341,8 +403,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         for(ShardInformation info : localShards.values()){
             String shardName = info.getShardName();
-            info.updatePeerAddress(getShardIdentifier(memberName, shardName),
-                getShardActorPath(shardName, memberName));
+            info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(),
+                getShardActorPath(shardName, memberName), getSelf());
         }
     }
 
@@ -384,13 +446,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     LOG.debug("Sending new SchemaContext to Shards");
                     for (ShardInformation info : localShards.values()) {
                         if (info.getActor() == null) {
-                            info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
-                                    info.getPeerAddresses(), datastoreContext, schemaContext)
-                                            .withDispatcher(shardDispatcherPath), info.getShardId().toString()));
+                            info.setActor(newShardActor(schemaContext, info));
                         } else {
                             info.getActor().tell(message, getSelf());
                         }
-                        info.getActor().tell(new RegisterRoleChangeListener(), self());
                     }
                 }
 
@@ -402,16 +461,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     }
 
+    @VisibleForTesting
+    protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
+        return getContext().actorOf(Shard.props(info.getShardId(),
+                info.getPeerAddresses(), datastoreContext, schemaContext)
+                        .withDispatcher(shardDispatcherPath), info.getShardId().toString());
+    }
+
     private void findPrimary(FindPrimary message) {
-        String shardName = message.getShardName();
+        final String shardName = message.getShardName();
 
         // First see if the there is a local replica for the shard
         final ShardInformation info = localShards.get(shardName);
         if (info != null) {
-            sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
+            sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
                 @Override
                 public Object get() {
-                    return new PrimaryFound(info.getActorPath().toString()).toSerializable();
+                    Object found = new PrimaryFound(info.getSerializedLeaderActor()).toSerializable();
+
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("{}: Found primary for {}: {}", shardName, found);
+                    }
+
+                    return found;
                 }
             });
 
@@ -481,7 +553,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         List<String> localShardActorNames = new ArrayList<>();
         for(String shardName : memberShardNames){
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
-            Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
+            Map<String, String> peerAddresses = getPeerAddresses(shardName);
             localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
         }
@@ -496,22 +568,19 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @param shardName
      * @return
      */
-    private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
+    private Map<String, String> getPeerAddresses(String shardName){
 
-        Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
+        Map<String, String> peerAddresses = new HashMap<>();
 
-        List<String> members =
-            this.configuration.getMembersFromShardName(shardName);
+        List<String> members = this.configuration.getMembersFromShardName(shardName);
 
         String currentMemberName = this.cluster.getCurrentMemberName();
 
         for(String memberName : members){
             if(!currentMemberName.equals(memberName)){
-                ShardIdentifier shardId = getShardIdentifier(memberName,
-                    shardName);
-                String path =
-                    getShardActorPath(shardName, currentMemberName);
-                peerAddresses.put(shardId, path);
+                ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+                String path = getShardActorPath(shardName, currentMemberName);
+                peerAddresses.put(shardId.toString(), path);
             }
         }
         return peerAddresses;
@@ -552,23 +621,25 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return mBean;
     }
 
-    private class ShardInformation {
+    @VisibleForTesting
+    protected static class ShardInformation {
         private final ShardIdentifier shardId;
         private final String shardName;
         private ActorRef actor;
         private ActorPath actorPath;
-        private final Map<ShardIdentifier, String> peerAddresses;
+        private final Map<String, String> peerAddresses;
 
         // flag that determines if the actor is ready for business
         private boolean actorInitialized = false;
 
         private boolean followerSyncStatus = false;
 
-        private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
+        private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
         private String role ;
+        private String leaderId;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
-                Map<ShardIdentifier, String> peerAddresses) {
+                Map<String, String> peerAddresses) {
             this.shardName = shardName;
             this.shardId = shardId;
             this.peerAddresses = peerAddresses;
@@ -595,11 +666,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return shardId;
         }
 
-        Map<ShardIdentifier, String> getPeerAddresses() {
+        Map<String, String> getPeerAddresses() {
             return peerAddresses;
         }
 
-        void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
+        void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
             LOG.info("updatePeerAddress for peer {} with address {}", peerId,
                 peerAddress);
             if(peerAddresses.containsKey(peerId)){
@@ -611,42 +682,87 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                                 peerId, peerAddress, actor.path());
                     }
 
-                    actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf());
+                    actor.tell(new PeerAddressResolved(peerId.toString(), peerAddress), sender);
                 }
+
+                notifyOnShardInitializedCallbacks();
             }
         }
 
+        boolean isShardReady() {
+            return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
+        }
+
+        boolean isShardReadyWithLeaderId() {
+            return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId));
+        }
+
         boolean isShardInitialized() {
             return getActor() != null && actorInitialized;
         }
 
+        boolean isLeader() {
+            return Objects.equal(leaderId, shardId.toString());
+        }
+
+        String getSerializedLeaderActor() {
+            if(isLeader()) {
+                return Serialization.serializedActorPath(getActor());
+            } else {
+                return peerAddresses.get(leaderId);
+            }
+        }
+
         void setActorInitialized() {
+            LOG.debug("Shard {} is initialized", shardId);
+
             this.actorInitialized = true;
 
-            for(Runnable runnable: runnablesOnInitialized) {
-                runnable.run();
+            notifyOnShardInitializedCallbacks();
+        }
+
+        private void notifyOnShardInitializedCallbacks() {
+            if(onShardInitializedSet.isEmpty()) {
+                return;
             }
 
-            runnablesOnInitialized.clear();
+            boolean ready = isShardReadyWithLeaderId();
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
+                        ready ? "ready" : "initialized", onShardInitializedSet.size());
+            }
+
+            Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
+            while(iter.hasNext()) {
+                OnShardInitialized onShardInitialized = iter.next();
+                if(!(onShardInitialized instanceof OnShardReady) || ready) {
+                    iter.remove();
+                    onShardInitialized.getTimeoutSchedule().cancel();
+                    onShardInitialized.getReplyRunnable().run();
+                }
+            }
         }
 
-        void addRunnableOnInitialized(Runnable runnable) {
-            runnablesOnInitialized.add(runnable);
+        void addOnShardInitialized(OnShardInitialized onShardInitialized) {
+            onShardInitializedSet.add(onShardInitialized);
         }
 
-        public void setRole(String newRole) {
-            this.role = newRole;
+        void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
+            onShardInitializedSet.remove(onShardInitialized);
         }
 
-        public String getRole(){
-            return this.role;
+        void setRole(String newRole) {
+            this.role = newRole;
+
+            notifyOnShardInitializedCallbacks();
         }
 
-        public void setFollowerSyncStatus(boolean syncStatus){
+        void setFollowerSyncStatus(boolean syncStatus){
             this.followerSyncStatus = syncStatus;
         }
 
-        public boolean isInSync(){
+        boolean isInSync(){
             if(RaftState.Follower.name().equals(this.role)){
                 return followerSyncStatus;
             } else if(RaftState.Leader.name().equals(this.role)){
@@ -656,6 +772,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return false;
         }
 
+        void setLeaderId(String leaderId) {
+            this.leaderId = leaderId;
+
+            notifyOnShardInitializedCallbacks();
+        }
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {
@@ -680,6 +801,57 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private static class OnShardInitialized {
+        private final Runnable replyRunnable;
+        private Cancellable timeoutSchedule;
+
+        OnShardInitialized(Runnable replyRunnable) {
+            this.replyRunnable = replyRunnable;
+        }
+
+        Runnable getReplyRunnable() {
+            return replyRunnable;
+        }
+
+        Cancellable getTimeoutSchedule() {
+            return timeoutSchedule;
+        }
+
+        void setTimeoutSchedule(Cancellable timeoutSchedule) {
+            this.timeoutSchedule = timeoutSchedule;
+        }
+    }
+
+    private static class OnShardReady extends OnShardInitialized {
+        OnShardReady(Runnable replyRunnable) {
+            super(replyRunnable);
+        }
+    }
+
+    private static class ShardNotInitializedTimeout {
+        private final ActorRef sender;
+        private final ShardInformation shardInfo;
+        private final OnShardInitialized onShardInitialized;
+
+        ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
+            this.sender = sender;
+            this.shardInfo = shardInfo;
+            this.onShardInitialized = onShardInitialized;
+        }
+
+        ActorRef getSender() {
+            return sender;
+        }
+
+        ShardInformation getShardInfo() {
+            return shardInfo;
+        }
+
+        OnShardInitialized getOnShardInitialized() {
+            return onShardInitialized;
+        }
+    }
+
     static class SchemaContextModules implements Serializable {
         private static final long serialVersionUID = -8884620101025936590L;
 
index 7f2f3281353bb689fdf5b74a12bb119d87ef8e59..64f914b19fbebfa517afb2a7f23deb473a46a1a5 100644 (file)
@@ -485,7 +485,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 @Override
                 public void onComplete(Throwable failure, ActorSelection primaryShard) {
                     if(failure != null) {
-                        newTxFutureCallback.onComplete(failure, null);
+                        newTxFutureCallback.createTransactionContext(failure, null);
                     } else {
                         newTxFutureCallback.setPrimaryShard(primaryShard);
                     }
@@ -566,7 +566,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
             if(transactionType == TransactionType.WRITE_ONLY &&
                     actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
-                LOG.debug("Tx {} Primary shard found - creating WRITE_ONLY transaction context", identifier);
+                LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
+                        identifier, primaryShard);
 
                 // For write-only Tx's we prepare the transaction modifications directly on the shard actor
                 // to avoid the overhead of creating a separate transaction actor.
@@ -612,7 +613,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Performs a CreateTransaction try async.
          */
         private void tryCreateTransaction() {
-            LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} Primary shard {} found - trying create transaction", identifier, primaryShard);
+            }
 
             Object serializedCreateMessage = new CreateTransaction(identifier.toString(),
                     TransactionProxy.this.transactionType.ordinal(),
@@ -645,6 +648,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
             }
 
+            createTransactionContext(failure, response);
+        }
+
+        private void createTransactionContext(Throwable failure, Object response) {
             // Mainly checking for state violation here to perform a volatile read of "initialized" to
             // ensure updates to operationLimter et al are visible to this thread (ie we're doing
             // "piggy-back" synchronization here).
index a34330bcf6864c26799aae0c6a48990dee1f6f82..d51d6800a23b44f2c14ff932a1be5c21421d5c5d 100644 (file)
@@ -18,22 +18,22 @@ public class FindPrimary implements SerializableMessage{
     public static final Class<FindPrimary> SERIALIZABLE_CLASS = FindPrimary.class;
 
     private final String shardName;
-    private final boolean waitUntilInitialized;
+    private final boolean waitUntilReady;
 
-    public FindPrimary(String shardName, boolean waitUntilInitialized){
+    public FindPrimary(String shardName, boolean waitUntilReady){
 
         Preconditions.checkNotNull(shardName, "shardName should not be null");
 
         this.shardName = shardName;
-        this.waitUntilInitialized = waitUntilInitialized;
+        this.waitUntilReady = waitUntilReady;
     }
 
     public String getShardName() {
         return shardName;
     }
 
-    public boolean isWaitUntilInitialized() {
-        return waitUntilInitialized;
+    public boolean isWaitUntilReady() {
+        return waitUntilReady;
     }
 
     @Override
@@ -44,4 +44,12 @@ public class FindPrimary implements SerializableMessage{
     public static FindPrimary fromSerializable(Object message){
         return (FindPrimary) message;
     }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("FindPrimary [shardName=").append(shardName).append(", waitUntilReady=").append(waitUntilReady)
+                .append("]");
+        return builder.toString();
+    }
 }
index 346519ed5aeea3f7b82a60d263052f0a06a60c8e..82f36499395ba885008838cce7e8b9c9b64e33fe 100644 (file)
@@ -8,18 +8,17 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 
 public class PeerAddressResolved {
-    private final ShardIdentifier peerId;
+    private final String peerId;
     private final String peerAddress;
 
-    public PeerAddressResolved(ShardIdentifier peerId, String peerAddress) {
+    public PeerAddressResolved(String peerId, String peerAddress) {
         this.peerId = peerId;
         this.peerAddress = peerAddress;
     }
 
-    public ShardIdentifier getPeerId() {
+    public String getPeerId() {
         return peerId;
     }
 
index 0fb09d8231903bbc9b530f488039adc6b8672b90..6f9bb7fc9feb4ede3a00d06b07d689d615f51458 100644 (file)
@@ -17,6 +17,7 @@ import akka.actor.Address;
 import akka.actor.PoisonPill;
 import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
+import akka.dispatch.OnComplete;
 import akka.pattern.AskTimeoutException;
 import akka.util.Timeout;
 import com.codahale.metrics.JmxReporter;
@@ -35,6 +36,7 @@ import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
@@ -98,8 +100,9 @@ public class ActorContext {
     private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
     private final int transactionOutstandingOperationLimit;
     private Timeout transactionCommitOperationTimeout;
+    private Timeout shardInitializationTimeout;
     private final Dispatchers dispatchers;
-    private final Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
+    private Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
 
     private volatile SchemaContext schemaContext;
     private volatile boolean updated;
@@ -121,14 +124,6 @@ public class ActorContext {
         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
 
         setCachedProperties();
-        primaryShardActorSelectionCache = CacheBuilder.newBuilder()
-                .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
-                .build();
-
-        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
-        operationTimeout = new Timeout(operationDuration);
-        transactionCommitOperationTimeout =  new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
-                TimeUnit.SECONDS));
 
         Address selfAddress = clusterWrapper.getSelfAddress();
         if (selfAddress != null && !selfAddress.host().isEmpty()) {
@@ -150,6 +145,12 @@ public class ActorContext {
 
         transactionCommitOperationTimeout =  new Timeout(Duration.create(
                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
+
+        shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
+
+        primaryShardActorSelectionCache = CacheBuilder.newBuilder()
+                .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
+                .build();
     }
 
     public DatastoreContext getDatastoreContext() {
@@ -202,28 +203,13 @@ public class ActorContext {
         return schemaContext;
     }
 
-    /**
-     * Finds the primary shard for the given shard name
-     *
-     * @param shardName
-     * @return
-     */
-    public Optional<ActorSelection> findPrimaryShard(String shardName) {
-        String path = findPrimaryPathOrNull(shardName);
-        if (path == null){
-            return Optional.absent();
-        }
-        return Optional.of(actorSystem.actorSelection(path));
-    }
-
     public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
         Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
         if(ret != null){
             return ret;
         }
         Future<Object> future = executeOperationAsync(shardManager,
-                new FindPrimary(shardName, true).toSerializable(),
-                datastoreContext.getShardInitializationTimeout());
+                new FindPrimary(shardName, true).toSerializable(), shardInitializationTimeout);
 
         return future.transform(new Mapper<Object, ActorSelection>() {
             @Override
@@ -242,6 +228,8 @@ public class ActorContext {
                 } else if(response instanceof PrimaryNotFound) {
                     throw new PrimaryNotFoundException(
                             String.format("No primary shard found for %S.", shardName));
+                } else if(response instanceof NoShardLeaderException) {
+                    throw (NoShardLeaderException)response;
                 }
 
                 throw new UnknownMessageException(String.format(
@@ -277,7 +265,7 @@ public class ActorContext {
      */
     public Future<ActorRef> findLocalShardAsync( final String shardName) {
         Future<Object> future = executeOperationAsync(shardManager,
-                new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
+                new FindLocalShard(shardName, true), shardInitializationTimeout);
 
         return future.map(new Mapper<Object, ActorRef>() {
             @Override
@@ -301,26 +289,6 @@ public class ActorContext {
         }, getClientDispatcher());
     }
 
-    private String findPrimaryPathOrNull(String shardName) {
-        Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
-
-        if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
-            PrimaryFound found = PrimaryFound.fromSerializable(result);
-
-            LOG.debug("Primary found {}", found.getPrimaryPath());
-            return found.getPrimaryPath();
-
-        } else if (result.getClass().equals(ActorNotInitialized.class)){
-            throw new NotInitializedException(
-                String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
-            );
-
-        } else {
-            return null;
-        }
-    }
-
-
     /**
      * Executes an operation on a local actor and wait for it's response
      *
@@ -428,16 +396,21 @@ public class ActorContext {
      *
      * @param message
      */
-    public void broadcast(Object message){
-        for(String shardName : configuration.getAllShardNames()){
-
-            Optional<ActorSelection> primary = findPrimaryShard(shardName);
-            if (primary.isPresent()) {
-                primary.get().tell(message, ActorRef.noSender());
-            } else {
-                LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
-                        message.getClass().getSimpleName(), shardName);
-            }
+    public void broadcast(final Object message){
+        for(final String shardName : configuration.getAllShardNames()){
+
+            Future<ActorSelection> primaryFuture = findPrimaryShardAsync(shardName);
+            primaryFuture.onComplete(new OnComplete<ActorSelection>() {
+                @Override
+                public void onComplete(Throwable failure, ActorSelection primaryShard) {
+                    if(failure != null) {
+                        LOG.warn("broadcast failed to send message {} to shard {}:  {}",
+                                message.getClass().getSimpleName(), shardName, failure);
+                    } else {
+                        primaryShard.tell(message, ActorRef.noSender());
+                    }
+                }
+            }, getClientDispatcher());
         }
     }
 
index 8cafb46528e4ceddfe39e96234ead00988552597..378bc717f4da3b22f2516a2c303c43abc9144fab 100644 (file)
@@ -87,7 +87,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{
     }
 
     protected Props newShardProps() {
-        return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+        return Shard.props(shardID, Collections.<String,String>emptyMap(),
                 newDatastoreContext(), SCHEMA_CONTEXT);
     }
 
@@ -102,7 +102,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{
         Creator<Shard> creator = new Creator<Shard>() {
             @Override
             public Shard create() throws Exception {
-                return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+                return new Shard(shardID, Collections.<String,String>emptyMap(),
                         newDatastoreContext(), SCHEMA_CONTEXT) {
                     @Override
                     protected void onRecoveryComplete() {
index a3c5eb4b003c666d9388e4f70ba51790c09db9dd..fdc7e664c2b6cd63b5071e504bebf70c5f0dc05d 100644 (file)
@@ -147,8 +147,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
         }};
     }
 
-    @Test
-    public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{
+    private void testTransactionWritesWithShardNotInitiallyReady(final boolean writeOnly) throws Exception {
         new IntegrationTestKit(getSystem()) {{
             String testName = "testTransactionWritesWithShardNotInitiallyReady";
             String shardName = "test-1";
@@ -163,8 +162,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
             // Create the write Tx
 
-            // TODO - we'll want to test this with write-only as well when FindPrimary returns the leader shard.
-            final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction();
+            final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
+                    dataStore.newReadWriteTransaction();
             assertNotNull("newReadWriteTransaction returned null", writeTx);
 
             // Do some modification operations and ready the Tx on a separate thread.
@@ -240,7 +239,18 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
     }
 
     @Test
-    public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{
+    public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
+        datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+        testTransactionWritesWithShardNotInitiallyReady(true);
+    }
+
+    @Test
+    public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
+        testTransactionWritesWithShardNotInitiallyReady(false);
+    }
+
+    @Test
+    public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
         new IntegrationTestKit(getSystem()) {{
             String testName = "testTransactionReadsWithShardNotInitiallyReady";
             String shardName = "test-1";
@@ -455,8 +465,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
         }};
     }
 
-    @Test(expected=NoShardLeaderException.class)
-    public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{
+    private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
         new IntegrationTestKit(getSystem()) {{
             String testName = "testTransactionCommitFailureWithNoShardLeader";
             String shardName = "test-1";
@@ -465,6 +474,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
             // by setting the election timeout, which is based on the heartbeat interval, really high.
 
             datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
+            datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
 
             // Set the leader election timeout low for the test.
 
@@ -474,7 +484,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
             // Create the write Tx.
 
-            final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction();
+            final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
+                dataStore.newReadWriteTransaction();
             assertNotNull("newReadWriteTransaction returned null", writeTx);
 
             // Do some modifications and ready the Tx on a separate thread.
@@ -523,6 +534,17 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
         }};
     }
 
+    @Test(expected=NoShardLeaderException.class)
+    public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
+        datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+        testTransactionCommitFailureWithNoShardLeader(true);
+    }
+
+    @Test(expected=NoShardLeaderException.class)
+    public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
+        testTransactionCommitFailureWithNoShardLeader(false);
+    }
+
     @Test
     public void testTransactionAbort() throws Exception{
         System.setProperty("shard.persistent", "true");
index 4e61260550cdc9ab4f5b7e9409e295337e0d47b8..1ab03b216cdf09075d5747b7a76bf7e579bb9c38 100644 (file)
@@ -1,21 +1,22 @@
 package org.opendaylight.controller.cluster.datastore;
 
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.RaftState;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 
 public class RoleChangeNotifierTest extends AbstractActorTest  {
 
@@ -51,8 +52,6 @@ public class RoleChangeNotifierTest extends AbstractActorTest  {
             TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
                 getSystem(), RoleChangeNotifier.getProps(memberId), memberId);
 
-            RoleChangeNotifier roleChangeNotifier = notifierTestActorRef.underlyingActor();
-
             notifierTestActorRef.tell(new RoleChanged(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), shardActor);
 
             // no notification should be sent as listener has not yet registered
@@ -74,6 +73,32 @@ public class RoleChangeNotifierTest extends AbstractActorTest  {
         }};
 
     }
+
+    @Test
+    public void testHandleLeaderStateChanged() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String actorId = "testHandleLeaderStateChanged";
+            TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
+                getSystem(), RoleChangeNotifier.getProps(actorId), actorId);
+
+            notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader1"), ActorRef.noSender());
+
+            // listener registers after the sate has been changed, ensure we sent the latest state change after a reply
+            notifierTestActorRef.tell(new RegisterRoleChangeListener(), getRef());
+
+            expectMsgClass(RegisterRoleChangeListenerReply.class);
+
+            LeaderStateChanged leaderStateChanged = expectMsgClass(LeaderStateChanged.class);
+            assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId());
+            assertEquals("getLeaderId", "leader1", leaderStateChanged.getLeaderId());
+
+            notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader2"), ActorRef.noSender());
+
+            leaderStateChanged = expectMsgClass(LeaderStateChanged.class);
+            assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId());
+            assertEquals("getLeaderId", "leader2", leaderStateChanged.getLeaderId());
+        }};
+    }
 }
 
 
index 99417076bfa3087a38f57f5f6a7cc5b6f0676bb1..ae7a4f96c53fec04dbadbb612c9bc0369952f654 100644 (file)
@@ -34,6 +34,7 @@ import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
@@ -44,9 +45,11 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
@@ -56,6 +59,7 @@ import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 public class ShardManagerTest extends AbstractActorTest {
     private static int ID_COUNTER = 1;
@@ -66,7 +70,10 @@ public class ShardManagerTest extends AbstractActorTest {
     @Mock
     private static CountDownLatch ready;
 
-    private static ActorRef mockShardActor;
+    private static TestActorRef<MessageCollectorActor> mockShardActor;
+
+    private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
+            dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
 
     @Before
     public void setUp() {
@@ -75,9 +82,11 @@ public class ShardManagerTest extends AbstractActorTest {
         InMemoryJournal.clear();
 
         if(mockShardActor == null) {
-            String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
-            mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
+            String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
+            mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name);
         }
+
+        mockShardActor.underlyingActor().clear();
     }
 
     @After
@@ -86,44 +95,93 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     private Props newShardMgrProps() {
-        DatastoreContext.Builder builder = DatastoreContext.newBuilder();
-        builder.dataStoreType(shardMrgIDSuffix);
         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
-                builder.build(), ready);
+                datastoreContextBuilder.build(), ready);
+    }
+
+    private Props newPropsShardMgrWithMockShardActor() {
+        Creator<ShardManager> creator = new Creator<ShardManager>() {
+            private static final long serialVersionUID = 1L;
+            @Override
+            public ShardManager create() throws Exception {
+                return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
+                        datastoreContextBuilder.build(), ready) {
+                    @Override
+                    protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+                        return mockShardActor;
+                    }
+                };
+            }
+        };
+
+        return Props.create(new DelegatingShardManagerCreator(creator));
     }
 
     @Test
     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
             shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
 
-            expectMsgEquals(duration("5 seconds"),
-                    new PrimaryNotFound("non-existent").toSerializable());
+            expectMsgEquals(duration("5 seconds"), new PrimaryNotFound("non-existent").toSerializable());
         }};
     }
 
     @Test
-    public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
+    public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+            String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
+            shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef());
+
+            MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
+            shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
+                    RaftState.Leader.name())), mockShardActor);
+
             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
 
-            expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
+                    primaryFound.getPrimaryPath().contains("member-1-shard-default"));
         }};
     }
 
     @Test
-    public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
+    public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+
+            String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+            MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
+
+            String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager.tell(new RoleChangeNotification(memberId1,
+                    RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+            shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
+
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
+                    primaryFound.getPrimaryPath().contains("member-2-shard-default"));
+        }};
+    }
+
+    @Test
+    public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
 
@@ -132,28 +190,129 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
+    public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+
+            expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+        }};
+    }
+
+    @Test
+    public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+
+            String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager.tell(new RoleChangeNotification(memberId,
+                    RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+
+            expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+
+            shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
+
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
+                    primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+        }};
+    }
+
+    @Test
+    public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
-            // delayed until we send ActorInitialized.
-            Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
-                    new Timeout(5, TimeUnit.SECONDS));
+            // delayed until we send ActorInitialized and RoleChangeNotification.
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+
+            expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
 
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
-            Object resp = Await.result(future, duration("5 seconds"));
-            assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
+            expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+
+            String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager.tell(new RoleChangeNotification(memberId,
+                    RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
+
+            expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+
+            shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
+
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
+                    primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+
+            expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
+        }};
+    }
+
+    @Test
+    public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+
+            expectMsgClass(duration("2 seconds"), ActorNotInitialized.class);
+
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+
+            expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
+        }};
+    }
+
+    @Test
+    public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+            shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
+                    null, RaftState.Candidate.name()), mockShardActor);
+
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+
+            expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
+        }};
+    }
+
+    @Test
+    public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+
+            expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
         }};
     }
 
     @Test
     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
@@ -168,7 +327,7 @@ public class ShardManagerTest extends AbstractActorTest {
     @Test
     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), mockShardActor);
@@ -185,7 +344,7 @@ public class ShardManagerTest extends AbstractActorTest {
     @Test
     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
 
@@ -196,7 +355,7 @@ public class ShardManagerTest extends AbstractActorTest {
     @Test
     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
@@ -439,14 +598,11 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRoleChangeNotificationReleaseReady() throws Exception {
         new JavaTestKit(getSystem()) {
             {
-                final Props persistentProps = ShardManager.props(
-                        new MockClusterWrapper(),
-                        new MockConfiguration(),
-                        DatastoreContext.newBuilder().persistent(true).build(), ready);
-                final TestActorRef<ShardManager> shardManager =
-                        TestActorRef.create(getSystem(), persistentProps);
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
 
-                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
+                String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+                        memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
 
                 verify(ready, times(1)).countDown();
 
@@ -457,14 +613,10 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
         new JavaTestKit(getSystem()) {
             {
-                final Props persistentProps = ShardManager.props(
-                        new MockClusterWrapper(),
-                        new MockConfiguration(),
-                        DatastoreContext.newBuilder().persistent(true).build(), ready);
-                final TestActorRef<ShardManager> shardManager =
-                        TestActorRef.create(getSystem(), persistentProps);
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
 
-                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
+                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+                        "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
 
                 verify(ready, never()).countDown();
 
index 0fbe68665e057689bf608c5502ee2ebe9c58c4fc..adc7f4706c1e871f6ddab28bf8364e06a975d0fc 100644 (file)
@@ -41,7 +41,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@@ -159,7 +158,7 @@ public class ShardTest extends AbstractShardTest {
 
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+                    return new Shard(shardID, Collections.<String,String>emptyMap(),
                             newDatastoreContext(), SCHEMA_CONTEXT) {
                         @Override
                         public void onReceiveCommand(final Object message) throws Exception {
@@ -287,7 +286,7 @@ public class ShardTest extends AbstractShardTest {
             final CountDownLatch recoveryComplete = new CountDownLatch(1);
             class TestShard extends Shard {
                 TestShard() {
-                    super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
+                    super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
                             newDatastoreContext(), SCHEMA_CONTEXT);
                 }
 
@@ -318,7 +317,7 @@ public class ShardTest extends AbstractShardTest {
                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
 
             String address = "akka://foobar";
-            shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
+            shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
 
             assertEquals("getPeerAddresses", address,
                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
@@ -763,7 +762,7 @@ public class ShardTest extends AbstractShardTest {
             Creator<Shard> creator = new Creator<Shard>() {
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+                    return new Shard(shardID, Collections.<String,String>emptyMap(),
                             newDatastoreContext(), SCHEMA_CONTEXT) {
                         @Override
                         protected boolean isLeader() {
@@ -1427,7 +1426,7 @@ public class ShardTest extends AbstractShardTest {
             Creator<Shard> creator = new Creator<Shard>() {
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+                    return new Shard(shardID, Collections.<String,String>emptyMap(),
                             newDatastoreContext(), SCHEMA_CONTEXT) {
 
                         DelegatingPersistentDataProvider delegating;
@@ -1528,13 +1527,13 @@ public class ShardTest extends AbstractShardTest {
         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
 
-        final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+        final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
                 persistentContext, SCHEMA_CONTEXT);
 
         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
 
-        final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+        final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
                 nonPersistentContext, SCHEMA_CONTEXT);
 
         new ShardTestKit(getSystem()) {{
index 09a4532b53906ab1b803aae880c1e69660537f07..c3fef611e348a2a446ef2bfff87024425063f3ba 100644 (file)
@@ -60,7 +60,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
     }
 
     private ActorRef createShard(){
-        return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.<ShardIdentifier, String>emptyMap(), datastoreContext,
+        return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.<String, String>emptyMap(), datastoreContext,
                 TestModel.createTestContext()));
     }
 
index 8ebb145728fc63fac9ba55375a8a8698d16bab62..e63ace3e2cc5abaff32acc4012f583cfbbdde6cc 100644 (file)
@@ -79,7 +79,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     private ActorRef createShard(){
         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-            Collections.<ShardIdentifier, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
+            Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
     }
 
     private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
index ac2c07964179192f86b3123f5a9f8a24a8cd2e9d..265ec59f1cd324889627281850ffae9ed456c52a 100644 (file)
@@ -33,6 +33,8 @@ import org.junit.Test;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@@ -659,11 +661,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         verifyCohortFutures(proxy, TestException.class);
     }
 
-    @Test
-    public void testReadyWithInitialCreateTransactionFailure() throws Exception {
-
-        doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
-                mockActorContext).findPrimaryShardAsync(anyString());
+    private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
+        doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
@@ -681,7 +680,22 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        verifyCohortFutures(proxy, PrimaryNotFoundException.class);
+        verifyCohortFutures(proxy, toThrow.getClass());
+    }
+
+    @Test
+    public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
+        testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
+    }
+
+    @Test
+    public void testWriteOnlyTxWithNotInitializedException() throws Exception {
+        testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
+    }
+
+    @Test
+    public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
+        testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
     }
 
     @Test
index 2a29d2c08934d0fdfd6a0f9690ea379bd231fd7b..e206e69cdaaa88d04ca38ab530865ba9e7e37320 100644 (file)
@@ -59,7 +59,7 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc
             SchemaContext schemaContext = TestModel.createTestContext();
             Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
                     shardName("inventory").type("config").build(),
-                    Collections.<ShardIdentifier,String>emptyMap(),
+                    Collections.<String,String>emptyMap(),
                     DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
                     schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
 
@@ -133,7 +133,7 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc
             SchemaContext schemaContext = TestModel.createTestContext();
             Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
                     shardName("inventory").type("config").build(),
-                    Collections.<ShardIdentifier,String>emptyMap(),
+                    Collections.<String,String>emptyMap(),
                     DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
                     schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
 
index 6bd732e038a00055bb2407ccc416c7f192059405..2746bcf982906af7046c8c8e3cb930711929df43 100644 (file)
@@ -20,8 +20,12 @@ import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
+import java.util.Arrays;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.time.StopWatch;
 import org.junit.Assert;
@@ -30,14 +34,18 @@ import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
@@ -45,10 +53,16 @@ import scala.concurrent.duration.FiniteDuration;
 
 public class ActorContextTest extends AbstractActorTest{
 
+    static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
+
+    private static class TestMessage {
+    }
+
     private static class MockShardManager extends UntypedActor {
 
         private final boolean found;
         private final ActorRef actorRef;
+        private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
 
         private MockShardManager(boolean found, ActorRef actorRef){
 
@@ -57,6 +71,18 @@ public class ActorContextTest extends AbstractActorTest{
         }
 
         @Override public void onReceive(Object message) throws Exception {
+            if(message instanceof FindPrimary) {
+                FindPrimary fp = (FindPrimary)message;
+                Object resp = findPrimaryResponses.get(fp.getShardName());
+                if(resp == null) {
+                    log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
+                } else {
+                    getSender().tell(resp, getSelf());
+                }
+
+                return;
+            }
+
             if(found){
                 getSender().tell(new LocalShardFound(actorRef), getSelf());
             } else {
@@ -64,15 +90,28 @@ public class ActorContextTest extends AbstractActorTest{
             }
         }
 
+        void addFindPrimaryResp(String shardName, Object resp) {
+            findPrimaryResponses.put(shardName, resp);
+        }
+
         private static Props props(final boolean found, final ActorRef actorRef){
             return Props.create(new MockShardManagerCreator(found, actorRef) );
         }
 
+        private static Props props(){
+            return Props.create(new MockShardManagerCreator() );
+        }
+
         @SuppressWarnings("serial")
         private static class MockShardManagerCreator implements Creator<MockShardManager> {
             final boolean found;
             final ActorRef actorRef;
 
+            MockShardManagerCreator() {
+                this.found = false;
+                this.actorRef = null;
+            }
+
             MockShardManagerCreator(boolean found, ActorRef actorRef) {
                 this.found = found;
                 this.actorRef = actorRef;
@@ -287,18 +326,15 @@ public class ActorContextTest extends AbstractActorTest{
 
     @Test
     public void testRateLimiting(){
-        DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
-        doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
-        doReturn("config").when(mockDataStoreContext).getDataStoreType();
-        doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+                transactionCreationInitialRateLimit(155L).build();
 
         ActorContext actorContext =
                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
-                        mock(Configuration.class), mockDataStoreContext);
+                        mock(Configuration.class), dataStoreContext);
 
         // Check that the initial value is being picked up from DataStoreContext
-        assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
+        assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
 
         actorContext.setTxCreationLimit(1.0);
 
@@ -320,16 +356,9 @@ public class ActorContextTest extends AbstractActorTest{
 
     @Test
     public void testClientDispatcherIsGlobalDispatcher(){
-
-        DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
-        doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
-        doReturn("config").when(mockDataStoreContext).getDataStoreType();
-        doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
-
         ActorContext actorContext =
                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
-                        mock(Configuration.class), mockDataStoreContext);
+                        mock(Configuration.class), DatastoreContext.newBuilder().build());
 
         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
 
@@ -337,18 +366,11 @@ public class ActorContextTest extends AbstractActorTest{
 
     @Test
     public void testClientDispatcherIsNotGlobalDispatcher(){
-
-        DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
-        doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
-        doReturn("config").when(mockDataStoreContext).getDataStoreType();
-        doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
-
         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
 
         ActorContext actorContext =
                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
-                        mock(Configuration.class), mockDataStoreContext);
+                        mock(Configuration.class), DatastoreContext.newBuilder().build());
 
         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
 
@@ -388,15 +410,12 @@ public class ActorContextTest extends AbstractActorTest{
             TestActorRef<MessageCollectorActor> shardManager =
                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
 
-            DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
-            doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
-            doReturn("config").when(mockDataStoreContext).getDataStoreType();
-            doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+            DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
 
             ActorContext actorContext =
                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), mockDataStoreContext) {
+                            mock(Configuration.class), dataStoreContext) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
                             return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
@@ -431,15 +450,12 @@ public class ActorContextTest extends AbstractActorTest{
             TestActorRef<MessageCollectorActor> shardManager =
                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
 
-            DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
-            doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
-            doReturn("config").when(mockDataStoreContext).getDataStoreType();
-            doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+            DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
 
             ActorContext actorContext =
                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), mockDataStoreContext) {
+                            mock(Configuration.class), dataStoreContext) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
                             return Futures.successful((Object) new PrimaryNotFound("foobar"));
@@ -459,7 +475,6 @@ public class ActorContextTest extends AbstractActorTest{
             Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
 
             assertNull(cached);
-
     }
 
     @Test
@@ -468,15 +483,12 @@ public class ActorContextTest extends AbstractActorTest{
             TestActorRef<MessageCollectorActor> shardManager =
                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
 
-            DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
-            doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
-            doReturn("config").when(mockDataStoreContext).getDataStoreType();
-            doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+            DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
 
             ActorContext actorContext =
                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), mockDataStoreContext) {
+                            mock(Configuration.class), dataStoreContext) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
                             return Futures.successful((Object) new ActorNotInitialized());
@@ -496,7 +508,49 @@ public class ActorContextTest extends AbstractActorTest{
             Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
 
             assertNull(cached);
+    }
+
+    @Test
+    public void testBroadcast() {
+        new JavaTestKit(getSystem()) {{
+            ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+            ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+            TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
+            MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
+            shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()).toSerializable());
+            shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()).toSerializable());
+            shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
+
+            Configuration mockConfig = mock(Configuration.class);
+            doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
+                    when(mockConfig).getAllShardNames();
 
+            ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+                    mock(ClusterWrapper.class), mockConfig,
+                    DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
+
+            actorContext.broadcast(new TestMessage());
+
+            expectFirstMatching(shardActorRef1, TestMessage.class);
+            expectFirstMatching(shardActorRef2, TestMessage.class);
+        }};
     }
 
+    private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
+        int count = 5000 / 50;
+        for(int i = 0; i < count; i++) {
+            try {
+                T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
+                if(message != null) {
+                    return message;
+                }
+            } catch (Exception e) {}
+
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
+
+        Assert.fail("Did not receive message of type " + clazz);
+        return null;
+    }
 }
index 4bd0ad818fff437ba8fbddac035ba9b4075b60b0..d62c9dbc28713eb5cb35bb434c9bf8e7b733e2c4 100644 (file)
@@ -10,13 +10,14 @@ package org.opendaylight.controller.cluster.datastore.utils;
 
 import akka.actor.ActorRef;
 import akka.actor.UntypedActor;
-
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
@@ -31,7 +32,7 @@ import scala.concurrent.duration.FiniteDuration;
  * </p>
  */
 public class MessageCollectorActor extends UntypedActor {
-    private List<Object> messages = new ArrayList<>();
+    private final List<Object> messages = new ArrayList<>();
 
     @Override public void onReceive(Object message) throws Exception {
         if(message instanceof String){
@@ -43,6 +44,10 @@ public class MessageCollectorActor extends UntypedActor {
         }
     }
 
+    public void clear() {
+        messages.clear();
+    }
+
     public static List<Object> getAllMessages(ActorRef actor) throws Exception {
         FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
         Timeout operationTimeout = new Timeout(operationDuration);
@@ -87,4 +92,20 @@ public class MessageCollectorActor extends UntypedActor {
         return output;
     }
 
+    public static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
+        int count = 5000 / 50;
+        for(int i = 0; i < count; i++) {
+            try {
+                T message = (T) getFirstMatching(actor, clazz);
+                if(message != null) {
+                    return message;
+                }
+            } catch (Exception e) {}
+
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
+
+        Assert.fail("Did not receive message of type " + clazz);
+        return null;
+    }
 }
index 81b6bccaf08ea0f35b5d4ed8c8a0fea8f3524796..63878df23cae2d0f5e42fd86feb8f423b2a31d7e 100644 (file)
@@ -12,7 +12,6 @@ import static org.junit.Assert.assertNotNull;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
-import com.google.common.base.Optional;
 
 public class MockActorContext extends ActorContext {
 
@@ -36,10 +35,6 @@ public class MockActorContext extends ActorContext {
         return executeRemoteOperationResponse;
     }
 
-    @Override public Optional<ActorSelection> findPrimaryShard(String shardName) {
-        return Optional.absent();
-    }
-
     public void setExecuteShardOperationResponse(Object response){
         executeShardOperationResponse = response;
     }
index 360ac68a519f29695410ef50115ddc98c90f2343..a6fdfd3ff97445a2f28cd48e077d66e22fd24f38 100644 (file)
@@ -53,7 +53,16 @@ public class RemoteRpcImplementation implements DOMRpcImplementation {
             @Override
             public void onComplete(final Throwable failure, final Object reply) throws Throwable {
                 if(failure != null) {
-                    LOG.error("InvokeRpc failed", failure);
+
+                    // When we return a failure to the caller they can choose to log it if they like
+                    // so here we just do basic warn logging by default and log the stack trace only when debug
+                    // is enabled
+
+                    LOG.warn("InvokeRpc failed rpc = {}, identifier = {}", rpcMsg.getRpc(), rpcMsg.getIdentifier());
+
+                    if(LOG.isDebugEnabled()){
+                        LOG.debug("Detailed Error", failure);
+                    }
 
                     final String message = String.format("Execution of RPC %s failed",  rpcMsg.getRpc());
                     Collection<RpcError> errors = ((RpcErrorsException)failure).getRpcErrors();