Bug -3221 : Adding a new DataStoreUnavailableException for external applications. 78/20678/6
authorHarman Singh <harmasin@cisco.com>
Mon, 18 May 2015 21:51:46 +0000 (14:51 -0700)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 27 May 2015 18:21:05 +0000 (18:21 +0000)
If cluster member, which is a leader of one or multiple shards becomes unavailable,
Shard re-election happens after certain time period and anyone tries to access datastore,
receives akka.timeout exception. No application can act upon such exception.

DataStoreUnavailableException will help external applications to retry if they see this exception.

Change-Id: Iceb10580bbe73ae91dc8abb4bc6a183cb4fad6f8
Signed-off-by: Harman Singh <harmasin@cisco.com>
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataStoreUnavailableException.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java

diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataStoreUnavailableException.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataStoreUnavailableException.java
new file mode 100644 (file)
index 0000000..ebbd116
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.api.data;
+
+/**
+ * This exception occurs if the datastore is temporarily unavailable.
+ * A retry of the transaction may succeed after a period of time
+ */
+
+public class DataStoreUnavailableException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public DataStoreUnavailableException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+
+}
index aa5040f..c16a849 100644 (file)
@@ -20,6 +20,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
@@ -195,7 +197,9 @@ public class ConcurrentDOMDataBroker extends AbstractDOMBroker {
 
         LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, t);
         final Exception e;
-        if (t instanceof Exception) {
+        if(t instanceof NoShardLeaderException) {
+            e = new DataStoreUnavailableException(t.getMessage(), t);
+        } else if (t instanceof Exception) {
             e = (Exception)t;
         } else {
             e = new RuntimeException("Unexpected error occurred", t);
index 197cd9f..4507168 100644 (file)
@@ -11,7 +11,10 @@ import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.Semaphore;
+
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -77,7 +80,13 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
         LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
         operationLimiter.release();
-        proxyFuture.setException(new ReadFailedException("Error reading data for path " + path, failure));
+        Throwable t;
+        if(failure instanceof NoShardLeaderException) {
+            t = new DataStoreUnavailableException(failure.getMessage(), failure);
+        } else {
+            t = failure;
+        }
+        proxyFuture.setException(new ReadFailedException("Error reading data for path " + path, t));
     }
 
     @Override
index 63d4c98..d33576d 100644 (file)
@@ -188,7 +188,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if(message instanceof ClusterEvent.MemberRemoved) {
             memberRemoved((ClusterEvent.MemberRemoved) message);
         } else if(message instanceof ClusterEvent.UnreachableMember) {
-            ignoreMessage(message);
+            memberUnreachable((ClusterEvent.UnreachableMember)message);
+        } else if(message instanceof ClusterEvent.ReachableMember) {
+            memberReachable((ClusterEvent.ReachableMember) message);
         } else if(message instanceof DatastoreContext) {
             onDatastoreContext((DatastoreContext)message);
         } else if(message instanceof RoleChangeNotification) {
@@ -455,6 +457,40 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         checkReady();
     }
 
+    private void memberReachable(ClusterEvent.ReachableMember message) {
+        String memberName = message.member().roles().head();
+        LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
+
+        markMemberAvailable(memberName);
+    }
+
+    private void memberUnreachable(ClusterEvent.UnreachableMember message) {
+        String memberName = message.member().roles().head();
+        LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
+
+        markMemberUnavailable(memberName);
+    }
+
+    private void markMemberUnavailable(final String memberName) {
+        for(ShardInformation info : localShards.values()){
+            String leaderId = info.getLeaderId();
+            if(leaderId != null && leaderId.contains(memberName)) {
+                LOG.debug("Marking Leader {} as unavailable.", leaderId);
+                info.setLeaderAvailable(false);
+            }
+        }
+    }
+
+    private void markMemberAvailable(final String memberName) {
+        for(ShardInformation info : localShards.values()){
+            String leaderId = info.getLeaderId();
+            if(leaderId != null && leaderId.contains(memberName)) {
+                LOG.debug("Marking Leader {} as available.", leaderId);
+                info.setLeaderAvailable(true);
+            }
+        }
+    }
+
     private void onDatastoreContext(DatastoreContext context) {
         datastoreContext = context;
         for (ShardInformation info : localShards.values()) {
@@ -685,6 +721,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private ActorPath actorPath;
         private final Map<String, String> peerAddresses;
         private Optional<DataTree> localShardDataTree;
+        private boolean leaderAvailable = false;
 
         // flag that determines if the actor is ready for business
         private boolean actorInitialized = false;
@@ -759,7 +796,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         boolean isShardReadyWithLeaderId() {
-            return isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
+            return leaderAvailable && isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
         }
 
         boolean isShardInitialized() {
@@ -840,11 +877,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         boolean setLeaderId(String leaderId) {
             boolean changed = !Objects.equal(this.leaderId, leaderId);
             this.leaderId = leaderId;
-
+            if(leaderId != null) {
+                this.leaderAvailable = true;
+            }
             notifyOnShardInitializedCallbacks();
 
             return changed;
         }
+
+        public String getLeaderId() {
+            return leaderId;
+        }
+
+        public void setLeaderAvailable(boolean leaderAvailable) {
+            this.leaderAvailable = leaderAvailable;
+        }
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {
index f2536bf..7a7dde7 100644 (file)
@@ -44,6 +44,8 @@ import org.junit.Test;
 import org.mockito.InOrder;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
@@ -194,7 +196,7 @@ public class ConcurrentDOMDataBrokerTest {
             fail("Expected TransactionCommitFailedException");
         } catch (TransactionCommitFailedException e) {
             if(expCause != null) {
-                assertSame("Expected cause", expCause, e.getCause());
+                assertSame("Expected cause", expCause.getClass(), e.getCause().getClass());
             }
 
             InOrder inOrder = inOrder((Object[])mockCohorts);
@@ -221,6 +223,21 @@ public class ConcurrentDOMDataBrokerTest {
         assertFailure(future, cause, mockCohort1, mockCohort2);
     }
 
+    @Test
+    public void testSubmitWithCanCommitDataStoreUnavailableException() throws Exception {
+        doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
+        NoShardLeaderException rootCause = new NoShardLeaderException("mock");
+        DataStoreUnavailableException cause = new DataStoreUnavailableException(rootCause.getMessage(), rootCause);
+        doReturn(Futures.immediateFailedFuture(rootCause)).when(mockCohort2).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
+
+        CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
+            transaction, Arrays.asList(mockCohort1, mockCohort2));
+
+        assertFailure(future, cause, mockCohort1, mockCohort2);
+    }
+
     @Test
     public void testSubmitWithPreCommitException() throws Exception {
         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
index 5f6973f..1ffe387 100644 (file)
@@ -424,6 +424,169 @@ public class ShardManagerTest extends AbstractActorTest {
         JavaTestKit.shutdownActorSystem(system2);
     }
 
+    @Test
+    public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
+        String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+        // Create an ActorSystem ShardManager actor for member-1.
+
+        final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+
+        final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
+            newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
+                new MockConfiguration()), shardManagerID);
+
+        // Create an ActorSystem ShardManager actor for member-2.
+
+        final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+
+        Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
+
+        MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+            put("default", Arrays.asList("member-1", "member-2")).build());
+
+        final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
+            newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
+                mockConfig2), shardManagerID);
+
+        new JavaTestKit(system1) {{
+
+            shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager1.tell(new ActorInitialized(), mockShardActor1);
+            shardManager2.tell(new ActorInitialized(), mockShardActor2);
+
+            String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+            String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
+                Optional.of(mock(DataTree.class))), mockShardActor1);
+            shardManager1.tell(new RoleChangeNotification(memberId1,
+                RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
+            shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))),
+                mockShardActor2);
+            shardManager2.tell(new RoleChangeNotification(memberId2,
+                RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
+            shardManager1.underlyingActor().waitForMemberUp();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
+            String path = found.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
+
+            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
+                createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+
+            shardManager1.underlyingActor().waitForUnreachableMember();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+
+            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
+                createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+
+            shardManager1.underlyingActor().waitForReachableMember();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
+            String path1 = found1.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
+
+        }};
+
+        JavaTestKit.shutdownActorSystem(system1);
+        JavaTestKit.shutdownActorSystem(system2);
+    }
+
+    @Test
+    public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
+        String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+        // Create an ActorSystem ShardManager actor for member-1.
+
+        final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+
+        final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
+            newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
+                new MockConfiguration()), shardManagerID);
+
+        // Create an ActorSystem ShardManager actor for member-2.
+
+        final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+
+        Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
+
+        MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+            put("default", Arrays.asList("member-1", "member-2")).build());
+
+        final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
+            newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
+                mockConfig2), shardManagerID);
+
+        new JavaTestKit(system1) {{
+
+            shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager1.tell(new ActorInitialized(), mockShardActor1);
+            shardManager2.tell(new ActorInitialized(), mockShardActor2);
+
+            String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+            String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
+                Optional.of(mock(DataTree.class))), mockShardActor1);
+            shardManager1.tell(new RoleChangeNotification(memberId1,
+                RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
+            shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))),
+                mockShardActor2);
+            shardManager2.tell(new RoleChangeNotification(memberId2,
+                RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
+            shardManager1.underlyingActor().waitForMemberUp();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
+            String path = found.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
+
+            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
+                createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+
+            shardManager1.underlyingActor().waitForUnreachableMember();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+
+            shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class))),
+                mockShardActor1);
+            shardManager1.tell(new RoleChangeNotification(memberId1,
+                RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
+            String path1 = found1.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
+
+        }};
+
+        JavaTestKit.shutdownActorSystem(system1);
+        JavaTestKit.shutdownActorSystem(system2);
+    }
+
+
     @Test
     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -927,6 +1090,8 @@ public class ShardManagerTest extends AbstractActorTest {
         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
         private CountDownLatch memberUpReceived = new CountDownLatch(1);
         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
+        private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
+        private CountDownLatch memberReachableReceived = new CountDownLatch(1);
         private final ActorRef shardActor;
         private final String name;
 
@@ -955,6 +1120,16 @@ public class ShardManagerTest extends AbstractActorTest {
                     if(!getCluster().getCurrentMemberName().equals(role)) {
                         memberRemovedReceived.countDown();
                     }
+                } else if(message instanceof ClusterEvent.UnreachableMember) {
+                    String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
+                    if(!getCluster().getCurrentMemberName().equals(role)) {
+                        memberUnreachableReceived.countDown();
+                    }
+                } else if(message instanceof ClusterEvent.ReachableMember) {
+                    String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
+                    if(!getCluster().getCurrentMemberName().equals(role)) {
+                        memberReachableReceived.countDown();
+                    }
                 }
             }
         }
@@ -981,6 +1156,19 @@ public class ShardManagerTest extends AbstractActorTest {
             memberRemovedReceived = new CountDownLatch(1);
         }
 
+        void waitForUnreachableMember() {
+            assertEquals("UnreachableMember received", true,
+                Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
+                ));
+            memberUnreachableReceived = new CountDownLatch(1);
+        }
+
+        void waitForReachableMember() {
+            assertEquals("ReachableMember received", true,
+                Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
+            memberReachableReceived = new CountDownLatch(1);
+        }
+
         void verifyFindPrimary() {
             assertEquals("FindPrimary received", true,
                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
index 4ee89ca..1cc89f1 100644 (file)
@@ -461,68 +461,44 @@ public class ActorContextTest extends AbstractActorTest{
 
     @Test
     public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
-
-            TestActorRef<MessageCollectorActor> shardManager =
-                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
-
-            DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
-                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
-
-            ActorContext actorContext =
-                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
-                        @Override
-                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
-                            return Futures.successful((Object) new PrimaryNotFoundException("not found"));
-                        }
-                    };
-
-
-            Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
-
-            try {
-                Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
-                fail("Expected PrimaryNotFoundException");
-            } catch(PrimaryNotFoundException e){
-
-            }
-
-            Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
-
-            assertNull(cached);
+        testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
     }
 
     @Test
     public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
+        testFindPrimaryExceptions(new NotInitializedException("not initialized"));
+    }
 
-            TestActorRef<MessageCollectorActor> shardManager =
-                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
-
-            DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
-                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
-
-            ActorContext actorContext =
-                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
-                        @Override
-                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
-                            return Futures.successful((Object) new NotInitializedException("not iniislized"));
-                        }
-                    };
+    private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
+        TestActorRef<MessageCollectorActor> shardManager =
+            TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
 
+        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+            shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
 
-            Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+        ActorContext actorContext =
+            new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
+                @Override
+                protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+                    return Futures.successful(expectedException);
+                }
+            };
 
-            try {
-                Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
-                fail("Expected NotInitializedException");
-            } catch(NotInitializedException e){
+        Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
 
+        try {
+            Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+            fail("Expected" + expectedException.getClass().toString());
+        } catch(Exception e){
+            if(!expectedException.getClass().isInstance(e)) {
+                fail("Expected Exception of type " + expectedException.getClass().toString());
             }
+        }
 
-            Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+        Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
-            assertNull(cached);
+        assertNull(cached);
     }
 
     @Test
index 5d44033..756ed07 100644 (file)
@@ -87,4 +87,32 @@ public class MockClusterWrapper implements ClusterWrapper{
 
         return new ClusterEvent.MemberUp(member);
     }
+
+    public static ClusterEvent.UnreachableMember createUnreachableMember(String memberName, String address) {
+        akka.cluster.UniqueAddress uniqueAddress = new UniqueAddress(
+            AddressFromURIString.parse(address), 55);
+
+        Set<String> roles = new HashSet<>();
+
+        roles.add(memberName);
+
+        akka.cluster.Member member = new akka.cluster.Member(uniqueAddress, 1, MemberStatus.up(),
+            JavaConversions.asScalaSet(roles).<String>toSet());
+
+        return new ClusterEvent.UnreachableMember(member);
+    }
+
+    public static ClusterEvent.ReachableMember createReachableMember(String memberName, String address) {
+        akka.cluster.UniqueAddress uniqueAddress = new UniqueAddress(
+            AddressFromURIString.parse(address), 55);
+
+        Set<String> roles = new HashSet<>();
+
+        roles.add(memberName);
+
+        akka.cluster.Member member = new akka.cluster.Member(uniqueAddress, 1, MemberStatus.up(),
+            JavaConversions.asScalaSet(roles).<String>toSet());
+
+        return new ClusterEvent.ReachableMember(member);
+    }
 }