X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=959b37cb7b03c2d5cc0cf2d410965b3c39bb6223;hb=b5cb353e3553a39f576c284119af75ffa5ea66a9;hp=26f18bd932ec27ef81de5d08428d70b03ff9ef60;hpb=a81d98f692b80c45bce3fe6a87e731abfb012a9f;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 26f18bd932..959b37cb7b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -12,19 +12,21 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; import akka.actor.Props; +import akka.actor.Status; import akka.actor.Status.Failure; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Ticker; +import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Range; import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; @@ -37,6 +39,7 @@ import org.opendaylight.controller.cluster.access.commands.NotLeaderException; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; @@ -46,6 +49,7 @@ import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestExcepti import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; import org.opendaylight.controller.cluster.common.actor.CommonConfig; +import org.opendaylight.controller.cluster.common.actor.Dispatchers; import org.opendaylight.controller.cluster.common.actor.MessageTracker; import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; @@ -61,26 +65,32 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionCh import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; -import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree; +import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal; +import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; +import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; +import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; +import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; +import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; @@ -88,6 +98,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailed import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -115,6 +126,13 @@ public class Shard extends RaftActor { } }; + static final Object RESUME_NEXT_PENDING_TRANSACTION = new Object() { + @Override + public String toString() { + return "resumeNextPendingTransaction"; + } + }; + // FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant. public static final String DEFAULT_NAME = "default"; @@ -161,8 +179,8 @@ public class Shard extends RaftActor { private final ShardTransactionMessageRetrySupport messageRetrySupport; - private final FrontendMetadata frontendMetadata = new FrontendMetadata(); - private final Map knownFrontends = new HashMap<>(); + private final FrontendMetadata frontendMetadata; + private Map knownFrontends = ImmutableMap.of(); protected Shard(final AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), @@ -171,21 +189,23 @@ public class Shard extends RaftActor { this.name = builder.getId().toString(); this.datastoreContext = builder.getDatastoreContext(); this.restoreFromSnapshot = builder.getRestoreFromSnapshot(); + this.frontendMetadata = new FrontendMetadata(name); setPersistence(datastoreContext.isPersistent()); LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent()); ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher = - new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher"); + new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name); ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher = - new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher"); + new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name); if (builder.getDataTree() != null) { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(), - treeChangeListenerPublisher, dataChangeListenerPublisher, name); + treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata); } else { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(), - treeChangeListenerPublisher, dataChangeListenerPublisher, name); + builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, + dataChangeListenerPublisher, name, frontendMetadata); } shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this); @@ -205,8 +225,8 @@ public class Shard extends RaftActor { getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext, - new Dispatchers(context().system().dispatchers()).getDispatcherPath( - Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean); + new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Transaction), + self(), getContext(), shardMBean, builder.getId().getShardName()); snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG, this.name); @@ -256,26 +276,31 @@ public class Shard extends RaftActor { @SuppressWarnings("checkstyle:IllegalCatch") @Override protected void handleNonRaftCommand(final Object message) { - try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) { + try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) { final Optional maybeError = context.error(); if (maybeError.isPresent()) { LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(), maybeError.get()); } + store.resetTransactionBatch(); + if (message instanceof RequestEnvelope) { + final long now = ticker().read(); final RequestEnvelope envelope = (RequestEnvelope)message; + try { - final RequestSuccess success = handleRequest(envelope); + final RequestSuccess success = handleRequest(envelope, now); if (success != null) { - envelope.sendSuccess(success); + envelope.sendSuccess(success, ticker().read() - now); } } catch (RequestException e) { LOG.debug("{}: request {} failed", persistenceId(), envelope, e); - envelope.sendFailure(e); + envelope.sendFailure(e, ticker().read() - now); } catch (Exception e) { LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e); - envelope.sendFailure(new RuntimeRequestException("Request failed to process", e)); + envelope.sendFailure(new RuntimeRequestException("Request failed to process", e), + ticker().read() - now); } } else if (message instanceof ConnectClientRequest) { handleConnectClient((ConnectClientRequest)message); @@ -325,12 +350,45 @@ public class Shard extends RaftActor { } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) { store.processCohortRegistryCommand(getSender(), (DataTreeCohortActorRegistry.CohortRegistryCommand) message); + } else if (message instanceof PersistAbortTransactionPayload) { + final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId(); + persistPayload(txId, AbortTransactionPayload.create(txId), true); + } else if (message instanceof MakeLeaderLocal) { + onMakeLeaderLocal(); + } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) { + store.resumeNextPendingTransaction(); } else { super.handleNonRaftCommand(message); } } } + private void onMakeLeaderLocal() { + LOG.debug("{}: onMakeLeaderLocal received", persistenceId()); + if (isLeader()) { + getSender().tell(new Status.Success(null), getSelf()); + return; + } + + final ActorSelection leader = getLeader(); + + if (leader == null) { + // Leader is not present. The cluster is most likely trying to + // elect a leader and we should let that run its normal course + + // TODO we can wait for the election to complete and retry the + // request. We can also let the caller retry by sending a flag + // in the response indicating the request is "reTryable". + getSender().tell(new Failure( + new LeadershipTransferFailedException("We cannot initiate leadership transfer to local node. " + + "Currently there is no leader for " + persistenceId())), + getSelf()); + return; + } + + leader.tell(new RequestLeadership(getId(), getSender()), getSelf()); + } + // Acquire our frontend tracking handle and verify generation matches private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException { final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId()); @@ -374,7 +432,9 @@ public class Shard extends RaftActor { private void handleConnectClient(final ConnectClientRequest message) { try { if (!isLeader() || !isLeaderActive()) { - LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), message); + LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {}," + + "isLeadershipTransferInProgress: {}.", + persistenceId(), message, isLeader(), isLeaderActive(), isLeadershipTransferInProgress()); throw new NotLeaderException(getSelf()); } @@ -389,10 +449,13 @@ public class Shard extends RaftActor { } } - private @Nullable RequestSuccess handleRequest(final RequestEnvelope envelope) throws RequestException { + private @Nullable RequestSuccess handleRequest(final RequestEnvelope envelope, final long now) + throws RequestException { // We are not the leader, hence we want to fail-fast. if (!isLeader() || !isLeaderActive()) { - LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope); + LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {}," + + "isLeadershipTransferInProgress: {}.", + persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress()); throw new NotLeaderException(getSelf()); } @@ -400,13 +463,13 @@ public class Shard extends RaftActor { if (request instanceof TransactionRequest) { final TransactionRequest txReq = (TransactionRequest)request; final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId(); - return getFrontend(clientId).handleTransactionRequest(txReq, envelope); + return getFrontend(clientId).handleTransactionRequest(txReq, envelope, now); } else if (request instanceof LocalHistoryRequest) { final LocalHistoryRequest lhReq = (LocalHistoryRequest)request; final ClientIdentifier clientId = lhReq.getTarget().getClientId(); - return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope); + return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now); } else { - LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request); + LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request); throw new UnsupportedRequestException(request); } } @@ -445,16 +508,15 @@ public class Shard extends RaftActor { updateConfigParams(datastoreContext.getShardRaftConfig()); } - boolean canSkipPayload() { - // If we do not have any followers and we are not using persistence we can apply modification to the state - // immediately - return !hasFollowers() && !persistence().isRecoveryApplicable(); - } - // applyState() will be invoked once consensus is reached on the payload - void persistPayload(final TransactionIdentifier transactionId, final Payload payload) { - // We are faking the sender - persistData(self(), transactionId, payload); + void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) { + boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable(); + if (canSkipPayload) { + applyState(self(), id, payload); + } else { + // We are faking the sender + persistData(self(), id, payload, batchHint); + } } private void handleCommitTransaction(final CommitTransaction commit) { @@ -608,7 +670,7 @@ public class Shard extends RaftActor { doAbortTransaction(abort.getTransactionId(), getSender()); } - void doAbortTransaction(final TransactionIdentifier transactionID, final ActorRef sender) { + void doAbortTransaction(final Identifier transactionID, final ActorRef sender) { commitCoordinator.handleAbort(transactionID, sender, this); } @@ -624,7 +686,9 @@ public class Shard extends RaftActor { } private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) { - store.closeTransactionChain(closeTransactionChain.getIdentifier()); + final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier(); + store.closeTransactionChain(id, null); + store.purgeTransactionChain(id, null); } @SuppressWarnings("checkstyle:IllegalCatch") @@ -724,7 +788,7 @@ public class Shard extends RaftActor { persistenceId(), getId()); } - store.closeAllTransactionChains(); + store.purgeLeaderState(); } if (hasLeader && !isIsolatedLeader()) { @@ -736,8 +800,18 @@ public class Shard extends RaftActor { protected void onLeaderChanged(final String oldLeader, final String newLeader) { shardMBean.incrementLeadershipChangeCount(); - boolean hasLeader = hasLeader(); - if (hasLeader && !isLeader()) { + final boolean hasLeader = hasLeader(); + if (!hasLeader) { + // No leader implies we are not the leader, lose frontend state if we have any. This also places + // an explicit guard so the map will not get modified accidentally. + if (!knownFrontends.isEmpty()) { + LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet()); + knownFrontends = ImmutableMap.of(); + } + return; + } + + if (!isLeader()) { // Another leader was elected. If we were the previous leader and had pending transactions, convert // them to transaction messages and send to the new leader. ActorSelection leader = getLeader(); @@ -756,9 +830,13 @@ public class Shard extends RaftActor { commitCoordinator.abortPendingTransactions("The transacton was aborted due to inflight leadership " + "change and the leader address isn't available.", this); } + } else { + // We have become the leader, we need to reconstruct frontend state + knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this)); + LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet()); } - if (hasLeader && !isIsolatedLeader()) { + if (!isIsolatedLeader()) { messageRetrySupport.retryMessages(); } } @@ -779,6 +857,13 @@ public class Shard extends RaftActor { store.setRunOnPendingTransactionsComplete(operation); } + @Override + protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { + return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors()) + .dataChangeListenerActors(changeSupport.getListenerActors()) + .commitCohortActors(store.getCohortActors()); + } + @Override public String persistenceId() { return this.name; @@ -812,7 +897,7 @@ public class Shard extends RaftActor { private ShardIdentifier id; private Map peerAddresses = Collections.emptyMap(); private DatastoreContext datastoreContext; - private SchemaContext schemaContext; + private SchemaContextProvider schemaContextProvider; private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot; private TipProducingDataTree dataTree; private volatile boolean sealed; @@ -848,9 +933,9 @@ public class Shard extends RaftActor { return self(); } - public T schemaContext(final SchemaContext newSchemaContext) { + public T schemaContextProvider(final SchemaContextProvider schemaContextProvider) { checkSealed(); - this.schemaContext = newSchemaContext; + this.schemaContextProvider = Preconditions.checkNotNull(schemaContextProvider); return self(); } @@ -879,7 +964,7 @@ public class Shard extends RaftActor { } public SchemaContext getSchemaContext() { - return schemaContext; + return Verify.verifyNotNull(schemaContextProvider.getSchemaContext()); } public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() { @@ -906,7 +991,7 @@ public class Shard extends RaftActor { Preconditions.checkNotNull(id, "id should not be null"); Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null"); Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null"); - Preconditions.checkNotNull(schemaContext, "schemaContext should not be null"); + Preconditions.checkNotNull(schemaContextProvider, "schemaContextProvider should not be null"); } public Props props() { @@ -925,4 +1010,8 @@ public class Shard extends RaftActor { Ticker ticker() { return Ticker.systemTicker(); } + + void scheduleNextPendingTransaction() { + self().tell(RESUME_NEXT_PENDING_TRANSACTION, ActorRef.noSender()); + } }