From 6823ef1fd8f0a2f7ea39a2e85276a73e5de0bf72 Mon Sep 17 00:00:00 2001 From: Harman Singh Date: Mon, 18 May 2015 14:51:46 -0700 Subject: [PATCH] Bug -3221 : Adding a new DataStoreUnavailableException for external applications. If cluster member, which is a leader of one or multiple shards becomes unavailable, Shard re-election happens after certain time period and anyone tries to access datastore, receives akka.timeout exception. No application can act upon such exception. DataStoreUnavailableException will help external applications to retry if they see this exception. Change-Id: Iceb10580bbe73ae91dc8abb4bc6a183cb4fad6f8 Signed-off-by: Harman Singh --- .../data/DataStoreUnavailableException.java | 23 +++ .../datastore/ConcurrentDOMDataBroker.java | 6 +- .../datastore/NoOpTransactionContext.java | 11 +- .../cluster/datastore/ShardManager.java | 53 ++++- .../ConcurrentDOMDataBrokerTest.java | 19 +- .../cluster/datastore/ShardManagerTest.java | 188 ++++++++++++++++++ .../datastore/utils/ActorContextTest.java | 76 +++---- .../datastore/utils/MockClusterWrapper.java | 28 +++ 8 files changed, 348 insertions(+), 56 deletions(-) create mode 100644 opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataStoreUnavailableException.java diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataStoreUnavailableException.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataStoreUnavailableException.java new file mode 100644 index 0000000000..ebbd116bc6 --- /dev/null +++ b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataStoreUnavailableException.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.common.api.data; + +/** + * This exception occurs if the datastore is temporarily unavailable. + * A retry of the transaction may succeed after a period of time + */ + +public class DataStoreUnavailableException extends Exception { + private static final long serialVersionUID = 1L; + + public DataStoreUnavailableException(String message, Throwable cause) { + super(message, cause); + } + + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java index aa5040fb11..c16a849335 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java @@ -20,6 +20,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; @@ -195,7 +197,9 @@ public class ConcurrentDOMDataBroker extends AbstractDOMBroker { LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, t); final Exception e; - if (t instanceof Exception) { + if(t instanceof NoShardLeaderException) { + e = new DataStoreUnavailableException(t.getMessage(), t); + } else if (t instanceof Exception) { e = (Exception)t; } else { e = new RuntimeException("Unexpected error occurred", t); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java index 197cd9fa83..4507168000 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java @@ -11,7 +11,10 @@ import akka.actor.ActorSelection; import com.google.common.base.Optional; import com.google.common.util.concurrent.SettableFuture; import java.util.concurrent.Semaphore; + +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -77,7 +80,13 @@ final class NoOpTransactionContext extends AbstractTransactionContext { public void readData(final YangInstanceIdentifier path, SettableFuture>> proxyFuture) { LOG.debug("Tx {} readData called path = {}", getIdentifier(), path); operationLimiter.release(); - proxyFuture.setException(new ReadFailedException("Error reading data for path " + path, failure)); + Throwable t; + if(failure instanceof NoShardLeaderException) { + t = new DataStoreUnavailableException(failure.getMessage(), failure); + } else { + t = failure; + } + proxyFuture.setException(new ReadFailedException("Error reading data for path " + path, t)); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 63d4c9826b..d33576d495 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -188,7 +188,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if(message instanceof ClusterEvent.MemberRemoved) { memberRemoved((ClusterEvent.MemberRemoved) message); } else if(message instanceof ClusterEvent.UnreachableMember) { - ignoreMessage(message); + memberUnreachable((ClusterEvent.UnreachableMember)message); + } else if(message instanceof ClusterEvent.ReachableMember) { + memberReachable((ClusterEvent.ReachableMember) message); } else if(message instanceof DatastoreContext) { onDatastoreContext((DatastoreContext)message); } else if(message instanceof RoleChangeNotification) { @@ -455,6 +457,40 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { checkReady(); } + private void memberReachable(ClusterEvent.ReachableMember message) { + String memberName = message.member().roles().head(); + LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address()); + + markMemberAvailable(memberName); + } + + private void memberUnreachable(ClusterEvent.UnreachableMember message) { + String memberName = message.member().roles().head(); + LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address()); + + markMemberUnavailable(memberName); + } + + private void markMemberUnavailable(final String memberName) { + for(ShardInformation info : localShards.values()){ + String leaderId = info.getLeaderId(); + if(leaderId != null && leaderId.contains(memberName)) { + LOG.debug("Marking Leader {} as unavailable.", leaderId); + info.setLeaderAvailable(false); + } + } + } + + private void markMemberAvailable(final String memberName) { + for(ShardInformation info : localShards.values()){ + String leaderId = info.getLeaderId(); + if(leaderId != null && leaderId.contains(memberName)) { + LOG.debug("Marking Leader {} as available.", leaderId); + info.setLeaderAvailable(true); + } + } + } + private void onDatastoreContext(DatastoreContext context) { datastoreContext = context; for (ShardInformation info : localShards.values()) { @@ -685,6 +721,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private ActorPath actorPath; private final Map peerAddresses; private Optional localShardDataTree; + private boolean leaderAvailable = false; // flag that determines if the actor is ready for business private boolean actorInitialized = false; @@ -759,7 +796,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } boolean isShardReadyWithLeaderId() { - return isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null); + return leaderAvailable && isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null); } boolean isShardInitialized() { @@ -840,11 +877,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { boolean setLeaderId(String leaderId) { boolean changed = !Objects.equal(this.leaderId, leaderId); this.leaderId = leaderId; - + if(leaderId != null) { + this.leaderAvailable = true; + } notifyOnShardInitializedCallbacks(); return changed; } + + public String getLeaderId() { + return leaderId; + } + + public void setLeaderAvailable(boolean leaderAvailable) { + this.leaderAvailable = leaderAvailable; + } } private static class ShardManagerCreator implements Creator { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java index f2536bfc2c..7a7dde74af 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java @@ -44,6 +44,8 @@ import org.junit.Test; import org.mockito.InOrder; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; @@ -194,7 +196,7 @@ public class ConcurrentDOMDataBrokerTest { fail("Expected TransactionCommitFailedException"); } catch (TransactionCommitFailedException e) { if(expCause != null) { - assertSame("Expected cause", expCause, e.getCause()); + assertSame("Expected cause", expCause.getClass(), e.getCause().getClass()); } InOrder inOrder = inOrder((Object[])mockCohorts); @@ -221,6 +223,21 @@ public class ConcurrentDOMDataBrokerTest { assertFailure(future, cause, mockCohort1, mockCohort2); } + @Test + public void testSubmitWithCanCommitDataStoreUnavailableException() throws Exception { + doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort(); + NoShardLeaderException rootCause = new NoShardLeaderException("mock"); + DataStoreUnavailableException cause = new DataStoreUnavailableException(rootCause.getMessage(), rootCause); + doReturn(Futures.immediateFailedFuture(rootCause)).when(mockCohort2).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort(); + + CheckedFuture future = coordinator.submit( + transaction, Arrays.asList(mockCohort1, mockCohort2)); + + assertFailure(future, cause, mockCohort1, mockCohort2); + } + @Test public void testSubmitWithPreCommitException() throws Exception { doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 5f6973f5db..1ffe387a19 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -424,6 +424,169 @@ public class ShardManagerTest extends AbstractActorTest { JavaTestKit.shutdownActorSystem(system2); } + @Test + public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception { + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + + // Create an ActorSystem ShardManager actor for member-1. + + final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); + + final TestActorRef shardManager1 = TestActorRef.create(system1, + newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1), + new MockConfiguration()), shardManagerID); + + // Create an ActorSystem ShardManager actor for member-2. + + final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); + + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2"); + + MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")).build()); + + final TestActorRef shardManager2 = TestActorRef.create(system2, + newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2), + mockConfig2), shardManagerID); + + new JavaTestKit(system1) {{ + + shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager1.tell(new ActorInitialized(), mockShardActor1); + shardManager2.tell(new ActorInitialized(), mockShardActor2); + + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, + Optional.of(mock(DataTree.class))), mockShardActor1); + shardManager1.tell(new RoleChangeNotification(memberId1, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1); + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))), + mockShardActor2); + shardManager2.tell(new RoleChangeNotification(memberId2, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); + shardManager1.underlyingActor().waitForMemberUp(); + + shardManager1.tell(new FindPrimary("default", true), getRef()); + + RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); + String path = found.getPrimaryPath(); + assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config")); + + shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. + createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + + shardManager1.underlyingActor().waitForUnreachableMember(); + + shardManager1.tell(new FindPrimary("default", true), getRef()); + + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); + + shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. + createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + + shardManager1.underlyingActor().waitForReachableMember(); + + shardManager1.tell(new FindPrimary("default", true), getRef()); + + RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); + String path1 = found1.getPrimaryPath(); + assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config")); + + }}; + + JavaTestKit.shutdownActorSystem(system1); + JavaTestKit.shutdownActorSystem(system2); + } + + @Test + public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception { + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + + // Create an ActorSystem ShardManager actor for member-1. + + final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); + + final TestActorRef shardManager1 = TestActorRef.create(system1, + newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1), + new MockConfiguration()), shardManagerID); + + // Create an ActorSystem ShardManager actor for member-2. + + final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); + + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2"); + + MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")).build()); + + final TestActorRef shardManager2 = TestActorRef.create(system2, + newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2), + mockConfig2), shardManagerID); + + new JavaTestKit(system1) {{ + + shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager1.tell(new ActorInitialized(), mockShardActor1); + shardManager2.tell(new ActorInitialized(), mockShardActor2); + + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, + Optional.of(mock(DataTree.class))), mockShardActor1); + shardManager1.tell(new RoleChangeNotification(memberId1, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1); + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))), + mockShardActor2); + shardManager2.tell(new RoleChangeNotification(memberId2, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); + shardManager1.underlyingActor().waitForMemberUp(); + + shardManager1.tell(new FindPrimary("default", true), getRef()); + + RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); + String path = found.getPrimaryPath(); + assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config")); + + shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. + createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + + shardManager1.underlyingActor().waitForUnreachableMember(); + + shardManager1.tell(new FindPrimary("default", true), getRef()); + + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); + + shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class))), + mockShardActor1); + shardManager1.tell(new RoleChangeNotification(memberId1, + RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1); + + shardManager1.tell(new FindPrimary("default", true), getRef()); + + LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); + String path1 = found1.getPrimaryPath(); + assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config")); + + }}; + + JavaTestKit.shutdownActorSystem(system1); + JavaTestKit.shutdownActorSystem(system2); + } + + @Test public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ @@ -927,6 +1090,8 @@ public class ShardManagerTest extends AbstractActorTest { private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1); private CountDownLatch memberUpReceived = new CountDownLatch(1); private CountDownLatch memberRemovedReceived = new CountDownLatch(1); + private CountDownLatch memberUnreachableReceived = new CountDownLatch(1); + private CountDownLatch memberReachableReceived = new CountDownLatch(1); private final ActorRef shardActor; private final String name; @@ -955,6 +1120,16 @@ public class ShardManagerTest extends AbstractActorTest { if(!getCluster().getCurrentMemberName().equals(role)) { memberRemovedReceived.countDown(); } + } else if(message instanceof ClusterEvent.UnreachableMember) { + String role = ((ClusterEvent.UnreachableMember)message).member().roles().head(); + if(!getCluster().getCurrentMemberName().equals(role)) { + memberUnreachableReceived.countDown(); + } + } else if(message instanceof ClusterEvent.ReachableMember) { + String role = ((ClusterEvent.ReachableMember)message).member().roles().head(); + if(!getCluster().getCurrentMemberName().equals(role)) { + memberReachableReceived.countDown(); + } } } } @@ -981,6 +1156,19 @@ public class ShardManagerTest extends AbstractActorTest { memberRemovedReceived = new CountDownLatch(1); } + void waitForUnreachableMember() { + assertEquals("UnreachableMember received", true, + Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS + )); + memberUnreachableReceived = new CountDownLatch(1); + } + + void waitForReachableMember() { + assertEquals("ReachableMember received", true, + Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS)); + memberReachableReceived = new CountDownLatch(1); + } + void verifyFindPrimary() { assertEquals("FindPrimary received", true, Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 4ee89ca76d..1cc89f18af 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -461,68 +461,44 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception { - - TestActorRef shardManager = - TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); - - DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). - shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); - - ActorContext actorContext = - new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) { - @Override - protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new PrimaryNotFoundException("not found")); - } - }; - - - Future foobar = actorContext.findPrimaryShardAsync("foobar"); - - try { - Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); - fail("Expected PrimaryNotFoundException"); - } catch(PrimaryNotFoundException e){ - - } - - Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); - - assertNull(cached); + testFindPrimaryExceptions(new PrimaryNotFoundException("not found")); } @Test public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception { + testFindPrimaryExceptions(new NotInitializedException("not initialized")); + } - TestActorRef shardManager = - TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); - - DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). - shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); - - ActorContext actorContext = - new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) { - @Override - protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new NotInitializedException("not iniislized")); - } - }; + private void testFindPrimaryExceptions(final Object expectedException) throws Exception { + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); - Future foobar = actorContext.findPrimaryShardAsync("foobar"); + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful(expectedException); + } + }; - try { - Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); - fail("Expected NotInitializedException"); - } catch(NotInitializedException e){ + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + try { + Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); + fail("Expected" + expectedException.getClass().toString()); + } catch(Exception e){ + if(!expectedException.getClass().isInstance(e)) { + fail("Expected Exception of type " + expectedException.getClass().toString()); } + } - Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); - assertNull(cached); + assertNull(cached); } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java index 5d44033cd6..756ed07c45 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java @@ -87,4 +87,32 @@ public class MockClusterWrapper implements ClusterWrapper{ return new ClusterEvent.MemberUp(member); } + + public static ClusterEvent.UnreachableMember createUnreachableMember(String memberName, String address) { + akka.cluster.UniqueAddress uniqueAddress = new UniqueAddress( + AddressFromURIString.parse(address), 55); + + Set roles = new HashSet<>(); + + roles.add(memberName); + + akka.cluster.Member member = new akka.cluster.Member(uniqueAddress, 1, MemberStatus.up(), + JavaConversions.asScalaSet(roles).toSet()); + + return new ClusterEvent.UnreachableMember(member); + } + + public static ClusterEvent.ReachableMember createReachableMember(String memberName, String address) { + akka.cluster.UniqueAddress uniqueAddress = new UniqueAddress( + AddressFromURIString.parse(address), 55); + + Set roles = new HashSet<>(); + + roles.add(memberName); + + akka.cluster.Member member = new akka.cluster.Member(uniqueAddress, 1, MemberStatus.up(), + JavaConversions.asScalaSet(roles).toSet()); + + return new ClusterEvent.ReachableMember(member); + } } -- 2.36.6