Bug 7521: Convert byte[] to ShardManagerSnapshot in DatastoreSnapshot
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
index 7653aea3fb814b2c69e36684800902df013aab72..b2c9d30184079141e552c21edfafcf8ba8ba2c0b 100644 (file)
@@ -33,8 +33,6 @@ import akka.persistence.SnapshotSelectionCriteria;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -49,25 +47,27 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
-import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
 import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
@@ -79,6 +79,9 @@ import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShard
 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
@@ -98,6 +101,7 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -160,7 +164,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
-        this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountdownLatch();
+        this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
 
@@ -220,14 +224,19 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if (message instanceof SwitchShardBehavior) {
             onSwitchShardBehavior((SwitchShardBehavior) message);
         } else if (message instanceof CreateShard) {
-            onCreateShard((CreateShard) message);
+            onCreateShard((CreateShard)message);
         } else if (message instanceof AddShardReplica) {
             onAddShardReplica((AddShardReplica) message);
+        } else if (message instanceof CreatePrefixedShard) {
+            onCreatePrefixedShard((CreatePrefixedShard) message);
+        } else if (message instanceof AddPrefixShardReplica) {
+            onAddPrefixShardReplica((AddPrefixShardReplica) message);
         } else if (message instanceof ForwardedAddServerReply) {
-            ForwardedAddServerReply msg = (ForwardedAddServerReply) message;
-            onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, msg.removeShardOnFailure);
+            ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
+            onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
+                    msg.removeShardOnFailure);
         } else if (message instanceof ForwardedAddServerFailure) {
-            ForwardedAddServerFailure msg = (ForwardedAddServerFailure) message;
+            ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
         } else if (message instanceof RemoveShardReplica) {
             onRemoveShardReplica((RemoveShardReplica) message);
@@ -286,7 +295,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
                 } else {
                     int nfailed = 0;
-                    for (Boolean result: results) {
+                    for (Boolean result : results) {
                         if (!result) {
                             nfailed++;
                         }
@@ -400,13 +409,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
-        byte[] shardManagerSnapshot = null;
-        if (currentSnapshot != null) {
-            shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
-        }
-
         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
-                new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
+                new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
 
         for (ShardInformation shardInfo: localShards.values()) {
@@ -414,6 +418,31 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void onCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
+        LOG.debug("{}: onCreatePrefixedShard: {}", persistenceId(), createPrefixedShard);
+
+        Object reply;
+        try {
+            final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
+                    createPrefixedShard.getConfig().getPrefix());
+            if (localShards.containsKey(shardId.getShardName())) {
+                LOG.debug("{}: Shard {} already exists", persistenceId(), shardId);
+                reply = new Status.Success(String.format("Shard with name %s already exists", shardId));
+            } else {
+                doCreatePrefixedShard(createPrefixedShard);
+                reply = new Status.Success(null);
+            }
+        } catch (final Exception e) {
+            LOG.error("{}: onCreateShard failed", persistenceId(), e);
+            reply = new Status.Failure(e);
+        }
+
+        if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+            getSender().tell(reply, getSelf());
+        }
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void onCreateShard(CreateShard createShard) {
         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
@@ -438,9 +467,48 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void doCreateShard(CreateShard createShard) {
-        ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
-        String shardName = moduleShardConfig.getShardName();
+    private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
+        final PrefixShardConfiguration config = createPrefixedShard.getConfig();
+
+        final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
+                createPrefixedShard.getConfig().getPrefix());
+        final String shardName = shardId.getShardName();
+
+        configuration.addPrefixShardConfiguration(config);
+
+        DatastoreContext shardDatastoreContext = createPrefixedShard.getContext();
+
+        if (shardDatastoreContext == null) {
+            final Builder builder = newShardDatastoreContextBuilder(shardName);
+            builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
+                    .storeRoot(config.getPrefix().getRootIdentifier());
+            shardDatastoreContext = builder.build();
+        } else {
+            shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
+                    peerAddressResolver).build();
+        }
+
+        final boolean shardWasInRecoveredSnapshot = currentSnapshot != null
+                && currentSnapshot.getShardList().contains(shardName);
+
+        final Map<String, String> peerAddresses = Collections.emptyMap();
+        final boolean isActiveMember = true;
+        LOG.debug("{} doCreatePrefixedShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+                persistenceId(), shardId, peerAddresses, isActiveMember);
+
+        final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
+                shardDatastoreContext, createPrefixedShard.getShardBuilder(), peerAddressResolver);
+        info.setActiveMember(isActiveMember);
+        localShards.put(info.getShardName(), info);
+
+        if (schemaContext != null) {
+            info.setActor(newShardActor(schemaContext, info));
+        }
+    }
+
+    private void doCreateShard(final CreateShard createShard) {
+        final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+        final String shardName = moduleShardConfig.getShardName();
 
         configuration.addModuleShardConfiguration(moduleShardConfig);
 
@@ -648,16 +716,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         if (currentSnapshot == null && restoreFromSnapshot != null
                 && restoreFromSnapshot.getShardManagerSnapshot() != null) {
-            try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
-                    restoreFromSnapshot.getShardManagerSnapshot()))) {
-                ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
+            ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
 
-                LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
+            LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
 
-                applyShardManagerSnapshot(snapshot);
-            } catch (Exception e) {
-                LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
-            }
+            applyShardManagerSnapshot(snapshot);
         }
 
         createLocalShards();
@@ -1077,6 +1140,38 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return false;
     }
 
+
+    // With this message the shard does NOT have to be preconfigured
+    // do a dynamic lookup if the shard exists somewhere and replicate
+    private void onAddPrefixShardReplica(final AddPrefixShardReplica shardReplicaMsg) {
+        final String shardName = ClusterUtils.getCleanShardName(shardReplicaMsg.getPrefix());
+
+        LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), shardReplicaMsg);
+
+        if (schemaContext == null) {
+            final String msg = String.format(
+                    "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+            LOG.debug("{}: {}", persistenceId(), msg);
+            getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+            return;
+        }
+
+        findPrimary(shardName,
+                new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
+                    @Override
+                    public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
+                        getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()),
+                                getTargetActor());
+                    }
+
+                    @Override
+                    public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
+                        sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
+                    }
+                }
+        );
+    }
+
     private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
         final String shardName = shardReplicaMsg.getShardName();
 
@@ -1352,7 +1447,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                     OnDemandRaftState raftState = (OnDemandRaftState) response;
                     Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
-                    for ( Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+                    for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
                         serverVotingStatusMap.put(e.getKey(), !e.getValue());
                     }