X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=a0c3e7f5709d8f3fbc0bbcbcda7b34736ae3b881;hb=4842cd28aa49cdd492d7c49ed06fef00208e1cd7;hp=47a25d7ea339af7f9dd87933879148f3c06b8f6e;hpb=402dbc040ddb5dfc488320356b5a36c66d59c36e;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 47a25d7ea3..a0c3e7f570 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -5,9 +5,10 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Verify.verify; + import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; @@ -29,6 +30,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; @@ -76,14 +78,13 @@ import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree; import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal; import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; -import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.DisableTrackingPayload; import org.opendaylight.controller.cluster.messaging.MessageAssembler; import org.opendaylight.controller.cluster.messaging.MessageSlicer; import org.opendaylight.controller.cluster.messaging.SliceOptions; @@ -107,7 +108,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailed import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider; -import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; /** @@ -161,6 +161,8 @@ public class Shard extends RaftActor { /// The name of this shard private final String name; + private final String shardName; + private final ShardStats shardMBean; private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean; @@ -201,6 +203,7 @@ public class Shard extends RaftActor { Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION); this.name = builder.getId().toString(); + this.shardName = builder.getId().getShardName(); this.datastoreContext = builder.getDatastoreContext(); this.restoreFromSnapshot = builder.getRestoreFromSnapshot(); this.frontendMetadata = new FrontendMetadata(name); @@ -360,9 +363,6 @@ public class Shard extends RaftActor { } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) { store.processCohortRegistryCommand(getSender(), (DataTreeCohortActorRegistry.CohortRegistryCommand) message); - } else if (message instanceof PersistAbortTransactionPayload) { - final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId(); - persistPayload(txId, AbortTransactionPayload.create(txId), true); } else if (message instanceof MakeLeaderLocal) { onMakeLeaderLocal(); } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) { @@ -392,9 +392,7 @@ public class Shard extends RaftActor { responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget()) .message(envelope.newSuccessEnvelope(success, executionTimeNanos)) .sendTo(envelope.getMessage().getReplyTo()).replyTo(self()) - .onFailureCallback(t -> { - LOG.warn("Error slicing response {}", success, t); - }).build())); + .onFailureCallback(t -> LOG.warn("Error slicing response {}", success, t)).build())); } else { envelope.sendSuccess(success, executionTimeNanos); } @@ -433,6 +431,34 @@ public class Shard extends RaftActor { return Optional.of(state.getLastConnectTicks()); } + private void disableTracking(final DisableTrackingPayload payload) { + final ClientIdentifier clientId = payload.getIdentifier(); + LOG.debug("{}: disabling tracking of {}", persistenceId(), clientId); + frontendMetadata.disableTracking(clientId); + + if (isLeader()) { + final FrontendIdentifier frontendId = clientId.getFrontendId(); + final LeaderFrontendState frontend = knownFrontends.get(frontendId); + if (frontend != null) { + if (clientId.equals(frontend.getIdentifier())) { + if (!(frontend instanceof LeaderFrontendState.Disabled)) { + verify(knownFrontends.replace(frontendId, frontend, + new LeaderFrontendState.Disabled(persistenceId(), clientId, store))); + LOG.debug("{}: leader state for {} disabled", persistenceId(), clientId); + } else { + LOG.debug("{}: leader state {} is already disabled", persistenceId(), frontend); + } + } else { + LOG.debug("{}: leader state {} does not match {}", persistenceId(), frontend, clientId); + } + } else { + LOG.debug("{}: leader state for {} not found", persistenceId(), clientId); + knownFrontends.put(frontendId, new LeaderFrontendState.Disabled(persistenceId(), clientId, + getDataStore())); + } + } + } + private void onMakeLeaderLocal() { LOG.debug("{}: onMakeLeaderLocal received", persistenceId()); if (isLeader()) { @@ -471,7 +497,8 @@ public class Shard extends RaftActor { } if (cmp > 0) { LOG.debug("{}: rejecting request from outdated client {}", persistenceId(), clientId); - throw new RetiredGenerationException(existing.getIdentifier().getGeneration()); + throw new RetiredGenerationException(clientId.getGeneration(), + existing.getIdentifier().getGeneration()); } LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId); @@ -527,7 +554,7 @@ public class Shard extends RaftActor { final ABIVersion selectedVersion = selectVersion(message); final LeaderFrontendState frontend; if (existing == null) { - frontend = new LeaderFrontendState(persistenceId(), clientId, store); + frontend = new LeaderFrontendState.Enabled(persistenceId(), clientId, store); knownFrontends.put(clientId.getFrontendId(), frontend); LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId); } else { @@ -586,6 +613,10 @@ public class Shard extends RaftActor { return roleChangeNotifier; } + String getShardName() { + return shardName; + } + @Override protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, final short leaderPayloadVersion) { @@ -615,13 +646,14 @@ public class Shard extends RaftActor { } private void handleCommitTransaction(final CommitTransaction commit) { + final TransactionIdentifier txId = commit.getTransactionId(); if (isLeader()) { - commitCoordinator.handleCommit(commit.getTransactionId(), getSender(), this); + askProtocolEncountered(txId); + commitCoordinator.handleCommit(txId, getSender(), this); } else { ActorSelection leader = getLeader(); if (leader == null) { - messageRetrySupport.addMessageToRetry(commit, getSender(), - "Could not commit transaction " + commit.getTransactionId()); + messageRetrySupport.addMessageToRetry(commit, getSender(), "Could not commit transaction " + txId); } else { LOG.debug("{}: Forwarding CommitTransaction to leader {}", persistenceId(), leader); leader.forward(commit, getContext()); @@ -630,15 +662,17 @@ public class Shard extends RaftActor { } private void handleCanCommitTransaction(final CanCommitTransaction canCommit) { - LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionId()); + final TransactionIdentifier txId = canCommit.getTransactionId(); + LOG.debug("{}: Can committing transaction {}", persistenceId(), txId); if (isLeader()) { - commitCoordinator.handleCanCommit(canCommit.getTransactionId(), getSender(), this); + askProtocolEncountered(txId); + commitCoordinator.handleCanCommit(txId, getSender(), this); } else { ActorSelection leader = getLeader(); if (leader == null) { messageRetrySupport.addMessageToRetry(canCommit, getSender(), - "Could not canCommit transaction " + canCommit.getTransactionId()); + "Could not canCommit transaction " + txId); } else { LOG.debug("{}: Forwarding CanCommitTransaction to leader {}", persistenceId(), leader); leader.forward(canCommit, getContext()); @@ -648,6 +682,8 @@ public class Shard extends RaftActor { @SuppressWarnings("checkstyle:IllegalCatch") protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) { + askProtocolEncountered(batched.getTransactionId()); + try { commitCoordinator.handleBatchedModifications(batched, sender, this); } catch (Exception e) { @@ -744,6 +780,7 @@ public class Shard extends RaftActor { boolean isLeaderActive = isLeaderActive(); if (isLeader() && isLeaderActive) { + askProtocolEncountered(forwardedReady.getTransactionId()); commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this); } else { ActorSelection leader = getLeader(); @@ -754,7 +791,8 @@ public class Shard extends RaftActor { LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader); ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionId(), - forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit()); + forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit(), + forwardedReady.getParticipatingShardNames()); readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); leader.forward(readyLocal, getContext()); } @@ -762,7 +800,9 @@ public class Shard extends RaftActor { } private void handleAbortTransaction(final AbortTransaction abort) { - doAbortTransaction(abort.getTransactionId(), getSender()); + final TransactionIdentifier transactionId = abort.getTransactionId(); + askProtocolEncountered(transactionId); + doAbortTransaction(transactionId, getSender()); } void doAbortTransaction(final Identifier transactionID, final ActorRef sender) { @@ -781,13 +821,21 @@ public class Shard extends RaftActor { } private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) { - final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier(); - store.closeTransactionChain(id, null); - store.purgeTransactionChain(id, null); + if (isLeader()) { + final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier(); + askProtocolEncountered(id.getClientId()); + store.closeTransactionChain(id); + } else if (getLeader() != null) { + getLeader().forward(closeTransactionChain, getContext()); + } else { + LOG.warn("{}: Could not close transaction {}", persistenceId(), closeTransactionChain.getIdentifier()); + } } @SuppressWarnings("checkstyle:IllegalCatch") private void createTransaction(final CreateTransaction createTransaction) { + askProtocolEncountered(createTransaction.getTransactionId()); + try { if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY && failIfIsolatedLeader(getSender())) { @@ -810,6 +858,27 @@ public class Shard extends RaftActor { transactionId); } + // Called on leader only + private void askProtocolEncountered(final TransactionIdentifier transactionId) { + askProtocolEncountered(transactionId.getHistoryId().getClientId()); + } + + // Called on leader only + private void askProtocolEncountered(final ClientIdentifier clientId) { + final FrontendIdentifier frontend = clientId.getFrontendId(); + final LeaderFrontendState state = knownFrontends.get(frontend); + if (!(state instanceof LeaderFrontendState.Disabled)) { + LOG.debug("{}: encountered ask-based client {}, disabling transaction tracking", persistenceId(), clientId); + if (knownFrontends.isEmpty()) { + knownFrontends = new HashMap<>(); + } + knownFrontends.put(frontend, new LeaderFrontendState.Disabled(persistenceId(), clientId, getDataStore())); + + persistPayload(clientId, DisableTrackingPayload.create(clientId, + datastoreContext.getInitialPayloadSerializedBufferCapacity()), false); + } + } + private void updateSchemaContext(final UpdateSchemaContext message) { updateSchemaContext(message.getSchemaContext()); } @@ -851,7 +920,7 @@ public class Shard extends RaftActor { if (txCommitTimeoutCheckSchedule == null) { // Schedule a message to be periodically sent to check if the current in-progress // transaction should be expired and aborted. - FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS); + FiniteDuration period = FiniteDuration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS); txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule( period, period, getSelf(), TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender()); @@ -861,6 +930,11 @@ public class Shard extends RaftActor { @Override protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { if (data instanceof Payload) { + if (data instanceof DisableTrackingPayload) { + disableTracking((DisableTrackingPayload) data); + return; + } + try { store.applyReplicatedPayload(identifier, (Payload)data); } catch (DataValidationFailedException | IOException e) { @@ -923,6 +997,8 @@ public class Shard extends RaftActor { messagesToForward.size(), leader); for (Object message : messagesToForward) { + LOG.debug("{}: Forwarding pending transaction message {}", persistenceId(), message); + leader.tell(message, self()); } } @@ -957,8 +1033,10 @@ public class Shard extends RaftActor { paused = true; // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused. - knownFrontends.values().forEach(LeaderFrontendState::retire); - knownFrontends = ImmutableMap.of(); + if (datastoreContext.isUseTellBasedProtocol()) { + knownFrontends.values().forEach(LeaderFrontendState::retire); + knownFrontends = ImmutableMap.of(); + } store.setRunOnPendingTransactionsComplete(operation); }