X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=d7bbc1ca3c19a7ffb59f8424635fcfc153ecd0cc;hp=b3dc07b0e5c3128d25375cd8d209e906c1a29596;hb=20a32e6459fd1e27e7669bf1ebc7742b96787b94;hpb=b66d5a3c59525a1c7885c3d653d9657a99f4103d diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index b3dc07b0e5..d7bbc1ca3c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -11,20 +11,24 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; +import akka.actor.ExtendedActorSystem; import akka.actor.Props; +import akka.actor.Status; import akka.actor.Status.Failure; +import akka.serialization.JavaSerializer; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Ticker; +import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Range; import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; @@ -34,23 +38,29 @@ import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.NotLeaderException; +import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException; import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; +import org.opendaylight.controller.cluster.access.concepts.SliceableMessage; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; import org.opendaylight.controller.cluster.common.actor.CommonConfig; +import org.opendaylight.controller.cluster.common.actor.Dispatchers; +import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherType; import org.opendaylight.controller.cluster.common.actor.MessageTracker; import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardDataTreeListenerInfoMXBeanImpl; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; @@ -61,21 +71,27 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionCh import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; -import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree; +import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal; import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; +import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; +import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; +import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot; +import org.opendaylight.controller.cluster.messaging.MessageAssembler; +import org.opendaylight.controller.cluster.messaging.MessageSlicer; +import org.opendaylight.controller.cluster.messaging.SliceOptions; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; +import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; @@ -83,13 +99,15 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -117,6 +135,13 @@ public class Shard extends RaftActor { } }; + static final Object RESUME_NEXT_PENDING_TRANSACTION = new Object() { + @Override + public String toString() { + return "resumeNextPendingTransaction"; + } + }; + // FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant. public static final String DEFAULT_NAME = "default"; @@ -139,6 +164,8 @@ public class Shard extends RaftActor { private final ShardStats shardMBean; + private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean; + private DatastoreContext datastoreContext; private final ShardCommitCoordinator commitCoordinator; @@ -163,8 +190,14 @@ public class Shard extends RaftActor { private final ShardTransactionMessageRetrySupport messageRetrySupport; - private final FrontendMetadata frontendMetadata = new FrontendMetadata(); - private final Map knownFrontends = new HashMap<>(); + private final FrontendMetadata frontendMetadata; + private Map knownFrontends = ImmutableMap.of(); + private boolean paused; + + private final MessageSlicer responseMessageSlicer; + private final Dispatchers dispatchers; + + private final MessageAssembler requestMessageAssembler; protected Shard(final AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), @@ -173,22 +206,23 @@ public class Shard extends RaftActor { this.name = builder.getId().toString(); this.datastoreContext = builder.getDatastoreContext(); this.restoreFromSnapshot = builder.getRestoreFromSnapshot(); + this.frontendMetadata = new FrontendMetadata(name); setPersistence(datastoreContext.isPersistent()); LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent()); ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher = - new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher"); + new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name); ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher = - new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher"); + new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name); if (builder.getDataTree() != null) { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(), - treeChangeListenerPublisher, dataChangeListenerPublisher, name); + treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata); } else { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(), builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, - dataChangeListenerPublisher, name); + dataChangeListenerPublisher, name, frontendMetadata); } shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this); @@ -207,14 +241,29 @@ public class Shard extends RaftActor { appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class, getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); + dispatchers = new Dispatchers(context().system().dispatchers()); transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext, - new Dispatchers(context().system().dispatchers()).getDispatcherPath( - Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean); + dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Transaction), + self(), getContext(), shardMBean, builder.getId().getShardName()); snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG, this.name); messageRetrySupport = new ShardTransactionMessageRetrySupport(this); + + responseMessageSlicer = MessageSlicer.builder().logContext(this.name) + .messageSliceSize(datastoreContext.getMaximumMessageSliceSize()) + .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory()) + .expireStateAfterInactivity(2, TimeUnit.MINUTES).build(); + + requestMessageAssembler = MessageAssembler.builder().logContext(this.name) + .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory()) + .assembledMessageCallback((message, sender) -> self().tell(message, sender)) + .expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build(); + + listenerInfoMXBean = new ShardDataTreeListenerInfoMXBeanImpl(name, datastoreContext.getDataStoreMXBeanType(), + self()); + listenerInfoMXBean.register(); } private void setTransactionCommitTimeout() { @@ -243,6 +292,7 @@ public class Shard extends RaftActor { commitCoordinator.abortPendingTransactions("Transaction aborted due to shutdown.", this); shardMBean.unregisterMBean(); + listenerInfoMXBean.unregister(); } @Override @@ -256,7 +306,6 @@ public class Shard extends RaftActor { } } - @SuppressWarnings("checkstyle:IllegalCatch") @Override protected void handleNonRaftCommand(final Object message) { try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) { @@ -266,23 +315,12 @@ public class Shard extends RaftActor { maybeError.get()); } - if (message instanceof RequestEnvelope) { - final long now = ticker().read(); - final RequestEnvelope envelope = (RequestEnvelope)message; + store.resetTransactionBatch(); - try { - final RequestSuccess success = handleRequest(envelope, now); - if (success != null) { - envelope.sendSuccess(success, ticker().read() - now); - } - } catch (RequestException e) { - LOG.debug("{}: request {} failed", persistenceId(), envelope, e); - envelope.sendFailure(e, ticker().read() - now); - } catch (Exception e) { - LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e); - envelope.sendFailure(new RuntimeRequestException("Request failed to process", e), - ticker().read() - now); - } + if (message instanceof RequestEnvelope) { + handleRequestEnvelope((RequestEnvelope)message); + } else if (MessageAssembler.isHandledMessage(message)) { + handleRequestAssemblerMessage(message); } else if (message instanceof ConnectClientRequest) { handleConnectClient((ConnectClientRequest)message); } else if (CreateTransaction.isSerializedType(message)) { @@ -311,8 +349,7 @@ public class Shard extends RaftActor { PeerAddressResolved resolved = (PeerAddressResolved) message; setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress()); } else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) { - store.checkForExpiredTransactions(transactionCommitTimeout); - commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this); + commitTimeoutCheck(); } else if (message instanceof DatastoreContext) { onDatastoreContext((DatastoreContext)message); } else if (message instanceof RegisterRoleChangeListener) { @@ -331,18 +368,113 @@ public class Shard extends RaftActor { } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) { store.processCohortRegistryCommand(getSender(), (DataTreeCohortActorRegistry.CohortRegistryCommand) message); - } else { + } else if (message instanceof PersistAbortTransactionPayload) { + final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId(); + persistPayload(txId, AbortTransactionPayload.create(txId), true); + } else if (message instanceof MakeLeaderLocal) { + onMakeLeaderLocal(); + } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) { + store.resumeNextPendingTransaction(); + } else if (!responseMessageSlicer.handleMessage(message)) { super.handleNonRaftCommand(message); } } } + private void handleRequestAssemblerMessage(final Object message) { + dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> { + JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) context().system()); + requestMessageAssembler.handleMessage(message, self()); + }); + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private void handleRequestEnvelope(final RequestEnvelope envelope) { + final long now = ticker().read(); + try { + final RequestSuccess success = handleRequest(envelope, now); + if (success != null) { + final long executionTimeNanos = ticker().read() - now; + if (success instanceof SliceableMessage) { + dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> + responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget()) + .message(envelope.newSuccessEnvelope(success, executionTimeNanos)) + .sendTo(envelope.getMessage().getReplyTo()).replyTo(self()) + .onFailureCallback(t -> { + LOG.warn("Error slicing response {}", success, t); + }).build())); + } else { + envelope.sendSuccess(success, executionTimeNanos); + } + } + } catch (RequestException e) { + LOG.debug("{}: request {} failed", persistenceId(), envelope, e); + envelope.sendFailure(e, ticker().read() - now); + } catch (Exception e) { + LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e); + envelope.sendFailure(new RuntimeRequestException("Request failed to process", e), + ticker().read() - now); + } + } + + private void commitTimeoutCheck() { + store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess); + commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this); + requestMessageAssembler.checkExpiredAssembledMessageState(); + } + + private Optional updateAccess(final SimpleShardDataTreeCohort cohort) { + final FrontendIdentifier frontend = cohort.getIdentifier().getHistoryId().getClientId().getFrontendId(); + final LeaderFrontendState state = knownFrontends.get(frontend); + if (state == null) { + // Not tell-based protocol, do nothing + return Optional.absent(); + } + + if (isIsolatedLeader()) { + // We are isolated and no new request can come through until we emerge from it. We are still updating + // liveness of frontend when we see it attempting to communicate. Use the last access timer. + return Optional.of(state.getLastSeenTicks()); + } + + // If this frontend has freshly connected, give it some time to catch up before killing its transactions. + return Optional.of(state.getLastConnectTicks()); + } + + private void onMakeLeaderLocal() { + LOG.debug("{}: onMakeLeaderLocal received", persistenceId()); + if (isLeader()) { + getSender().tell(new Status.Success(null), getSelf()); + return; + } + + final ActorSelection leader = getLeader(); + + if (leader == null) { + // Leader is not present. The cluster is most likely trying to + // elect a leader and we should let that run its normal course + + // TODO we can wait for the election to complete and retry the + // request. We can also let the caller retry by sending a flag + // in the response indicating the request is "reTryable". + getSender().tell(new Failure( + new LeadershipTransferFailedException("We cannot initiate leadership transfer to local node. " + + "Currently there is no leader for " + persistenceId())), + getSelf()); + return; + } + + leader.tell(new RequestLeadership(getId(), getSender()), getSelf()); + } + // Acquire our frontend tracking handle and verify generation matches - private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException { + @Nullable + private LeaderFrontendState findFrontend(final ClientIdentifier clientId) throws RequestException { final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId()); if (existing != null) { final int cmp = Long.compareUnsigned(existing.getIdentifier().getGeneration(), clientId.getGeneration()); if (cmp == 0) { + existing.touch(); return existing; } if (cmp > 0) { @@ -357,13 +489,21 @@ public class Shard extends RaftActor { LOG.debug("{}: client {} is not yet known", persistenceId(), clientId); } - final LeaderFrontendState ret = new LeaderFrontendState(persistenceId(), clientId, store); - knownFrontends.put(clientId.getFrontendId(), ret); - LOG.debug("{}: created state {} for client {}", persistenceId(), ret, clientId); - return ret; + return null; } - private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) { + private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException { + final LeaderFrontendState ret = findFrontend(clientId); + if (ret != null) { + return ret; + } + + // TODO: a dedicated exception would be better, but this is technically true, too + throw new OutOfSequenceEnvelopeException(0); + } + + @Nonnull + private static ABIVersion selectVersion(final ConnectClientRequest message) { final Range clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion()); for (ABIVersion v : SUPPORTED_ABIVERSIONS) { if (clientRange.contains(v)) { @@ -379,13 +519,29 @@ public class Shard extends RaftActor { @SuppressWarnings("checkstyle:IllegalCatch") private void handleConnectClient(final ConnectClientRequest message) { try { + final ClientIdentifier clientId = message.getTarget(); + final LeaderFrontendState existing = findFrontend(clientId); + if (existing != null) { + existing.touch(); + } + if (!isLeader() || !isLeaderActive()) { - LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), message); + LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {}," + + "isLeadershipTransferInProgress: {}.", + persistenceId(), message, isLeader(), isLeaderActive(), isLeadershipTransferInProgress()); throw new NotLeaderException(getSelf()); } final ABIVersion selectedVersion = selectVersion(message); - final LeaderFrontendState frontend = getFrontend(message.getTarget()); + final LeaderFrontendState frontend; + if (existing == null) { + frontend = new LeaderFrontendState(persistenceId(), clientId, store); + knownFrontends.put(clientId.getFrontendId(), frontend); + LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId); + } else { + frontend = existing; + } + frontend.reconnect(); message.getReplyTo().tell(new ConnectClientSuccess(message.getTarget(), message.getSequence(), getSelf(), ImmutableList.of(), store.getDataTree(), CLIENT_MAX_MESSAGES).toVersion(selectedVersion), @@ -395,11 +551,14 @@ public class Shard extends RaftActor { } } - private @Nullable RequestSuccess handleRequest(final RequestEnvelope envelope, final long now) + @Nullable + private RequestSuccess handleRequest(final RequestEnvelope envelope, final long now) throws RequestException { // We are not the leader, hence we want to fail-fast. - if (!isLeader() || !isLeaderActive()) { - LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope); + if (!isLeader() || paused || !isLeaderActive()) { + LOG.debug("{}: not currently active leader, rejecting request {}. isLeader: {}, isLeaderActive: {}," + + "isLeadershipTransferInProgress: {}, paused: {}", + persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress(), paused); throw new NotLeaderException(getSelf()); } @@ -413,7 +572,7 @@ public class Shard extends RaftActor { final ClientIdentifier clientId = lhReq.getTarget().getClientId(); return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now); } else { - LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request); + LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request); throw new UnsupportedRequestException(request); } } @@ -453,13 +612,13 @@ public class Shard extends RaftActor { } // applyState() will be invoked once consensus is reached on the payload - void persistPayload(final TransactionIdentifier transactionId, final Payload payload, boolean batchHint) { + void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) { boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable(); if (canSkipPayload) { - applyState(self(), transactionId, payload); + applyState(self(), id, payload); } else { // We are faking the sender - persistData(self(), transactionId, payload, batchHint); + persistData(self(), id, payload, batchHint); } } @@ -526,7 +685,7 @@ public class Shard extends RaftActor { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) { messageRetrySupport.addMessageToRetry(batched, getSender(), - "Could not commit transaction " + batched.getTransactionId()); + "Could not process BatchedModifications " + batched.getTransactionId()); } else { // If this is not the first batch and leadership changed in between batched messages, // we need to reconstruct previous BatchedModifications from the transaction @@ -579,7 +738,7 @@ public class Shard extends RaftActor { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) { messageRetrySupport.addMessageToRetry(message, getSender(), - "Could not commit transaction " + message.getTransactionId()); + "Could not process ready local transaction " + message.getTransactionId()); } else { LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader); message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); @@ -598,7 +757,7 @@ public class Shard extends RaftActor { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) { messageRetrySupport.addMessageToRetry(forwardedReady, getSender(), - "Could not commit transaction " + forwardedReady.getTransactionId()); + "Could not process forwarded ready transaction " + forwardedReady.getTransactionId()); } else { LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader); @@ -614,7 +773,7 @@ public class Shard extends RaftActor { doAbortTransaction(abort.getTransactionId(), getSender()); } - void doAbortTransaction(final TransactionIdentifier transactionID, final ActorRef sender) { + void doAbortTransaction(final Identifier transactionID, final ActorRef sender) { commitCoordinator.handleAbort(transactionID, sender, this); } @@ -630,7 +789,9 @@ public class Shard extends RaftActor { } private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) { - store.closeTransactionChain(closeTransactionChain.getIdentifier()); + final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier(); + store.closeTransactionChain(id, null); + store.purgeTransactionChain(id, null); } @SuppressWarnings("checkstyle:IllegalCatch") @@ -680,8 +841,11 @@ public class Shard extends RaftActor { @Override @Nonnull protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { - return new ShardRecoveryCoordinator(store, - restoreFromSnapshot != null ? restoreFromSnapshot.getSnapshot() : null, persistenceId(), LOG); + if (restoreFromSnapshot == null) { + return ShardRecoveryCoordinator.create(store, persistenceId(), LOG); + } + + return ShardRecoveryCoordinator.forSnapshot(store, persistenceId(), LOG, restoreFromSnapshot.getSnapshot()); } @Override @@ -730,7 +894,8 @@ public class Shard extends RaftActor { persistenceId(), getId()); } - store.closeAllTransactionChains(); + paused = false; + store.purgeLeaderState(); } if (hasLeader && !isIsolatedLeader()) { @@ -741,9 +906,21 @@ public class Shard extends RaftActor { @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) { shardMBean.incrementLeadershipChangeCount(); + paused = false; + + if (!isLeader()) { + if (!knownFrontends.isEmpty()) { + LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet()); + knownFrontends = ImmutableMap.of(); + } + + requestMessageAssembler.close(); + + if (!hasLeader()) { + // No leader anywhere, nothing else to do + return; + } - boolean hasLeader = hasLeader(); - if (hasLeader && !isLeader()) { // Another leader was elected. If we were the previous leader and had pending transactions, convert // them to transaction messages and send to the new leader. ActorSelection leader = getLeader(); @@ -762,9 +939,13 @@ public class Shard extends RaftActor { commitCoordinator.abortPendingTransactions("The transacton was aborted due to inflight leadership " + "change and the leader address isn't available.", this); } + } else { + // We have become the leader, we need to reconstruct frontend state + knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this)); + LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet()); } - if (hasLeader && !isIsolatedLeader()) { + if (!isIsolatedLeader()) { messageRetrySupport.retryMessages(); } } @@ -782,11 +963,28 @@ public class Shard extends RaftActor { @Override protected void pauseLeader(final Runnable operation) { LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation); + paused = true; + + // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused. + knownFrontends.values().forEach(LeaderFrontendState::retire); + knownFrontends = ImmutableMap.of(); + store.setRunOnPendingTransactionsComplete(operation); } @Override - protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { + protected void unpauseLeader() { + LOG.debug("{}: In unpauseLeader", persistenceId()); + paused = false; + + store.setRunOnPendingTransactionsComplete(null); + + // Restore tell-based protocol state as if we were becoming the leader + knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this)); + } + + @Override + protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors()) .dataChangeListenerActors(changeSupport.getListenerActors()) .commitCohortActors(store.getCohortActors()); @@ -825,9 +1023,9 @@ public class Shard extends RaftActor { private ShardIdentifier id; private Map peerAddresses = Collections.emptyMap(); private DatastoreContext datastoreContext; - private SchemaContext schemaContext; + private SchemaContextProvider schemaContextProvider; private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot; - private TipProducingDataTree dataTree; + private DataTree dataTree; private volatile boolean sealed; protected AbstractBuilder(final Class shardClass) { @@ -861,9 +1059,9 @@ public class Shard extends RaftActor { return self(); } - public T schemaContext(final SchemaContext newSchemaContext) { + public T schemaContextProvider(final SchemaContextProvider newSchemaContextProvider) { checkSealed(); - this.schemaContext = newSchemaContext; + this.schemaContextProvider = Preconditions.checkNotNull(newSchemaContextProvider); return self(); } @@ -873,7 +1071,7 @@ public class Shard extends RaftActor { return self(); } - public T dataTree(final TipProducingDataTree newDataTree) { + public T dataTree(final DataTree newDataTree) { checkSealed(); this.dataTree = newDataTree; return self(); @@ -892,14 +1090,14 @@ public class Shard extends RaftActor { } public SchemaContext getSchemaContext() { - return schemaContext; + return Verify.verifyNotNull(schemaContextProvider.getSchemaContext()); } public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() { return restoreFromSnapshot; } - public TipProducingDataTree getDataTree() { + public DataTree getDataTree() { return dataTree; } @@ -919,7 +1117,7 @@ public class Shard extends RaftActor { Preconditions.checkNotNull(id, "id should not be null"); Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null"); Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null"); - Preconditions.checkNotNull(schemaContext, "schemaContext should not be null"); + Preconditions.checkNotNull(schemaContextProvider, "schemaContextProvider should not be null"); } public Props props() { @@ -930,7 +1128,7 @@ public class Shard extends RaftActor { } public static class Builder extends AbstractBuilder { - private Builder() { + Builder() { super(Shard.class); } } @@ -938,4 +1136,8 @@ public class Shard extends RaftActor { Ticker ticker() { return Ticker.systemTicker(); } + + void scheduleNextPendingTransaction() { + self().tell(RESUME_NEXT_PENDING_TRANSACTION, ActorRef.noSender()); + } }