X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=10cef862fce2e42f731ebe01ff6775348c65476a;hp=959b37cb7b03c2d5cc0cf2d410965b3c39bb6223;hb=de1ed2cb86a3c897d307a4b73a89384465c3ca6f;hpb=60b94e13e2c92ea7d3ef39d4aa6b96674a9415b2 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 959b37cb7b..10cef862fc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -46,10 +46,12 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException; import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; +import org.opendaylight.controller.cluster.access.concepts.SliceableMessage; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.common.actor.Dispatchers; +import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherType; import org.opendaylight.controller.cluster.common.actor.MessageTracker; import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; @@ -79,6 +81,8 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot; +import org.opendaylight.controller.cluster.messaging.MessageSlicer; +import org.opendaylight.controller.cluster.messaging.SliceOptions; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; @@ -182,6 +186,9 @@ public class Shard extends RaftActor { private final FrontendMetadata frontendMetadata; private Map knownFrontends = ImmutableMap.of(); + private final MessageSlicer responseMessageSlicer; + private final Dispatchers dispatchers; + protected Shard(final AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION); @@ -224,14 +231,20 @@ public class Shard extends RaftActor { appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class, getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); + dispatchers = new Dispatchers(context().system().dispatchers()); transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext, - new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Transaction), + dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean, builder.getId().getShardName()); snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG, this.name); messageRetrySupport = new ShardTransactionMessageRetrySupport(this); + + responseMessageSlicer = MessageSlicer.builder().logContext(this.name) + .messageSliceSize(datastoreContext.getMaximumMessageSliceSize()) + .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory()) + .expireStateAfterInactivity(2, TimeUnit.MINUTES).build(); } private void setTransactionCommitTimeout() { @@ -273,7 +286,6 @@ public class Shard extends RaftActor { } } - @SuppressWarnings("checkstyle:IllegalCatch") @Override protected void handleNonRaftCommand(final Object message) { try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) { @@ -286,22 +298,7 @@ public class Shard extends RaftActor { store.resetTransactionBatch(); if (message instanceof RequestEnvelope) { - final long now = ticker().read(); - final RequestEnvelope envelope = (RequestEnvelope)message; - - try { - final RequestSuccess success = handleRequest(envelope, now); - if (success != null) { - envelope.sendSuccess(success, ticker().read() - now); - } - } catch (RequestException e) { - LOG.debug("{}: request {} failed", persistenceId(), envelope, e); - envelope.sendFailure(e, ticker().read() - now); - } catch (Exception e) { - LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e); - envelope.sendFailure(new RuntimeRequestException("Request failed to process", e), - ticker().read() - now); - } + handleRequestEnvelope((RequestEnvelope)message); } else if (message instanceof ConnectClientRequest) { handleConnectClient((ConnectClientRequest)message); } else if (CreateTransaction.isSerializedType(message)) { @@ -357,12 +354,41 @@ public class Shard extends RaftActor { onMakeLeaderLocal(); } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) { store.resumeNextPendingTransaction(); - } else { + } else if (!responseMessageSlicer.handleMessage(message)) { super.handleNonRaftCommand(message); } } } + @SuppressWarnings("checkstyle:IllegalCatch") + private void handleRequestEnvelope(final RequestEnvelope envelope) { + final long now = ticker().read(); + try { + final RequestSuccess success = handleRequest(envelope, now); + if (success != null) { + final long executionTimeNanos = ticker().read() - now; + if (success instanceof SliceableMessage) { + dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> + responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget()) + .message(envelope.newSuccessEnvelope(success, executionTimeNanos)) + .sendTo(envelope.getMessage().getReplyTo()).replyTo(self()) + .onFailureCallback(t -> { + LOG.warn("Error slicing response {}", success, t); + }).build())); + } else { + envelope.sendSuccess(success, executionTimeNanos); + } + } + } catch (RequestException e) { + LOG.debug("{}: request {} failed", persistenceId(), envelope, e); + envelope.sendFailure(e, ticker().read() - now); + } catch (Exception e) { + LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e); + envelope.sendFailure(new RuntimeRequestException("Request failed to process", e), + ticker().read() - now); + } + } + private void onMakeLeaderLocal() { LOG.debug("{}: onMakeLeaderLocal received", persistenceId()); if (isLeader()) { @@ -857,6 +883,12 @@ public class Shard extends RaftActor { store.setRunOnPendingTransactionsComplete(operation); } + @Override + protected void unpauseLeader() { + LOG.debug("{}: In unpauseLeader", persistenceId()); + store.setRunOnPendingTransactionsComplete(null); + } + @Override protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())