X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=3021080758b80b5c1c3e4ba2eef9d7f7621863fd;hb=cc7ef3a4cc3eb2027be5558c1564e580fd153087;hp=a46bb3dae8e5887202d66ad0ddfda51e5774537b;hpb=5c5c980e564d2b5f6cd26821ffd26997f59af260;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
index a46bb3dae8..3021080758 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
@@ -5,936 +5,29 @@
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import akka.actor.Address;
-import akka.actor.Cancellable;
-import akka.actor.OneForOneStrategy;
-import akka.actor.Props;
-import akka.actor.SupervisorStrategy;
-import akka.cluster.ClusterEvent;
-import akka.japi.Creator;
-import akka.japi.Function;
-import akka.persistence.RecoveryCompleted;
-import akka.serialization.Serialization;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Sets;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
-import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
-import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
-import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
-import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
-import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
-import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
-import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
-import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
-import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
-import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
-import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
-import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
-import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
-import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
/**
- * The ShardManager has the following jobs,
- *
- * - Create all the local shard replicas that belong on this cluster member
- *
- Find the address of the local shard
- *
- Find the primary replica for any given shard
- *
- Monitor the cluster members and store their addresses
- *
+ * Manages shards.
+ *
+ * @deprecated This is a deprecated placeholder to keep its inner class present. It serves no other purpose.
*/
-public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
-
- private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
-
- // Stores a mapping between a member name and the address of the member
- // Member names look like "member-1", "member-2" etc and are as specified
- // in configuration
- private final Map memberNameToAddress = new HashMap<>();
-
- // Stores a mapping between a shard name and it's corresponding information
- // Shard names look like inventory, topology etc and are as specified in
- // configuration
- private final Map localShards = new HashMap<>();
-
- // The type of a ShardManager reflects the type of the datastore itself
- // A data store could be of type config/operational
- private final String type;
-
- private final String shardManagerIdentifierString;
-
- private final ClusterWrapper cluster;
-
- private final Configuration configuration;
-
- private final String shardDispatcherPath;
-
- private ShardManagerInfo mBean;
-
- private DatastoreContext datastoreContext;
-
- private final CountDownLatch waitTillReadyCountdownLatch;
-
- private final PrimaryShardInfoFutureCache primaryShardInfoCache;
-
- /**
- */
- protected ShardManager(ClusterWrapper cluster, Configuration configuration,
- DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch,
- PrimaryShardInfoFutureCache primaryShardInfoCache) {
-
- this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
- this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
- this.datastoreContext = datastoreContext;
- this.type = datastoreContext.getDataStoreType();
- this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
- this.shardDispatcherPath =
- new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
- this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
- this.primaryShardInfoCache = primaryShardInfoCache;
-
- // Subscribe this actor to cluster member events
- cluster.subscribeToMemberEvents(getSelf());
-
- createLocalShards();
- }
-
- public static Props props(
- final ClusterWrapper cluster,
- final Configuration configuration,
- final DatastoreContext datastoreContext,
- final CountDownLatch waitTillReadyCountdownLatch,
- final PrimaryShardInfoFutureCache primaryShardInfoCache) {
-
- Preconditions.checkNotNull(cluster, "cluster should not be null");
- Preconditions.checkNotNull(configuration, "configuration should not be null");
- Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
- Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
-
- return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext,
- waitTillReadyCountdownLatch, primaryShardInfoCache));
- }
-
- @Override
- public void postStop() {
- LOG.info("Stopping ShardManager");
-
- mBean.unregisterMBean();
- }
-
- @Override
- public void handleCommand(Object message) throws Exception {
- if (message instanceof FindPrimary) {
- findPrimary((FindPrimary)message);
- } else if(message instanceof FindLocalShard){
- findLocalShard((FindLocalShard) message);
- } else if (message instanceof UpdateSchemaContext) {
- updateSchemaContext(message);
- } else if(message instanceof ActorInitialized) {
- onActorInitialized(message);
- } else if (message instanceof ClusterEvent.MemberUp){
- memberUp((ClusterEvent.MemberUp) message);
- } else if(message instanceof ClusterEvent.MemberRemoved) {
- memberRemoved((ClusterEvent.MemberRemoved) message);
- } else if(message instanceof ClusterEvent.UnreachableMember) {
- memberUnreachable((ClusterEvent.UnreachableMember)message);
- } else if(message instanceof ClusterEvent.ReachableMember) {
- memberReachable((ClusterEvent.ReachableMember) message);
- } else if(message instanceof DatastoreContext) {
- onDatastoreContext((DatastoreContext)message);
- } else if(message instanceof RoleChangeNotification) {
- onRoleChangeNotification((RoleChangeNotification) message);
- } else if(message instanceof FollowerInitialSyncUpStatus){
- onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
- } else if(message instanceof ShardNotInitializedTimeout) {
- onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
- } else if(message instanceof ShardLeaderStateChanged) {
- onLeaderStateChanged((ShardLeaderStateChanged)message);
- } else {
- unknownMessage(message);
- }
-
- }
-
- private void checkReady(){
- if (isReadyWithLeaderId()) {
- LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
- persistenceId(), type, waitTillReadyCountdownLatch.getCount());
-
- waitTillReadyCountdownLatch.countDown();
- }
- }
-
- private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
- LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
-
- ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
- if(shardInformation != null) {
- shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
- shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
- if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
- primaryShardInfoCache.remove(shardInformation.getShardName());
- }
-
- checkReady();
- } else {
- LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
- }
- }
-
- private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
- ShardInformation shardInfo = message.getShardInfo();
-
- LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
- shardInfo.getShardName());
-
- shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
-
- if(!shardInfo.isShardInitialized()) {
- LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
- message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
- } else {
- LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
- message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
- }
- }
-
- private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
- LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
- status.getName(), status.isInitialSyncDone());
-
- ShardInformation shardInformation = findShardInformation(status.getName());
-
- if(shardInformation != null) {
- shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
-
- mBean.setSyncStatus(isInSync());
- }
-
- }
-
- private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
- LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
- roleChanged.getOldRole(), roleChanged.getNewRole());
-
- ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
- if(shardInformation != null) {
- shardInformation.setRole(roleChanged.getNewRole());
- checkReady();
- mBean.setSyncStatus(isInSync());
- }
- }
-
-
- private ShardInformation findShardInformation(String memberId) {
- for(ShardInformation info : localShards.values()){
- if(info.getShardId().toString().equals(memberId)){
- return info;
- }
- }
-
- return null;
- }
-
- private boolean isReadyWithLeaderId() {
- boolean isReady = true;
- for (ShardInformation info : localShards.values()) {
- if(!info.isShardReadyWithLeaderId()){
- isReady = false;
- break;
- }
- }
- return isReady;
- }
-
- private boolean isInSync(){
- for (ShardInformation info : localShards.values()) {
- if(!info.isInSync()){
- return false;
- }
- }
- return true;
- }
-
- private void onActorInitialized(Object message) {
- final ActorRef sender = getSender();
-
- if (sender == null) {
- return; //why is a non-actor sending this message? Just ignore.
- }
-
- String actorName = sender.path().name();
- //find shard name from actor name; actor name is stringified shardId
- ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
-
- if (shardId.getShardName() == null) {
- return;
- }
-
- markShardAsInitialized(shardId.getShardName());
- }
-
- private void markShardAsInitialized(String shardName) {
- LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
-
- ShardInformation shardInformation = localShards.get(shardName);
- if (shardInformation != null) {
- shardInformation.setActorInitialized();
-
- shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
- }
- }
-
- @Override
- protected void handleRecover(Object message) throws Exception {
- if (message instanceof RecoveryCompleted) {
- LOG.info("Recovery complete : {}", persistenceId());
-
- // We no longer persist SchemaContext modules so delete all the prior messages from the akka
- // journal on upgrade from Helium.
- deleteMessages(lastSequenceNr());
- }
- }
-
- private void findLocalShard(FindLocalShard message) {
- final ShardInformation shardInformation = localShards.get(message.getShardName());
-
- if(shardInformation == null){
- getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
- return;
- }
-
- sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier