import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import akka.actor.ExtendedActorSystem;
+import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.Status.Failure;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SnapshotOffer;
import akka.serialization.JavaSerializer;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import org.opendaylight.controller.cluster.common.actor.MessageTracker;
import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
+import org.opendaylight.controller.cluster.datastore.actors.JsonExportActor;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
private final MessageAssembler requestMessageAssembler;
+ private final ExportOnRecovery exportOnRecovery;
+
+ private final ActorRef exportActor;
+
protected Shard(final AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
this.datastoreContext = builder.getDatastoreContext();
this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
this.frontendMetadata = new FrontendMetadata(name);
+ this.exportOnRecovery = datastoreContext.getExportOnRecovery();
+
+ switch (exportOnRecovery) {
+ case Json:
+ exportActor = getContext().actorOf(JsonExportActor.props(builder.getSchemaContext(),
+ datastoreContext.getRecoveryExportBaseDir()));
+ break;
+ case Off:
+ default:
+ exportActor = null;
+ break;
+ }
setPersistence(datastoreContext.isPersistent());
getSender());
super.handleRecover(message);
+
+ switch (exportOnRecovery) {
+ case Json:
+ if (message instanceof SnapshotOffer) {
+ exportActor.tell(new JsonExportActor.ExportSnapshot(store.readCurrentData().get(), name),
+ ActorRef.noSender());
+ } else if (message instanceof ReplicatedLogEntry) {
+ exportActor.tell(new JsonExportActor.ExportJournal((ReplicatedLogEntry) message),
+ ActorRef.noSender());
+ } else if (message instanceof RecoveryCompleted) {
+ exportActor.tell(new JsonExportActor.FinishExport(name), ActorRef.noSender());
+ exportActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+ break;
+ case Off:
+ default:
+ break;
+ }
+
if (LOG.isTraceEnabled()) {
appendEntriesReplyTracker.begin();
}