From 0b50c6a2012206c2853e2fc7c4e50d89a1fe35ff Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 23 Mar 2016 20:28:24 +0100 Subject: [PATCH 1/1] Split out ShardInformation ShardInformation is a large inner class, split it out for easier navigation. Change-Id: I1748f23ff0902c512251bf4aeeb65451f696ba78 Signed-off-by: Robert Varga --- .../shardmanager/ShardInformation.java | 264 +++++++++++++++ .../datastore/shardmanager/ShardManager.java | 314 ++---------------- 2 files changed, 299 insertions(+), 279 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java new file mode 100644 index 0000000000..a37a94fd0e --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java @@ -0,0 +1,264 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.shardmanager; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.serialization.Serialization; +import com.google.common.base.Objects; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.Shard; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; +import org.opendaylight.controller.cluster.datastore.messages.PeerDown; +import org.opendaylight.controller.cluster.datastore.messages.PeerUp; +import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager.OnShardInitialized; +import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager.OnShardReady; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class ShardInformation { + private static final Logger LOG = LoggerFactory.getLogger(ShardInformation.class); + + private final Set onShardInitializedSet = new HashSet<>(); + private final Map initialPeerAddresses; + private final ShardPeerAddressResolver addressResolver; + private final ShardIdentifier shardId; + private final String shardName; + private ActorRef actor; + private Optional localShardDataTree; + private boolean leaderAvailable = false; + + // flag that determines if the actor is ready for business + private boolean actorInitialized = false; + + private boolean followerSyncStatus = false; + + private String role ; + private String leaderId; + private short leaderVersion; + + private DatastoreContext datastoreContext; + private Shard.AbstractBuilder builder; + private boolean isActiveMember = true; + + ShardInformation(String shardName, ShardIdentifier shardId, + Map initialPeerAddresses, DatastoreContext datastoreContext, + Shard.AbstractBuilder builder, ShardPeerAddressResolver addressResolver) { + this.shardName = shardName; + this.shardId = shardId; + this.initialPeerAddresses = initialPeerAddresses; + this.datastoreContext = datastoreContext; + this.builder = builder; + this.addressResolver = addressResolver; + } + + Props newProps(SchemaContext schemaContext) { + Preconditions.checkNotNull(builder); + Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext). + schemaContext(schemaContext).props(); + builder = null; + return props; + } + + String getShardName() { + return shardName; + } + + @Nullable + ActorRef getActor(){ + return actor; + } + + void setActor(ActorRef actor) { + this.actor = actor; + } + + ShardIdentifier getShardId() { + return shardId; + } + + void setLocalDataTree(Optional localShardDataTree) { + this.localShardDataTree = localShardDataTree; + } + + Optional getLocalShardDataTree() { + return localShardDataTree; + } + + DatastoreContext getDatastoreContext() { + return datastoreContext; + } + + void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) { + this.datastoreContext = datastoreContext; + if (actor != null) { + LOG.debug("Sending new DatastoreContext to {}", shardId); + actor.tell(this.datastoreContext, sender); + } + } + + void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){ + LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); + + if(actor != null) { + if(LOG.isDebugEnabled()) { + LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}", + peerId, peerAddress, actor.path()); + } + + actor.tell(new PeerAddressResolved(peerId, peerAddress), sender); + } + + notifyOnShardInitializedCallbacks(); + } + + void peerDown(String memberName, String peerId, ActorRef sender) { + if(actor != null) { + actor.tell(new PeerDown(memberName, peerId), sender); + } + } + + void peerUp(String memberName, String peerId, ActorRef sender) { + if(actor != null) { + actor.tell(new PeerUp(memberName, peerId), sender); + } + } + + boolean isShardReady() { + return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role); + } + + boolean isShardReadyWithLeaderId() { + return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) && + (isLeader() || addressResolver.resolve(leaderId) != null); + } + + boolean isShardInitialized() { + return getActor() != null && actorInitialized; + } + + boolean isLeader() { + return Objects.equal(leaderId, shardId.toString()); + } + + String getSerializedLeaderActor() { + if(isLeader()) { + return Serialization.serializedActorPath(getActor()); + } else { + return addressResolver.resolve(leaderId); + } + } + + void setActorInitialized() { + LOG.debug("Shard {} is initialized", shardId); + + this.actorInitialized = true; + + notifyOnShardInitializedCallbacks(); + } + + private void notifyOnShardInitializedCallbacks() { + if(onShardInitializedSet.isEmpty()) { + return; + } + + boolean ready = isShardReadyWithLeaderId(); + + LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId, + ready ? "ready" : "initialized", onShardInitializedSet.size()); + + Iterator iter = onShardInitializedSet.iterator(); + while(iter.hasNext()) { + OnShardInitialized onShardInitialized = iter.next(); + if (!(onShardInitialized instanceof OnShardReady) || ready) { + iter.remove(); + onShardInitialized.getTimeoutSchedule().cancel(); + onShardInitialized.getReplyRunnable().run(); + } + } + } + + void addOnShardInitialized(OnShardInitialized onShardInitialized) { + onShardInitializedSet.add(onShardInitialized); + } + + void removeOnShardInitialized(OnShardInitialized onShardInitialized) { + onShardInitializedSet.remove(onShardInitialized); + } + + void setRole(String newRole) { + this.role = newRole; + + notifyOnShardInitializedCallbacks(); + } + + void setFollowerSyncStatus(boolean syncStatus){ + this.followerSyncStatus = syncStatus; + } + + boolean isInSync(){ + if(RaftState.Follower.name().equals(this.role)){ + return followerSyncStatus; + } else if(RaftState.Leader.name().equals(this.role)){ + return true; + } + + return false; + } + + boolean setLeaderId(String leaderId) { + boolean changed = !Objects.equal(this.leaderId, leaderId); + this.leaderId = leaderId; + if(leaderId != null) { + this.leaderAvailable = true; + } + notifyOnShardInitializedCallbacks(); + + return changed; + } + + String getLeaderId() { + return leaderId; + } + + void setLeaderAvailable(boolean leaderAvailable) { + this.leaderAvailable = leaderAvailable; + + if(leaderAvailable) { + notifyOnShardInitializedCallbacks(); + } + } + + short getLeaderVersion() { + return leaderVersion; + } + + void setLeaderVersion(short leaderVersion) { + this.leaderVersion = leaderVersion; + } + + boolean isActiveMember() { + return isActiveMember; + } + + void setActiveMember(boolean isActiveMember) { + this.isActiveMember = isActiveMember; + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index 4c9fcc1dba..be9a389153 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -14,7 +14,6 @@ import akka.actor.Address; import akka.actor.Cancellable; import akka.actor.OneForOneStrategy; import akka.actor.PoisonPill; -import akka.actor.Props; import akka.actor.Status; import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; @@ -27,15 +26,10 @@ import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; -import akka.serialization.Serialization; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.base.Supplier; -import com.google.common.collect.Sets; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.util.ArrayList; @@ -43,7 +37,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -74,9 +67,6 @@ import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; -import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; -import org.opendaylight.controller.cluster.datastore.messages.PeerDown; -import org.opendaylight.controller.cluster.datastore.messages.PeerUp; import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; @@ -86,7 +76,6 @@ import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; -import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; @@ -98,7 +87,6 @@ import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -537,10 +525,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { if(!shardInfo.isShardInitialized()) { LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName()); - message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf()); + message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf()); } else { LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName()); - message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf()); + message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf()); } } @@ -719,11 +707,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if (!shardInformation.isShardInitialized()) { LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInformation.getShardName()); - getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf()); + getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf()); } else { LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInformation.getShardName()); - getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf()); + getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf()); } return; @@ -876,7 +864,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { actor.tell(switchBehavior, getSelf()); } else { LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available", - info.shardName, switchBehavior.getNewState()); + info.getShardName(), switchBehavior.getNewState()); } } @@ -1297,7 +1285,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { 0, 0)); } - private static class ForwardedAddServerReply { + private static final class ForwardedAddServerReply { ShardInformation shardInfo; AddServerReply addServerReply; String leaderPath; @@ -1312,7 +1300,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private static class ForwardedAddServerFailure { + private static final class ForwardedAddServerFailure { String shardName; String failureMessage; Throwable failure; @@ -1327,238 +1315,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - @VisibleForTesting - protected static class ShardInformation { - private final ShardIdentifier shardId; - private final String shardName; - private ActorRef actor; - private final Map initialPeerAddresses; - private Optional localShardDataTree; - private boolean leaderAvailable = false; - - // flag that determines if the actor is ready for business - private boolean actorInitialized = false; - - private boolean followerSyncStatus = false; - - private final Set onShardInitializedSet = Sets.newHashSet(); - private String role ; - private String leaderId; - private short leaderVersion; - - private DatastoreContext datastoreContext; - private Shard.AbstractBuilder builder; - private final ShardPeerAddressResolver addressResolver; - private boolean isActiveMember = true; - - private ShardInformation(String shardName, ShardIdentifier shardId, - Map initialPeerAddresses, DatastoreContext datastoreContext, - Shard.AbstractBuilder builder, ShardPeerAddressResolver addressResolver) { - this.shardName = shardName; - this.shardId = shardId; - this.initialPeerAddresses = initialPeerAddresses; - this.datastoreContext = datastoreContext; - this.builder = builder; - this.addressResolver = addressResolver; - } - - Props newProps(SchemaContext schemaContext) { - Preconditions.checkNotNull(builder); - Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext). - schemaContext(schemaContext).props(); - builder = null; - return props; - } - - String getShardName() { - return shardName; - } - - @Nullable - ActorRef getActor(){ - return actor; - } - - void setActor(ActorRef actor) { - this.actor = actor; - } - - ShardIdentifier getShardId() { - return shardId; - } - - void setLocalDataTree(Optional localShardDataTree) { - this.localShardDataTree = localShardDataTree; - } - - Optional getLocalShardDataTree() { - return localShardDataTree; - } - - DatastoreContext getDatastoreContext() { - return datastoreContext; - } - - void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) { - this.datastoreContext = datastoreContext; - if (actor != null) { - LOG.debug ("Sending new DatastoreContext to {}", shardId); - actor.tell(this.datastoreContext, sender); - } - } - - void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){ - LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); - - if(actor != null) { - if(LOG.isDebugEnabled()) { - LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}", - peerId, peerAddress, actor.path()); - } - - actor.tell(new PeerAddressResolved(peerId, peerAddress), sender); - } - - notifyOnShardInitializedCallbacks(); - } - - void peerDown(String memberName, String peerId, ActorRef sender) { - if(actor != null) { - actor.tell(new PeerDown(memberName, peerId), sender); - } - } - - void peerUp(String memberName, String peerId, ActorRef sender) { - if(actor != null) { - actor.tell(new PeerUp(memberName, peerId), sender); - } - } - - boolean isShardReady() { - return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role); - } - - boolean isShardReadyWithLeaderId() { - return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) && - (isLeader() || addressResolver.resolve(leaderId) != null); - } - - boolean isShardInitialized() { - return getActor() != null && actorInitialized; - } - - boolean isLeader() { - return Objects.equal(leaderId, shardId.toString()); - } - - String getSerializedLeaderActor() { - if(isLeader()) { - return Serialization.serializedActorPath(getActor()); - } else { - return addressResolver.resolve(leaderId); - } - } - - void setActorInitialized() { - LOG.debug("Shard {} is initialized", shardId); - - this.actorInitialized = true; - - notifyOnShardInitializedCallbacks(); - } - - private void notifyOnShardInitializedCallbacks() { - if(onShardInitializedSet.isEmpty()) { - return; - } - - boolean ready = isShardReadyWithLeaderId(); - - if(LOG.isDebugEnabled()) { - LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId, - ready ? "ready" : "initialized", onShardInitializedSet.size()); - } - - Iterator iter = onShardInitializedSet.iterator(); - while(iter.hasNext()) { - OnShardInitialized onShardInitialized = iter.next(); - if(!(onShardInitialized instanceof OnShardReady) || ready) { - iter.remove(); - onShardInitialized.getTimeoutSchedule().cancel(); - onShardInitialized.getReplyRunnable().run(); - } - } - } - - void addOnShardInitialized(OnShardInitialized onShardInitialized) { - onShardInitializedSet.add(onShardInitialized); - } - - void removeOnShardInitialized(OnShardInitialized onShardInitialized) { - onShardInitializedSet.remove(onShardInitialized); - } - - void setRole(String newRole) { - this.role = newRole; - - notifyOnShardInitializedCallbacks(); - } - - void setFollowerSyncStatus(boolean syncStatus){ - this.followerSyncStatus = syncStatus; - } - - boolean isInSync(){ - if(RaftState.Follower.name().equals(this.role)){ - return followerSyncStatus; - } else if(RaftState.Leader.name().equals(this.role)){ - return true; - } - - return false; - } - - boolean setLeaderId(String leaderId) { - boolean changed = !Objects.equal(this.leaderId, leaderId); - this.leaderId = leaderId; - if(leaderId != null) { - this.leaderAvailable = true; - } - notifyOnShardInitializedCallbacks(); - - return changed; - } - - String getLeaderId() { - return leaderId; - } - - void setLeaderAvailable(boolean leaderAvailable) { - this.leaderAvailable = leaderAvailable; - - if(leaderAvailable) { - notifyOnShardInitializedCallbacks(); - } - } - - short getLeaderVersion() { - return leaderVersion; - } - - void setLeaderVersion(short leaderVersion) { - this.leaderVersion = leaderVersion; - } - - boolean isActiveMember() { - return isActiveMember; - } - - void setActiveMember(boolean isActiveMember) { - this.isActiveMember = isActiveMember; - } - } - - private static class OnShardInitialized { + static class OnShardInitialized { private final Runnable replyRunnable; private Cancellable timeoutSchedule; @@ -1579,36 +1336,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private static class OnShardReady extends OnShardInitialized { + static class OnShardReady extends OnShardInitialized { OnShardReady(Runnable replyRunnable) { super(replyRunnable); } } - private static class ShardNotInitializedTimeout { - private final ActorRef sender; - private final ShardInformation shardInfo; - private final OnShardInitialized onShardInitialized; - - ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) { - this.sender = sender; - this.shardInfo = shardInfo; - this.onShardInitialized = onShardInitialized; - } - - ActorRef getSender() { - return sender; - } - - ShardInformation getShardInfo() { - return shardInfo; - } - - OnShardInitialized getOnShardInitialized() { - return onShardInitialized; - } - } - private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) { Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext(). getShardInitializationTimeout().duration().$times(2)); @@ -1714,7 +1447,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - /** * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received @@ -1764,12 +1496,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { /** * The WrappedShardResponse class wraps a response from a Shard. */ - private static class WrappedShardResponse { + private static final class WrappedShardResponse { private final ShardIdentifier shardId; private final Object response; private final String leaderPath; - private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) { + WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) { this.shardId = shardId; this.response = response; this.leaderPath = leaderPath; @@ -1787,6 +1519,30 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return leaderPath; } } + + private static final class ShardNotInitializedTimeout { + private final ActorRef sender; + private final ShardInformation shardInfo; + private final OnShardInitialized onShardInitialized; + + ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) { + this.sender = sender; + this.shardInfo = shardInfo; + this.onShardInitialized = onShardInitialized; + } + + ActorRef getSender() { + return sender; + } + + ShardInformation getShardInfo() { + return shardInfo; + } + + OnShardInitialized getOnShardInitialized() { + return onShardInitialized; + } + } } -- 2.36.6