X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=3eba622c84c459bdb9abd8fbd484da5b4876a902;hb=18ecb07132281d3152a49baf50e8c020403588f9;hp=a91109c64b3fa5aac674677b95d2a31c83477fbd;hpb=d4d59200f8c56551755c36fbbd2b4aa52defa5cb;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
index a91109c64b..3eba622c84 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
@@ -5,1546 +5,27 @@
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
+package org.opendaylight.controller.cluster.datastore;
-package org.opendaylight.controller.cluster.datastore;
-
-import static akka.pattern.Patterns.ask;
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import akka.actor.Address;
-import akka.actor.Cancellable;
-import akka.actor.OneForOneStrategy;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.actor.SupervisorStrategy;
-import akka.cluster.ClusterEvent;
-import akka.dispatch.OnComplete;
-import akka.japi.Function;
-import akka.persistence.RecoveryCompleted;
-import akka.persistence.SaveSnapshotFailure;
-import akka.persistence.SaveSnapshotSuccess;
-import akka.persistence.SnapshotOffer;
-import akka.persistence.SnapshotSelectionCriteria;
-import akka.serialization.Serialization;
-import akka.util.Timeout;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Sets;
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.apache.commons.lang3.SerializationUtils;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
-import org.opendaylight.controller.cluster.datastore.config.Configuration;
-import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
-import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
-import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
-import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
-import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
-import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
-import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
-import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
-import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
-import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
-import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
-import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
-import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
-import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
-import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
-import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
-import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
-import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
-import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
-import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
-import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
-import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
-import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
-import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
-import org.opendaylight.controller.cluster.raft.messages.AddServer;
-import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
-import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
-import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
-import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
-import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
-import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * The ShardManager has the following jobs,
- *
- * - Create all the local shard replicas that belong on this cluster member
- *
- Find the address of the local shard
- *
- Find the primary replica for any given shard
- *
- Monitor the cluster members and store their addresses
- *
- */
-public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
-
- private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
-
- // Stores a mapping between a shard name and it's corresponding information
- // Shard names look like inventory, topology etc and are as specified in
- // configuration
- private final Map localShards = new HashMap<>();
-
- // The type of a ShardManager reflects the type of the datastore itself
- // A data store could be of type config/operational
- private final String type;
-
- private final ClusterWrapper cluster;
-
- private final Configuration configuration;
-
- private final String shardDispatcherPath;
-
- private final ShardManagerInfo mBean;
-
- private DatastoreContextFactory datastoreContextFactory;
-
- private final CountDownLatch waitTillReadyCountdownLatch;
-
- private final PrimaryShardInfoFutureCache primaryShardInfoCache;
-
- private final ShardPeerAddressResolver peerAddressResolver;
-
- private SchemaContext schemaContext;
-
- private DatastoreSnapshot restoreFromSnapshot;
-
- private ShardManagerSnapshot currentSnapshot;
-
- private final Set shardReplicaOperationsInProgress = new HashSet<>();
-
- private final String persistenceId;
-
- /**
- */
- protected ShardManager(AbstractBuilder> builder) {
-
- this.cluster = builder.cluster;
- this.configuration = builder.configuration;
- this.datastoreContextFactory = builder.datastoreContextFactory;
- this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
- this.shardDispatcherPath =
- new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
- this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
- this.primaryShardInfoCache = builder.primaryShardInfoCache;
- this.restoreFromSnapshot = builder.restoreFromSnapshot;
-
- String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
- persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
-
- peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
-
- // Subscribe this actor to cluster member events
- cluster.subscribeToMemberEvents(getSelf());
-
- List localShardActorNames = new ArrayList<>();
- mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
- "shard-manager-" + this.type,
- datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
- localShardActorNames);
- mBean.setShardManager(this);
- }
-
- @Override
- public void postStop() {
- LOG.info("Stopping ShardManager");
-
- mBean.unregisterMBean();
- }
-
- @Override
- public void handleCommand(Object message) throws Exception {
- if (message instanceof FindPrimary) {
- findPrimary((FindPrimary)message);
- } else if(message instanceof FindLocalShard){
- findLocalShard((FindLocalShard) message);
- } else if (message instanceof UpdateSchemaContext) {
- updateSchemaContext(message);
- } else if(message instanceof ActorInitialized) {
- onActorInitialized(message);
- } else if (message instanceof ClusterEvent.MemberUp){
- memberUp((ClusterEvent.MemberUp) message);
- } else if (message instanceof ClusterEvent.MemberExited){
- memberExited((ClusterEvent.MemberExited) message);
- } else if(message instanceof ClusterEvent.MemberRemoved) {
- memberRemoved((ClusterEvent.MemberRemoved) message);
- } else if(message instanceof ClusterEvent.UnreachableMember) {
- memberUnreachable((ClusterEvent.UnreachableMember)message);
- } else if(message instanceof ClusterEvent.ReachableMember) {
- memberReachable((ClusterEvent.ReachableMember) message);
- } else if(message instanceof DatastoreContextFactory) {
- onDatastoreContextFactory((DatastoreContextFactory)message);
- } else if(message instanceof RoleChangeNotification) {
- onRoleChangeNotification((RoleChangeNotification) message);
- } else if(message instanceof FollowerInitialSyncUpStatus){
- onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
- } else if(message instanceof ShardNotInitializedTimeout) {
- onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
- } else if(message instanceof ShardLeaderStateChanged) {
- onLeaderStateChanged((ShardLeaderStateChanged) message);
- } else if(message instanceof SwitchShardBehavior){
- onSwitchShardBehavior((SwitchShardBehavior) message);
- } else if(message instanceof CreateShard) {
- onCreateShard((CreateShard)message);
- } else if(message instanceof AddShardReplica){
- onAddShardReplica((AddShardReplica)message);
- } else if(message instanceof ForwardedAddServerReply) {
- ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
- onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
- msg.removeShardOnFailure);
- } else if(message instanceof ForwardedAddServerFailure) {
- ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
- onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
- } else if(message instanceof PrimaryShardFoundForContext) {
- PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
- onPrimaryShardFoundContext(primaryShardFoundContext);
- } else if(message instanceof RemoveShardReplica) {
- onRemoveShardReplica((RemoveShardReplica) message);
- } else if(message instanceof WrappedShardResponse){
- onWrappedShardResponse((WrappedShardResponse) message);
- } else if(message instanceof GetSnapshot) {
- onGetSnapshot();
- } else if(message instanceof ServerRemoved){
- onShardReplicaRemoved((ServerRemoved) message);
- } else if (message instanceof SaveSnapshotSuccess) {
- onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
- } else if (message instanceof SaveSnapshotFailure) {
- LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
- persistenceId(), ((SaveSnapshotFailure) message).cause());
- } else {
- unknownMessage(message);
- }
- }
-
- private void onWrappedShardResponse(WrappedShardResponse message) {
- if (message.getResponse() instanceof RemoveServerReply) {
- onRemoveServerReply(getSender(), message.getShardName(), (RemoveServerReply) message.getResponse());
- }
- }
-
- private void onRemoveServerReply(ActorRef originalSender, String shardName, RemoveServerReply response) {
- shardReplicaOperationsInProgress.remove(shardName);
- originalSender.tell(new Status.Success(null), self());
- }
-
- private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
- if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
- addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
- getSender());
- } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
- removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
- primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
- }
- }
-
- private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
- final ActorRef sender) {
- if(isShardReplicaOperationInProgress(shardName, sender)) {
- return;
- }
-
- shardReplicaOperationsInProgress.add(shardName);
-
- final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
-
- final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
-
- //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
- LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
- primaryPath, shardId);
-
- Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
- duration());
- Future