X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=6ce7a8e83ab8b2355afd9d0f0c7f5caf6cf704f9;hp=8003e2b4d3163d92db8e588458f71bf8dbd2d0d2;hb=e84f63ee098fff5b02cbce1281ca0d1208f966fa;hpb=817d0efe25becd8d457550b11bf985298e169954 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 8003e2b4d3..6ce7a8e83a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -16,18 +16,21 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; import akka.actor.ExtendedActorSystem; +import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; import akka.actor.Status.Failure; +import akka.persistence.RecoveryCompleted; +import akka.persistence.SnapshotOffer; import akka.serialization.JavaSerializer; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Ticker; -import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Range; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -64,6 +67,7 @@ import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherTy import org.opendaylight.controller.cluster.common.actor.MessageTracker; import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; +import org.opendaylight.controller.cluster.datastore.actors.JsonExportActor; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; @@ -74,6 +78,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionCh import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.GetKnownClients; import org.opendaylight.controller.cluster.datastore.messages.GetKnownClientsReply; @@ -99,16 +104,18 @@ import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery; import org.opendaylight.yangtools.concepts.Identifier; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; +import org.opendaylight.yangtools.yang.data.tree.api.DataTree; +import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.tree.api.TreeType; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContextProvider; import scala.concurrent.duration.FiniteDuration; @@ -119,6 +126,7 @@ import scala.concurrent.duration.FiniteDuration; *

* Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it */ +// FIXME: non-final for testing? public class Shard extends RaftActor { @VisibleForTesting @@ -206,15 +214,32 @@ public class Shard extends RaftActor { private final MessageAssembler requestMessageAssembler; - protected Shard(final AbstractBuilder builder) { + private final ExportOnRecovery exportOnRecovery; + + private final ActorRef exportActor; + + @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design") + Shard(final AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION); - this.name = builder.getId().toString(); - this.shardName = builder.getId().getShardName(); - this.datastoreContext = builder.getDatastoreContext(); - this.restoreFromSnapshot = builder.getRestoreFromSnapshot(); - this.frontendMetadata = new FrontendMetadata(name); + name = builder.getId().toString(); + shardName = builder.getId().getShardName(); + datastoreContext = builder.getDatastoreContext(); + restoreFromSnapshot = builder.getRestoreFromSnapshot(); + frontendMetadata = new FrontendMetadata(name); + exportOnRecovery = datastoreContext.getExportOnRecovery(); + + switch (exportOnRecovery) { + case Json: + exportActor = getContext().actorOf(JsonExportActor.props(builder.getSchemaContext(), + datastoreContext.getRecoveryExportBaseDir())); + break; + case Off: + default: + exportActor = null; + break; + } setPersistence(datastoreContext.isPersistent()); @@ -232,13 +257,13 @@ public class Shard extends RaftActor { frontendMetadata); } - shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this); + shardMBean = ShardStats.create(name, datastoreContext.getDataStoreMXBeanType(), this); if (isMetricsCaptureEnabled()) { getContext().become(new MeteringBehavior(this)); } - commitCoordinator = new ShardCommitCoordinator(store, LOG, this.name); + commitCoordinator = new ShardCommitCoordinator(store, LOG, name); setTransactionCommitTimeout(); @@ -254,16 +279,16 @@ public class Shard extends RaftActor { self(), getContext(), shardMBean, builder.getId().getShardName()); snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG, - this.name, datastoreContext); + name, datastoreContext); messageRetrySupport = new ShardTransactionMessageRetrySupport(this); - responseMessageSlicer = MessageSlicer.builder().logContext(this.name) + responseMessageSlicer = MessageSlicer.builder().logContext(name) .messageSliceSize(datastoreContext.getMaximumMessageSliceSize()) .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory()) .expireStateAfterInactivity(2, TimeUnit.MINUTES).build(); - requestMessageAssembler = MessageAssembler.builder().logContext(this.name) + requestMessageAssembler = MessageAssembler.builder().logContext(name) .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory()) .assembledMessageCallback((message, sender) -> self().tell(message, sender)) .expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build(); @@ -285,7 +310,7 @@ public class Shard extends RaftActor { } @Override - public void postStop() throws Exception { + public final void postStop() throws Exception { LOG.info("Stopping Shard {}", persistenceId()); super.postStop(); @@ -303,17 +328,37 @@ public class Shard extends RaftActor { } @Override - protected void handleRecover(final Object message) { + protected final void handleRecover(final Object message) { LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(), message.getClass(), getSender()); super.handleRecover(message); + + switch (exportOnRecovery) { + case Json: + if (message instanceof SnapshotOffer) { + exportActor.tell(new JsonExportActor.ExportSnapshot(store.readCurrentData().get(), name), + ActorRef.noSender()); + } else if (message instanceof ReplicatedLogEntry) { + exportActor.tell(new JsonExportActor.ExportJournal((ReplicatedLogEntry) message), + ActorRef.noSender()); + } else if (message instanceof RecoveryCompleted) { + exportActor.tell(new JsonExportActor.FinishExport(name), ActorRef.noSender()); + exportActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + break; + case Off: + default: + break; + } + if (LOG.isTraceEnabled()) { appendEntriesReplyTracker.begin(); } } @Override + // non-final for TestShard protected void handleNonRaftCommand(final Object message) { try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) { final Optional maybeError = context.error(); @@ -346,6 +391,8 @@ public class Shard extends RaftActor { handleAbortTransaction(AbortTransaction.fromSerializable(message)); } else if (CloseTransactionChain.isSerializedType(message)) { closeTransactionChain(CloseTransactionChain.fromSerializable(message)); + } else if (message instanceof DataTreeChangedReply) { + // Ignore reply } else if (message instanceof RegisterDataTreeChangeListener) { treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader()); } else if (message instanceof UpdateSchemaContext) { @@ -621,31 +668,31 @@ public class Shard extends RaftActor { return getLeaderId() != null; } - public int getPendingTxCommitQueueSize() { + final int getPendingTxCommitQueueSize() { return store.getQueueSize(); } - public int getCohortCacheSize() { + final int getCohortCacheSize() { return commitCoordinator.getCohortCacheSize(); } @Override - protected Optional getRoleChangeNotifier() { + protected final Optional getRoleChangeNotifier() { return roleChangeNotifier; } - String getShardName() { + final String getShardName() { return shardName; } @Override - protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, + protected final LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, final short leaderPayloadVersion) { return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion) : new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion); } - protected void onDatastoreContext(final DatastoreContext context) { + private void onDatastoreContext(final DatastoreContext context) { datastoreContext = verifyNotNull(context); setTransactionCommitTimeout(); @@ -656,8 +703,9 @@ public class Shard extends RaftActor { } // applyState() will be invoked once consensus is reached on the payload + // non-final for mocking void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) { - boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable(); + final boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable(); if (canSkipPayload) { applyState(self(), id, payload); } else { @@ -702,7 +750,7 @@ public class Shard extends RaftActor { } @SuppressWarnings("checkstyle:IllegalCatch") - protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) { + private void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) { askProtocolEncountered(batched.getTransactionId()); try { @@ -772,22 +820,23 @@ public class Shard extends RaftActor { @SuppressWarnings("checkstyle:IllegalCatch") private void handleReadyLocalTransaction(final ReadyLocalTransaction message) { - LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionId()); + final TransactionIdentifier txId = message.getTransactionId(); + LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), txId); boolean isLeaderActive = isLeaderActive(); if (isLeader() && isLeaderActive) { + askProtocolEncountered(txId); try { commitCoordinator.handleReadyLocalTransaction(message, getSender(), this); } catch (Exception e) { - LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), - message.getTransactionId(), e); + LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), txId, e); getSender().tell(new Failure(e), getSelf()); } } else { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) { messageRetrySupport.addMessageToRetry(message, getSender(), - "Could not process ready local transaction " + message.getTransactionId()); + "Could not process ready local transaction " + txId); } else { LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader); message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); @@ -826,7 +875,7 @@ public class Shard extends RaftActor { doAbortTransaction(transactionId, getSender()); } - void doAbortTransaction(final Identifier transactionID, final ActorRef sender) { + final void doAbortTransaction(final Identifier transactionID, final ActorRef sender) { commitCoordinator.handleAbort(transactionID, sender, this); } @@ -915,13 +964,12 @@ public class Shard extends RaftActor { } @Override - @VisibleForTesting - public RaftActorSnapshotCohort getRaftActorSnapshotCohort() { + protected final RaftActorSnapshotCohort getRaftActorSnapshotCohort() { return snapshotCohort; } @Override - protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { + protected final RaftActorRecoveryCohort getRaftActorRecoveryCohort() { if (restoreFromSnapshot == null) { return ShardRecoveryCoordinator.create(store, persistenceId(), LOG); } @@ -930,6 +978,7 @@ public class Shard extends RaftActor { } @Override + // non-final for testing protected void onRecoveryComplete() { restoreFromSnapshot = null; @@ -948,7 +997,7 @@ public class Shard extends RaftActor { } @Override - protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { + protected final void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { if (data instanceof Payload) { if (data instanceof DisableTrackingPayload) { disableTracking((DisableTrackingPayload) data); @@ -966,7 +1015,7 @@ public class Shard extends RaftActor { } @Override - protected void onStateChanged() { + protected final void onStateChanged() { boolean isLeader = isLeader(); boolean hasLeader = hasLeader(); treeChangeSupport.onLeadershipChange(isLeader, hasLeader); @@ -989,7 +1038,7 @@ public class Shard extends RaftActor { } @Override - protected void onLeaderChanged(final String oldLeader, final String newLeader) { + protected final void onLeaderChanged(final String oldLeader, final String newLeader) { shardMBean.incrementLeadershipChangeCount(); paused = false; @@ -1010,7 +1059,9 @@ public class Shard extends RaftActor { // them to transaction messages and send to the new leader. ActorSelection leader = getLeader(); if (leader != null) { - Collection messagesToForward = convertPendingTransactionsToMessages(); + // Clears all pending transactions and converts them to messages to be forwarded to a new leader. + Collection messagesToForward = commitCoordinator.convertPendingTransactionsToMessages( + datastoreContext.getShardBatchedModificationCount()); if (!messagesToForward.isEmpty()) { LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(), @@ -1028,7 +1079,7 @@ public class Shard extends RaftActor { } } else { // We have become the leader, we need to reconstruct frontend state - knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this)); + knownFrontends = verifyNotNull(frontendMetadata.toLeaderState(this)); LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet()); } @@ -1037,18 +1088,8 @@ public class Shard extends RaftActor { } } - /** - * Clears all pending transactions and converts them to messages to be forwarded to a new leader. - * - * @return the converted messages - */ - public Collection convertPendingTransactionsToMessages() { - return commitCoordinator.convertPendingTransactionsToMessages( - datastoreContext.getShardBatchedModificationCount()); - } - @Override - protected void pauseLeader(final Runnable operation) { + protected final void pauseLeader(final Runnable operation) { LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation); paused = true; @@ -1062,29 +1103,30 @@ public class Shard extends RaftActor { } @Override - protected void unpauseLeader() { + protected final void unpauseLeader() { LOG.debug("{}: In unpauseLeader", persistenceId()); paused = false; store.setRunOnPendingTransactionsComplete(null); // Restore tell-based protocol state as if we were becoming the leader - knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this)); + knownFrontends = verifyNotNull(frontendMetadata.toLeaderState(this)); } @Override - protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { - return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors()) - .commitCohortActors(store.getCohortActors()); + protected final OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { + return OnDemandShardState.newBuilder() + .treeChangeListenerActors(treeChangeSupport.getListenerActors()) + .commitCohortActors(store.getCohortActors()); } @Override - public String persistenceId() { - return this.name; + public final String persistenceId() { + return name; } @Override - public String journalPluginId() { + public final String journalPluginId() { // This method may be invoked from super constructor (wonderful), hence we also need to handle the case of // the field being uninitialized because our constructor is not finished. if (datastoreContext != null && !datastoreContext.isPersistent()) { @@ -1094,20 +1136,22 @@ public class Shard extends RaftActor { } @VisibleForTesting - ShardCommitCoordinator getCommitCoordinator() { + final ShardCommitCoordinator getCommitCoordinator() { return commitCoordinator; } - public DatastoreContext getDatastoreContext() { + // non-final for mocking + DatastoreContext getDatastoreContext() { return datastoreContext; } @VisibleForTesting - public ShardDataTree getDataStore() { + final ShardDataTree getDataStore() { return store; } @VisibleForTesting + // non-final for mocking ShardStats getShardMBean() { return shardMBean; } @@ -1127,12 +1171,12 @@ public class Shard extends RaftActor { private volatile boolean sealed; - protected AbstractBuilder(final Class shardClass) { + AbstractBuilder(final Class shardClass) { this.shardClass = shardClass; } - protected void checkSealed() { - checkState(!sealed, "Builder isalready sealed - further modifications are not allowed"); + final void checkSealed() { + checkState(!sealed, "Builder is already sealed - further modifications are not allowed"); } @SuppressWarnings("unchecked") @@ -1189,7 +1233,7 @@ public class Shard extends RaftActor { } public EffectiveModelContext getSchemaContext() { - return Verify.verifyNotNull(schemaContextProvider.getEffectiveModelContext()); + return verifyNotNull(schemaContextProvider.getEffectiveModelContext()); } public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {