X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=eeacdd9b6ffa9cb247c698d64c68936f8e594caa;hp=51ee4d7b80ef7c9396930984b22e44c18bbb392c;hb=e7e69069ae5ecaacc9ea0e47cb40cdf68237d636;hpb=9905bf0575ff196a531eb114e89b1bdb7226bc6c diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 51ee4d7b80..eeacdd9b6f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -7,6 +7,10 @@ */ package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; @@ -17,21 +21,23 @@ import akka.actor.Status.Failure; import akka.serialization.JavaSerializer; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.base.Ticker; import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Range; import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.ABIVersion; import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; @@ -71,18 +77,19 @@ import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.GetKnownClients; +import org.opendaylight.controller.cluster.datastore.messages.GetKnownClientsReply; import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree; import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal; import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; -import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.DisableTrackingPayload; import org.opendaylight.controller.cluster.messaging.MessageAssembler; import org.opendaylight.controller.cluster.messaging.MessageSlicer; import org.opendaylight.controller.cluster.messaging.SliceOptions; @@ -104,8 +111,8 @@ import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContextProvider; import scala.concurrent.duration.FiniteDuration; /** @@ -187,7 +194,9 @@ public class Shard extends RaftActor { private final ShardTransactionMessageRetrySupport messageRetrySupport; - private final FrontendMetadata frontendMetadata; + @VisibleForTesting + final FrontendMetadata frontendMetadata; + private Map knownFrontends = ImmutableMap.of(); private boolean paused; @@ -214,10 +223,12 @@ public class Shard extends RaftActor { new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name); if (builder.getDataTree() != null) { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(), - treeChangeListenerPublisher, name, frontendMetadata); + treeChangeListenerPublisher, name, + frontendMetadata); } else { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(), - builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name, frontendMetadata); + builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name, + frontendMetadata); } shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this); @@ -242,7 +253,7 @@ public class Shard extends RaftActor { self(), getContext(), shardMBean, builder.getId().getShardName()); snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG, - this.name); + this.name, datastoreContext); messageRetrySupport = new ShardTransactionMessageRetrySupport(this); @@ -273,7 +284,7 @@ public class Shard extends RaftActor { } @Override - public void postStop() { + public void postStop() throws Exception { LOG.info("Stopping Shard {}", persistenceId()); super.postStop(); @@ -361,14 +372,12 @@ public class Shard extends RaftActor { } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) { store.processCohortRegistryCommand(getSender(), (DataTreeCohortActorRegistry.CohortRegistryCommand) message); - } else if (message instanceof PersistAbortTransactionPayload) { - final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId(); - persistPayload(txId, AbortTransactionPayload.create( - txId, datastoreContext.getInitialPayloadSerializedBufferCapacity()), true); } else if (message instanceof MakeLeaderLocal) { onMakeLeaderLocal(); } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) { store.resumeNextPendingTransaction(); + } else if (GetKnownClients.INSTANCE.equals(message)) { + handleGetKnownClients(); } else if (!responseMessageSlicer.handleMessage(message)) { super.handleNonRaftCommand(message); } @@ -415,22 +424,50 @@ public class Shard extends RaftActor { requestMessageAssembler.checkExpiredAssembledMessageState(); } - private Optional updateAccess(final SimpleShardDataTreeCohort cohort) { + private OptionalLong updateAccess(final SimpleShardDataTreeCohort cohort) { final FrontendIdentifier frontend = cohort.getIdentifier().getHistoryId().getClientId().getFrontendId(); final LeaderFrontendState state = knownFrontends.get(frontend); if (state == null) { // Not tell-based protocol, do nothing - return Optional.absent(); + return OptionalLong.empty(); } if (isIsolatedLeader()) { // We are isolated and no new request can come through until we emerge from it. We are still updating // liveness of frontend when we see it attempting to communicate. Use the last access timer. - return Optional.of(state.getLastSeenTicks()); + return OptionalLong.of(state.getLastSeenTicks()); } // If this frontend has freshly connected, give it some time to catch up before killing its transactions. - return Optional.of(state.getLastConnectTicks()); + return OptionalLong.of(state.getLastConnectTicks()); + } + + private void disableTracking(final DisableTrackingPayload payload) { + final ClientIdentifier clientId = payload.getIdentifier(); + LOG.debug("{}: disabling tracking of {}", persistenceId(), clientId); + frontendMetadata.disableTracking(clientId); + + if (isLeader()) { + final FrontendIdentifier frontendId = clientId.getFrontendId(); + final LeaderFrontendState frontend = knownFrontends.get(frontendId); + if (frontend != null) { + if (clientId.equals(frontend.getIdentifier())) { + if (!(frontend instanceof LeaderFrontendState.Disabled)) { + verify(knownFrontends.replace(frontendId, frontend, + new LeaderFrontendState.Disabled(persistenceId(), clientId, store))); + LOG.debug("{}: leader state for {} disabled", persistenceId(), clientId); + } else { + LOG.debug("{}: leader state {} is already disabled", persistenceId(), frontend); + } + } else { + LOG.debug("{}: leader state {} does not match {}", persistenceId(), frontend, clientId); + } + } else { + LOG.debug("{}: leader state for {} not found", persistenceId(), clientId); + knownFrontends.put(frontendId, new LeaderFrontendState.Disabled(persistenceId(), clientId, + getDataStore())); + } + } } private void onMakeLeaderLocal() { @@ -460,8 +497,7 @@ public class Shard extends RaftActor { } // Acquire our frontend tracking handle and verify generation matches - @Nullable - private LeaderFrontendState findFrontend(final ClientIdentifier clientId) throws RequestException { + private @Nullable LeaderFrontendState findFrontend(final ClientIdentifier clientId) throws RequestException { final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId()); if (existing != null) { final int cmp = Long.compareUnsigned(existing.getIdentifier().getGeneration(), clientId.getGeneration()); @@ -495,8 +531,7 @@ public class Shard extends RaftActor { throw new OutOfSequenceEnvelopeException(0); } - @Nonnull - private static ABIVersion selectVersion(final ConnectClientRequest message) { + private static @NonNull ABIVersion selectVersion(final ConnectClientRequest message) { final Range clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion()); for (ABIVersion v : SUPPORTED_ABIVERSIONS) { if (clientRange.contains(v)) { @@ -528,7 +563,7 @@ public class Shard extends RaftActor { final ABIVersion selectedVersion = selectVersion(message); final LeaderFrontendState frontend; if (existing == null) { - frontend = new LeaderFrontendState(persistenceId(), clientId, store); + frontend = new LeaderFrontendState.Enabled(persistenceId(), clientId, store); knownFrontends.put(clientId.getFrontendId(), frontend); LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId); } else { @@ -544,8 +579,7 @@ public class Shard extends RaftActor { } } - @Nullable - private RequestSuccess handleRequest(final RequestEnvelope envelope, final long now) + private @Nullable RequestSuccess handleRequest(final RequestEnvelope envelope, final long now) throws RequestException { // We are not the leader, hence we want to fail-fast. if (!isLeader() || paused || !isLeaderActive()) { @@ -570,6 +604,18 @@ public class Shard extends RaftActor { } } + private void handleGetKnownClients() { + final ImmutableSet clients; + if (isLeader()) { + clients = knownFrontends.values().stream() + .map(LeaderFrontendState::getIdentifier) + .collect(ImmutableSet.toImmutableSet()); + } else { + clients = frontendMetadata.getClients(); + } + sender().tell(new GetKnownClientsReply(clients), self()); + } + private boolean hasLeader() { return getLeaderId() != null; } @@ -620,13 +666,14 @@ public class Shard extends RaftActor { } private void handleCommitTransaction(final CommitTransaction commit) { + final TransactionIdentifier txId = commit.getTransactionId(); if (isLeader()) { - commitCoordinator.handleCommit(commit.getTransactionId(), getSender(), this); + askProtocolEncountered(txId); + commitCoordinator.handleCommit(txId, getSender(), this); } else { ActorSelection leader = getLeader(); if (leader == null) { - messageRetrySupport.addMessageToRetry(commit, getSender(), - "Could not commit transaction " + commit.getTransactionId()); + messageRetrySupport.addMessageToRetry(commit, getSender(), "Could not commit transaction " + txId); } else { LOG.debug("{}: Forwarding CommitTransaction to leader {}", persistenceId(), leader); leader.forward(commit, getContext()); @@ -635,15 +682,17 @@ public class Shard extends RaftActor { } private void handleCanCommitTransaction(final CanCommitTransaction canCommit) { - LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionId()); + final TransactionIdentifier txId = canCommit.getTransactionId(); + LOG.debug("{}: Can committing transaction {}", persistenceId(), txId); if (isLeader()) { - commitCoordinator.handleCanCommit(canCommit.getTransactionId(), getSender(), this); + askProtocolEncountered(txId); + commitCoordinator.handleCanCommit(txId, getSender(), this); } else { ActorSelection leader = getLeader(); if (leader == null) { messageRetrySupport.addMessageToRetry(canCommit, getSender(), - "Could not canCommit transaction " + canCommit.getTransactionId()); + "Could not canCommit transaction " + txId); } else { LOG.debug("{}: Forwarding CanCommitTransaction to leader {}", persistenceId(), leader); leader.forward(canCommit, getContext()); @@ -653,6 +702,8 @@ public class Shard extends RaftActor { @SuppressWarnings("checkstyle:IllegalCatch") protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) { + askProtocolEncountered(batched.getTransactionId()); + try { commitCoordinator.handleBatchedModifications(batched, sender, this); } catch (Exception e) { @@ -749,6 +800,7 @@ public class Shard extends RaftActor { boolean isLeaderActive = isLeaderActive(); if (isLeader() && isLeaderActive) { + askProtocolEncountered(forwardedReady.getTransactionId()); commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this); } else { ActorSelection leader = getLeader(); @@ -768,7 +820,9 @@ public class Shard extends RaftActor { } private void handleAbortTransaction(final AbortTransaction abort) { - doAbortTransaction(abort.getTransactionId(), getSender()); + final TransactionIdentifier transactionId = abort.getTransactionId(); + askProtocolEncountered(transactionId); + doAbortTransaction(transactionId, getSender()); } void doAbortTransaction(final Identifier transactionID, final ActorRef sender) { @@ -789,9 +843,8 @@ public class Shard extends RaftActor { private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) { if (isLeader()) { final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier(); - // FIXME: CONTROLLER-1628: stage purge once no transactions are present - store.closeTransactionChain(id, null); - store.purgeTransactionChain(id, null); + askProtocolEncountered(id.getClientId()); + store.closeTransactionChain(id); } else if (getLeader() != null) { getLeader().forward(closeTransactionChain, getContext()); } else { @@ -801,6 +854,8 @@ public class Shard extends RaftActor { @SuppressWarnings("checkstyle:IllegalCatch") private void createTransaction(final CreateTransaction createTransaction) { + askProtocolEncountered(createTransaction.getTransactionId()); + try { if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY && failIfIsolatedLeader(getSender())) { @@ -823,12 +878,33 @@ public class Shard extends RaftActor { transactionId); } + // Called on leader only + private void askProtocolEncountered(final TransactionIdentifier transactionId) { + askProtocolEncountered(transactionId.getHistoryId().getClientId()); + } + + // Called on leader only + private void askProtocolEncountered(final ClientIdentifier clientId) { + final FrontendIdentifier frontend = clientId.getFrontendId(); + final LeaderFrontendState state = knownFrontends.get(frontend); + if (!(state instanceof LeaderFrontendState.Disabled)) { + LOG.debug("{}: encountered ask-based client {}, disabling transaction tracking", persistenceId(), clientId); + if (knownFrontends.isEmpty()) { + knownFrontends = new HashMap<>(); + } + knownFrontends.put(frontend, new LeaderFrontendState.Disabled(persistenceId(), clientId, getDataStore())); + + persistPayload(clientId, DisableTrackingPayload.create(clientId, + datastoreContext.getInitialPayloadSerializedBufferCapacity()), false); + } + } + private void updateSchemaContext(final UpdateSchemaContext message) { - updateSchemaContext(message.getSchemaContext()); + updateSchemaContext(message.getEffectiveModelContext()); } @VisibleForTesting - void updateSchemaContext(final SchemaContext schemaContext) { + void updateSchemaContext(final @NonNull EffectiveModelContext schemaContext) { store.updateSchemaContext(schemaContext); } @@ -844,7 +920,6 @@ public class Shard extends RaftActor { } @Override - @Nonnull protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { if (restoreFromSnapshot == null) { return ShardRecoveryCoordinator.create(store, persistenceId(), LOG); @@ -874,6 +949,11 @@ public class Shard extends RaftActor { @Override protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { if (data instanceof Payload) { + if (data instanceof DisableTrackingPayload) { + disableTracking((DisableTrackingPayload) data); + return; + } + try { store.applyReplicatedPayload(identifier, (Payload)data); } catch (DataValidationFailedException | IOException e) { @@ -972,8 +1052,10 @@ public class Shard extends RaftActor { paused = true; // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused. - knownFrontends.values().forEach(LeaderFrontendState::retire); - knownFrontends = ImmutableMap.of(); + if (datastoreContext.isUseTellBasedProtocol()) { + knownFrontends.values().forEach(LeaderFrontendState::retire); + knownFrontends = ImmutableMap.of(); + } store.setRunOnPendingTransactionsComplete(operation); } @@ -1024,21 +1106,22 @@ public class Shard extends RaftActor { } public abstract static class AbstractBuilder, S extends Shard> { - private final Class shardClass; + private final Class shardClass; private ShardIdentifier id; private Map peerAddresses = Collections.emptyMap(); private DatastoreContext datastoreContext; - private SchemaContextProvider schemaContextProvider; + private EffectiveModelContextProvider schemaContextProvider; private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot; private DataTree dataTree; + private volatile boolean sealed; - protected AbstractBuilder(final Class shardClass) { + protected AbstractBuilder(final Class shardClass) { this.shardClass = shardClass; } protected void checkSealed() { - Preconditions.checkState(!sealed, "Builder isalready sealed - further modifications are not allowed"); + checkState(!sealed, "Builder isalready sealed - further modifications are not allowed"); } @SuppressWarnings("unchecked") @@ -1064,9 +1147,9 @@ public class Shard extends RaftActor { return self(); } - public T schemaContextProvider(final SchemaContextProvider newSchemaContextProvider) { + public T schemaContextProvider(final EffectiveModelContextProvider newSchemaContextProvider) { checkSealed(); - this.schemaContextProvider = Preconditions.checkNotNull(newSchemaContextProvider); + this.schemaContextProvider = requireNonNull(newSchemaContextProvider); return self(); } @@ -1094,8 +1177,8 @@ public class Shard extends RaftActor { return datastoreContext; } - public SchemaContext getSchemaContext() { - return Verify.verifyNotNull(schemaContextProvider.getSchemaContext()); + public EffectiveModelContext getSchemaContext() { + return Verify.verifyNotNull(schemaContextProvider.getEffectiveModelContext()); } public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() { @@ -1119,10 +1202,10 @@ public class Shard extends RaftActor { } protected void verify() { - Preconditions.checkNotNull(id, "id should not be null"); - Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null"); - Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null"); - Preconditions.checkNotNull(schemaContextProvider, "schemaContextProvider should not be null"); + requireNonNull(id, "id should not be null"); + requireNonNull(peerAddresses, "peerAddresses should not be null"); + requireNonNull(datastoreContext, "dataStoreContext should not be null"); + requireNonNull(schemaContextProvider, "schemaContextProvider should not be null"); } public Props props() { @@ -1134,7 +1217,11 @@ public class Shard extends RaftActor { public static class Builder extends AbstractBuilder { Builder() { - super(Shard.class); + this(Shard.class); + } + + Builder(final Class shardClass) { + super(shardClass); } }