From 207c7ca08028fc86e06ec0ac761208d6d3190742 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Sun, 27 Sep 2015 20:14:45 -0400 Subject: [PATCH] Add ShardPeerAddressResolver Added a ShardPeerAddressResolver implementation that is passed to Shard RaftActors to resolve addresses for shard peer ids. I refactored ShardManager a bit to move the memberNameToAddress map and related code to the ShardPeerAddressResolver. Change-Id: I5cbef5816d9bf13a339e43008144f44fd55fc606 Signed-off-by: Tom Pantelis --- .../cluster/datastore/DatastoreContext.java | 21 +++- .../cluster/datastore/ShardManager.java | 107 +++++++----------- .../datastore/ShardPeerAddressResolver.java | 86 ++++++++++++++ .../cluster/datastore/ShardManagerTest.java | 3 + .../ShardPeerAddressResolverTest.java | 96 ++++++++++++++++ 5 files changed, 243 insertions(+), 70 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolver.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolverTest.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index e0c243b5b4..4d952a725f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader; import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; +import org.opendaylight.controller.cluster.raft.PeerAddressResolver; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -111,7 +112,7 @@ public class DatastoreContext { setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor()); setCustomRaftPolicyImplementation(other.customRaftPolicyImplementation); setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize()); - + setPeerAddressResolver(other.raftConfig.getPeerAddressResolver()); } public static Builder newBuilder() { @@ -178,6 +179,10 @@ public class DatastoreContext { return transactionCreationInitialRateLimit; } + private void setPeerAddressResolver(PeerAddressResolver resolver) { + raftConfig.setPeerAddressResolver(resolver); + } + private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis){ raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis, TimeUnit.MILLISECONDS)); @@ -201,7 +206,6 @@ public class DatastoreContext { raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation); } - private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) { raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage); } @@ -210,6 +214,10 @@ public class DatastoreContext { raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount); } + private void setShardSnapshotChunkSize(int shardSnapshotChunkSize) { + raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize); + } + public int getShardBatchedModificationCount() { return shardBatchedModificationCount; } @@ -230,10 +238,6 @@ public class DatastoreContext { return raftConfig.getSnapshotChunkSize(); } - public void setShardSnapshotChunkSize(int shardSnapshotChunkSize) { - raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize); - } - public static class Builder { private final DatastoreContext datastoreContext; private int maxShardDataChangeExecutorPoolSize = @@ -441,5 +445,10 @@ public class DatastoreContext { datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize); return this; } + + public Builder shardPeerAddressResolver(PeerAddressResolver resolver) { + datastoreContext.setPeerAddressResolver(resolver); + return this; + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 60894f5bc9..24d808cd91 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorPath; import akka.actor.ActorRef; -import akka.actor.Address; import akka.actor.Cancellable; import akka.actor.OneForOneStrategy; import akka.actor.Props; @@ -44,7 +43,6 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderExc import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; -import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; @@ -90,11 +88,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class); - // Stores a mapping between a member name and the address of the member - // Member names look like "member-1", "member-2" etc and are as specified - // in configuration - private final Map memberNameToAddress = new HashMap<>(); - // Stores a mapping between a shard name and it's corresponding information // Shard names look like inventory, topology etc and are as specified in // configuration @@ -104,8 +97,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // A data store could be of type config/operational private final String type; - private final String shardManagerIdentifierString; - private final ClusterWrapper cluster; private final Configuration configuration; @@ -120,6 +111,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final PrimaryShardInfoFutureCache primaryShardInfoCache; + private final ShardPeerAddressResolver peerAddressResolver; + private SchemaContext schemaContext; /** @@ -132,12 +125,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); this.datastoreContext = datastoreContext; this.type = datastoreContext.getDataStoreType(); - this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString(); this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; this.primaryShardInfoCache = primaryShardInfoCache; + peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName()); + this.datastoreContext = DatastoreContext.newBuilderFrom(datastoreContext).shardPeerAddressResolver( + peerAddressResolver).build(); + // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); @@ -145,11 +141,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } public static Props props( - final ClusterWrapper cluster, - final Configuration configuration, - final DatastoreContext datastoreContext, - final CountDownLatch waitTillReadyCountdownLatch, - final PrimaryShardInfoFutureCache primaryShardInfoCache) { + final ClusterWrapper cluster, + final Configuration configuration, + final DatastoreContext datastoreContext, + final CountDownLatch waitTillReadyCountdownLatch, + final PrimaryShardInfoFutureCache primaryShardInfoCache) { Preconditions.checkNotNull(cluster, "cluster should not be null"); Preconditions.checkNotNull(configuration, "configuration should not be null"); @@ -228,6 +224,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { DatastoreContext shardDatastoreContext = createShard.getDatastoreContext(); if(shardDatastoreContext == null) { shardDatastoreContext = datastoreContext; + } else { + shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver( + peerAddressResolver).build(); } ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses, @@ -474,7 +473,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); - memberNameToAddress.remove(memberName); + peerAddressResolver.removePeerAddress(memberName); for(ShardInformation info : localShards.values()){ info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); @@ -487,7 +486,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); - memberNameToAddress.remove(memberName); + peerAddressResolver.removePeerAddress(memberName); for(ShardInformation info : localShards.values()){ info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); @@ -500,12 +499,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); - memberNameToAddress.put(memberName, message.member().address()); + peerAddressResolver.addPeerAddress(memberName, message.member().address()); for(ShardInformation info : localShards.values()){ String shardName = info.getShardName(); String peerId = getShardIdentifier(memberName, shardName).toString(); - info.updatePeerAddress(peerId, getShardActorPath(shardName, memberName), getSelf()); + info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf()); info.peerUp(memberName, peerId, getSelf()); } @@ -554,7 +553,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void onDatastoreContext(DatastoreContext context) { - datastoreContext = context; + datastoreContext = DatastoreContext.newBuilderFrom(context).shardPeerAddressResolver( + peerAddressResolver).build(); for (ShardInformation info : localShards.values()) { if (info.getActor() != null) { info.getActor().tell(datastoreContext, getSelf()); @@ -604,7 +604,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @VisibleForTesting protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) { return getContext().actorOf(info.newProps(schemaContext) - .withDispatcher(shardDispatcherPath), info.getShardId().toString()); + .withDispatcher(shardDispatcherPath), info.getShardId().toString()); } private void findPrimary(FindPrimary message) { @@ -624,28 +624,24 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) : new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion()); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); - } + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); + } - return found; + return found; } }); return; } - for(Map.Entry entry: memberNameToAddress.entrySet()) { - if(!cluster.getCurrentMemberName().equals(entry.getKey())) { - String path = getShardManagerActorPathBuilder(entry.getValue()).toString(); - - LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(), - shardName, path); + for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) { + LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(), + shardName, address); - getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName, - message.isWaitUntilReady()), getContext()); - return; - } + getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName, + message.isWaitUntilReady()), getContext()); + return; } LOG.debug("{}: No shard found for {}", persistenceId(), shardName); @@ -654,23 +650,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { String.format("No primary shard found for %s.", shardName)), getSelf()); } - private StringBuilder getShardManagerActorPathBuilder(Address address) { - StringBuilder builder = new StringBuilder(); - builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString); - return builder; - } - - private String getShardActorPath(String shardName, String memberName) { - Address address = memberNameToAddress.get(memberName); - if(address != null) { - StringBuilder builder = getShardManagerActorPathBuilder(address); - builder.append("/") - .append(getShardIdentifier(memberName, shardName)); - return builder.toString(); - } - return null; - } - /** * Construct the name of the shard actor given the name of the member on * which the shard resides and the name of the shard @@ -680,7 +659,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @return */ private ShardIdentifier getShardIdentifier(String memberName, String shardName){ - return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build(); + return peerAddressResolver.getShardIdentifier(memberName, shardName); } /** @@ -703,7 +682,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type, - datastoreContext.getDataStoreMXBeanType(), localShardActorNames); + datastoreContext.getDataStoreMXBeanType(), localShardActorNames); mBean.setShardManager(this); } @@ -722,8 +701,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { for(String memberName : members) { if(!currentMemberName.equals(memberName)) { ShardIdentifier shardId = getShardIdentifier(memberName, shardName); - String path = getShardActorPath(shardName, memberName); - peerAddresses.put(shardId.toString(), path); + String address = peerAddressResolver.getShardActorAddress(shardName, memberName); + peerAddresses.put(shardId.toString(), address); } } return peerAddresses; @@ -733,14 +712,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy(10, Duration.create("1 minute"), - new Function() { - @Override - public SupervisorStrategy.Directive apply(Throwable t) { - LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t); - return SupervisorStrategy.resume(); - } + new Function() { + @Override + public SupervisorStrategy.Directive apply(Throwable t) { + LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t); + return SupervisorStrategy.resume(); } - ); + } + ); } @@ -826,7 +805,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){ LOG.info("updatePeerAddress for peer {} with address {}", peerId, - peerAddress); + peerAddress); if(peerAddresses.containsKey(peerId)){ peerAddresses.put(peerId, peerAddress); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolver.java new file mode 100644 index 0000000000..12fabbb6d2 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolver.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.Address; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; +import org.opendaylight.controller.cluster.raft.PeerAddressResolver; + +/** + * Implementation PeerAddressResolver that resolves address for shard peer ids. This class is owned by the + * ShardMaanager and passed to Shard actors via the ConfigParams. + * + * @author Thomas Pantelis + */ +class ShardPeerAddressResolver implements PeerAddressResolver { + // Stores a mapping between a member name and the address of the member. The map is concurrent as it + // will be accessed by multiple threads via the public resolve method. + private final ConcurrentMap memberNameToAddress = new ConcurrentHashMap<>(); + private final String shardManagerIdentifier; + private final String shardManagerType; + private final String localMemberName; + + public ShardPeerAddressResolver(String shardManagerType, String localMemberName) { + this.shardManagerIdentifier = ShardManagerIdentifier.builder().type(shardManagerType).build().toString(); + this.shardManagerType = shardManagerType; + this.localMemberName = localMemberName; + } + + void addPeerAddress(String memberName, Address address) { + memberNameToAddress.put(memberName, address); + } + + void removePeerAddress(String memberName) { + memberNameToAddress.remove(memberName); + } + + Address getPeerAddress(String memberName) { + return memberNameToAddress.get(memberName); + } + + Collection getShardManagerPeerActorAddresses() { + Collection peerAddresses = new ArrayList<>(); + for(Map.Entry entry: memberNameToAddress.entrySet()) { + if(!localMemberName.equals(entry.getKey())) { + peerAddresses.add(getShardManagerActorPathBuilder(entry.getValue()).toString()); + } + } + + return peerAddresses; + } + + ShardIdentifier getShardIdentifier(String memberName, String shardName){ + return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(shardManagerType).build(); + } + + String getShardActorAddress(String shardName, String memberName) { + Address memberAddress = memberNameToAddress.get(memberName); + if(memberAddress != null) { + return getShardManagerActorPathBuilder(memberAddress).append("/").append( + getShardIdentifier(memberName, shardName)).toString(); + } + + return null; + } + + private StringBuilder getShardManagerActorPathBuilder(Address address) { + return new StringBuilder().append(address.toString()).append("/user/").append(shardManagerIdentifier); + } + + @Override + public String resolve(String peerId) { + ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build(); + return getShardActorAddress(shardId.getShardName(), shardId.getMemberName()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 82f4b3ccf9..e49221b06a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -961,6 +961,7 @@ public class ShardManagerTest extends AbstractActorTest { }}; } + @Test public void testOnReceiveCreateShard() { new JavaTestKit(getSystem()) {{ datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); @@ -986,6 +987,8 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("5 seconds"), LocalShardFound.class); assertEquals("isRecoveryApplicable", false, shardPropsCreator.datastoreContext.isPersistent()); + assertTrue("Epxected ShardPeerAddressResolver", shardPropsCreator.datastoreContext.getShardRaftConfig(). + getPeerAddressResolver() instanceof ShardPeerAddressResolver); assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(), new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()), shardPropsCreator.peerAddresses.keySet()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolverTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolverTest.java new file mode 100644 index 0000000000..5d01e36cc2 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolverTest.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertEquals; +import akka.actor.Address; +import com.google.common.collect.Sets; +import java.util.Collection; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; + +/** + * Unit tests for ShardPeerAddressResolver. + * + * @author Thomas Pantelis + */ +public class ShardPeerAddressResolverTest { + + @Test + public void testGetShardActorAddress() { + ShardPeerAddressResolver resolver = new ShardPeerAddressResolver("config", "member-1"); + + assertEquals("getShardActorAddress", null, resolver.getShardActorAddress("default", "member-2")); + + Address address2 = new Address("tcp", "system2"); + resolver.addPeerAddress("member-2", address2); + assertEquals("getPeerAddress", address2, resolver.getPeerAddress("member-2")); + + Address address3 = new Address("tcp", "system3"); + resolver.addPeerAddress("member-3", address3); + assertEquals("getPeerAddress", address3, resolver.getPeerAddress("member-3")); + + assertEquals("getShardActorAddress", address2.toString() + + "/user/shardmanager-config/member-2-shard-default-config", + resolver.getShardActorAddress("default", "member-2")); + + assertEquals("getShardActorAddress", address3.toString() + + "/user/shardmanager-config/member-3-shard-default-config", + resolver.getShardActorAddress("default", "member-3")); + + assertEquals("getShardActorAddress", address2.toString() + + "/user/shardmanager-config/member-2-shard-topology-config", + resolver.getShardActorAddress("topology", "member-2")); + + resolver.removePeerAddress("member-2"); + assertEquals("getShardActorAddress", null, resolver.getShardActorAddress("default", "member-2")); + assertEquals("getShardActorAddress", null, resolver.getShardActorAddress("topology", "member-2")); + assertEquals("getShardActorAddress", address3.toString() + + "/user/shardmanager-config/member-3-shard-default-config", + resolver.getShardActorAddress("default", "member-3")); + } + + @Test + public void testResolve() { + String type = "config"; + ShardPeerAddressResolver resolver = new ShardPeerAddressResolver(type, "member-1"); + + String memberName = "member-2"; + String peerId = ShardIdentifier.builder().memberName(memberName ).shardName("default"). + type(type).build().toString(); + + assertEquals("resolve", null, resolver.resolve(peerId)); + + Address address = new Address("tcp", "system"); + resolver.addPeerAddress(memberName, address); + + String shardAddress = resolver.getShardActorAddress("default", memberName); + assertEquals("getShardActorAddress", address.toString() + + "/user/shardmanager-" + type + "/" + memberName + "-shard-default-" + type, shardAddress); + + assertEquals("resolve", shardAddress, resolver.resolve(peerId)); + } + + @Test + public void testGetShardManagerPeerActorAddresses() { + ShardPeerAddressResolver resolver = new ShardPeerAddressResolver("config", "member-1"); + + resolver.addPeerAddress("member-1", new Address("tcp", "system1")); + + Address address2 = new Address("tcp", "system2"); + resolver.addPeerAddress("member-2", address2); + + Address address3 = new Address("tcp", "system3"); + resolver.addPeerAddress("member-3", address3); + + Collection peerAddresses = resolver.getShardManagerPeerActorAddresses(); + assertEquals("getShardManagerPeerActorAddresses", Sets.newHashSet( + address2.toString() + "/user/shardmanager-config", + address3.toString() + "/user/shardmanager-config"), Sets.newHashSet(peerAddresses)); + } +} -- 2.36.6