X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=c6d7d82c34435e87ac2857d6f6091b02a4175257;hb=9681454e5a6acb2ce5e87c72c06e6a03e901c765;hp=616f56c466bbac02194460dca1d07b4a57b2569f;hpb=4680d02510a884b3a893345f423cedcc8c5af0f4;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 616f56c466..c6d7d82c34 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -24,6 +24,7 @@ import akka.persistence.RecoveryCompleted; import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; +import akka.persistence.SnapshotSelectionCriteria; import akka.serialization.Serialization; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; @@ -33,6 +34,8 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.Sets; +import java.io.ByteArrayInputStream; +import java.io.ObjectInputStream; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -46,7 +49,9 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; @@ -138,13 +143,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private DatastoreSnapshot restoreFromSnapshot; + private ShardManagerSnapshot currentSnapshot; + private final Set shardReplicaOperationsInProgress = new HashSet<>(); private final String persistenceId; /** */ - protected ShardManager(Builder builder) { + protected ShardManager(AbstractBuilder builder) { this.cluster = builder.cluster; this.configuration = builder.configuration; @@ -222,9 +229,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if(message instanceof ForwardedAddServerFailure) { ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message; onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure); - } else if(message instanceof ForwardedAddServerPrimaryShardFound) { - ForwardedAddServerPrimaryShardFound msg = (ForwardedAddServerPrimaryShardFound)message; - addShard(msg.shardName, msg.primaryFound, getSender()); + } else if(message instanceof PrimaryShardFoundForContext) { + PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message; + onPrimaryShardFoundContext(primaryShardFoundContext); } else if(message instanceof RemoveShardReplica){ onRemoveShardReplica((RemoveShardReplica)message); } else if(message instanceof GetSnapshot) { @@ -232,15 +239,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if(message instanceof ServerRemoved){ onShardReplicaRemoved((ServerRemoved) message); } else if (message instanceof SaveSnapshotSuccess) { - LOG.debug("{} saved ShardManager snapshot successfully", persistenceId()); + onSaveSnapshotSuccess((SaveSnapshotSuccess)message); } else if (message instanceof SaveSnapshotFailure) { - LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards", - persistenceId(), ((SaveSnapshotFailure)message).cause()); + LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", + persistenceId(), ((SaveSnapshotFailure) message).cause()); } else { unknownMessage(message); } } + private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) { + if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) { + addShard(primaryShardFoundContext.shardName, primaryShardFoundContext.getRemotePrimaryShardFound(), getSender()); + } + } + private void onShardReplicaRemoved(ServerRemoved message) { final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build(); final ShardInformation shardInformation = localShards.remove(shardId.getShardName()); @@ -276,6 +289,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } byte[] shardManagerSnapshot = null; + if(currentSnapshot != null) { + shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot); + } + ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props( new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(), datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration())); @@ -286,17 +303,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void onCreateShard(CreateShard createShard) { + LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard); + Object reply; try { String shardName = createShard.getModuleShardConfig().getShardName(); if(localShards.containsKey(shardName)) { + LOG.debug("{}: Shard {} already exists", persistenceId(), shardName); reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName)); } else { doCreateShard(createShard); reply = new akka.actor.Status.Success(null); } } catch (Exception e) { - LOG.error("onCreateShard failed", e); + LOG.error("{}: onCreateShard failed", persistenceId(), e); reply = new akka.actor.Status.Failure(e); } @@ -321,13 +341,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); + boolean shardWasInRecoveredSnapshot = currentSnapshot != null && + currentSnapshot.getShardList().contains(shardName); + Map peerAddresses; boolean isActiveMember; - if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) { + if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName). + contains(cluster.getCurrentMemberName())) { peerAddresses = getPeerAddresses(shardName); isActiveMember = true; } else { - // The local member is not in the given shard member configuration. In this case we'll create + // The local member is not in the static shard member configuration and the shard did not + // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create // the shard with no peers and with elections disabled so it stays as follower. A // subsequent AddServer request will be needed to make it an active member. isActiveMember = false; @@ -336,8 +361,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build(); } - LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId, - moduleShardConfig.getShardMemberNames(), peerAddresses); + LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}", + persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses, + isActiveMember); ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses, shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver); @@ -492,15 +518,34 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override protected void handleRecover(Object message) throws Exception { if (message instanceof RecoveryCompleted) { - LOG.info("Recovery complete : {}", persistenceId()); - - // We no longer persist SchemaContext modules so delete all the prior messages from the akka - // journal on upgrade from Helium. - deleteMessages(lastSequenceNr()); - createLocalShards(); + onRecoveryCompleted(); } else if (message instanceof SnapshotOffer) { - handleShardRecovery((SnapshotOffer) message); + applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot()); + } + } + + private void onRecoveryCompleted() { + LOG.info("Recovery complete : {}", persistenceId()); + + // We no longer persist SchemaContext modules so delete all the prior messages from the akka + // journal on upgrade from Helium. + deleteMessages(lastSequenceNr()); + + if(currentSnapshot == null && restoreFromSnapshot != null && + restoreFromSnapshot.getShardManagerSnapshot() != null) { + try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream( + restoreFromSnapshot.getShardManagerSnapshot()))) { + ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject(); + + LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot); + + applyShardManagerSnapshot(snapshot); + } catch(Exception e) { + LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e); + } } + + createLocalShards(); } private void findLocalShard(FindLocalShard message) { @@ -752,12 +797,25 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } + Collection visitedAddresses; + if(message instanceof RemoteFindPrimary) { + visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses(); + } else { + visitedAddresses = new ArrayList<>(); + } + + visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString()); + for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) { + if(visitedAddresses.contains(address)) { + continue; + } + LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(), shardName, address); getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName, - message.isWaitUntilReady()), getContext()); + message.isWaitUntilReady(), visitedAddresses), getContext()); return; } @@ -800,6 +858,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); + + LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId); + Map peerAddresses = getPeerAddresses(shardName); localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot( @@ -865,7 +926,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return false; } - private void onAddShardReplica (AddShardReplica shardReplicaMsg) { + private void onAddShardReplica (final AddShardReplica shardReplicaMsg) { final String shardName = shardReplicaMsg.getShardName(); LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg); @@ -887,34 +948,19 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } - Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext(). - getShardInitializationTimeout().duration().$times(2)); + findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) { + @Override + public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { + getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + + } - final ActorRef sender = getSender(); - Future futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout); - futureObj.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, Object response) { - if (failure != null) { - LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure); - sender.tell(new akka.actor.Status.Failure(new RuntimeException( - String.format("Failed to find leader for shard %s", shardName), failure)), getSelf()); - } else { - if(response instanceof RemotePrimaryShardFound) { - self().tell(new ForwardedAddServerPrimaryShardFound(shardName, - (RemotePrimaryShardFound)response), sender); - } else if(response instanceof LocalPrimaryShardFound) { - sendLocalReplicaAlreadyExistsReply(shardName, sender); - } else { - String msg = String.format("Failed to find leader for shard %s: received response: %s", - shardName, response); - LOG.debug ("{}: {}", persistenceId(), msg); - sender.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable)response : - new RuntimeException(msg)), getSelf()); - } - } + public void onLocalPrimaryFound(LocalPrimaryShardFound message) { + sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor()); } - }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + + }); } private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) { @@ -1061,16 +1107,23 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList); - saveSnapshot(new ShardManagerSnapshot(shardList)); + saveSnapshot(updateShardManagerSnapshot(shardList)); } - private void handleShardRecovery(SnapshotOffer offer) { - LOG.debug ("{}: in handleShardRecovery", persistenceId()); - ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot(); + private ShardManagerSnapshot updateShardManagerSnapshot(List shardList) { + currentSnapshot = new ShardManagerSnapshot(shardList); + return currentSnapshot; + } + + private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) { + currentSnapshot = snapshot; + + LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot); + String currentMember = cluster.getCurrentMemberName(); Set configuredShardList = new HashSet<>(configuration.getMemberShardNames(currentMember)); - for (String shard : snapshot.getShardList()) { + for (String shard : currentSnapshot.getShardList()) { if (!configuredShardList.contains(shard)) { // add the current member as a replica for the shard LOG.debug ("{}: adding shard {}", persistenceId(), shard); @@ -1086,14 +1139,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private static class ForwardedAddServerPrimaryShardFound { - String shardName; - RemotePrimaryShardFound primaryFound; - - ForwardedAddServerPrimaryShardFound(String shardName, RemotePrimaryShardFound primaryFound) { - this.shardName = shardName; - this.primaryFound = primaryFound; - } + private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) { + LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available", + persistenceId()); + deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1))); } private static class ForwardedAddServerReply { @@ -1433,7 +1482,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return new Builder(); } - public static class Builder { + public static abstract class AbstractBuilder> { private ClusterWrapper cluster; private Configuration configuration; private DatastoreContextFactory datastoreContextFactory; @@ -1442,44 +1491,49 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private DatastoreSnapshot restoreFromSnapshot; private volatile boolean sealed; + @SuppressWarnings("unchecked") + private T self() { + return (T) this; + } + protected void checkSealed() { Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed"); } - public Builder cluster(ClusterWrapper cluster) { + public T cluster(ClusterWrapper cluster) { checkSealed(); this.cluster = cluster; - return this; + return self(); } - public Builder configuration(Configuration configuration) { + public T configuration(Configuration configuration) { checkSealed(); this.configuration = configuration; - return this; + return self(); } - public Builder datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) { + public T datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) { checkSealed(); this.datastoreContextFactory = datastoreContextFactory; - return this; + return self(); } - public Builder waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) { + public T waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) { checkSealed(); this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; - return this; + return self(); } - public Builder primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) { + public T primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) { checkSealed(); this.primaryShardInfoCache = primaryShardInfoCache; - return this; + return self(); } - public Builder restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) { + public T restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) { checkSealed(); this.restoreFromSnapshot = restoreFromSnapshot; - return this; + return self(); } protected void verify() { @@ -1496,6 +1550,171 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return Props.create(ShardManager.class, this); } } + + public static class Builder extends AbstractBuilder { + } + + private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) { + Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext(). + getShardInitializationTimeout().duration().$times(2)); + + + Future futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout); + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + handler.onFailure(failure); + } else { + if(response instanceof RemotePrimaryShardFound) { + handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response); + } else if(response instanceof LocalPrimaryShardFound) { + handler.onLocalPrimaryFound((LocalPrimaryShardFound) response); + } else { + handler.onUnknownResponse(response); + } + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + + /** + * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the + * a remote or local find primary message is processed + */ + private static interface FindPrimaryResponseHandler { + /** + * Invoked when a Failure message is received as a response + * + * @param failure + */ + void onFailure(Throwable failure); + + /** + * Invoked when a RemotePrimaryShardFound response is received + * + * @param response + */ + void onRemotePrimaryShardFound(RemotePrimaryShardFound response); + + /** + * Invoked when a LocalPrimaryShardFound response is received + * @param response + */ + void onLocalPrimaryFound(LocalPrimaryShardFound response); + + /** + * Invoked when an unknown response is received. This is another type of failure. + * + * @param response + */ + void onUnknownResponse(Object response); + } + + /** + * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary + * replica and sends a wrapped Failure response to some targetActor + */ + private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler { + private final ActorRef targetActor; + private final String shardName; + private final String persistenceId; + private final ActorRef shardManagerActor; + + /** + * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs + * @param shardName The name of the shard for which the primary replica had to be found + * @param persistenceId The persistenceId for the ShardManager + * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary + */ + protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){ + this.targetActor = Preconditions.checkNotNull(targetActor); + this.shardName = Preconditions.checkNotNull(shardName); + this.persistenceId = Preconditions.checkNotNull(persistenceId); + this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor); + } + + public ActorRef getTargetActor() { + return targetActor; + } + + public String getShardName() { + return shardName; + } + + public ActorRef getShardManagerActor() { + return shardManagerActor; + } + + @Override + public void onFailure(Throwable failure) { + LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure); + targetActor.tell(new akka.actor.Status.Failure(new RuntimeException( + String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor); + } + + @Override + public void onUnknownResponse(Object response) { + String msg = String.format("Failed to find leader for shard %s: received response: %s", + shardName, response); + LOG.debug ("{}: {}", persistenceId, msg); + targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response : + new RuntimeException(msg)), shardManagerActor); + } + } + + + /** + * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be + * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received + * as a successful response to find primary. + */ + private static class PrimaryShardFoundForContext { + private final String shardName; + private final Object contextMessage; + private final RemotePrimaryShardFound remotePrimaryShardFound; + private final LocalPrimaryShardFound localPrimaryShardFound; + + public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, @Nonnull Object primaryFoundMessage) { + this.shardName = Preconditions.checkNotNull(shardName); + this.contextMessage = Preconditions.checkNotNull(contextMessage); + Preconditions.checkNotNull(primaryFoundMessage); + this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? (RemotePrimaryShardFound) primaryFoundMessage : null; + this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? (LocalPrimaryShardFound) primaryFoundMessage : null; + } + + @Nonnull + public String getPrimaryPath(){ + if(remotePrimaryShardFound != null){ + return remotePrimaryShardFound.getPrimaryPath(); + } + return localPrimaryShardFound.getPrimaryPath(); + } + + @Nonnull + public Object getContextMessage() { + return contextMessage; + } + + @Nullable + public RemotePrimaryShardFound getRemotePrimaryShardFound(){ + return remotePrimaryShardFound; + } + + @Nullable + public LocalPrimaryShardFound getLocalPrimaryShardFound(){ + return localPrimaryShardFound; + } + + boolean isPrimaryLocal(){ + return (remotePrimaryShardFound == null); + } + + @Nonnull + public String getShardName() { + return shardName; + } + } }