If cluster member, which is a leader of one or multiple shards becomes unavailable,
Shard re-election happens after certain time period and anyone tries to access datastore,
receives akka.timeout exception. No application can act upon such exception.
DataStoreUnavailableException will help external applications to retry if they see this exception.
Change-Id: Iceb10580bbe73ae91dc8abb4bc6a183cb4fad6f8
Signed-off-by: Harman Singh <harmasin@cisco.com>
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.api.data;
+
+/**
+ * This exception occurs if the datastore is temporarily unavailable.
+ * A retry of the transaction may succeed after a period of time
+ */
+
+public class DataStoreUnavailableException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public DataStoreUnavailableException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+
+}
import java.util.Map;
import java.util.concurrent.Executor;
import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, t);
final Exception e;
- if (t instanceof Exception) {
+ if(t instanceof NoShardLeaderException) {
+ e = new DataStoreUnavailableException(t.getMessage(), t);
+ } else if (t instanceof Exception) {
e = (Exception)t;
} else {
e = new RuntimeException("Unexpected error occurred", t);
import com.google.common.base.Optional;
import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.Semaphore;
+
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
operationLimiter.release();
- proxyFuture.setException(new ReadFailedException("Error reading data for path " + path, failure));
+ Throwable t;
+ if(failure instanceof NoShardLeaderException) {
+ t = new DataStoreUnavailableException(failure.getMessage(), failure);
+ } else {
+ t = failure;
+ }
+ proxyFuture.setException(new ReadFailedException("Error reading data for path " + path, t));
}
@Override
} else if(message instanceof ClusterEvent.MemberRemoved) {
memberRemoved((ClusterEvent.MemberRemoved) message);
} else if(message instanceof ClusterEvent.UnreachableMember) {
- ignoreMessage(message);
+ memberUnreachable((ClusterEvent.UnreachableMember)message);
+ } else if(message instanceof ClusterEvent.ReachableMember) {
+ memberReachable((ClusterEvent.ReachableMember) message);
} else if(message instanceof DatastoreContext) {
onDatastoreContext((DatastoreContext)message);
} else if(message instanceof RoleChangeNotification) {
checkReady();
}
+ private void memberReachable(ClusterEvent.ReachableMember message) {
+ String memberName = message.member().roles().head();
+ LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
+
+ markMemberAvailable(memberName);
+ }
+
+ private void memberUnreachable(ClusterEvent.UnreachableMember message) {
+ String memberName = message.member().roles().head();
+ LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
+
+ markMemberUnavailable(memberName);
+ }
+
+ private void markMemberUnavailable(final String memberName) {
+ for(ShardInformation info : localShards.values()){
+ String leaderId = info.getLeaderId();
+ if(leaderId != null && leaderId.contains(memberName)) {
+ LOG.debug("Marking Leader {} as unavailable.", leaderId);
+ info.setLeaderAvailable(false);
+ }
+ }
+ }
+
+ private void markMemberAvailable(final String memberName) {
+ for(ShardInformation info : localShards.values()){
+ String leaderId = info.getLeaderId();
+ if(leaderId != null && leaderId.contains(memberName)) {
+ LOG.debug("Marking Leader {} as available.", leaderId);
+ info.setLeaderAvailable(true);
+ }
+ }
+ }
+
private void onDatastoreContext(DatastoreContext context) {
datastoreContext = context;
for (ShardInformation info : localShards.values()) {
private ActorPath actorPath;
private final Map<String, String> peerAddresses;
private Optional<DataTree> localShardDataTree;
+ private boolean leaderAvailable = false;
// flag that determines if the actor is ready for business
private boolean actorInitialized = false;
}
boolean isShardReadyWithLeaderId() {
- return isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
+ return leaderAvailable && isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
}
boolean isShardInitialized() {
boolean setLeaderId(String leaderId) {
boolean changed = !Objects.equal(this.leaderId, leaderId);
this.leaderId = leaderId;
-
+ if(leaderId != null) {
+ this.leaderAvailable = true;
+ }
notifyOnShardInitializedCallbacks();
return changed;
}
+
+ public String getLeaderId() {
+ return leaderId;
+ }
+
+ public void setLeaderAvailable(boolean leaderAvailable) {
+ this.leaderAvailable = leaderAvailable;
+ }
}
private static class ShardManagerCreator implements Creator<ShardManager> {
import org.mockito.InOrder;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
fail("Expected TransactionCommitFailedException");
} catch (TransactionCommitFailedException e) {
if(expCause != null) {
- assertSame("Expected cause", expCause, e.getCause());
+ assertSame("Expected cause", expCause.getClass(), e.getCause().getClass());
}
InOrder inOrder = inOrder((Object[])mockCohorts);
assertFailure(future, cause, mockCohort1, mockCohort2);
}
+ @Test
+ public void testSubmitWithCanCommitDataStoreUnavailableException() throws Exception {
+ doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
+ doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
+ NoShardLeaderException rootCause = new NoShardLeaderException("mock");
+ DataStoreUnavailableException cause = new DataStoreUnavailableException(rootCause.getMessage(), rootCause);
+ doReturn(Futures.immediateFailedFuture(rootCause)).when(mockCohort2).canCommit();
+ doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
+
+ CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
+ transaction, Arrays.asList(mockCohort1, mockCohort2));
+
+ assertFailure(future, cause, mockCohort1, mockCohort2);
+ }
+
@Test
public void testSubmitWithPreCommitException() throws Exception {
doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
JavaTestKit.shutdownActorSystem(system2);
}
+ @Test
+ public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
+ String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+ // Create an ActorSystem ShardManager actor for member-1.
+
+ final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+ Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+
+ final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
+ newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
+ new MockConfiguration()), shardManagerID);
+
+ // Create an ActorSystem ShardManager actor for member-2.
+
+ final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+
+ Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
+
+ MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("default", Arrays.asList("member-1", "member-2")).build());
+
+ final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
+ newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
+ mockConfig2), shardManagerID);
+
+ new JavaTestKit(system1) {{
+
+ shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager1.tell(new ActorInitialized(), mockShardActor1);
+ shardManager2.tell(new ActorInitialized(), mockShardActor2);
+
+ String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+ String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
+ Optional.of(mock(DataTree.class))), mockShardActor1);
+ shardManager1.tell(new RoleChangeNotification(memberId1,
+ RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
+ shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))),
+ mockShardActor2);
+ shardManager2.tell(new RoleChangeNotification(memberId2,
+ RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
+ shardManager1.underlyingActor().waitForMemberUp();
+
+ shardManager1.tell(new FindPrimary("default", true), getRef());
+
+ RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
+ String path = found.getPrimaryPath();
+ assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
+
+ shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
+ createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ shardManager1.underlyingActor().waitForUnreachableMember();
+
+ shardManager1.tell(new FindPrimary("default", true), getRef());
+
+ expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+
+ shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
+ createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ shardManager1.underlyingActor().waitForReachableMember();
+
+ shardManager1.tell(new FindPrimary("default", true), getRef());
+
+ RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
+ String path1 = found1.getPrimaryPath();
+ assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
+
+ }};
+
+ JavaTestKit.shutdownActorSystem(system1);
+ JavaTestKit.shutdownActorSystem(system2);
+ }
+
+ @Test
+ public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
+ String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+ // Create an ActorSystem ShardManager actor for member-1.
+
+ final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+ Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+
+ final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
+ newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
+ new MockConfiguration()), shardManagerID);
+
+ // Create an ActorSystem ShardManager actor for member-2.
+
+ final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+
+ Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
+
+ MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("default", Arrays.asList("member-1", "member-2")).build());
+
+ final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
+ newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
+ mockConfig2), shardManagerID);
+
+ new JavaTestKit(system1) {{
+
+ shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager1.tell(new ActorInitialized(), mockShardActor1);
+ shardManager2.tell(new ActorInitialized(), mockShardActor2);
+
+ String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+ String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
+ Optional.of(mock(DataTree.class))), mockShardActor1);
+ shardManager1.tell(new RoleChangeNotification(memberId1,
+ RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
+ shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))),
+ mockShardActor2);
+ shardManager2.tell(new RoleChangeNotification(memberId2,
+ RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
+ shardManager1.underlyingActor().waitForMemberUp();
+
+ shardManager1.tell(new FindPrimary("default", true), getRef());
+
+ RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
+ String path = found.getPrimaryPath();
+ assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
+
+ shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
+ createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ shardManager1.underlyingActor().waitForUnreachableMember();
+
+ shardManager1.tell(new FindPrimary("default", true), getRef());
+
+ expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+
+ shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class))),
+ mockShardActor1);
+ shardManager1.tell(new RoleChangeNotification(memberId1,
+ RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
+
+ shardManager1.tell(new FindPrimary("default", true), getRef());
+
+ LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
+ String path1 = found1.getPrimaryPath();
+ assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
+
+ }};
+
+ JavaTestKit.shutdownActorSystem(system1);
+ JavaTestKit.shutdownActorSystem(system2);
+ }
+
+
@Test
public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
private CountDownLatch memberUpReceived = new CountDownLatch(1);
private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
+ private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
+ private CountDownLatch memberReachableReceived = new CountDownLatch(1);
private final ActorRef shardActor;
private final String name;
if(!getCluster().getCurrentMemberName().equals(role)) {
memberRemovedReceived.countDown();
}
+ } else if(message instanceof ClusterEvent.UnreachableMember) {
+ String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
+ if(!getCluster().getCurrentMemberName().equals(role)) {
+ memberUnreachableReceived.countDown();
+ }
+ } else if(message instanceof ClusterEvent.ReachableMember) {
+ String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
+ if(!getCluster().getCurrentMemberName().equals(role)) {
+ memberReachableReceived.countDown();
+ }
}
}
}
memberRemovedReceived = new CountDownLatch(1);
}
+ void waitForUnreachableMember() {
+ assertEquals("UnreachableMember received", true,
+ Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
+ ));
+ memberUnreachableReceived = new CountDownLatch(1);
+ }
+
+ void waitForReachableMember() {
+ assertEquals("ReachableMember received", true,
+ Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
+ memberReachableReceived = new CountDownLatch(1);
+ }
+
void verifyFindPrimary() {
assertEquals("FindPrimary received", true,
Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
@Test
public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
-
- TestActorRef<MessageCollectorActor> shardManager =
- TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
-
- DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
- shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
-
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
- @Override
- protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new PrimaryNotFoundException("not found"));
- }
- };
-
-
- Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
-
- try {
- Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
- fail("Expected PrimaryNotFoundException");
- } catch(PrimaryNotFoundException e){
-
- }
-
- Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
-
- assertNull(cached);
+ testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
}
@Test
public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
+ testFindPrimaryExceptions(new NotInitializedException("not initialized"));
+ }
- TestActorRef<MessageCollectorActor> shardManager =
- TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
-
- DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
- shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
-
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
- @Override
- protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new NotInitializedException("not iniislized"));
- }
- };
+ private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
+ TestActorRef<MessageCollectorActor> shardManager =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
- Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
+ @Override
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ return Futures.successful(expectedException);
+ }
+ };
- try {
- Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
- fail("Expected NotInitializedException");
- } catch(NotInitializedException e){
+ Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+ try {
+ Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+ fail("Expected" + expectedException.getClass().toString());
+ } catch(Exception e){
+ if(!expectedException.getClass().isInstance(e)) {
+ fail("Expected Exception of type " + expectedException.getClass().toString());
}
+ }
- Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+ Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
- assertNull(cached);
+ assertNull(cached);
}
@Test
return new ClusterEvent.MemberUp(member);
}
+
+ public static ClusterEvent.UnreachableMember createUnreachableMember(String memberName, String address) {
+ akka.cluster.UniqueAddress uniqueAddress = new UniqueAddress(
+ AddressFromURIString.parse(address), 55);
+
+ Set<String> roles = new HashSet<>();
+
+ roles.add(memberName);
+
+ akka.cluster.Member member = new akka.cluster.Member(uniqueAddress, 1, MemberStatus.up(),
+ JavaConversions.asScalaSet(roles).<String>toSet());
+
+ return new ClusterEvent.UnreachableMember(member);
+ }
+
+ public static ClusterEvent.ReachableMember createReachableMember(String memberName, String address) {
+ akka.cluster.UniqueAddress uniqueAddress = new UniqueAddress(
+ AddressFromURIString.parse(address), 55);
+
+ Set<String> roles = new HashSet<>();
+
+ roles.add(memberName);
+
+ akka.cluster.Member member = new akka.cluster.Member(uniqueAddress, 1, MemberStatus.up(),
+ JavaConversions.asScalaSet(roles).<String>toSet());
+
+ return new ClusterEvent.ReachableMember(member);
+ }
}