X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=aa2c524fe7cf5acd3c0b49fc8f210df013ff6e44;hb=refs%2Fchanges%2F02%2F38802%2F16;hp=7a41021475186b4d320d7b773a14bbac9e650f86;hpb=849db3eff0c710410aaed61a709ba69b491a75bd;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
index 7a41021475..aa2c524fe7 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
@@ -14,10 +14,11 @@ import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.PoisonPill;
-import akka.actor.Props;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
+import akka.actor.SupervisorStrategy.Directive;
import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.japi.Function;
@@ -27,15 +28,9 @@ import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
-import akka.serialization.Serialization;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Sets;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
@@ -43,22 +38,22 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.lang3.SerializationUtils;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.Shard;
-import org.opendaylight.controller.cluster.datastore.ShardManagerSnapshot;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
@@ -66,7 +61,6 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderExc
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
@@ -76,20 +70,15 @@ import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
-import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
-import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
-import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
@@ -101,7 +90,6 @@ import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,7 +107,7 @@ import scala.concurrent.duration.FiniteDuration;
*
Monitor the cluster members and store their addresses
*
*/
-public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
+class ShardManager extends AbstractUntypedPersistentActorWithMetering {
private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
@@ -158,19 +146,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
private final String persistenceId;
- /**
- */
- protected ShardManager(AbstractBuilder> builder) {
-
- this.cluster = builder.cluster;
- this.configuration = builder.configuration;
- this.datastoreContextFactory = builder.datastoreContextFactory;
- this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
+ ShardManager(AbstractShardManagerCreator> builder) {
+ this.cluster = builder.getCluster();
+ this.configuration = builder.getConfiguration();
+ this.datastoreContextFactory = builder.getDatastoreContextFactory();
+ this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
- this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
- this.primaryShardInfoCache = builder.primaryShardInfoCache;
- this.restoreFromSnapshot = builder.restoreFromSnapshot;
+ this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountdownLatch();
+ this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
+ this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
@@ -180,12 +165,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
- List localShardActorNames = new ArrayList<>();
- mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
- "shard-manager-" + this.type,
- datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
- localShardActorNames);
- mBean.setShardManager(this);
+ mBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), "shard-manager-" + this.type,
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
+ mBean.registerMBean();
}
@Override
@@ -256,6 +238,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
persistenceId(), ((SaveSnapshotFailure) message).cause());
} else if(message instanceof Shutdown) {
onShutDown();
+ } else if (message instanceof GetLocalShardIds) {
+ onGetLocalShardIds();
} else {
unknownMessage(message);
}
@@ -318,14 +302,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
shardId.getShardName());
- originalSender.tell(new akka.actor.Status.Success(null), getSelf());
+ originalSender.tell(new Status.Success(null), getSelf());
} else {
LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
persistenceId(), shardId, replyMsg.getStatus());
- Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
- leaderPath, shardId);
- originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
+ Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
+ originalSender.tell(new Status.Failure(failure), getSelf());
}
}
@@ -408,7 +391,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
}
if(notInitialized != null) {
- getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
+ getSender().tell(new Status.Failure(new IllegalStateException(String.format(
"%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
return;
}
@@ -435,14 +418,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
String shardName = createShard.getModuleShardConfig().getShardName();
if(localShards.containsKey(shardName)) {
LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
- reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
+ reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
} else {
doCreateShard(createShard);
- reply = new akka.actor.Status.Success(null);
+ reply = new Status.Success(null);
}
} catch (Exception e) {
LOG.error("{}: onCreateShard failed", persistenceId(), e);
- reply = new akka.actor.Status.Failure(e);
+ reply = new Status.Failure(e);
}
if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
@@ -495,8 +478,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
info.setActiveMember(isActiveMember);
localShards.put(info.getShardName(), info);
- mBean.addLocalShard(shardId.toString());
-
if(schemaContext != null) {
info.setActor(newShardActor(schemaContext, info));
}
@@ -547,10 +528,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
if(!shardInfo.isShardInitialized()) {
LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
- message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
+ message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
} else {
LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
- message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
+ message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
}
}
@@ -681,12 +662,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
return;
}
- sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier