X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=5874c5296f0ebd8d1b5085abd3797f6c77907618;hb=d4d59200f8c56551755c36fbbd2b4aa52defa5cb;hp=64c6821120f94f99a389c12700757a7b8c7266f5;hpb=b584e686fdeba863643f80c0894d7fbd2dcaa540;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
index 64c6821120..a91109c64b 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
@@ -8,29 +8,101 @@
package org.opendaylight.controller.cluster.datastore;
+import static akka.pattern.Patterns.ask;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.Address;
+import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
+import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.cluster.ClusterEvent;
-import akka.japi.Creator;
+import akka.dispatch.OnComplete;
import akka.japi.Function;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.serialization.Serialization;
+import akka.util.Timeout;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Sets;
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.SerializationUtils;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
+import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
+import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
+import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
+import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.AddServer;
+import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
+import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import scala.concurrent.duration.FiniteDuration;
/**
* The ShardManager has the following jobs,
@@ -41,12 +113,9 @@ import java.util.Map;
*
Monitor the cluster members and store their addresses
*
*/
-public class ShardManager extends AbstractUntypedActor {
+public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
- // Stores a mapping between a member name and the address of the member
- // Member names look like "member-1", "member-2" etc and are as specified
- // in configuration
- private final Map memberNameToAddress = new HashMap<>();
+ private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
// Stores a mapping between a shard name and it's corresponding information
// Shard names look like inventory, topology etc and are as specified in
@@ -61,263 +130,1679 @@ public class ShardManager extends AbstractUntypedActor {
private final Configuration configuration;
+ private final String shardDispatcherPath;
+
+ private final ShardManagerInfo mBean;
+
+ private DatastoreContextFactory datastoreContextFactory;
+
+ private final CountDownLatch waitTillReadyCountdownLatch;
+
+ private final PrimaryShardInfoFutureCache primaryShardInfoCache;
+
+ private final ShardPeerAddressResolver peerAddressResolver;
+
+ private SchemaContext schemaContext;
+
+ private DatastoreSnapshot restoreFromSnapshot;
+
+ private ShardManagerSnapshot currentSnapshot;
+
+ private final Set shardReplicaOperationsInProgress = new HashSet<>();
+
+ private final String persistenceId;
+
/**
- * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
- * configuration or operational
*/
- private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
+ protected ShardManager(AbstractBuilder> builder) {
+
+ this.cluster = builder.cluster;
+ this.configuration = builder.configuration;
+ this.datastoreContextFactory = builder.datastoreContextFactory;
+ this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
+ this.shardDispatcherPath =
+ new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
+ this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
+ this.primaryShardInfoCache = builder.primaryShardInfoCache;
+ this.restoreFromSnapshot = builder.restoreFromSnapshot;
+
+ String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
+ persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
- this.type = Preconditions.checkNotNull(type, "type should not be null");
- this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
- this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
+ peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
- // Create all the local Shards and make them a child of the ShardManager
- // TODO: This may need to be initiated when we first get the schema context
- createLocalShards();
+ List localShardActorNames = new ArrayList<>();
+ mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
+ "shard-manager-" + this.type,
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
+ localShardActorNames);
+ mBean.setShardManager(this);
}
- public static Props props(final String type,
- final ClusterWrapper cluster,
- final Configuration configuration) {
- return Props.create(new Creator() {
+ @Override
+ public void postStop() {
+ LOG.info("Stopping ShardManager");
- @Override
- public ShardManager create() throws Exception {
- return new ShardManager(type, cluster, configuration);
- }
- });
+ mBean.unregisterMBean();
}
-
@Override
- public void handleReceive(Object message) throws Exception {
- if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
- findPrimary(
- FindPrimary.fromSerializable(message));
+ public void handleCommand(Object message) throws Exception {
+ if (message instanceof FindPrimary) {
+ findPrimary((FindPrimary)message);
} else if(message instanceof FindLocalShard){
findLocalShard((FindLocalShard) message);
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext(message);
+ } else if(message instanceof ActorInitialized) {
+ onActorInitialized(message);
} else if (message instanceof ClusterEvent.MemberUp){
memberUp((ClusterEvent.MemberUp) message);
+ } else if (message instanceof ClusterEvent.MemberExited){
+ memberExited((ClusterEvent.MemberExited) message);
} else if(message instanceof ClusterEvent.MemberRemoved) {
memberRemoved((ClusterEvent.MemberRemoved) message);
} else if(message instanceof ClusterEvent.UnreachableMember) {
- ignoreMessage(message);
- } else{
- throw new Exception ("Not recognized message received, message="+message);
+ memberUnreachable((ClusterEvent.UnreachableMember)message);
+ } else if(message instanceof ClusterEvent.ReachableMember) {
+ memberReachable((ClusterEvent.ReachableMember) message);
+ } else if(message instanceof DatastoreContextFactory) {
+ onDatastoreContextFactory((DatastoreContextFactory)message);
+ } else if(message instanceof RoleChangeNotification) {
+ onRoleChangeNotification((RoleChangeNotification) message);
+ } else if(message instanceof FollowerInitialSyncUpStatus){
+ onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
+ } else if(message instanceof ShardNotInitializedTimeout) {
+ onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
+ } else if(message instanceof ShardLeaderStateChanged) {
+ onLeaderStateChanged((ShardLeaderStateChanged) message);
+ } else if(message instanceof SwitchShardBehavior){
+ onSwitchShardBehavior((SwitchShardBehavior) message);
+ } else if(message instanceof CreateShard) {
+ onCreateShard((CreateShard)message);
+ } else if(message instanceof AddShardReplica){
+ onAddShardReplica((AddShardReplica)message);
+ } else if(message instanceof ForwardedAddServerReply) {
+ ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
+ onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
+ msg.removeShardOnFailure);
+ } else if(message instanceof ForwardedAddServerFailure) {
+ ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
+ onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
+ } else if(message instanceof PrimaryShardFoundForContext) {
+ PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
+ onPrimaryShardFoundContext(primaryShardFoundContext);
+ } else if(message instanceof RemoveShardReplica) {
+ onRemoveShardReplica((RemoveShardReplica) message);
+ } else if(message instanceof WrappedShardResponse){
+ onWrappedShardResponse((WrappedShardResponse) message);
+ } else if(message instanceof GetSnapshot) {
+ onGetSnapshot();
+ } else if(message instanceof ServerRemoved){
+ onShardReplicaRemoved((ServerRemoved) message);
+ } else if (message instanceof SaveSnapshotSuccess) {
+ onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
+ } else if (message instanceof SaveSnapshotFailure) {
+ LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
+ persistenceId(), ((SaveSnapshotFailure) message).cause());
+ } else {
+ unknownMessage(message);
+ }
+ }
+
+ private void onWrappedShardResponse(WrappedShardResponse message) {
+ if (message.getResponse() instanceof RemoveServerReply) {
+ onRemoveServerReply(getSender(), message.getShardName(), (RemoveServerReply) message.getResponse());
}
+ }
+ private void onRemoveServerReply(ActorRef originalSender, String shardName, RemoveServerReply response) {
+ shardReplicaOperationsInProgress.remove(shardName);
+ originalSender.tell(new Status.Success(null), self());
}
- private void findLocalShard(FindLocalShard message) {
- ShardInformation shardInformation =
- localShards.get(message.getShardName());
+ private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
+ if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
+ addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
+ getSender());
+ } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
+ removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
+ primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
+ }
+ }
- if(shardInformation != null){
- getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf());
+ private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
+ final ActorRef sender) {
+ if(isShardReplicaOperationInProgress(shardName, sender)) {
return;
}
- getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
+
+ final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+
+ //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
+ LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+ primaryPath, shardId);
+
+ Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
+ duration());
+ Future