Add ShardPeerAddressResolver 06/27606/4
authorTom Pantelis <tpanteli@brocade.com>
Mon, 28 Sep 2015 00:14:45 +0000 (20:14 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 8 Oct 2015 17:19:09 +0000 (17:19 +0000)
Added a ShardPeerAddressResolver implementation that is passed to
Shard RaftActors to resolve addresses for shard peer ids. I refactored
ShardManager a bit to move the memberNameToAddress map and related code
to the ShardPeerAddressResolver.

Change-Id: I5cbef5816d9bf13a339e43008144f44fd55fc606
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolver.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolverTest.java [new file with mode: 0644]

index e0c243b5b41ddf3612248e8fc3fec6ab31fd84d3..4d952a725f5492bfd8cb92eb5fc36f315fe11e58 100644 (file)
@@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
@@ -111,7 +112,7 @@ public class DatastoreContext {
         setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
         setCustomRaftPolicyImplementation(other.customRaftPolicyImplementation);
         setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
-
+        setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
     }
 
     public static Builder newBuilder() {
@@ -178,6 +179,10 @@ public class DatastoreContext {
         return transactionCreationInitialRateLimit;
     }
 
+    private void setPeerAddressResolver(PeerAddressResolver resolver) {
+        raftConfig.setPeerAddressResolver(resolver);
+    }
+
     private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis){
         raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
                 TimeUnit.MILLISECONDS));
@@ -201,7 +206,6 @@ public class DatastoreContext {
         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
     }
 
-
     private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
     }
@@ -210,6 +214,10 @@ public class DatastoreContext {
         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
     }
 
+    private void setShardSnapshotChunkSize(int shardSnapshotChunkSize) {
+        raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
+    }
+
     public int getShardBatchedModificationCount() {
         return shardBatchedModificationCount;
     }
@@ -230,10 +238,6 @@ public class DatastoreContext {
         return raftConfig.getSnapshotChunkSize();
     }
 
-    public void setShardSnapshotChunkSize(int shardSnapshotChunkSize) {
-        raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
-    }
-
     public static class Builder {
         private final DatastoreContext datastoreContext;
         private int maxShardDataChangeExecutorPoolSize =
@@ -441,5 +445,10 @@ public class DatastoreContext {
             datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
             return this;
         }
+
+        public Builder shardPeerAddressResolver(PeerAddressResolver resolver) {
+            datastoreContext.setPeerAddressResolver(resolver);
+            return this;
+        }
     }
 }
index 60894f5bc9723059ef7441aa79357084a00f2345..24d808cd91bf76839bcb9bdbb08e61e49a17db9b 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
-import akka.actor.Address;
 import akka.actor.Cancellable;
 import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
@@ -44,7 +43,6 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderExc
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
@@ -90,11 +88,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
 
-    // Stores a mapping between a member name and the address of the member
-    // Member names look like "member-1", "member-2" etc and are as specified
-    // in configuration
-    private final Map<String, Address> memberNameToAddress = new HashMap<>();
-
     // Stores a mapping between a shard name and it's corresponding information
     // Shard names look like inventory, topology etc and are as specified in
     // configuration
@@ -104,8 +97,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     // A data store could be of type config/operational
     private final String type;
 
-    private final String shardManagerIdentifierString;
-
     private final ClusterWrapper cluster;
 
     private final Configuration configuration;
@@ -120,6 +111,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
+    private final ShardPeerAddressResolver peerAddressResolver;
+
     private SchemaContext schemaContext;
 
     /**
@@ -132,12 +125,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
         this.datastoreContext = datastoreContext;
         this.type = datastoreContext.getDataStoreType();
-        this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
         this.primaryShardInfoCache = primaryShardInfoCache;
 
+        peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
+        this.datastoreContext = DatastoreContext.newBuilderFrom(datastoreContext).shardPeerAddressResolver(
+                peerAddressResolver).build();
+
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
@@ -145,11 +141,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     public static Props props(
-        final ClusterWrapper cluster,
-        final Configuration configuration,
-        final DatastoreContext datastoreContext,
-        final CountDownLatch waitTillReadyCountdownLatch,
-        final PrimaryShardInfoFutureCache primaryShardInfoCache) {
+            final ClusterWrapper cluster,
+            final Configuration configuration,
+            final DatastoreContext datastoreContext,
+            final CountDownLatch waitTillReadyCountdownLatch,
+            final PrimaryShardInfoFutureCache primaryShardInfoCache) {
 
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
@@ -228,6 +224,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
             if(shardDatastoreContext == null) {
                 shardDatastoreContext = datastoreContext;
+            } else {
+                shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
+                        peerAddressResolver).build();
             }
 
             ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
@@ -474,7 +473,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
-        memberNameToAddress.remove(memberName);
+        peerAddressResolver.removePeerAddress(memberName);
 
         for(ShardInformation info : localShards.values()){
             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
@@ -487,7 +486,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
-        memberNameToAddress.remove(memberName);
+        peerAddressResolver.removePeerAddress(memberName);
 
         for(ShardInformation info : localShards.values()){
             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
@@ -500,12 +499,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
-        memberNameToAddress.put(memberName, message.member().address());
+        peerAddressResolver.addPeerAddress(memberName, message.member().address());
 
         for(ShardInformation info : localShards.values()){
             String shardName = info.getShardName();
             String peerId = getShardIdentifier(memberName, shardName).toString();
-            info.updatePeerAddress(peerId, getShardActorPath(shardName, memberName), getSelf());
+            info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
 
             info.peerUp(memberName, peerId, getSelf());
         }
@@ -554,7 +553,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void onDatastoreContext(DatastoreContext context) {
-        datastoreContext = context;
+        datastoreContext = DatastoreContext.newBuilderFrom(context).shardPeerAddressResolver(
+                peerAddressResolver).build();
         for (ShardInformation info : localShards.values()) {
             if (info.getActor() != null) {
                 info.getActor().tell(datastoreContext, getSelf());
@@ -604,7 +604,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     @VisibleForTesting
     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
         return getContext().actorOf(info.newProps(schemaContext)
-                        .withDispatcher(shardDispatcherPath), info.getShardId().toString());
+                .withDispatcher(shardDispatcherPath), info.getShardId().toString());
     }
 
     private void findPrimary(FindPrimary message) {
@@ -624,28 +624,24 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
 
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
-                    }
+                            if(LOG.isDebugEnabled()) {
+                                LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
+                            }
 
-                    return found;
+                            return found;
                 }
             });
 
             return;
         }
 
-        for(Map.Entry<String, Address> entry: memberNameToAddress.entrySet()) {
-            if(!cluster.getCurrentMemberName().equals(entry.getKey())) {
-                String path = getShardManagerActorPathBuilder(entry.getValue()).toString();
-
-                LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
-                        shardName, path);
+        for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
+            LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
+                    shardName, address);
 
-                getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName,
-                        message.isWaitUntilReady()), getContext());
-                return;
-            }
+            getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
+                    message.isWaitUntilReady()), getContext());
+            return;
         }
 
         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
@@ -654,23 +650,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 String.format("No primary shard found for %s.", shardName)), getSelf());
     }
 
-    private StringBuilder getShardManagerActorPathBuilder(Address address) {
-        StringBuilder builder = new StringBuilder();
-        builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString);
-        return builder;
-    }
-
-    private String getShardActorPath(String shardName, String memberName) {
-        Address address = memberNameToAddress.get(memberName);
-        if(address != null) {
-            StringBuilder builder = getShardManagerActorPathBuilder(address);
-            builder.append("/")
-                .append(getShardIdentifier(memberName, shardName));
-            return builder.toString();
-        }
-        return null;
-    }
-
     /**
      * Construct the name of the shard actor given the name of the member on
      * which the shard resides and the name of the shard
@@ -680,7 +659,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @return
      */
     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
-        return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
+        return peerAddressResolver.getShardIdentifier(memberName, shardName);
     }
 
     /**
@@ -703,7 +682,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
-                    datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
+                datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
 
         mBean.setShardManager(this);
     }
@@ -722,8 +701,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         for(String memberName : members) {
             if(!currentMemberName.equals(memberName)) {
                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
-                String path = getShardActorPath(shardName, memberName);
-                peerAddresses.put(shardId.toString(), path);
+                String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
+                peerAddresses.put(shardId.toString(), address);
             }
         }
         return peerAddresses;
@@ -733,14 +712,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     public SupervisorStrategy supervisorStrategy() {
 
         return new OneForOneStrategy(10, Duration.create("1 minute"),
-            new Function<Throwable, SupervisorStrategy.Directive>() {
-                @Override
-                public SupervisorStrategy.Directive apply(Throwable t) {
-                    LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
-                    return SupervisorStrategy.resume();
-                }
+                new Function<Throwable, SupervisorStrategy.Directive>() {
+            @Override
+            public SupervisorStrategy.Directive apply(Throwable t) {
+                LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
+                return SupervisorStrategy.resume();
             }
-        );
+        }
+                );
 
     }
 
@@ -826,7 +805,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
             LOG.info("updatePeerAddress for peer {} with address {}", peerId,
-                peerAddress);
+                    peerAddress);
             if(peerAddresses.containsKey(peerId)){
                 peerAddresses.put(peerId, peerAddress);
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolver.java
new file mode 100644 (file)
index 0000000..12fabbb
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Address;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
+import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
+
+/**
+ * Implementation PeerAddressResolver that resolves address for shard peer ids. This class is owned by the
+ * ShardMaanager and passed to Shard actors via the ConfigParams.
+ *
+ * @author Thomas Pantelis
+ */
+class ShardPeerAddressResolver implements PeerAddressResolver {
+    // Stores a mapping between a member name and the address of the member. The map is concurrent as it
+    // will be accessed by multiple threads via the public resolve method.
+    private final ConcurrentMap<String, Address> memberNameToAddress = new ConcurrentHashMap<>();
+    private final String shardManagerIdentifier;
+    private final String shardManagerType;
+    private final String localMemberName;
+
+    public ShardPeerAddressResolver(String shardManagerType, String localMemberName) {
+        this.shardManagerIdentifier = ShardManagerIdentifier.builder().type(shardManagerType).build().toString();
+        this.shardManagerType = shardManagerType;
+        this.localMemberName = localMemberName;
+    }
+
+    void addPeerAddress(String memberName, Address address) {
+        memberNameToAddress.put(memberName, address);
+    }
+
+    void removePeerAddress(String memberName) {
+        memberNameToAddress.remove(memberName);
+    }
+
+    Address getPeerAddress(String memberName) {
+        return memberNameToAddress.get(memberName);
+    }
+
+    Collection<String> getShardManagerPeerActorAddresses() {
+        Collection<String> peerAddresses = new ArrayList<>();
+        for(Map.Entry<String, Address> entry: memberNameToAddress.entrySet()) {
+            if(!localMemberName.equals(entry.getKey())) {
+                peerAddresses.add(getShardManagerActorPathBuilder(entry.getValue()).toString());
+            }
+        }
+
+        return peerAddresses;
+    }
+
+    ShardIdentifier getShardIdentifier(String memberName, String shardName){
+        return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(shardManagerType).build();
+    }
+
+    String getShardActorAddress(String shardName, String memberName) {
+        Address memberAddress = memberNameToAddress.get(memberName);
+        if(memberAddress != null) {
+            return getShardManagerActorPathBuilder(memberAddress).append("/").append(
+                    getShardIdentifier(memberName, shardName)).toString();
+        }
+
+        return null;
+    }
+
+    private StringBuilder getShardManagerActorPathBuilder(Address address) {
+        return new StringBuilder().append(address.toString()).append("/user/").append(shardManagerIdentifier);
+    }
+
+    @Override
+    public String resolve(String peerId) {
+        ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build();
+        return getShardActorAddress(shardId.getShardName(), shardId.getMemberName());
+    }
+}
index 82f4b3ccf9edde056646cf823755570006001c48..e49221b06a5980c325ca304eb9dfa7a52f873235 100644 (file)
@@ -961,6 +961,7 @@ public class ShardManagerTest extends AbstractActorTest {
         }};
     }
 
+    @Test
     public void testOnReceiveCreateShard() {
         new JavaTestKit(getSystem()) {{
             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
@@ -986,6 +987,8 @@ public class ShardManagerTest extends AbstractActorTest {
             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
 
             assertEquals("isRecoveryApplicable", false, shardPropsCreator.datastoreContext.isPersistent());
+            assertTrue("Epxected ShardPeerAddressResolver", shardPropsCreator.datastoreContext.getShardRaftConfig().
+                    getPeerAddressResolver() instanceof ShardPeerAddressResolver);
             assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
                     new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
                     shardPropsCreator.peerAddresses.keySet());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolverTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolverTest.java
new file mode 100644 (file)
index 0000000..5d01e36
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import akka.actor.Address;
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+
+/**
+ * Unit tests for ShardPeerAddressResolver.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardPeerAddressResolverTest {
+
+    @Test
+    public void testGetShardActorAddress() {
+        ShardPeerAddressResolver resolver = new ShardPeerAddressResolver("config", "member-1");
+
+        assertEquals("getShardActorAddress", null, resolver.getShardActorAddress("default", "member-2"));
+
+        Address address2 = new Address("tcp", "system2");
+        resolver.addPeerAddress("member-2", address2);
+        assertEquals("getPeerAddress", address2, resolver.getPeerAddress("member-2"));
+
+        Address address3 = new Address("tcp", "system3");
+        resolver.addPeerAddress("member-3", address3);
+        assertEquals("getPeerAddress", address3, resolver.getPeerAddress("member-3"));
+
+        assertEquals("getShardActorAddress", address2.toString() +
+                "/user/shardmanager-config/member-2-shard-default-config",
+                resolver.getShardActorAddress("default", "member-2"));
+
+        assertEquals("getShardActorAddress", address3.toString() +
+                "/user/shardmanager-config/member-3-shard-default-config",
+                resolver.getShardActorAddress("default", "member-3"));
+
+        assertEquals("getShardActorAddress", address2.toString() +
+                "/user/shardmanager-config/member-2-shard-topology-config",
+                resolver.getShardActorAddress("topology", "member-2"));
+
+        resolver.removePeerAddress("member-2");
+        assertEquals("getShardActorAddress", null, resolver.getShardActorAddress("default", "member-2"));
+        assertEquals("getShardActorAddress", null, resolver.getShardActorAddress("topology", "member-2"));
+        assertEquals("getShardActorAddress", address3.toString() +
+                "/user/shardmanager-config/member-3-shard-default-config",
+                resolver.getShardActorAddress("default", "member-3"));
+    }
+
+    @Test
+    public void testResolve() {
+        String type = "config";
+        ShardPeerAddressResolver resolver = new ShardPeerAddressResolver(type, "member-1");
+
+        String memberName = "member-2";
+        String peerId = ShardIdentifier.builder().memberName(memberName ).shardName("default").
+                type(type).build().toString();
+
+        assertEquals("resolve", null, resolver.resolve(peerId));
+
+        Address address = new Address("tcp", "system");
+        resolver.addPeerAddress(memberName, address);
+
+        String shardAddress = resolver.getShardActorAddress("default", memberName);
+        assertEquals("getShardActorAddress", address.toString() +
+                "/user/shardmanager-" + type + "/" + memberName + "-shard-default-" + type, shardAddress);
+
+        assertEquals("resolve", shardAddress, resolver.resolve(peerId));
+    }
+
+    @Test
+    public void testGetShardManagerPeerActorAddresses() {
+        ShardPeerAddressResolver resolver = new ShardPeerAddressResolver("config", "member-1");
+
+        resolver.addPeerAddress("member-1", new Address("tcp", "system1"));
+
+        Address address2 = new Address("tcp", "system2");
+        resolver.addPeerAddress("member-2", address2);
+
+        Address address3 = new Address("tcp", "system3");
+        resolver.addPeerAddress("member-3", address3);
+
+        Collection<String> peerAddresses = resolver.getShardManagerPeerActorAddresses();
+        assertEquals("getShardManagerPeerActorAddresses", Sets.newHashSet(
+                address2.toString() + "/user/shardmanager-config",
+                address3.toString() + "/user/shardmanager-config"), Sets.newHashSet(peerAddresses));
+    }
+}