X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=6ce7a8e83ab8b2355afd9d0f0c7f5caf6cf704f9;hp=8003e2b4d3163d92db8e588458f71bf8dbd2d0d2;hb=e84f63ee098fff5b02cbce1281ca0d1208f966fa;hpb=817d0efe25becd8d457550b11bf985298e169954 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 8003e2b4d3..6ce7a8e83a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -16,18 +16,21 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; import akka.actor.ExtendedActorSystem; +import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; import akka.actor.Status.Failure; +import akka.persistence.RecoveryCompleted; +import akka.persistence.SnapshotOffer; import akka.serialization.JavaSerializer; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Ticker; -import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Range; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -64,6 +67,7 @@ import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherTy import org.opendaylight.controller.cluster.common.actor.MessageTracker; import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; +import org.opendaylight.controller.cluster.datastore.actors.JsonExportActor; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; @@ -74,6 +78,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionCh import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.GetKnownClients; import org.opendaylight.controller.cluster.datastore.messages.GetKnownClientsReply; @@ -99,16 +104,18 @@ import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery; import org.opendaylight.yangtools.concepts.Identifier; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; +import org.opendaylight.yangtools.yang.data.tree.api.DataTree; +import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.tree.api.TreeType; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContextProvider; import scala.concurrent.duration.FiniteDuration; @@ -119,6 +126,7 @@ import scala.concurrent.duration.FiniteDuration; *
* Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it
*/
+// FIXME: non-final for testing?
public class Shard extends RaftActor {
@VisibleForTesting
@@ -206,15 +214,32 @@ public class Shard extends RaftActor {
private final MessageAssembler requestMessageAssembler;
- protected Shard(final AbstractBuilder, ?> builder) {
+ private final ExportOnRecovery exportOnRecovery;
+
+ private final ActorRef exportActor;
+
+ @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design")
+ Shard(final AbstractBuilder, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
- this.name = builder.getId().toString();
- this.shardName = builder.getId().getShardName();
- this.datastoreContext = builder.getDatastoreContext();
- this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
- this.frontendMetadata = new FrontendMetadata(name);
+ name = builder.getId().toString();
+ shardName = builder.getId().getShardName();
+ datastoreContext = builder.getDatastoreContext();
+ restoreFromSnapshot = builder.getRestoreFromSnapshot();
+ frontendMetadata = new FrontendMetadata(name);
+ exportOnRecovery = datastoreContext.getExportOnRecovery();
+
+ switch (exportOnRecovery) {
+ case Json:
+ exportActor = getContext().actorOf(JsonExportActor.props(builder.getSchemaContext(),
+ datastoreContext.getRecoveryExportBaseDir()));
+ break;
+ case Off:
+ default:
+ exportActor = null;
+ break;
+ }
setPersistence(datastoreContext.isPersistent());
@@ -232,13 +257,13 @@ public class Shard extends RaftActor {
frontendMetadata);
}
- shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this);
+ shardMBean = ShardStats.create(name, datastoreContext.getDataStoreMXBeanType(), this);
if (isMetricsCaptureEnabled()) {
getContext().become(new MeteringBehavior(this));
}
- commitCoordinator = new ShardCommitCoordinator(store, LOG, this.name);
+ commitCoordinator = new ShardCommitCoordinator(store, LOG, name);
setTransactionCommitTimeout();
@@ -254,16 +279,16 @@ public class Shard extends RaftActor {
self(), getContext(), shardMBean, builder.getId().getShardName());
snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
- this.name, datastoreContext);
+ name, datastoreContext);
messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
- responseMessageSlicer = MessageSlicer.builder().logContext(this.name)
+ responseMessageSlicer = MessageSlicer.builder().logContext(name)
.messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
.fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
.expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
- requestMessageAssembler = MessageAssembler.builder().logContext(this.name)
+ requestMessageAssembler = MessageAssembler.builder().logContext(name)
.fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
.assembledMessageCallback((message, sender) -> self().tell(message, sender))
.expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build();
@@ -285,7 +310,7 @@ public class Shard extends RaftActor {
}
@Override
- public void postStop() throws Exception {
+ public final void postStop() throws Exception {
LOG.info("Stopping Shard {}", persistenceId());
super.postStop();
@@ -303,17 +328,37 @@ public class Shard extends RaftActor {
}
@Override
- protected void handleRecover(final Object message) {
+ protected final void handleRecover(final Object message) {
LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(), message.getClass(),
getSender());
super.handleRecover(message);
+
+ switch (exportOnRecovery) {
+ case Json:
+ if (message instanceof SnapshotOffer) {
+ exportActor.tell(new JsonExportActor.ExportSnapshot(store.readCurrentData().get(), name),
+ ActorRef.noSender());
+ } else if (message instanceof ReplicatedLogEntry) {
+ exportActor.tell(new JsonExportActor.ExportJournal((ReplicatedLogEntry) message),
+ ActorRef.noSender());
+ } else if (message instanceof RecoveryCompleted) {
+ exportActor.tell(new JsonExportActor.FinishExport(name), ActorRef.noSender());
+ exportActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+ break;
+ case Off:
+ default:
+ break;
+ }
+
if (LOG.isTraceEnabled()) {
appendEntriesReplyTracker.begin();
}
}
@Override
+ // non-final for TestShard
protected void handleNonRaftCommand(final Object message) {
try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
final Optional