import akka.actor.SupervisorStrategy;
import akka.cluster.ClusterEvent;
import akka.dispatch.OnComplete;
-import akka.japi.Creator;
import akka.japi.Function;
import akka.persistence.RecoveryCompleted;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotOffer;
import akka.serialization.Serialization;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
private final String shardDispatcherPath;
- private ShardManagerInfo mBean;
+ private final ShardManagerInfo mBean;
- private DatastoreContext datastoreContext;
+ private DatastoreContextFactory datastoreContextFactory;
private final CountDownLatch waitTillReadyCountdownLatch;
private SchemaContext schemaContext;
+ private DatastoreSnapshot restoreFromSnapshot;
+
/**
*/
- protected ShardManager(ClusterWrapper cluster, Configuration configuration,
- DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch,
- PrimaryShardInfoFutureCache primaryShardInfoCache) {
-
- this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
- this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
- this.datastoreContext = datastoreContext;
- this.type = datastoreContext.getDataStoreType();
+ protected ShardManager(Builder builder) {
+
+ this.cluster = builder.cluster;
+ this.configuration = builder.configuration;
+ this.datastoreContextFactory = builder.datastoreContextFactory;
+ this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
- this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
- this.primaryShardInfoCache = primaryShardInfoCache;
+ this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
+ this.primaryShardInfoCache = builder.primaryShardInfoCache;
+ this.restoreFromSnapshot = builder.restoreFromSnapshot;
peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
- this.datastoreContext = DatastoreContext.newBuilderFrom(datastoreContext).shardPeerAddressResolver(
- peerAddressResolver).build();
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
- createLocalShards();
- }
-
- public static Props props(
- final ClusterWrapper cluster,
- final Configuration configuration,
- final DatastoreContext datastoreContext,
- final CountDownLatch waitTillReadyCountdownLatch,
- final PrimaryShardInfoFutureCache primaryShardInfoCache) {
-
- Preconditions.checkNotNull(cluster, "cluster should not be null");
- Preconditions.checkNotNull(configuration, "configuration should not be null");
- Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
- Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
-
- return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext,
- waitTillReadyCountdownLatch, primaryShardInfoCache));
+ List<String> localShardActorNames = new ArrayList<>();
+ mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
+ "shard-manager-" + this.type,
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
+ localShardActorNames);
+ mBean.setShardManager(this);
}
@Override
memberUnreachable((ClusterEvent.UnreachableMember)message);
} else if(message instanceof ClusterEvent.ReachableMember) {
memberReachable((ClusterEvent.ReachableMember) message);
- } else if(message instanceof DatastoreContext) {
- onDatastoreContext((DatastoreContext)message);
+ } else if(message instanceof DatastoreContextFactory) {
+ onDatastoreContextFactory((DatastoreContextFactory)message);
} else if(message instanceof RoleChangeNotification) {
onRoleChangeNotification((RoleChangeNotification) message);
} else if(message instanceof FollowerInitialSyncUpStatus){
onAddShardReplica((AddShardReplica)message);
} else if(message instanceof RemoveShardReplica){
onRemoveShardReplica((RemoveShardReplica)message);
+ } else if(message instanceof GetSnapshot) {
+ onGetSnapshot();
+ } else if (message instanceof SaveSnapshotSuccess) {
+ LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId());
+ } else if (message instanceof SaveSnapshotFailure) {
+ LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
+ persistenceId(), ((SaveSnapshotFailure)message).cause());
} else {
unknownMessage(message);
}
}
+ private void onGetSnapshot() {
+ LOG.debug("{}: onGetSnapshot", persistenceId());
+
+ List<String> notInitialized = null;
+ for(ShardInformation shardInfo: localShards.values()) {
+ if(!shardInfo.isShardInitialized()) {
+ if(notInitialized == null) {
+ notInitialized = new ArrayList<>();
+ }
+
+ notInitialized.add(shardInfo.getShardName());
+ }
+ }
+
+ if(notInitialized != null) {
+ getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
+ "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
+ return;
+ }
+
+ byte[] shardManagerSnapshot = null;
+ ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
+ new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
+ datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
+
+ for(ShardInformation shardInfo: localShards.values()) {
+ shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
+ }
+ }
+
private void onCreateShard(CreateShard createShard) {
Object reply;
try {
DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
if(shardDatastoreContext == null) {
- shardDatastoreContext = datastoreContext;
+ shardDatastoreContext = newShardDatastoreContext(moduleShardConfig.getShardName());
} else {
shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
peerAddressResolver).build();
}
ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
- shardDatastoreContext, createShard.getShardPropsCreator(), peerAddressResolver);
+ shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
localShards.put(info.getShardName(), info);
mBean.addLocalShard(shardId.toString());
}
}
+ private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
+ return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
+ shardPeerAddressResolver(peerAddressResolver);
+ }
+
+ private DatastoreContext newShardDatastoreContext(String shardName) {
+ return newShardDatastoreContextBuilder(shardName).build();
+ }
+
private void checkReady(){
if (isReadyWithLeaderId()) {
LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
// We no longer persist SchemaContext modules so delete all the prior messages from the akka
// journal on upgrade from Helium.
deleteMessages(lastSequenceNr());
+ createLocalShards();
+ } else if (message instanceof SnapshotOffer) {
+ handleShardRecovery((SnapshotOffer) message);
}
}
shardInformation.addOnShardInitialized(onShardInitialized);
- LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
-
- FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration();
+ FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
if(shardInformation.isShardInitialized()) {
// If the shard is already initialized then we'll wait enough time for the shard to
// elect a leader, ie 2 times the election timeout.
- timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig()
+ timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
.getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
}
+ LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
+ shardInformation.getShardName());
+
Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
timeout, getSelf(),
new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
}
}
- private void onDatastoreContext(DatastoreContext context) {
- datastoreContext = DatastoreContext.newBuilderFrom(context).shardPeerAddressResolver(
- peerAddressResolver).build();
+ private void onDatastoreContextFactory(DatastoreContextFactory factory) {
+ datastoreContextFactory = factory;
for (ShardInformation info : localShards.values()) {
- if (info.getActor() != null) {
- info.getActor().tell(datastoreContext, getSelf());
- }
+ info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
}
}
String memberName = this.cluster.getCurrentMemberName();
Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
- ShardPropsCreator shardPropsCreator = new DefaultShardPropsCreator();
- List<String> localShardActorNames = new ArrayList<>();
+ Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
+ if(restoreFromSnapshot != null)
+ {
+ for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
+ shardSnapshots.put(snapshot.getName(), snapshot);
+ }
+ }
+
+ restoreFromSnapshot = null; // null out to GC
+
for(String shardName : memberShardNames){
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
Map<String, String> peerAddresses = getPeerAddresses(shardName);
- localShardActorNames.add(shardId.toString());
- localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext,
- shardPropsCreator, peerAddressResolver));
+ localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
+ newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
+ shardSnapshots.get(shardName)), peerAddressResolver));
+ mBean.addLocalShard(shardId.toString());
}
-
- mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
- datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
-
- mBean.setShardManager(this);
}
/**
return mBean;
}
- private DatastoreContext getInitShardDataStoreContext() {
- return (DatastoreContext.newBuilderFrom(datastoreContext)
- .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
- .build());
- }
-
private void checkLocalShardExists(final String shardName, final ActorRef sender) {
if (localShards.containsKey(shardName)) {
String msg = String.format("Local shard %s already exists", shardName);
return;
}
- Timeout findPrimaryTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
+ Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+ getShardInitializationTimeout().duration().$times(2));
final ActorRef sender = getSender();
Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout);
ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+
+ DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
+ DisableElectionsRaftPolicy.class.getName()).build();
+
final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
- getPeerAddresses(shardName), getInitShardDataStoreContext(),
- new DefaultShardPropsCreator(), peerAddressResolver);
+ getPeerAddresses(shardName), datastoreContext,
+ Shard.builder(), peerAddressResolver);
+ shardInfo.setShardActiveMember(false);
localShards.put(shardName, shardInfo);
shardInfo.setActor(newShardActor(schemaContext, shardInfo));
LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
response.getPrimaryPath(), shardId);
- Timeout addServerTimeout = new Timeout(datastoreContext
- .getShardLeaderElectionTimeout().duration().$times(4));
+ Timeout addServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(4));
Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout);
LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
// Make the local shard voting capable
- shardInfo.setDatastoreContext(datastoreContext, getSelf());
+ shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
+ shardInfo.setShardActiveMember(true);
+ persistShardList();
mBean.addLocalShard(shardInfo.getShardId().toString());
sender.tell(new akka.actor.Status.Success(true), getSelf());
return;
}
+ private void persistShardList() {
+ List<String> shardList = new ArrayList(localShards.keySet());
+ for (ShardInformation shardInfo : localShards.values()) {
+ if (!shardInfo.isShardActiveMember()) {
+ shardList.remove(shardInfo.getShardName());
+ }
+ }
+ LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
+ saveSnapshot(new ShardManagerSnapshot(shardList));
+ }
+
+ private void handleShardRecovery(SnapshotOffer offer) {
+ LOG.debug ("{}: in handleShardRecovery", persistenceId());
+ ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
+ String currentMember = cluster.getCurrentMemberName();
+ Set<String> configuredShardList =
+ new HashSet<>(configuration.getMemberShardNames(currentMember));
+ for (String shard : snapshot.getShardList()) {
+ if (!configuredShardList.contains(shard)) {
+ // add the current member as a replica for the shard
+ LOG.debug ("{}: adding shard {}", persistenceId(), shard);
+ configuration.addMemberReplicaForShard(shard, currentMember);
+ } else {
+ configuredShardList.remove(shard);
+ }
+ }
+ for (String shard : configuredShardList) {
+ // remove the member as a replica for the shard
+ LOG.debug ("{}: removing shard {}", persistenceId(), shard);
+ configuration.removeMemberReplicaForShard(shard, currentMember);
+ }
+ }
+
@VisibleForTesting
protected static class ShardInformation {
private final ShardIdentifier shardId;
private short leaderVersion;
private DatastoreContext datastoreContext;
- private final ShardPropsCreator shardPropsCreator;
+ private Shard.AbstractBuilder<?, ?> builder;
private final ShardPeerAddressResolver addressResolver;
+ private boolean shardActiveStatus = true;
private ShardInformation(String shardName, ShardIdentifier shardId,
Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
- ShardPropsCreator shardPropsCreator, ShardPeerAddressResolver addressResolver) {
+ Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
this.shardName = shardName;
this.shardId = shardId;
this.initialPeerAddresses = initialPeerAddresses;
this.datastoreContext = datastoreContext;
- this.shardPropsCreator = shardPropsCreator;
+ this.builder = builder;
this.addressResolver = addressResolver;
}
Props newProps(SchemaContext schemaContext) {
- return shardPropsCreator.newProps(shardId, initialPeerAddresses, datastoreContext, schemaContext);
+ Preconditions.checkNotNull(builder);
+ Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
+ schemaContext(schemaContext).props();
+ builder = null;
+ return props;
}
String getShardName() {
return localShardDataTree;
}
+ DatastoreContext getDatastoreContext() {
+ return datastoreContext;
+ }
+
+ void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
+ this.datastoreContext = datastoreContext;
+ if (actor != null) {
+ LOG.debug ("Sending new DatastoreContext to {}", shardId);
+ actor.tell(this.datastoreContext, sender);
+ }
+ }
+
void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
this.leaderVersion = leaderVersion;
}
- void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
- this.datastoreContext = datastoreContext;
- //notify the datastoreContextchange
- LOG.debug ("Notifying RaftPolicy change via datastoreContextChange for {}",
- this.shardName);
- if (actor != null) {
- actor.tell(this.datastoreContext, sender);
- }
- }
- }
-
- private static class ShardManagerCreator implements Creator<ShardManager> {
- private static final long serialVersionUID = 1L;
-
- final ClusterWrapper cluster;
- final Configuration configuration;
- final DatastoreContext datastoreContext;
- private final CountDownLatch waitTillReadyCountdownLatch;
- private final PrimaryShardInfoFutureCache primaryShardInfoCache;
-
- ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext,
- CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) {
- this.cluster = cluster;
- this.configuration = configuration;
- this.datastoreContext = datastoreContext;
- this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
- this.primaryShardInfoCache = primaryShardInfoCache;
+ void setShardActiveMember(boolean flag) {
+ shardActiveStatus = flag;
}
- @Override
- public ShardManager create() throws Exception {
- return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch,
- primaryShardInfoCache);
+ boolean isShardActiveMember() {
+ return shardActiveStatus;
}
}
return modules;
}
}
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private ClusterWrapper cluster;
+ private Configuration configuration;
+ private DatastoreContextFactory datastoreContextFactory;
+ private CountDownLatch waitTillReadyCountdownLatch;
+ private PrimaryShardInfoFutureCache primaryShardInfoCache;
+ private DatastoreSnapshot restoreFromSnapshot;
+ private volatile boolean sealed;
+
+ protected void checkSealed() {
+ Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
+ }
+
+ public Builder cluster(ClusterWrapper cluster) {
+ checkSealed();
+ this.cluster = cluster;
+ return this;
+ }
+
+ public Builder configuration(Configuration configuration) {
+ checkSealed();
+ this.configuration = configuration;
+ return this;
+ }
+
+ public Builder datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
+ checkSealed();
+ this.datastoreContextFactory = datastoreContextFactory;
+ return this;
+ }
+
+ public Builder waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
+ checkSealed();
+ this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+ return this;
+ }
+
+ public Builder primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
+ checkSealed();
+ this.primaryShardInfoCache = primaryShardInfoCache;
+ return this;
+ }
+
+ public Builder restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
+ checkSealed();
+ this.restoreFromSnapshot = restoreFromSnapshot;
+ return this;
+ }
+
+ protected void verify() {
+ sealed = true;
+ Preconditions.checkNotNull(cluster, "cluster should not be null");
+ Preconditions.checkNotNull(configuration, "configuration should not be null");
+ Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
+ Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
+ Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
+ }
+
+ public Props props() {
+ verify();
+ return Props.create(ShardManager.class, this);
+ }
+ }
}