Bug -3221 : Adding a new DataStoreUnavailableException for external applications. 78/20678/6
authorHarman Singh <harmasin@cisco.com>
Mon, 18 May 2015 21:51:46 +0000 (14:51 -0700)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 27 May 2015 18:21:05 +0000 (18:21 +0000)
If cluster member, which is a leader of one or multiple shards becomes unavailable,
Shard re-election happens after certain time period and anyone tries to access datastore,
receives akka.timeout exception. No application can act upon such exception.

DataStoreUnavailableException will help external applications to retry if they see this exception.

Change-Id: Iceb10580bbe73ae91dc8abb4bc6a183cb4fad6f8
Signed-off-by: Harman Singh <harmasin@cisco.com>
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataStoreUnavailableException.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java

diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataStoreUnavailableException.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataStoreUnavailableException.java
new file mode 100644 (file)
index 0000000..ebbd116
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.api.data;
+
+/**
+ * This exception occurs if the datastore is temporarily unavailable.
+ * A retry of the transaction may succeed after a period of time
+ */
+
+public class DataStoreUnavailableException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public DataStoreUnavailableException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+
+}
index aa5040fb11215711af2a511621d0831b8cd0c138..c16a84933572d436a14c0506858822039fdc8a77 100644 (file)
@@ -20,6 +20,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker;
 import java.util.Map;
 import java.util.concurrent.Executor;
 import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
@@ -195,7 +197,9 @@ public class ConcurrentDOMDataBroker extends AbstractDOMBroker {
 
         LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, t);
         final Exception e;
 
         LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, t);
         final Exception e;
-        if (t instanceof Exception) {
+        if(t instanceof NoShardLeaderException) {
+            e = new DataStoreUnavailableException(t.getMessage(), t);
+        } else if (t instanceof Exception) {
             e = (Exception)t;
         } else {
             e = new RuntimeException("Unexpected error occurred", t);
             e = (Exception)t;
         } else {
             e = new RuntimeException("Unexpected error occurred", t);
index 197cd9fa83c00b569b25d59d982328774fb44436..450716800066a291d00e2b22b71a4c6306dfadaf 100644 (file)
@@ -11,7 +11,10 @@ import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.Semaphore;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.Semaphore;
+
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -77,7 +80,13 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
         LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
         operationLimiter.release();
     public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
         LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
         operationLimiter.release();
-        proxyFuture.setException(new ReadFailedException("Error reading data for path " + path, failure));
+        Throwable t;
+        if(failure instanceof NoShardLeaderException) {
+            t = new DataStoreUnavailableException(failure.getMessage(), failure);
+        } else {
+            t = failure;
+        }
+        proxyFuture.setException(new ReadFailedException("Error reading data for path " + path, t));
     }
 
     @Override
     }
 
     @Override
index 63d4c9826b0835de095103c3c463c1d811e5ecae..d33576d495fa66d2306ebb7d7378691269292e21 100644 (file)
@@ -188,7 +188,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if(message instanceof ClusterEvent.MemberRemoved) {
             memberRemoved((ClusterEvent.MemberRemoved) message);
         } else if(message instanceof ClusterEvent.UnreachableMember) {
         } else if(message instanceof ClusterEvent.MemberRemoved) {
             memberRemoved((ClusterEvent.MemberRemoved) message);
         } else if(message instanceof ClusterEvent.UnreachableMember) {
-            ignoreMessage(message);
+            memberUnreachable((ClusterEvent.UnreachableMember)message);
+        } else if(message instanceof ClusterEvent.ReachableMember) {
+            memberReachable((ClusterEvent.ReachableMember) message);
         } else if(message instanceof DatastoreContext) {
             onDatastoreContext((DatastoreContext)message);
         } else if(message instanceof RoleChangeNotification) {
         } else if(message instanceof DatastoreContext) {
             onDatastoreContext((DatastoreContext)message);
         } else if(message instanceof RoleChangeNotification) {
@@ -455,6 +457,40 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         checkReady();
     }
 
         checkReady();
     }
 
+    private void memberReachable(ClusterEvent.ReachableMember message) {
+        String memberName = message.member().roles().head();
+        LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
+
+        markMemberAvailable(memberName);
+    }
+
+    private void memberUnreachable(ClusterEvent.UnreachableMember message) {
+        String memberName = message.member().roles().head();
+        LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
+
+        markMemberUnavailable(memberName);
+    }
+
+    private void markMemberUnavailable(final String memberName) {
+        for(ShardInformation info : localShards.values()){
+            String leaderId = info.getLeaderId();
+            if(leaderId != null && leaderId.contains(memberName)) {
+                LOG.debug("Marking Leader {} as unavailable.", leaderId);
+                info.setLeaderAvailable(false);
+            }
+        }
+    }
+
+    private void markMemberAvailable(final String memberName) {
+        for(ShardInformation info : localShards.values()){
+            String leaderId = info.getLeaderId();
+            if(leaderId != null && leaderId.contains(memberName)) {
+                LOG.debug("Marking Leader {} as available.", leaderId);
+                info.setLeaderAvailable(true);
+            }
+        }
+    }
+
     private void onDatastoreContext(DatastoreContext context) {
         datastoreContext = context;
         for (ShardInformation info : localShards.values()) {
     private void onDatastoreContext(DatastoreContext context) {
         datastoreContext = context;
         for (ShardInformation info : localShards.values()) {
@@ -685,6 +721,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private ActorPath actorPath;
         private final Map<String, String> peerAddresses;
         private Optional<DataTree> localShardDataTree;
         private ActorPath actorPath;
         private final Map<String, String> peerAddresses;
         private Optional<DataTree> localShardDataTree;
+        private boolean leaderAvailable = false;
 
         // flag that determines if the actor is ready for business
         private boolean actorInitialized = false;
 
         // flag that determines if the actor is ready for business
         private boolean actorInitialized = false;
@@ -759,7 +796,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         boolean isShardReadyWithLeaderId() {
         }
 
         boolean isShardReadyWithLeaderId() {
-            return isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
+            return leaderAvailable && isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
         }
 
         boolean isShardInitialized() {
         }
 
         boolean isShardInitialized() {
@@ -840,11 +877,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         boolean setLeaderId(String leaderId) {
             boolean changed = !Objects.equal(this.leaderId, leaderId);
             this.leaderId = leaderId;
         boolean setLeaderId(String leaderId) {
             boolean changed = !Objects.equal(this.leaderId, leaderId);
             this.leaderId = leaderId;
-
+            if(leaderId != null) {
+                this.leaderAvailable = true;
+            }
             notifyOnShardInitializedCallbacks();
 
             return changed;
         }
             notifyOnShardInitializedCallbacks();
 
             return changed;
         }
+
+        public String getLeaderId() {
+            return leaderId;
+        }
+
+        public void setLeaderAvailable(boolean leaderAvailable) {
+            this.leaderAvailable = leaderAvailable;
+        }
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {
index f2536bfc2c2fc8c1fde4c22c423946ba2c7688b2..7a7dde74afd4fbaec582173e0f35762bc00caeb8 100644 (file)
@@ -44,6 +44,8 @@ import org.junit.Test;
 import org.mockito.InOrder;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.mockito.InOrder;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
@@ -194,7 +196,7 @@ public class ConcurrentDOMDataBrokerTest {
             fail("Expected TransactionCommitFailedException");
         } catch (TransactionCommitFailedException e) {
             if(expCause != null) {
             fail("Expected TransactionCommitFailedException");
         } catch (TransactionCommitFailedException e) {
             if(expCause != null) {
-                assertSame("Expected cause", expCause, e.getCause());
+                assertSame("Expected cause", expCause.getClass(), e.getCause().getClass());
             }
 
             InOrder inOrder = inOrder((Object[])mockCohorts);
             }
 
             InOrder inOrder = inOrder((Object[])mockCohorts);
@@ -221,6 +223,21 @@ public class ConcurrentDOMDataBrokerTest {
         assertFailure(future, cause, mockCohort1, mockCohort2);
     }
 
         assertFailure(future, cause, mockCohort1, mockCohort2);
     }
 
+    @Test
+    public void testSubmitWithCanCommitDataStoreUnavailableException() throws Exception {
+        doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
+        NoShardLeaderException rootCause = new NoShardLeaderException("mock");
+        DataStoreUnavailableException cause = new DataStoreUnavailableException(rootCause.getMessage(), rootCause);
+        doReturn(Futures.immediateFailedFuture(rootCause)).when(mockCohort2).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
+
+        CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
+            transaction, Arrays.asList(mockCohort1, mockCohort2));
+
+        assertFailure(future, cause, mockCohort1, mockCohort2);
+    }
+
     @Test
     public void testSubmitWithPreCommitException() throws Exception {
         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
     @Test
     public void testSubmitWithPreCommitException() throws Exception {
         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
index 5f6973f5dbff3cb23d798c6740c1ff6aeae77850..1ffe387a199c3988e2ce214f31b9121167cfc79c 100644 (file)
@@ -424,6 +424,169 @@ public class ShardManagerTest extends AbstractActorTest {
         JavaTestKit.shutdownActorSystem(system2);
     }
 
         JavaTestKit.shutdownActorSystem(system2);
     }
 
+    @Test
+    public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
+        String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+        // Create an ActorSystem ShardManager actor for member-1.
+
+        final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+
+        final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
+            newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
+                new MockConfiguration()), shardManagerID);
+
+        // Create an ActorSystem ShardManager actor for member-2.
+
+        final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+
+        Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
+
+        MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+            put("default", Arrays.asList("member-1", "member-2")).build());
+
+        final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
+            newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
+                mockConfig2), shardManagerID);
+
+        new JavaTestKit(system1) {{
+
+            shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager1.tell(new ActorInitialized(), mockShardActor1);
+            shardManager2.tell(new ActorInitialized(), mockShardActor2);
+
+            String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+            String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
+                Optional.of(mock(DataTree.class))), mockShardActor1);
+            shardManager1.tell(new RoleChangeNotification(memberId1,
+                RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
+            shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))),
+                mockShardActor2);
+            shardManager2.tell(new RoleChangeNotification(memberId2,
+                RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
+            shardManager1.underlyingActor().waitForMemberUp();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
+            String path = found.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
+
+            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
+                createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+
+            shardManager1.underlyingActor().waitForUnreachableMember();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+
+            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
+                createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+
+            shardManager1.underlyingActor().waitForReachableMember();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
+            String path1 = found1.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
+
+        }};
+
+        JavaTestKit.shutdownActorSystem(system1);
+        JavaTestKit.shutdownActorSystem(system2);
+    }
+
+    @Test
+    public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
+        String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+        // Create an ActorSystem ShardManager actor for member-1.
+
+        final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+
+        final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
+            newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
+                new MockConfiguration()), shardManagerID);
+
+        // Create an ActorSystem ShardManager actor for member-2.
+
+        final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+
+        Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
+
+        MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+            put("default", Arrays.asList("member-1", "member-2")).build());
+
+        final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
+            newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
+                mockConfig2), shardManagerID);
+
+        new JavaTestKit(system1) {{
+
+            shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager1.tell(new ActorInitialized(), mockShardActor1);
+            shardManager2.tell(new ActorInitialized(), mockShardActor2);
+
+            String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+            String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
+                Optional.of(mock(DataTree.class))), mockShardActor1);
+            shardManager1.tell(new RoleChangeNotification(memberId1,
+                RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
+            shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))),
+                mockShardActor2);
+            shardManager2.tell(new RoleChangeNotification(memberId2,
+                RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
+            shardManager1.underlyingActor().waitForMemberUp();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
+            String path = found.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
+
+            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
+                createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+
+            shardManager1.underlyingActor().waitForUnreachableMember();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+
+            shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class))),
+                mockShardActor1);
+            shardManager1.tell(new RoleChangeNotification(memberId1,
+                RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
+            String path1 = found1.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
+
+        }};
+
+        JavaTestKit.shutdownActorSystem(system1);
+        JavaTestKit.shutdownActorSystem(system2);
+    }
+
+
     @Test
     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{
     @Test
     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -927,6 +1090,8 @@ public class ShardManagerTest extends AbstractActorTest {
         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
         private CountDownLatch memberUpReceived = new CountDownLatch(1);
         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
         private CountDownLatch memberUpReceived = new CountDownLatch(1);
         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
+        private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
+        private CountDownLatch memberReachableReceived = new CountDownLatch(1);
         private final ActorRef shardActor;
         private final String name;
 
         private final ActorRef shardActor;
         private final String name;
 
@@ -955,6 +1120,16 @@ public class ShardManagerTest extends AbstractActorTest {
                     if(!getCluster().getCurrentMemberName().equals(role)) {
                         memberRemovedReceived.countDown();
                     }
                     if(!getCluster().getCurrentMemberName().equals(role)) {
                         memberRemovedReceived.countDown();
                     }
+                } else if(message instanceof ClusterEvent.UnreachableMember) {
+                    String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
+                    if(!getCluster().getCurrentMemberName().equals(role)) {
+                        memberUnreachableReceived.countDown();
+                    }
+                } else if(message instanceof ClusterEvent.ReachableMember) {
+                    String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
+                    if(!getCluster().getCurrentMemberName().equals(role)) {
+                        memberReachableReceived.countDown();
+                    }
                 }
             }
         }
                 }
             }
         }
@@ -981,6 +1156,19 @@ public class ShardManagerTest extends AbstractActorTest {
             memberRemovedReceived = new CountDownLatch(1);
         }
 
             memberRemovedReceived = new CountDownLatch(1);
         }
 
+        void waitForUnreachableMember() {
+            assertEquals("UnreachableMember received", true,
+                Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
+                ));
+            memberUnreachableReceived = new CountDownLatch(1);
+        }
+
+        void waitForReachableMember() {
+            assertEquals("ReachableMember received", true,
+                Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
+            memberReachableReceived = new CountDownLatch(1);
+        }
+
         void verifyFindPrimary() {
             assertEquals("FindPrimary received", true,
                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
         void verifyFindPrimary() {
             assertEquals("FindPrimary received", true,
                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
index 4ee89ca76d4866d616b97ba14b1e268fa1aeaa82..1cc89f18af960557ba510814e6fe7c5326fa34ea 100644 (file)
@@ -461,68 +461,44 @@ public class ActorContextTest extends AbstractActorTest{
 
     @Test
     public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
 
     @Test
     public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
-
-            TestActorRef<MessageCollectorActor> shardManager =
-                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
-
-            DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
-                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
-
-            ActorContext actorContext =
-                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
-                        @Override
-                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
-                            return Futures.successful((Object) new PrimaryNotFoundException("not found"));
-                        }
-                    };
-
-
-            Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
-
-            try {
-                Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
-                fail("Expected PrimaryNotFoundException");
-            } catch(PrimaryNotFoundException e){
-
-            }
-
-            Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
-
-            assertNull(cached);
+        testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
     }
 
     @Test
     public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
     }
 
     @Test
     public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
+        testFindPrimaryExceptions(new NotInitializedException("not initialized"));
+    }
 
 
-            TestActorRef<MessageCollectorActor> shardManager =
-                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
-
-            DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
-                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
-
-            ActorContext actorContext =
-                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
-                        @Override
-                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
-                            return Futures.successful((Object) new NotInitializedException("not iniislized"));
-                        }
-                    };
+    private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
+        TestActorRef<MessageCollectorActor> shardManager =
+            TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
 
 
+        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+            shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
 
 
-            Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+        ActorContext actorContext =
+            new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
+                @Override
+                protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+                    return Futures.successful(expectedException);
+                }
+            };
 
 
-            try {
-                Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
-                fail("Expected NotInitializedException");
-            } catch(NotInitializedException e){
+        Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
 
 
+        try {
+            Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+            fail("Expected" + expectedException.getClass().toString());
+        } catch(Exception e){
+            if(!expectedException.getClass().isInstance(e)) {
+                fail("Expected Exception of type " + expectedException.getClass().toString());
             }
             }
+        }
 
 
-            Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+        Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
 
-            assertNull(cached);
+        assertNull(cached);
     }
 
     @Test
     }
 
     @Test
index 5d44033cd609cf97a3ac14522e6cd201a46cbea6..756ed07c45618f4400bdb26f494ef2d16eaf9bbd 100644 (file)
@@ -87,4 +87,32 @@ public class MockClusterWrapper implements ClusterWrapper{
 
         return new ClusterEvent.MemberUp(member);
     }
 
         return new ClusterEvent.MemberUp(member);
     }
+
+    public static ClusterEvent.UnreachableMember createUnreachableMember(String memberName, String address) {
+        akka.cluster.UniqueAddress uniqueAddress = new UniqueAddress(
+            AddressFromURIString.parse(address), 55);
+
+        Set<String> roles = new HashSet<>();
+
+        roles.add(memberName);
+
+        akka.cluster.Member member = new akka.cluster.Member(uniqueAddress, 1, MemberStatus.up(),
+            JavaConversions.asScalaSet(roles).<String>toSet());
+
+        return new ClusterEvent.UnreachableMember(member);
+    }
+
+    public static ClusterEvent.ReachableMember createReachableMember(String memberName, String address) {
+        akka.cluster.UniqueAddress uniqueAddress = new UniqueAddress(
+            AddressFromURIString.parse(address), 55);
+
+        Set<String> roles = new HashSet<>();
+
+        roles.add(memberName);
+
+        akka.cluster.Member member = new akka.cluster.Member(uniqueAddress, 1, MemberStatus.up(),
+            JavaConversions.asScalaSet(roles).<String>toSet());
+
+        return new ClusterEvent.ReachableMember(member);
+    }
 }
 }