X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=6a05d25fe5b91adbb58a5539c8b5e09613a6e101;hp=3719f749df6bce26cbe469250284ffd834bba8a0;hb=555663eec40d16fbc622bb5de1de37f2253c359b;hpb=2d60632f7cf63712e8357a3cf3fc40d83366e5e6 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 3719f749df..6a05d25fe5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -5,9 +5,10 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Verify.verify; + import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; @@ -29,10 +30,12 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import java.util.OptionalLong; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.ABIVersion; import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; @@ -76,15 +79,13 @@ import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree; import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal; import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; -import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; -import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.DisableTrackingPayload; import org.opendaylight.controller.cluster.messaging.MessageAssembler; import org.opendaylight.controller.cluster.messaging.MessageSlicer; import org.opendaylight.controller.cluster.messaging.SliceOptions; @@ -103,12 +104,11 @@ import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider; -import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; /** @@ -162,6 +162,8 @@ public class Shard extends RaftActor { /// The name of this shard private final String name; + private final String shardName; + private final ShardStats shardMBean; private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean; @@ -183,8 +185,6 @@ public class Shard extends RaftActor { private final ShardSnapshotCohort snapshotCohort; private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this); - private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this); - private ShardSnapshot restoreFromSnapshot; @@ -204,6 +204,7 @@ public class Shard extends RaftActor { Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION); this.name = builder.getId().toString(); + this.shardName = builder.getId().getShardName(); this.datastoreContext = builder.getDatastoreContext(); this.restoreFromSnapshot = builder.getRestoreFromSnapshot(); this.frontendMetadata = new FrontendMetadata(name); @@ -214,15 +215,12 @@ public class Shard extends RaftActor { ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher = new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name); - ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher = - new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name); if (builder.getDataTree() != null) { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(), - treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata); + treeChangeListenerPublisher, name, frontendMetadata); } else { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(), - builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, - dataChangeListenerPublisher, name, frontendMetadata); + builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name, frontendMetadata); } shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this); @@ -309,7 +307,7 @@ public class Shard extends RaftActor { @Override protected void handleNonRaftCommand(final Object message) { try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) { - final Optional maybeError = context.error(); + final java.util.Optional maybeError = context.error(); if (maybeError.isPresent()) { LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(), maybeError.get()); @@ -339,8 +337,6 @@ public class Shard extends RaftActor { handleAbortTransaction(AbortTransaction.fromSerializable(message)); } else if (CloseTransactionChain.isSerializedType(message)) { closeTransactionChain(CloseTransactionChain.fromSerializable(message)); - } else if (message instanceof RegisterChangeListener) { - changeSupport.onMessage((RegisterChangeListener) message, isLeader(), hasLeader()); } else if (message instanceof RegisterDataTreeChangeListener) { treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader()); } else if (message instanceof UpdateSchemaContext) { @@ -368,9 +364,6 @@ public class Shard extends RaftActor { } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) { store.processCohortRegistryCommand(getSender(), (DataTreeCohortActorRegistry.CohortRegistryCommand) message); - } else if (message instanceof PersistAbortTransactionPayload) { - final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId(); - persistPayload(txId, AbortTransactionPayload.create(txId), true); } else if (message instanceof MakeLeaderLocal) { onMakeLeaderLocal(); } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) { @@ -400,9 +393,7 @@ public class Shard extends RaftActor { responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget()) .message(envelope.newSuccessEnvelope(success, executionTimeNanos)) .sendTo(envelope.getMessage().getReplyTo()).replyTo(self()) - .onFailureCallback(t -> { - LOG.warn("Error slicing response {}", success, t); - }).build())); + .onFailureCallback(t -> LOG.warn("Error slicing response {}", success, t)).build())); } else { envelope.sendSuccess(success, executionTimeNanos); } @@ -423,22 +414,50 @@ public class Shard extends RaftActor { requestMessageAssembler.checkExpiredAssembledMessageState(); } - private Optional updateAccess(final SimpleShardDataTreeCohort cohort) { + private OptionalLong updateAccess(final SimpleShardDataTreeCohort cohort) { final FrontendIdentifier frontend = cohort.getIdentifier().getHistoryId().getClientId().getFrontendId(); final LeaderFrontendState state = knownFrontends.get(frontend); if (state == null) { // Not tell-based protocol, do nothing - return Optional.absent(); + return OptionalLong.empty(); } if (isIsolatedLeader()) { // We are isolated and no new request can come through until we emerge from it. We are still updating // liveness of frontend when we see it attempting to communicate. Use the last access timer. - return Optional.of(state.getLastSeenTicks()); + return OptionalLong.of(state.getLastSeenTicks()); } // If this frontend has freshly connected, give it some time to catch up before killing its transactions. - return Optional.of(state.getLastConnectTicks()); + return OptionalLong.of(state.getLastConnectTicks()); + } + + private void disableTracking(final DisableTrackingPayload payload) { + final ClientIdentifier clientId = payload.getIdentifier(); + LOG.debug("{}: disabling tracking of {}", persistenceId(), clientId); + frontendMetadata.disableTracking(clientId); + + if (isLeader()) { + final FrontendIdentifier frontendId = clientId.getFrontendId(); + final LeaderFrontendState frontend = knownFrontends.get(frontendId); + if (frontend != null) { + if (clientId.equals(frontend.getIdentifier())) { + if (!(frontend instanceof LeaderFrontendState.Disabled)) { + verify(knownFrontends.replace(frontendId, frontend, + new LeaderFrontendState.Disabled(persistenceId(), clientId, store))); + LOG.debug("{}: leader state for {} disabled", persistenceId(), clientId); + } else { + LOG.debug("{}: leader state {} is already disabled", persistenceId(), frontend); + } + } else { + LOG.debug("{}: leader state {} does not match {}", persistenceId(), frontend, clientId); + } + } else { + LOG.debug("{}: leader state for {} not found", persistenceId(), clientId); + knownFrontends.put(frontendId, new LeaderFrontendState.Disabled(persistenceId(), clientId, + getDataStore())); + } + } } private void onMakeLeaderLocal() { @@ -468,8 +487,7 @@ public class Shard extends RaftActor { } // Acquire our frontend tracking handle and verify generation matches - @Nullable - private LeaderFrontendState findFrontend(final ClientIdentifier clientId) throws RequestException { + private @Nullable LeaderFrontendState findFrontend(final ClientIdentifier clientId) throws RequestException { final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId()); if (existing != null) { final int cmp = Long.compareUnsigned(existing.getIdentifier().getGeneration(), clientId.getGeneration()); @@ -479,7 +497,8 @@ public class Shard extends RaftActor { } if (cmp > 0) { LOG.debug("{}: rejecting request from outdated client {}", persistenceId(), clientId); - throw new RetiredGenerationException(existing.getIdentifier().getGeneration()); + throw new RetiredGenerationException(clientId.getGeneration(), + existing.getIdentifier().getGeneration()); } LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId); @@ -502,8 +521,7 @@ public class Shard extends RaftActor { throw new OutOfSequenceEnvelopeException(0); } - @Nonnull - private static ABIVersion selectVersion(final ConnectClientRequest message) { + private static @NonNull ABIVersion selectVersion(final ConnectClientRequest message) { final Range clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion()); for (ABIVersion v : SUPPORTED_ABIVERSIONS) { if (clientRange.contains(v)) { @@ -535,7 +553,7 @@ public class Shard extends RaftActor { final ABIVersion selectedVersion = selectVersion(message); final LeaderFrontendState frontend; if (existing == null) { - frontend = new LeaderFrontendState(persistenceId(), clientId, store); + frontend = new LeaderFrontendState.Enabled(persistenceId(), clientId, store); knownFrontends.put(clientId.getFrontendId(), frontend); LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId); } else { @@ -551,8 +569,7 @@ public class Shard extends RaftActor { } } - @Nullable - private RequestSuccess handleRequest(final RequestEnvelope envelope, final long now) + private @Nullable RequestSuccess handleRequest(final RequestEnvelope envelope, final long now) throws RequestException { // We are not the leader, hence we want to fail-fast. if (!isLeader() || paused || !isLeaderActive()) { @@ -594,6 +611,10 @@ public class Shard extends RaftActor { return roleChangeNotifier; } + String getShardName() { + return shardName; + } + @Override protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, final short leaderPayloadVersion) { @@ -623,13 +644,14 @@ public class Shard extends RaftActor { } private void handleCommitTransaction(final CommitTransaction commit) { + final TransactionIdentifier txId = commit.getTransactionId(); if (isLeader()) { - commitCoordinator.handleCommit(commit.getTransactionId(), getSender(), this); + askProtocolEncountered(txId); + commitCoordinator.handleCommit(txId, getSender(), this); } else { ActorSelection leader = getLeader(); if (leader == null) { - messageRetrySupport.addMessageToRetry(commit, getSender(), - "Could not commit transaction " + commit.getTransactionId()); + messageRetrySupport.addMessageToRetry(commit, getSender(), "Could not commit transaction " + txId); } else { LOG.debug("{}: Forwarding CommitTransaction to leader {}", persistenceId(), leader); leader.forward(commit, getContext()); @@ -638,15 +660,17 @@ public class Shard extends RaftActor { } private void handleCanCommitTransaction(final CanCommitTransaction canCommit) { - LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionId()); + final TransactionIdentifier txId = canCommit.getTransactionId(); + LOG.debug("{}: Can committing transaction {}", persistenceId(), txId); if (isLeader()) { - commitCoordinator.handleCanCommit(canCommit.getTransactionId(), getSender(), this); + askProtocolEncountered(txId); + commitCoordinator.handleCanCommit(txId, getSender(), this); } else { ActorSelection leader = getLeader(); if (leader == null) { messageRetrySupport.addMessageToRetry(canCommit, getSender(), - "Could not canCommit transaction " + canCommit.getTransactionId()); + "Could not canCommit transaction " + txId); } else { LOG.debug("{}: Forwarding CanCommitTransaction to leader {}", persistenceId(), leader); leader.forward(canCommit, getContext()); @@ -656,6 +680,8 @@ public class Shard extends RaftActor { @SuppressWarnings("checkstyle:IllegalCatch") protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) { + askProtocolEncountered(batched.getTransactionId()); + try { commitCoordinator.handleBatchedModifications(batched, sender, this); } catch (Exception e) { @@ -752,6 +778,7 @@ public class Shard extends RaftActor { boolean isLeaderActive = isLeaderActive(); if (isLeader() && isLeaderActive) { + askProtocolEncountered(forwardedReady.getTransactionId()); commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this); } else { ActorSelection leader = getLeader(); @@ -762,7 +789,8 @@ public class Shard extends RaftActor { LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader); ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionId(), - forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit()); + forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit(), + forwardedReady.getParticipatingShardNames()); readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); leader.forward(readyLocal, getContext()); } @@ -770,7 +798,9 @@ public class Shard extends RaftActor { } private void handleAbortTransaction(final AbortTransaction abort) { - doAbortTransaction(abort.getTransactionId(), getSender()); + final TransactionIdentifier transactionId = abort.getTransactionId(); + askProtocolEncountered(transactionId); + doAbortTransaction(transactionId, getSender()); } void doAbortTransaction(final Identifier transactionID, final ActorRef sender) { @@ -789,13 +819,21 @@ public class Shard extends RaftActor { } private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) { - final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier(); - store.closeTransactionChain(id, null); - store.purgeTransactionChain(id, null); + if (isLeader()) { + final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier(); + askProtocolEncountered(id.getClientId()); + store.closeTransactionChain(id); + } else if (getLeader() != null) { + getLeader().forward(closeTransactionChain, getContext()); + } else { + LOG.warn("{}: Could not close transaction {}", persistenceId(), closeTransactionChain.getIdentifier()); + } } @SuppressWarnings("checkstyle:IllegalCatch") private void createTransaction(final CreateTransaction createTransaction) { + askProtocolEncountered(createTransaction.getTransactionId()); + try { if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY && failIfIsolatedLeader(getSender())) { @@ -818,6 +856,27 @@ public class Shard extends RaftActor { transactionId); } + // Called on leader only + private void askProtocolEncountered(final TransactionIdentifier transactionId) { + askProtocolEncountered(transactionId.getHistoryId().getClientId()); + } + + // Called on leader only + private void askProtocolEncountered(final ClientIdentifier clientId) { + final FrontendIdentifier frontend = clientId.getFrontendId(); + final LeaderFrontendState state = knownFrontends.get(frontend); + if (!(state instanceof LeaderFrontendState.Disabled)) { + LOG.debug("{}: encountered ask-based client {}, disabling transaction tracking", persistenceId(), clientId); + if (knownFrontends.isEmpty()) { + knownFrontends = new HashMap<>(); + } + knownFrontends.put(frontend, new LeaderFrontendState.Disabled(persistenceId(), clientId, getDataStore())); + + persistPayload(clientId, DisableTrackingPayload.create(clientId, + datastoreContext.getInitialPayloadSerializedBufferCapacity()), false); + } + } + private void updateSchemaContext(final UpdateSchemaContext message) { updateSchemaContext(message.getSchemaContext()); } @@ -839,7 +898,6 @@ public class Shard extends RaftActor { } @Override - @Nonnull protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { if (restoreFromSnapshot == null) { return ShardRecoveryCoordinator.create(store, persistenceId(), LOG); @@ -859,7 +917,7 @@ public class Shard extends RaftActor { if (txCommitTimeoutCheckSchedule == null) { // Schedule a message to be periodically sent to check if the current in-progress // transaction should be expired and aborted. - FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS); + FiniteDuration period = FiniteDuration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS); txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule( period, period, getSelf(), TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender()); @@ -869,6 +927,11 @@ public class Shard extends RaftActor { @Override protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { if (data instanceof Payload) { + if (data instanceof DisableTrackingPayload) { + disableTracking((DisableTrackingPayload) data); + return; + } + try { store.applyReplicatedPayload(identifier, (Payload)data); } catch (DataValidationFailedException | IOException e) { @@ -883,7 +946,6 @@ public class Shard extends RaftActor { protected void onStateChanged() { boolean isLeader = isLeader(); boolean hasLeader = hasLeader(); - changeSupport.onLeadershipChange(isLeader, hasLeader); treeChangeSupport.onLeadershipChange(isLeader, hasLeader); // If this actor is no longer the leader close all the transaction chains @@ -932,6 +994,8 @@ public class Shard extends RaftActor { messagesToForward.size(), leader); for (Object message : messagesToForward) { + LOG.debug("{}: Forwarding pending transaction message {}", persistenceId(), message); + leader.tell(message, self()); } } @@ -966,8 +1030,10 @@ public class Shard extends RaftActor { paused = true; // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused. - knownFrontends.values().forEach(LeaderFrontendState::retire); - knownFrontends = ImmutableMap.of(); + if (datastoreContext.isUseTellBasedProtocol()) { + knownFrontends.values().forEach(LeaderFrontendState::retire); + knownFrontends = ImmutableMap.of(); + } store.setRunOnPendingTransactionsComplete(operation); } @@ -986,7 +1052,6 @@ public class Shard extends RaftActor { @Override protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors()) - .dataChangeListenerActors(changeSupport.getListenerActors()) .commitCohortActors(store.getCohortActors()); } @@ -1025,7 +1090,7 @@ public class Shard extends RaftActor { private DatastoreContext datastoreContext; private SchemaContextProvider schemaContextProvider; private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot; - private TipProducingDataTree dataTree; + private DataTree dataTree; private volatile boolean sealed; protected AbstractBuilder(final Class shardClass) { @@ -1071,7 +1136,7 @@ public class Shard extends RaftActor { return self(); } - public T dataTree(final TipProducingDataTree newDataTree) { + public T dataTree(final DataTree newDataTree) { checkSealed(); this.dataTree = newDataTree; return self(); @@ -1097,7 +1162,7 @@ public class Shard extends RaftActor { return restoreFromSnapshot; } - public TipProducingDataTree getDataTree() { + public DataTree getDataTree() { return dataTree; }