X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=d61e12e1cb2fd2bc97e7f6b34429c22adaf72669;hb=4c1877fd363bf0663c91b85b2bf5be64d22e56aa;hp=fabfd096104dcab91ee6a4ffabd5758a50466e64;hpb=733636ec5f1b4caecd130a6a26f6d196af6ff854;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index fabfd09610..d61e12e1cb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -24,6 +24,7 @@ import akka.persistence.RecoveryCompleted; import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; +import akka.persistence.SnapshotSelectionCriteria; import akka.serialization.Serialization; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; @@ -33,6 +34,8 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.Sets; +import java.io.ByteArrayInputStream; +import java.io.ObjectInputStream; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -48,6 +51,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; @@ -139,6 +143,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private DatastoreSnapshot restoreFromSnapshot; + private ShardManagerSnapshot currentSnapshot; + private final Set shardReplicaOperationsInProgress = new HashSet<>(); private final String persistenceId; @@ -233,7 +239,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if(message instanceof ServerRemoved){ onShardReplicaRemoved((ServerRemoved) message); } else if (message instanceof SaveSnapshotSuccess) { - LOG.debug("{} saved ShardManager snapshot successfully", persistenceId()); + onSaveSnapshotSuccess((SaveSnapshotSuccess)message); } else if (message instanceof SaveSnapshotFailure) { LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(), ((SaveSnapshotFailure) message).cause()); @@ -283,6 +289,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } byte[] shardManagerSnapshot = null; + if(currentSnapshot != null) { + shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot); + } + ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props( new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(), datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration())); @@ -293,17 +303,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void onCreateShard(CreateShard createShard) { + LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard); + Object reply; try { String shardName = createShard.getModuleShardConfig().getShardName(); if(localShards.containsKey(shardName)) { + LOG.debug("{}: Shard {} already exists", persistenceId(), shardName); reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName)); } else { doCreateShard(createShard); reply = new akka.actor.Status.Success(null); } } catch (Exception e) { - LOG.error("onCreateShard failed", e); + LOG.error("{}: onCreateShard failed", persistenceId(), e); reply = new akka.actor.Status.Failure(e); } @@ -328,13 +341,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); + boolean shardWasInRecoveredSnapshot = currentSnapshot != null && + currentSnapshot.getShardList().contains(shardName); + Map peerAddresses; boolean isActiveMember; - if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) { + if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName). + contains(cluster.getCurrentMemberName())) { peerAddresses = getPeerAddresses(shardName); isActiveMember = true; } else { - // The local member is not in the given shard member configuration. In this case we'll create + // The local member is not in the static shard member configuration and the shard did not + // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create // the shard with no peers and with elections disabled so it stays as follower. A // subsequent AddServer request will be needed to make it an active member. isActiveMember = false; @@ -343,8 +361,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build(); } - LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId, - moduleShardConfig.getShardMemberNames(), peerAddresses); + LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}", + persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses, + isActiveMember); ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses, shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver); @@ -499,17 +518,36 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override protected void handleRecover(Object message) throws Exception { if (message instanceof RecoveryCompleted) { - LOG.info("Recovery complete : {}", persistenceId()); - - // We no longer persist SchemaContext modules so delete all the prior messages from the akka - // journal on upgrade from Helium. - deleteMessages(lastSequenceNr()); - createLocalShards(); + onRecoveryCompleted(); } else if (message instanceof SnapshotOffer) { - handleShardRecovery((SnapshotOffer) message); + applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot()); } } + private void onRecoveryCompleted() { + LOG.info("Recovery complete : {}", persistenceId()); + + // We no longer persist SchemaContext modules so delete all the prior messages from the akka + // journal on upgrade from Helium. + deleteMessages(lastSequenceNr()); + + if(currentSnapshot == null && restoreFromSnapshot != null && + restoreFromSnapshot.getShardManagerSnapshot() != null) { + try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream( + restoreFromSnapshot.getShardManagerSnapshot()))) { + ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject(); + + LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot); + + applyShardManagerSnapshot(snapshot); + } catch(Exception e) { + LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e); + } + } + + createLocalShards(); + } + private void findLocalShard(FindLocalShard message) { final ShardInformation shardInformation = localShards.get(message.getShardName()); @@ -807,6 +845,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); + + LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId); + Map peerAddresses = getPeerAddresses(shardName); localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot( @@ -1053,16 +1094,23 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList); - saveSnapshot(new ShardManagerSnapshot(shardList)); + saveSnapshot(updateShardManagerSnapshot(shardList)); } - private void handleShardRecovery(SnapshotOffer offer) { - LOG.debug ("{}: in handleShardRecovery", persistenceId()); - ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot(); + private ShardManagerSnapshot updateShardManagerSnapshot(List shardList) { + currentSnapshot = new ShardManagerSnapshot(shardList); + return currentSnapshot; + } + + private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) { + currentSnapshot = snapshot; + + LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot); + String currentMember = cluster.getCurrentMemberName(); Set configuredShardList = new HashSet<>(configuration.getMemberShardNames(currentMember)); - for (String shard : snapshot.getShardList()) { + for (String shard : currentSnapshot.getShardList()) { if (!configuredShardList.contains(shard)) { // add the current member as a replica for the shard LOG.debug ("{}: adding shard {}", persistenceId(), shard); @@ -1078,6 +1126,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) { + LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available", + persistenceId()); + deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1))); + } + private static class ForwardedAddServerReply { ShardInformation shardInfo; AddServerReply addServerReply;