X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=d9483d7b2b61915295eef094fc8c8570f0b63eaa;hb=refs%2Fchanges%2F36%2F60436%2F1;hp=aca69c2a5f5f5956893d247ac9fa01f2e5277d89;hpb=ec870dee9bacb971f11bc747b69e84ac37f5d746;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index aca69c2a5f..d9483d7b2b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -46,9 +46,12 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException; import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; +import org.opendaylight.controller.cluster.access.concepts.SliceableMessage; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; import org.opendaylight.controller.cluster.common.actor.CommonConfig; +import org.opendaylight.controller.cluster.common.actor.Dispatchers; +import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherType; import org.opendaylight.controller.cluster.common.actor.MessageTracker; import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; @@ -78,7 +81,8 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot; -import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; +import org.opendaylight.controller.cluster.messaging.MessageSlicer; +import org.opendaylight.controller.cluster.messaging.SliceOptions; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; @@ -98,6 +102,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailed import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -125,6 +130,13 @@ public class Shard extends RaftActor { } }; + static final Object RESUME_NEXT_PENDING_TRANSACTION = new Object() { + @Override + public String toString() { + return "resumeNextPendingTransaction"; + } + }; + // FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant. public static final String DEFAULT_NAME = "default"; @@ -173,6 +185,10 @@ public class Shard extends RaftActor { private final FrontendMetadata frontendMetadata; private Map knownFrontends = ImmutableMap.of(); + private boolean paused; + + private final MessageSlicer responseMessageSlicer; + private final Dispatchers dispatchers; protected Shard(final AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), @@ -216,14 +232,20 @@ public class Shard extends RaftActor { appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class, getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); + dispatchers = new Dispatchers(context().system().dispatchers()); transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext, - new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Transaction), + dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean, builder.getId().getShardName()); snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG, this.name); messageRetrySupport = new ShardTransactionMessageRetrySupport(this); + + responseMessageSlicer = MessageSlicer.builder().logContext(this.name) + .messageSliceSize(datastoreContext.getMaximumMessageSliceSize()) + .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory()) + .expireStateAfterInactivity(2, TimeUnit.MINUTES).build(); } private void setTransactionCommitTimeout() { @@ -265,7 +287,6 @@ public class Shard extends RaftActor { } } - @SuppressWarnings("checkstyle:IllegalCatch") @Override protected void handleNonRaftCommand(final Object message) { try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) { @@ -275,23 +296,10 @@ public class Shard extends RaftActor { maybeError.get()); } - if (message instanceof RequestEnvelope) { - final long now = ticker().read(); - final RequestEnvelope envelope = (RequestEnvelope)message; + store.resetTransactionBatch(); - try { - final RequestSuccess success = handleRequest(envelope, now); - if (success != null) { - envelope.sendSuccess(success, ticker().read() - now); - } - } catch (RequestException e) { - LOG.debug("{}: request {} failed", persistenceId(), envelope, e); - envelope.sendFailure(e, ticker().read() - now); - } catch (Exception e) { - LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e); - envelope.sendFailure(new RuntimeRequestException("Request failed to process", e), - ticker().read() - now); - } + if (message instanceof RequestEnvelope) { + handleRequestEnvelope((RequestEnvelope)message); } else if (message instanceof ConnectClientRequest) { handleConnectClient((ConnectClientRequest)message); } else if (CreateTransaction.isSerializedType(message)) { @@ -345,12 +353,43 @@ public class Shard extends RaftActor { persistPayload(txId, AbortTransactionPayload.create(txId), true); } else if (message instanceof MakeLeaderLocal) { onMakeLeaderLocal(); - } else { + } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) { + store.resumeNextPendingTransaction(); + } else if (!responseMessageSlicer.handleMessage(message)) { super.handleNonRaftCommand(message); } } } + @SuppressWarnings("checkstyle:IllegalCatch") + private void handleRequestEnvelope(final RequestEnvelope envelope) { + final long now = ticker().read(); + try { + final RequestSuccess success = handleRequest(envelope, now); + if (success != null) { + final long executionTimeNanos = ticker().read() - now; + if (success instanceof SliceableMessage) { + dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> + responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget()) + .message(envelope.newSuccessEnvelope(success, executionTimeNanos)) + .sendTo(envelope.getMessage().getReplyTo()).replyTo(self()) + .onFailureCallback(t -> { + LOG.warn("Error slicing response {}", success, t); + }).build())); + } else { + envelope.sendSuccess(success, executionTimeNanos); + } + } + } catch (RequestException e) { + LOG.debug("{}: request {} failed", persistenceId(), envelope, e); + envelope.sendFailure(e, ticker().read() - now); + } catch (Exception e) { + LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e); + envelope.sendFailure(new RuntimeRequestException("Request failed to process", e), + ticker().read() - now); + } + } + private void onMakeLeaderLocal() { LOG.debug("{}: onMakeLeaderLocal received", persistenceId()); if (isLeader()) { @@ -420,7 +459,9 @@ public class Shard extends RaftActor { private void handleConnectClient(final ConnectClientRequest message) { try { if (!isLeader() || !isLeaderActive()) { - LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), message); + LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {}," + + "isLeadershipTransferInProgress: {}.", + persistenceId(), message, isLeader(), isLeaderActive(), isLeadershipTransferInProgress()); throw new NotLeaderException(getSelf()); } @@ -438,8 +479,10 @@ public class Shard extends RaftActor { private @Nullable RequestSuccess handleRequest(final RequestEnvelope envelope, final long now) throws RequestException { // We are not the leader, hence we want to fail-fast. - if (!isLeader() || !isLeaderActive()) { - LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope); + if (!isLeader() || paused || !isLeaderActive()) { + LOG.debug("{}: not currently active leader, rejecting request {}. isLeader: {}, isLeaderActive: {}," + + "isLeadershipTransferInProgress: {}, paused: {}", + persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress(), paused); throw new NotLeaderException(getSelf()); } @@ -453,7 +496,7 @@ public class Shard extends RaftActor { final ClientIdentifier clientId = lhReq.getTarget().getClientId(); return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now); } else { - LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request); + LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request); throw new UnsupportedRequestException(request); } } @@ -772,6 +815,7 @@ public class Shard extends RaftActor { persistenceId(), getId()); } + paused = false; store.purgeLeaderState(); } @@ -783,19 +827,19 @@ public class Shard extends RaftActor { @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) { shardMBean.incrementLeadershipChangeCount(); + paused = false; - final boolean hasLeader = hasLeader(); - if (!hasLeader) { - // No leader implies we are not the leader, lose frontend state if we have any. This also places - // an explicit guard so the map will not get modified accidentally. + if (!isLeader()) { if (!knownFrontends.isEmpty()) { LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet()); knownFrontends = ImmutableMap.of(); } - return; - } - if (!isLeader()) { + if (!hasLeader()) { + // No leader anywhere, nothing else to do + return; + } + // Another leader was elected. If we were the previous leader and had pending transactions, convert // them to transaction messages and send to the new leader. ActorSelection leader = getLeader(); @@ -838,9 +882,26 @@ public class Shard extends RaftActor { @Override protected void pauseLeader(final Runnable operation) { LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation); + paused = true; + + // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused. + knownFrontends.values().forEach(LeaderFrontendState::retire); + knownFrontends = ImmutableMap.of(); + store.setRunOnPendingTransactionsComplete(operation); } + @Override + protected void unpauseLeader() { + LOG.debug("{}: In unpauseLeader", persistenceId()); + paused = false; + + store.setRunOnPendingTransactionsComplete(null); + + // Restore tell-based protocol state as if we were becoming the leader + knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this)); + } + @Override protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors()) @@ -881,7 +942,7 @@ public class Shard extends RaftActor { private ShardIdentifier id; private Map peerAddresses = Collections.emptyMap(); private DatastoreContext datastoreContext; - private SchemaContext schemaContext; + private SchemaContextProvider schemaContextProvider; private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot; private TipProducingDataTree dataTree; private volatile boolean sealed; @@ -917,9 +978,9 @@ public class Shard extends RaftActor { return self(); } - public T schemaContext(final SchemaContext newSchemaContext) { + public T schemaContextProvider(final SchemaContextProvider schemaContextProvider) { checkSealed(); - this.schemaContext = newSchemaContext; + this.schemaContextProvider = Preconditions.checkNotNull(schemaContextProvider); return self(); } @@ -948,7 +1009,7 @@ public class Shard extends RaftActor { } public SchemaContext getSchemaContext() { - return schemaContext; + return Verify.verifyNotNull(schemaContextProvider.getSchemaContext()); } public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() { @@ -975,7 +1036,7 @@ public class Shard extends RaftActor { Preconditions.checkNotNull(id, "id should not be null"); Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null"); Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null"); - Preconditions.checkNotNull(schemaContext, "schemaContext should not be null"); + Preconditions.checkNotNull(schemaContextProvider, "schemaContextProvider should not be null"); } public Props props() { @@ -994,4 +1055,8 @@ public class Shard extends RaftActor { Ticker ticker() { return Ticker.systemTicker(); } + + void scheduleNextPendingTransaction() { + self().tell(RESUME_NEXT_PENDING_TRANSACTION, ActorRef.noSender()); + } }