X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=8386e669b83f46cb102771f7c12f1b9458f5e485;hp=4eb6dd01dc9316f7dbc1d11c0447cb530e5ada58;hb=4639f61a41a93d6a762af97b819d164781b0f9f8;hpb=f86f7e8c204fb19615c45e669a764c623576e1a3
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
index 4eb6dd01dc..8386e669b8 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
@@ -14,10 +14,12 @@ import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.PoisonPill;
-import akka.actor.Props;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
+import akka.actor.SupervisorStrategy.Directive;
import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.MemberWeaklyUp;
+import akka.cluster.Member;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.japi.Function;
@@ -27,15 +29,9 @@ import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
-import akka.serialization.Serialization;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Sets;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
@@ -43,22 +39,22 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
import org.apache.commons.lang3.SerializationUtils;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.Shard;
-import org.opendaylight.controller.cluster.datastore.ShardManagerSnapshot;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
@@ -66,42 +62,41 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderExc
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
-import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
-import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
-import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,7 +114,7 @@ import scala.concurrent.duration.FiniteDuration;
*
Monitor the cluster members and store their addresses
*
*/
-public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
+class ShardManager extends AbstractUntypedPersistentActorWithMetering {
private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
@@ -158,19 +153,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
private final String persistenceId;
- /**
- */
- protected ShardManager(AbstractBuilder> builder) {
-
- this.cluster = builder.cluster;
- this.configuration = builder.configuration;
- this.datastoreContextFactory = builder.datastoreContextFactory;
- this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
+ ShardManager(AbstractShardManagerCreator> builder) {
+ this.cluster = builder.getCluster();
+ this.configuration = builder.getConfiguration();
+ this.datastoreContextFactory = builder.getDatastoreContextFactory();
+ this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
- this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
- this.primaryShardInfoCache = builder.primaryShardInfoCache;
- this.restoreFromSnapshot = builder.restoreFromSnapshot;
+ this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountdownLatch();
+ this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
+ this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
@@ -180,12 +172,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
- List localShardActorNames = new ArrayList<>();
- mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
- "shard-manager-" + this.type,
- datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
- localShardActorNames);
- mBean.setShardManager(this);
+ mBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), "shard-manager-" + this.type,
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
+ mBean.registerMBean();
}
@Override
@@ -207,6 +196,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
onActorInitialized(message);
} else if (message instanceof ClusterEvent.MemberUp){
memberUp((ClusterEvent.MemberUp) message);
+ } else if (message instanceof ClusterEvent.MemberWeaklyUp){
+ memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
} else if (message instanceof ClusterEvent.MemberExited){
memberExited((ClusterEvent.MemberExited) message);
} else if(message instanceof ClusterEvent.MemberRemoved) {
@@ -238,9 +229,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
} else if(message instanceof ForwardedAddServerFailure) {
ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
- } else if(message instanceof PrimaryShardFoundForContext) {
- PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
- onPrimaryShardFoundContext(primaryShardFoundContext);
} else if(message instanceof RemoveShardReplica) {
onRemoveShardReplica((RemoveShardReplica) message);
} else if(message instanceof WrappedShardResponse){
@@ -249,6 +237,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
onGetSnapshot();
} else if(message instanceof ServerRemoved){
onShardReplicaRemoved((ServerRemoved) message);
+ } else if(message instanceof ChangeShardMembersVotingStatus){
+ onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
+ } else if(message instanceof FlipShardMembersVotingStatus){
+ onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
} else if(message instanceof SaveSnapshotSuccess) {
onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
} else if(message instanceof SaveSnapshotFailure) {
@@ -256,6 +248,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
persistenceId(), ((SaveSnapshotFailure) message).cause());
} else if(message instanceof Shutdown) {
onShutDown();
+ } else if (message instanceof GetLocalShardIds) {
+ onGetLocalShardIds();
+ } else if(message instanceof RunnableMessage) {
+ ((RunnableMessage)message).run();
} else {
unknownMessage(message);
}
@@ -311,31 +307,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
String leaderPath) {
- shardReplicaOperationsInProgress.remove(shardId);
+ shardReplicaOperationsInProgress.remove(shardId.getShardName());
LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
shardId.getShardName());
- originalSender.tell(new akka.actor.Status.Success(null), getSelf());
+ originalSender.tell(new Status.Success(null), getSelf());
} else {
LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
persistenceId(), shardId, replyMsg.getStatus());
- Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
- leaderPath, shardId);
- originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
- }
- }
-
- private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
- if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
- addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
- getSender());
- } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
- removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
- primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
+ Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
+ originalSender.tell(new Status.Failure(failure), getSelf());
}
}
@@ -364,6 +349,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
@Override
public void onComplete(Throwable failure, Object response) {
if (failure != null) {
+ shardReplicaOperationsInProgress.remove(shardName);
String msg = String.format("RemoveServer request to leader %s for shard %s failed",
primaryPath, shardName);
@@ -408,7 +394,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
}
if(notInitialized != null) {
- getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
+ getSender().tell(new Status.Failure(new IllegalStateException(String.format(
"%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
return;
}
@@ -435,14 +421,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
String shardName = createShard.getModuleShardConfig().getShardName();
if(localShards.containsKey(shardName)) {
LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
- reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
+ reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
} else {
doCreateShard(createShard);
- reply = new akka.actor.Status.Success(null);
+ reply = new Status.Success(null);
}
} catch (Exception e) {
LOG.error("{}: onCreateShard failed", persistenceId(), e);
- reply = new akka.actor.Status.Failure(e);
+ reply = new Status.Failure(e);
}
if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
@@ -495,8 +481,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
info.setActiveMember(isActiveMember);
localShards.put(info.getShardName(), info);
- mBean.addLocalShard(shardId.toString());
-
if(schemaContext != null) {
info.setActor(newShardActor(schemaContext, info));
}
@@ -547,10 +531,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
if(!shardInfo.isShardInitialized()) {
LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
- message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
+ message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
} else {
LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
- message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
+ message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
}
}
@@ -620,9 +604,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
String actorName = sender.path().name();
//find shard name from actor name; actor name is stringified shardId
- ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
- if (shardId.getShardName() == null) {
+ final ShardIdentifier shardId;
+ try {
+ shardId = ShardIdentifier.fromShardIdString(actorName);
+ } catch (IllegalArgumentException e) {
+ LOG.debug("{}: ignoring actor {}", actorName, e);
return;
}
@@ -681,12 +668,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
return;
}
- sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier