X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=98a6090514c9549f2f506c82a85fce7376e35cf6;hb=refs%2Fchanges%2F70%2F29370%2F7;hp=0804a50e9b1f2b6d5587742e83b8efa2218ddd30;hpb=357eb2684de759f296d1315e587308db5f1c54d4;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 0804a50e9b..98a6090514 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -19,9 +19,11 @@ import akka.actor.Props; import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; import akka.dispatch.OnComplete; -import akka.japi.Creator; import akka.japi.Function; import akka.persistence.RecoveryCompleted; +import akka.persistence.SaveSnapshotFailure; +import akka.persistence.SaveSnapshotSuccess; +import akka.persistence.SnapshotOffer; import akka.serialization.Serialization; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; @@ -35,6 +37,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -54,6 +57,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; @@ -116,7 +120,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final String shardDispatcherPath; - private ShardManagerInfo mBean; + private final ShardManagerInfo mBean; private DatastoreContextFactory datastoreContextFactory; @@ -128,43 +132,33 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private SchemaContext schemaContext; + private DatastoreSnapshot restoreFromSnapshot; + /** */ - protected ShardManager(ClusterWrapper cluster, Configuration configuration, - DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch, - PrimaryShardInfoFutureCache primaryShardInfoCache) { - - this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); - this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); - this.datastoreContextFactory = datastoreContextFactory; - this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreType(); + protected ShardManager(Builder builder) { + + this.cluster = builder.cluster; + this.configuration = builder.configuration; + this.datastoreContextFactory = builder.datastoreContextFactory; + this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType(); this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); - this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; - this.primaryShardInfoCache = primaryShardInfoCache; + this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch; + this.primaryShardInfoCache = builder.primaryShardInfoCache; + this.restoreFromSnapshot = builder.restoreFromSnapshot; peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName()); // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); - createLocalShards(); - } - - public static Props props( - final ClusterWrapper cluster, - final Configuration configuration, - final DatastoreContextFactory datastoreContextFactory, - final CountDownLatch waitTillReadyCountdownLatch, - final PrimaryShardInfoFutureCache primaryShardInfoCache) { - - Preconditions.checkNotNull(cluster, "cluster should not be null"); - Preconditions.checkNotNull(configuration, "configuration should not be null"); - Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null"); - Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null"); - - return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContextFactory, - waitTillReadyCountdownLatch, primaryShardInfoCache)); + List localShardActorNames = new ArrayList<>(); + mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(), + "shard-manager-" + this.type, + datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), + localShardActorNames); + mBean.setShardManager(this); } @Override @@ -214,6 +208,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onRemoveShardReplica((RemoveShardReplica)message); } else if(message instanceof GetSnapshot) { onGetSnapshot(); + } else if (message instanceof SaveSnapshotSuccess) { + LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId()); + } else if (message instanceof SaveSnapshotFailure) { + LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards", + persistenceId(), ((SaveSnapshotFailure)message).cause()); } else { unknownMessage(message); } @@ -241,8 +240,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } byte[] shardManagerSnapshot = null; - ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(localShards.size(), - type, shardManagerSnapshot , getSender(), persistenceId(), + ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props( + new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(), datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration())); for(ShardInformation shardInfo: localShards.values()) { @@ -277,7 +276,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses, - shardDatastoreContext, createShard.getShardPropsCreator(), peerAddressResolver); + shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver); localShards.put(info.getShardName(), info); mBean.addLocalShard(shardId.toString()); @@ -443,6 +442,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // We no longer persist SchemaContext modules so delete all the prior messages from the akka // journal on upgrade from Helium. deleteMessages(lastSequenceNr()); + createLocalShards(); + } else if (message instanceof SnapshotOffer) { + handleShardRecovery((SnapshotOffer) message); } } @@ -731,20 +733,24 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { String memberName = this.cluster.getCurrentMemberName(); Collection memberShardNames = this.configuration.getMemberShardNames(memberName); - ShardPropsCreator shardPropsCreator = new DefaultShardPropsCreator(); - List localShardActorNames = new ArrayList<>(); + Map shardSnapshots = new HashMap<>(); + if(restoreFromSnapshot != null) + { + for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) { + shardSnapshots.put(snapshot.getName(), snapshot); + } + } + + restoreFromSnapshot = null; // null out to GC + for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); Map peerAddresses = getPeerAddresses(shardName); - localShardActorNames.add(shardId.toString()); localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, - newShardDatastoreContext(shardName), shardPropsCreator, peerAddressResolver)); + newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot( + shardSnapshots.get(shardName)), peerAddressResolver)); + mBean.addLocalShard(shardId.toString()); } - - mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type, - datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), localShardActorNames); - - mBean.setShardManager(this); } /** @@ -874,7 +880,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { final ShardInformation shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext, - new DefaultShardPropsCreator(), peerAddressResolver); + Shard.builder(), peerAddressResolver); + shardInfo.setShardActiveMember(false); localShards.put(shardName, shardInfo); shardInfo.setActor(newShardActor(schemaContext, shardInfo)); @@ -921,6 +928,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // Make the local shard voting capable shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf()); + shardInfo.setShardActiveMember(true); + persistShardList(); mBean.addLocalShard(shardInfo.getShardId().toString()); sender.tell(new akka.actor.Status.Success(true), getSelf()); @@ -966,6 +975,39 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } + private void persistShardList() { + List shardList = new ArrayList(localShards.keySet()); + for (ShardInformation shardInfo : localShards.values()) { + if (!shardInfo.isShardActiveMember()) { + shardList.remove(shardInfo.getShardName()); + } + } + LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList); + saveSnapshot(new ShardManagerSnapshot(shardList)); + } + + private void handleShardRecovery(SnapshotOffer offer) { + LOG.debug ("{}: in handleShardRecovery", persistenceId()); + ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot(); + String currentMember = cluster.getCurrentMemberName(); + Set configuredShardList = + new HashSet<>(configuration.getMemberShardNames(currentMember)); + for (String shard : snapshot.getShardList()) { + if (!configuredShardList.contains(shard)) { + // add the current member as a replica for the shard + LOG.debug ("{}: adding shard {}", persistenceId(), shard); + configuration.addMemberReplicaForShard(shard, currentMember); + } else { + configuredShardList.remove(shard); + } + } + for (String shard : configuredShardList) { + // remove the member as a replica for the shard + LOG.debug ("{}: removing shard {}", persistenceId(), shard); + configuration.removeMemberReplicaForShard(shard, currentMember); + } + } + @VisibleForTesting protected static class ShardInformation { private final ShardIdentifier shardId; @@ -987,22 +1029,27 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private short leaderVersion; private DatastoreContext datastoreContext; - private final ShardPropsCreator shardPropsCreator; + private Shard.AbstractBuilder builder; private final ShardPeerAddressResolver addressResolver; + private boolean shardActiveStatus = true; private ShardInformation(String shardName, ShardIdentifier shardId, Map initialPeerAddresses, DatastoreContext datastoreContext, - ShardPropsCreator shardPropsCreator, ShardPeerAddressResolver addressResolver) { + Shard.AbstractBuilder builder, ShardPeerAddressResolver addressResolver) { this.shardName = shardName; this.shardId = shardId; this.initialPeerAddresses = initialPeerAddresses; this.datastoreContext = datastoreContext; - this.shardPropsCreator = shardPropsCreator; + this.builder = builder; this.addressResolver = addressResolver; } Props newProps(SchemaContext schemaContext) { - return shardPropsCreator.newProps(shardId, initialPeerAddresses, datastoreContext, schemaContext); + Preconditions.checkNotNull(builder); + Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext). + schemaContext(schemaContext).props(); + builder = null; + return props; } String getShardName() { @@ -1183,31 +1230,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { void setLeaderVersion(short leaderVersion) { this.leaderVersion = leaderVersion; } - } - private static class ShardManagerCreator implements Creator { - private static final long serialVersionUID = 1L; - - final ClusterWrapper cluster; - final Configuration configuration; - final DatastoreContextFactory datastoreContextFactory; - private final CountDownLatch waitTillReadyCountdownLatch; - private final PrimaryShardInfoFutureCache primaryShardInfoCache; - - ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, - DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch, - PrimaryShardInfoFutureCache primaryShardInfoCache) { - this.cluster = cluster; - this.configuration = configuration; - this.datastoreContextFactory = datastoreContextFactory; - this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; - this.primaryShardInfoCache = primaryShardInfoCache; + void setShardActiveMember(boolean flag) { + shardActiveStatus = flag; } - @Override - public ShardManager create() throws Exception { - return new ShardManager(cluster, configuration, datastoreContextFactory, waitTillReadyCountdownLatch, - primaryShardInfoCache); + boolean isShardActiveMember() { + return shardActiveStatus; } } @@ -1280,6 +1309,74 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return modules; } } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private ClusterWrapper cluster; + private Configuration configuration; + private DatastoreContextFactory datastoreContextFactory; + private CountDownLatch waitTillReadyCountdownLatch; + private PrimaryShardInfoFutureCache primaryShardInfoCache; + private DatastoreSnapshot restoreFromSnapshot; + private volatile boolean sealed; + + protected void checkSealed() { + Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed"); + } + + public Builder cluster(ClusterWrapper cluster) { + checkSealed(); + this.cluster = cluster; + return this; + } + + public Builder configuration(Configuration configuration) { + checkSealed(); + this.configuration = configuration; + return this; + } + + public Builder datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) { + checkSealed(); + this.datastoreContextFactory = datastoreContextFactory; + return this; + } + + public Builder waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) { + checkSealed(); + this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; + return this; + } + + public Builder primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) { + checkSealed(); + this.primaryShardInfoCache = primaryShardInfoCache; + return this; + } + + public Builder restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) { + checkSealed(); + this.restoreFromSnapshot = restoreFromSnapshot; + return this; + } + + protected void verify() { + sealed = true; + Preconditions.checkNotNull(cluster, "cluster should not be null"); + Preconditions.checkNotNull(configuration, "configuration should not be null"); + Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null"); + Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null"); + Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null"); + } + + public Props props() { + verify(); + return Props.create(ShardManager.class, this); + } + } }