X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=dbcbd3d02eff96f393e389d6b92ee5487b83ce18;hp=10cef862fce2e42f731ebe01ff6775348c65476a;hb=a6af137c30470b86d4bc624d4c48cb686495a182;hpb=de1ed2cb86a3c897d307a4b73a89384465c3ca6f diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 10cef862fc..dbcbd3d02e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -11,9 +11,11 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; +import akka.actor.ExtendedActorSystem; import akka.actor.Props; import akka.actor.Status; import akka.actor.Status.Failure; +import akka.serialization.JavaSerializer; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; @@ -36,6 +38,7 @@ import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.NotLeaderException; +import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; @@ -57,6 +60,7 @@ import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardDataTreeListenerInfoMXBeanImpl; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; @@ -74,13 +78,13 @@ import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; -import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot; +import org.opendaylight.controller.cluster.messaging.MessageAssembler; import org.opendaylight.controller.cluster.messaging.MessageSlicer; import org.opendaylight.controller.cluster.messaging.SliceOptions; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; @@ -98,8 +102,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider; @@ -157,8 +161,12 @@ public class Shard extends RaftActor { /// The name of this shard private final String name; + private final String shardName; + private final ShardStats shardMBean; + private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean; + private DatastoreContext datastoreContext; private final ShardCommitCoordinator commitCoordinator; @@ -176,8 +184,6 @@ public class Shard extends RaftActor { private final ShardSnapshotCohort snapshotCohort; private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this); - private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this); - private ShardSnapshot restoreFromSnapshot; @@ -185,15 +191,19 @@ public class Shard extends RaftActor { private final FrontendMetadata frontendMetadata; private Map knownFrontends = ImmutableMap.of(); + private boolean paused; private final MessageSlicer responseMessageSlicer; private final Dispatchers dispatchers; + private final MessageAssembler requestMessageAssembler; + protected Shard(final AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION); this.name = builder.getId().toString(); + this.shardName = builder.getId().getShardName(); this.datastoreContext = builder.getDatastoreContext(); this.restoreFromSnapshot = builder.getRestoreFromSnapshot(); this.frontendMetadata = new FrontendMetadata(name); @@ -204,15 +214,12 @@ public class Shard extends RaftActor { ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher = new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name); - ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher = - new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name); if (builder.getDataTree() != null) { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(), - treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata); + treeChangeListenerPublisher, name, frontendMetadata); } else { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(), - builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, - dataChangeListenerPublisher, name, frontendMetadata); + builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name, frontendMetadata); } shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this); @@ -245,6 +252,15 @@ public class Shard extends RaftActor { .messageSliceSize(datastoreContext.getMaximumMessageSliceSize()) .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory()) .expireStateAfterInactivity(2, TimeUnit.MINUTES).build(); + + requestMessageAssembler = MessageAssembler.builder().logContext(this.name) + .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory()) + .assembledMessageCallback((message, sender) -> self().tell(message, sender)) + .expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build(); + + listenerInfoMXBean = new ShardDataTreeListenerInfoMXBeanImpl(name, datastoreContext.getDataStoreMXBeanType(), + self()); + listenerInfoMXBean.register(); } private void setTransactionCommitTimeout() { @@ -273,6 +289,7 @@ public class Shard extends RaftActor { commitCoordinator.abortPendingTransactions("Transaction aborted due to shutdown.", this); shardMBean.unregisterMBean(); + listenerInfoMXBean.unregister(); } @Override @@ -299,6 +316,8 @@ public class Shard extends RaftActor { if (message instanceof RequestEnvelope) { handleRequestEnvelope((RequestEnvelope)message); + } else if (MessageAssembler.isHandledMessage(message)) { + handleRequestAssemblerMessage(message); } else if (message instanceof ConnectClientRequest) { handleConnectClient((ConnectClientRequest)message); } else if (CreateTransaction.isSerializedType(message)) { @@ -317,8 +336,6 @@ public class Shard extends RaftActor { handleAbortTransaction(AbortTransaction.fromSerializable(message)); } else if (CloseTransactionChain.isSerializedType(message)) { closeTransactionChain(CloseTransactionChain.fromSerializable(message)); - } else if (message instanceof RegisterChangeListener) { - changeSupport.onMessage((RegisterChangeListener) message, isLeader(), hasLeader()); } else if (message instanceof RegisterDataTreeChangeListener) { treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader()); } else if (message instanceof UpdateSchemaContext) { @@ -327,8 +344,7 @@ public class Shard extends RaftActor { PeerAddressResolved resolved = (PeerAddressResolved) message; setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress()); } else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) { - store.checkForExpiredTransactions(transactionCommitTimeout); - commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this); + commitTimeoutCheck(); } else if (message instanceof DatastoreContext) { onDatastoreContext((DatastoreContext)message); } else if (message instanceof RegisterRoleChangeListener) { @@ -360,6 +376,13 @@ public class Shard extends RaftActor { } } + private void handleRequestAssemblerMessage(final Object message) { + dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> { + JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) context().system()); + requestMessageAssembler.handleMessage(message, self()); + }); + } + @SuppressWarnings("checkstyle:IllegalCatch") private void handleRequestEnvelope(final RequestEnvelope envelope) { final long now = ticker().read(); @@ -372,9 +395,7 @@ public class Shard extends RaftActor { responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget()) .message(envelope.newSuccessEnvelope(success, executionTimeNanos)) .sendTo(envelope.getMessage().getReplyTo()).replyTo(self()) - .onFailureCallback(t -> { - LOG.warn("Error slicing response {}", success, t); - }).build())); + .onFailureCallback(t -> LOG.warn("Error slicing response {}", success, t)).build())); } else { envelope.sendSuccess(success, executionTimeNanos); } @@ -389,6 +410,30 @@ public class Shard extends RaftActor { } } + private void commitTimeoutCheck() { + store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess); + commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this); + requestMessageAssembler.checkExpiredAssembledMessageState(); + } + + private Optional updateAccess(final SimpleShardDataTreeCohort cohort) { + final FrontendIdentifier frontend = cohort.getIdentifier().getHistoryId().getClientId().getFrontendId(); + final LeaderFrontendState state = knownFrontends.get(frontend); + if (state == null) { + // Not tell-based protocol, do nothing + return Optional.absent(); + } + + if (isIsolatedLeader()) { + // We are isolated and no new request can come through until we emerge from it. We are still updating + // liveness of frontend when we see it attempting to communicate. Use the last access timer. + return Optional.of(state.getLastSeenTicks()); + } + + // If this frontend has freshly connected, give it some time to catch up before killing its transactions. + return Optional.of(state.getLastConnectTicks()); + } + private void onMakeLeaderLocal() { LOG.debug("{}: onMakeLeaderLocal received", persistenceId()); if (isLeader()) { @@ -416,16 +461,19 @@ public class Shard extends RaftActor { } // Acquire our frontend tracking handle and verify generation matches - private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException { + @Nullable + private LeaderFrontendState findFrontend(final ClientIdentifier clientId) throws RequestException { final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId()); if (existing != null) { final int cmp = Long.compareUnsigned(existing.getIdentifier().getGeneration(), clientId.getGeneration()); if (cmp == 0) { + existing.touch(); return existing; } if (cmp > 0) { LOG.debug("{}: rejecting request from outdated client {}", persistenceId(), clientId); - throw new RetiredGenerationException(existing.getIdentifier().getGeneration()); + throw new RetiredGenerationException(clientId.getGeneration(), + existing.getIdentifier().getGeneration()); } LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId); @@ -435,13 +483,21 @@ public class Shard extends RaftActor { LOG.debug("{}: client {} is not yet known", persistenceId(), clientId); } - final LeaderFrontendState ret = new LeaderFrontendState(persistenceId(), clientId, store); - knownFrontends.put(clientId.getFrontendId(), ret); - LOG.debug("{}: created state {} for client {}", persistenceId(), ret, clientId); - return ret; + return null; + } + + private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException { + final LeaderFrontendState ret = findFrontend(clientId); + if (ret != null) { + return ret; + } + + // TODO: a dedicated exception would be better, but this is technically true, too + throw new OutOfSequenceEnvelopeException(0); } - private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) { + @Nonnull + private static ABIVersion selectVersion(final ConnectClientRequest message) { final Range clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion()); for (ABIVersion v : SUPPORTED_ABIVERSIONS) { if (clientRange.contains(v)) { @@ -457,6 +513,12 @@ public class Shard extends RaftActor { @SuppressWarnings("checkstyle:IllegalCatch") private void handleConnectClient(final ConnectClientRequest message) { try { + final ClientIdentifier clientId = message.getTarget(); + final LeaderFrontendState existing = findFrontend(clientId); + if (existing != null) { + existing.touch(); + } + if (!isLeader() || !isLeaderActive()) { LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {}," + "isLeadershipTransferInProgress: {}.", @@ -465,7 +527,15 @@ public class Shard extends RaftActor { } final ABIVersion selectedVersion = selectVersion(message); - final LeaderFrontendState frontend = getFrontend(message.getTarget()); + final LeaderFrontendState frontend; + if (existing == null) { + frontend = new LeaderFrontendState(persistenceId(), clientId, store); + knownFrontends.put(clientId.getFrontendId(), frontend); + LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId); + } else { + frontend = existing; + } + frontend.reconnect(); message.getReplyTo().tell(new ConnectClientSuccess(message.getTarget(), message.getSequence(), getSelf(), ImmutableList.of(), store.getDataTree(), CLIENT_MAX_MESSAGES).toVersion(selectedVersion), @@ -475,13 +545,14 @@ public class Shard extends RaftActor { } } - private @Nullable RequestSuccess handleRequest(final RequestEnvelope envelope, final long now) + @Nullable + private RequestSuccess handleRequest(final RequestEnvelope envelope, final long now) throws RequestException { // We are not the leader, hence we want to fail-fast. - if (!isLeader() || !isLeaderActive()) { - LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {}," - + "isLeadershipTransferInProgress: {}.", - persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress()); + if (!isLeader() || paused || !isLeaderActive()) { + LOG.debug("{}: not currently active leader, rejecting request {}. isLeader: {}, isLeaderActive: {}," + + "isLeadershipTransferInProgress: {}, paused: {}", + persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress(), paused); throw new NotLeaderException(getSelf()); } @@ -517,6 +588,10 @@ public class Shard extends RaftActor { return roleChangeNotifier; } + String getShardName() { + return shardName; + } + @Override protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, final short leaderPayloadVersion) { @@ -608,7 +683,7 @@ public class Shard extends RaftActor { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) { messageRetrySupport.addMessageToRetry(batched, getSender(), - "Could not commit transaction " + batched.getTransactionId()); + "Could not process BatchedModifications " + batched.getTransactionId()); } else { // If this is not the first batch and leadership changed in between batched messages, // we need to reconstruct previous BatchedModifications from the transaction @@ -661,7 +736,7 @@ public class Shard extends RaftActor { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) { messageRetrySupport.addMessageToRetry(message, getSender(), - "Could not commit transaction " + message.getTransactionId()); + "Could not process ready local transaction " + message.getTransactionId()); } else { LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader); message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); @@ -680,12 +755,13 @@ public class Shard extends RaftActor { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) { messageRetrySupport.addMessageToRetry(forwardedReady, getSender(), - "Could not commit transaction " + forwardedReady.getTransactionId()); + "Could not process forwarded ready transaction " + forwardedReady.getTransactionId()); } else { LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader); ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionId(), - forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit()); + forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit(), + forwardedReady.getParticipatingShardNames()); readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); leader.forward(readyLocal, getContext()); } @@ -764,8 +840,11 @@ public class Shard extends RaftActor { @Override @Nonnull protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { - return new ShardRecoveryCoordinator(store, - restoreFromSnapshot != null ? restoreFromSnapshot.getSnapshot() : null, persistenceId(), LOG); + if (restoreFromSnapshot == null) { + return ShardRecoveryCoordinator.create(store, persistenceId(), LOG); + } + + return ShardRecoveryCoordinator.forSnapshot(store, persistenceId(), LOG, restoreFromSnapshot.getSnapshot()); } @Override @@ -803,7 +882,6 @@ public class Shard extends RaftActor { protected void onStateChanged() { boolean isLeader = isLeader(); boolean hasLeader = hasLeader(); - changeSupport.onLeadershipChange(isLeader, hasLeader); treeChangeSupport.onLeadershipChange(isLeader, hasLeader); // If this actor is no longer the leader close all the transaction chains @@ -814,6 +892,7 @@ public class Shard extends RaftActor { persistenceId(), getId()); } + paused = false; store.purgeLeaderState(); } @@ -825,19 +904,21 @@ public class Shard extends RaftActor { @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) { shardMBean.incrementLeadershipChangeCount(); + paused = false; - final boolean hasLeader = hasLeader(); - if (!hasLeader) { - // No leader implies we are not the leader, lose frontend state if we have any. This also places - // an explicit guard so the map will not get modified accidentally. + if (!isLeader()) { if (!knownFrontends.isEmpty()) { LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet()); knownFrontends = ImmutableMap.of(); } - return; - } - if (!isLeader()) { + requestMessageAssembler.close(); + + if (!hasLeader()) { + // No leader anywhere, nothing else to do + return; + } + // Another leader was elected. If we were the previous leader and had pending transactions, convert // them to transaction messages and send to the new leader. ActorSelection leader = getLeader(); @@ -849,6 +930,8 @@ public class Shard extends RaftActor { messagesToForward.size(), leader); for (Object message : messagesToForward) { + LOG.debug("{}: Forwarding pending transaction message {}", persistenceId(), message); + leader.tell(message, self()); } } @@ -880,19 +963,29 @@ public class Shard extends RaftActor { @Override protected void pauseLeader(final Runnable operation) { LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation); + paused = true; + + // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused. + knownFrontends.values().forEach(LeaderFrontendState::retire); + knownFrontends = ImmutableMap.of(); + store.setRunOnPendingTransactionsComplete(operation); } @Override protected void unpauseLeader() { LOG.debug("{}: In unpauseLeader", persistenceId()); + paused = false; + store.setRunOnPendingTransactionsComplete(null); + + // Restore tell-based protocol state as if we were becoming the leader + knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this)); } @Override - protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { + protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors()) - .dataChangeListenerActors(changeSupport.getListenerActors()) .commitCohortActors(store.getCohortActors()); } @@ -931,7 +1024,7 @@ public class Shard extends RaftActor { private DatastoreContext datastoreContext; private SchemaContextProvider schemaContextProvider; private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot; - private TipProducingDataTree dataTree; + private DataTree dataTree; private volatile boolean sealed; protected AbstractBuilder(final Class shardClass) { @@ -965,9 +1058,9 @@ public class Shard extends RaftActor { return self(); } - public T schemaContextProvider(final SchemaContextProvider schemaContextProvider) { + public T schemaContextProvider(final SchemaContextProvider newSchemaContextProvider) { checkSealed(); - this.schemaContextProvider = Preconditions.checkNotNull(schemaContextProvider); + this.schemaContextProvider = Preconditions.checkNotNull(newSchemaContextProvider); return self(); } @@ -977,7 +1070,7 @@ public class Shard extends RaftActor { return self(); } - public T dataTree(final TipProducingDataTree newDataTree) { + public T dataTree(final DataTree newDataTree) { checkSealed(); this.dataTree = newDataTree; return self(); @@ -1003,7 +1096,7 @@ public class Shard extends RaftActor { return restoreFromSnapshot; } - public TipProducingDataTree getDataTree() { + public DataTree getDataTree() { return dataTree; } @@ -1034,7 +1127,7 @@ public class Shard extends RaftActor { } public static class Builder extends AbstractBuilder { - private Builder() { + Builder() { super(Shard.class); } }