Snapshot and journal export on recovery
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 9c548e55f7c2a57bd50ac7c37763798afac6dcc3..fb0ef03d1844c8e48b1c02fd77b880179306fbb0 100644 (file)
@@ -16,9 +16,12 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import akka.actor.ExtendedActorSystem;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.Status.Failure;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SnapshotOffer;
 import akka.serialization.JavaSerializer;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
@@ -64,6 +67,7 @@ import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherTy
 import org.opendaylight.controller.cluster.common.actor.MessageTracker;
 import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
+import org.opendaylight.controller.cluster.datastore.actors.JsonExportActor;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
@@ -99,12 +103,14 @@ import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
@@ -206,6 +212,10 @@ public class Shard extends RaftActor {
 
     private final MessageAssembler requestMessageAssembler;
 
+    private final ExportOnRecovery exportOnRecovery;
+
+    private final ActorRef exportActor;
+
     protected Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
@@ -215,6 +225,18 @@ public class Shard extends RaftActor {
         this.datastoreContext = builder.getDatastoreContext();
         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
         this.frontendMetadata = new FrontendMetadata(name);
+        this.exportOnRecovery = datastoreContext.getExportOnRecovery();
+
+        switch (exportOnRecovery) {
+            case Json:
+                exportActor = getContext().actorOf(JsonExportActor.props(builder.getSchemaContext(),
+                        datastoreContext.getRecoveryExportBaseDir()));
+                break;
+            case Off:
+            default:
+                exportActor = null;
+                break;
+        }
 
         setPersistence(datastoreContext.isPersistent());
 
@@ -308,6 +330,25 @@ public class Shard extends RaftActor {
             getSender());
 
         super.handleRecover(message);
+
+        switch (exportOnRecovery) {
+            case Json:
+                if (message instanceof SnapshotOffer) {
+                    exportActor.tell(new JsonExportActor.ExportSnapshot(store.readCurrentData().get(), name),
+                            ActorRef.noSender());
+                } else if (message instanceof ReplicatedLogEntry) {
+                    exportActor.tell(new JsonExportActor.ExportJournal((ReplicatedLogEntry) message),
+                            ActorRef.noSender());
+                } else if (message instanceof RecoveryCompleted) {
+                    exportActor.tell(new JsonExportActor.FinishExport(name), ActorRef.noSender());
+                    exportActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+                }
+                break;
+            case Off:
+            default:
+                break;
+        }
+
         if (LOG.isTraceEnabled()) {
             appendEntriesReplyTracker.begin();
         }