From: Tom Pantelis Date: Thu, 1 Oct 2015 13:27:40 +0000 (-0400) Subject: Remove peer address cache in ShardInformation X-Git-Tag: release/beryllium~237 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=2eddbcc5db186af5b5feacfe5d7056e2f3dccd38 Remove peer address cache in ShardInformation The ShardManager caches the peer addresses in the ShardInformation and uses it mainly to suppress PeerUp, PeerDown and PeerAddressResolved messages to the shard for peers that don't have replicas for the shard. This is fine with static config but With the upcoming work to dynamically add replicas, the shard will take ownership of persisting its peers so the ShardManager will not know about dynamic peers. I changed the semantics of the peer addresses to initial peer addresses. They are now only used to pass to the Shard on creation. As a result, PeerUp, PeerDown and PeerAddressResolved messages are now always sent to the Shard for all peers. The Shard/RaftActor decide ll whether or not to process the peer message. I changed RaftActorContextImpl#setPeerAddress to ignore a peerId it doesn't know about instead of throwing an ex. The other usages of the peerAddresses were to lookup the leader address. This is now done dynamically via the ShardPeerAddressResolver. Change-Id: Ida9738916a4a85d23198e7c095d5c73f17e2aa6c Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 74f02b5ef5..f0236a1abf 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.raft; -import static com.google.common.base.Preconditions.checkState; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; @@ -176,11 +175,12 @@ public class RaftActorContextImpl implements RaftActorContext { return null; } - @Override public void setPeerAddress(String peerId, String peerAddress) { - LOG.info("Peer address for peer {} set to {}", peerId, peerAddress); - checkState(peerAddresses.containsKey(peerId), peerId + " is unknown"); - - peerAddresses.put(peerId, peerAddress); + @Override + public void setPeerAddress(String peerId, String peerAddress) { + if(peerAddresses.containsKey(peerId)) { + LOG.info("Peer address for peer {} set to {}", peerId, peerAddress); + peerAddresses.put(peerId, peerAddress); + } } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java index 26fdf8f25d..5275a303d3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java @@ -65,4 +65,19 @@ public class RaftActorContextImplTest extends AbstractActorTest { assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1")); verify(mockResolver, never()).resolve(anyString()); } + + @Test + public void testSetPeerAddress() { + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(), + "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1, + Maps.newHashMap(ImmutableMap.of("peer1", "peerAddress1")), configParams, + new NonPersistentDataProvider(), log); + + context.setPeerAddress("peer1", "peerAddress1_1"); + assertEquals("getPeerAddress", "peerAddress1_1", context.getPeerAddress("peer1")); + + context.setPeerAddress("peer2", "peerAddress2"); + assertEquals("getPeerAddress", null, context.getPeerAddress("peer2")); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 24d808cd91..b48215d361 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorPath; import akka.actor.ActorRef; +import akka.actor.Address; import akka.actor.Cancellable; import akka.actor.OneForOneStrategy; import akka.actor.Props; @@ -230,7 +231,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses, - shardDatastoreContext, createShard.getShardPropsCreator()); + shardDatastoreContext, createShard.getShardPropsCreator(), peerAddressResolver); localShards.put(info.getShardName(), info); mBean.addLocalShard(shardId.toString()); @@ -499,7 +500,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); - peerAddressResolver.addPeerAddress(memberName, message.member().address()); + addPeerAddress(memberName, message.member().address()); + + checkReady(); + } + + private void addPeerAddress(String memberName, Address address) { + peerAddressResolver.addPeerAddress(memberName, address); for(ShardInformation info : localShards.values()){ String shardName = info.getShardName(); @@ -508,14 +515,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { info.peerUp(memberName, peerId, getSelf()); } - - checkReady(); } private void memberReachable(ClusterEvent.ReachableMember message) { String memberName = message.member().roles().head(); LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address()); + addPeerAddress(memberName, message.member().address()); + markMemberAvailable(memberName); } @@ -678,7 +685,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { Map peerAddresses = getPeerAddresses(shardName); localShardActorNames.add(shardId.toString()); localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext, - shardPropsCreator)); + shardPropsCreator, peerAddressResolver)); } mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type, @@ -739,7 +746,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final String shardName; private ActorRef actor; private ActorPath actorPath; - private final Map peerAddresses; + private final Map initialPeerAddresses; private Optional localShardDataTree; private boolean leaderAvailable = false; @@ -755,19 +762,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final DatastoreContext datastoreContext; private final ShardPropsCreator shardPropsCreator; + private final ShardPeerAddressResolver addressResolver; private ShardInformation(String shardName, ShardIdentifier shardId, - Map peerAddresses, DatastoreContext datastoreContext, - ShardPropsCreator shardPropsCreator) { + Map initialPeerAddresses, DatastoreContext datastoreContext, + ShardPropsCreator shardPropsCreator, ShardPeerAddressResolver addressResolver) { this.shardName = shardName; this.shardId = shardId; - this.peerAddresses = peerAddresses; + this.initialPeerAddresses = initialPeerAddresses; this.datastoreContext = datastoreContext; this.shardPropsCreator = shardPropsCreator; + this.addressResolver = addressResolver; } Props newProps(SchemaContext schemaContext) { - return shardPropsCreator.newProps(shardId, peerAddresses, datastoreContext, schemaContext); + return shardPropsCreator.newProps(shardId, initialPeerAddresses, datastoreContext, schemaContext); } String getShardName() { @@ -799,37 +808,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return localShardDataTree; } - Map getPeerAddresses() { - return peerAddresses; - } - void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){ - LOG.info("updatePeerAddress for peer {} with address {}", peerId, - peerAddress); - if(peerAddresses.containsKey(peerId)){ - peerAddresses.put(peerId, peerAddress); - - if(actor != null) { - if(LOG.isDebugEnabled()) { - LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}", - peerId, peerAddress, actor.path()); - } + LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); - actor.tell(new PeerAddressResolved(peerId, peerAddress), sender); + if(actor != null) { + if(LOG.isDebugEnabled()) { + LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}", + peerId, peerAddress, actor.path()); } - notifyOnShardInitializedCallbacks(); + actor.tell(new PeerAddressResolved(peerId, peerAddress), sender); } + + notifyOnShardInitializedCallbacks(); } void peerDown(String memberName, String peerId, ActorRef sender) { - if(peerAddresses.containsKey(peerId) && actor != null) { + if(actor != null) { actor.tell(new PeerDown(memberName, peerId), sender); } } void peerUp(String memberName, String peerId, ActorRef sender) { - if(peerAddresses.containsKey(peerId) && actor != null) { + if(actor != null) { actor.tell(new PeerUp(memberName, peerId), sender); } } @@ -840,7 +841,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { boolean isShardReadyWithLeaderId() { return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) && - (isLeader() || peerAddresses.get(leaderId) != null); + (isLeader() || addressResolver.resolve(leaderId) != null); } boolean isShardInitialized() { @@ -855,7 +856,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { if(isLeader()) { return Serialization.serializedActorPath(getActor()); } else { - return peerAddresses.get(leaderId); + return addressResolver.resolve(leaderId); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolver.java index 12fabbb6d2..464fc7f53a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolver.java @@ -80,6 +80,10 @@ class ShardPeerAddressResolver implements PeerAddressResolver { @Override public String resolve(String peerId) { + if(peerId == null) { + return null; + } + ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build(); return getShardActorAddress(shardId.getShardName(), shardId.getMemberName()); }