From b5db7d0971de9d84289bc4e46ed7aad1f014a41a Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Thu, 22 Jun 2017 10:20:11 -0400 Subject: [PATCH] Bug 7449: Slice ReadTransactionSuccess response Added slicing of the ReadTransactionSuccess message. The slicing is initiated by the Shard usung a MessageSlicer and re-assembly is done by the ClientActorBehavior on the FE. Introduced a SliceableMessage interface implemented by ReadTransactionSuccess which Shard checks for to determine if the response message should be sliced. Change-Id: Ie55e35aa82a9d2bc21f7a8f24396cb4df467252e Signed-off-by: Tom Pantelis Signed-off-by: Robert Varga --- .../commands/ReadTransactionSuccess.java | 4 +- .../access/concepts/RequestEnvelope.java | 13 +++- .../access/concepts/SliceableMessage.java | 20 ++++++ .../access/client/AbstractClientActor.java | 10 +++ .../client/AbstractClientActorBehavior.java | 6 +- .../access/client/ClientActorBehavior.java | 23 +++++++ .../access/client/ClientActorContext.java | 17 +++-- .../client/InitialClientActorContext.java | 4 +- .../client/AbstractClientConnectionTest.java | 4 +- .../access/client/AccessClientUtil.java | 3 +- .../access/client/ClientActorContextTest.java | 4 +- .../controller/cluster/datastore/Shard.java | 64 +++++++++++++------ ...butedDataStoreRemotingIntegrationTest.java | 16 +++++ .../test/resources/simplelogger.properties | 1 + 14 files changed, 155 insertions(+), 34 deletions(-) create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SliceableMessage.java diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccess.java index 733ce71d6c..77bd430fc5 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccess.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccess.java @@ -11,6 +11,7 @@ import com.google.common.annotations.Beta; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.SliceableMessage; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -21,7 +22,8 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; * @author Robert Varga */ @Beta -public final class ReadTransactionSuccess extends TransactionSuccess { +public final class ReadTransactionSuccess extends TransactionSuccess + implements SliceableMessage { private static final long serialVersionUID = 1L; private final Optional> data; diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelope.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelope.java index cb9034d324..46d5d1f996 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelope.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelope.java @@ -40,7 +40,18 @@ public final class RequestEnvelope extends Envelope> { * @throws NullPointerException if success is null */ public void sendSuccess(final RequestSuccess success, final long executionTimeNanos) { - sendResponse(new SuccessEnvelope(success, getSessionId(), getTxSequence(), executionTimeNanos)); + sendResponse(newSuccessEnvelope(success, executionTimeNanos)); + } + + /** + * Creates a successful ResponseEnvelope that wraps the given successful Request response message. + * + * @param success the successful Request response message + * @param executionTimeNanos the execution time of the request + * @return a {@link ResponseEnvelope} instance + */ + public ResponseEnvelope newSuccessEnvelope(final RequestSuccess success, final long executionTimeNanos) { + return new SuccessEnvelope(success, getSessionId(), getTxSequence(), executionTimeNanos); } private void sendResponse(final ResponseEnvelope envelope) { diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SliceableMessage.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SliceableMessage.java new file mode 100644 index 0000000000..cd3e2608d6 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SliceableMessage.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.concepts; + +import com.google.common.annotations.Beta; + +/** + * A tagging interface that specifies a message whose serialized size can be large and thus should be sliced into + * smaller chunks when transporting over the wire. + * + * @author Thomas Pantelis + */ +@Beta +public interface SliceableMessage { +} diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActor.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActor.java index 2fb4c94307..7b592fbdb2 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActor.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActor.java @@ -35,6 +35,15 @@ public abstract class AbstractClientActor extends UntypedPersistentActor { return currentBehavior.persistenceId(); } + @Override + public void postStop() { + if (currentBehavior != null) { + currentBehavior.close(); + } + + super.postStop(); + } + private void switchBehavior(final AbstractClientActorBehavior nextBehavior) { if (!currentBehavior.equals(nextBehavior)) { if (nextBehavior == null) { @@ -44,6 +53,7 @@ public abstract class AbstractClientActor extends UntypedPersistentActor { LOG.debug("{}: switched from {} to {}", persistenceId(), currentBehavior, nextBehavior); } + currentBehavior.close(); currentBehavior = nextBehavior; } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActorBehavior.java index 0a49fc03a1..f80ae7c6aa 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActorBehavior.java @@ -21,7 +21,7 @@ import javax.annotation.Nullable; * @author Robert Varga */ @Beta -public abstract class AbstractClientActorBehavior { +public abstract class AbstractClientActorBehavior implements AutoCloseable { private final C context; AbstractClientActorBehavior(@Nonnull final C context) { @@ -60,6 +60,10 @@ public abstract class AbstractClientActorBehavior extends private final Map> connections = new ConcurrentHashMap<>(); private final InversibleLock connectionsLock = new InversibleLock(); private final BackendInfoResolver resolver; + private final MessageAssembler responseMessageAssembler; protected ClientActorBehavior(@Nonnull final ClientActorContext context, @Nonnull final BackendInfoResolver resolver) { super(context); this.resolver = Preconditions.checkNotNull(resolver); + + final ClientActorConfig config = context.config(); + responseMessageAssembler = MessageAssembler.builder().logContext(persistenceId()) + .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(), + config.getTempFileDirectory())) + .assembledMessageCallback((message, sender) -> context.self().tell(message, sender)).build(); } @Override @@ -92,6 +102,11 @@ public abstract class ClientActorBehavior extends return context().getIdentifier(); } + @Override + public void close() { + responseMessageAssembler.close(); + } + /** * Get a connection to a shard. * @@ -121,13 +136,21 @@ public abstract class ClientActorBehavior extends if (command instanceof InternalCommand) { return ((InternalCommand) command).execute(this); } + if (command instanceof SuccessEnvelope) { return onRequestSuccess((SuccessEnvelope) command); } + if (command instanceof FailureEnvelope) { return internalOnRequestFailure((FailureEnvelope) command); } + if (MessageAssembler.isHandledMessage(command)) { + context().dispatchers().getDispatcher(DispatcherType.Serialization).execute( + () -> responseMessageAssembler.handleMessage(command, context().self())); + return this; + } + return onCommand(command); } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java index 5520359802..3ed207ef6f 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.access.client; import akka.actor.ActorRef; +import akka.actor.ActorSystem; import akka.actor.Cancellable; import akka.actor.Scheduler; import com.google.common.annotations.Beta; @@ -16,6 +17,7 @@ import com.google.common.base.Ticker; import javax.annotation.Nonnull; import javax.annotation.concurrent.ThreadSafe; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.common.actor.Dispatchers; import org.opendaylight.yangtools.concepts.Identifiable; import scala.concurrent.ExecutionContext; import scala.concurrent.duration.FiniteDuration; @@ -36,15 +38,17 @@ public class ClientActorContext extends AbstractClientActorContext implements Id private final ExecutionContext executionContext; private final ClientIdentifier identifier; private final Scheduler scheduler; + private final Dispatchers dispatchers; private final ClientActorConfig config; // Hidden to avoid subclassing - ClientActorContext(final ActorRef self, final Scheduler scheduler, final ExecutionContext executionContext, - final String persistenceId, final ClientIdentifier identifier, final ClientActorConfig config) { + ClientActorContext(final ActorRef self, final String persistenceId, final ActorSystem system, + final ClientIdentifier identifier, final ClientActorConfig config) { super(self, persistenceId); this.identifier = Preconditions.checkNotNull(identifier); - this.scheduler = Preconditions.checkNotNull(scheduler); - this.executionContext = Preconditions.checkNotNull(executionContext); + this.scheduler = Preconditions.checkNotNull(system).scheduler(); + this.executionContext = system.dispatcher(); + this.dispatchers = new Dispatchers(system.dispatchers()); this.config = Preconditions.checkNotNull(config); } @@ -59,6 +63,11 @@ public class ClientActorContext extends AbstractClientActorContext implements Id return config; } + @Nonnull + public Dispatchers dispatchers() { + return dispatchers; + } + /** * Return the time ticker for this {@link ClientActorContext}. This should be used for in all time-tracking * done within a client actor. Subclasses of {@link ClientActorBehavior} are encouraged to use diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InitialClientActorContext.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InitialClientActorContext.java index 91675c1895..4449dd2fb9 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InitialClientActorContext.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InitialClientActorContext.java @@ -35,8 +35,8 @@ final class InitialClientActorContext extends AbstractClientActorContext { ClientActorBehavior createBehavior(final ClientIdentifier clientId) { final ActorSystem system = actor.getContext().system(); - final ClientActorContext context = new ClientActorContext(self(), system.scheduler(), system.dispatcher(), - persistenceId(), clientId, actor.getClientActorConfig()); + final ClientActorContext context = new ClientActorContext(self(), persistenceId(), system, + clientId, actor.getClientActorConfig()); return actor.initialBehavior(context); } diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java index f2c42e18d4..9cd2d1cb5e 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java @@ -65,8 +65,8 @@ public abstract class AbstractClientConnectionTest knownFrontends = ImmutableMap.of(); + private final MessageSlicer responseMessageSlicer; + private final Dispatchers dispatchers; + protected Shard(final AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION); @@ -224,14 +231,20 @@ public class Shard extends RaftActor { appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class, getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); + dispatchers = new Dispatchers(context().system().dispatchers()); transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext, - new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Transaction), + dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean, builder.getId().getShardName()); snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG, this.name); messageRetrySupport = new ShardTransactionMessageRetrySupport(this); + + responseMessageSlicer = MessageSlicer.builder().logContext(this.name) + .messageSliceSize(datastoreContext.getMaximumMessageSliceSize()) + .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory()) + .expireStateAfterInactivity(2, TimeUnit.MINUTES).build(); } private void setTransactionCommitTimeout() { @@ -273,7 +286,6 @@ public class Shard extends RaftActor { } } - @SuppressWarnings("checkstyle:IllegalCatch") @Override protected void handleNonRaftCommand(final Object message) { try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) { @@ -286,22 +298,7 @@ public class Shard extends RaftActor { store.resetTransactionBatch(); if (message instanceof RequestEnvelope) { - final long now = ticker().read(); - final RequestEnvelope envelope = (RequestEnvelope)message; - - try { - final RequestSuccess success = handleRequest(envelope, now); - if (success != null) { - envelope.sendSuccess(success, ticker().read() - now); - } - } catch (RequestException e) { - LOG.debug("{}: request {} failed", persistenceId(), envelope, e); - envelope.sendFailure(e, ticker().read() - now); - } catch (Exception e) { - LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e); - envelope.sendFailure(new RuntimeRequestException("Request failed to process", e), - ticker().read() - now); - } + handleRequestEnvelope((RequestEnvelope)message); } else if (message instanceof ConnectClientRequest) { handleConnectClient((ConnectClientRequest)message); } else if (CreateTransaction.isSerializedType(message)) { @@ -357,12 +354,41 @@ public class Shard extends RaftActor { onMakeLeaderLocal(); } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) { store.resumeNextPendingTransaction(); - } else { + } else if (!responseMessageSlicer.handleMessage(message)) { super.handleNonRaftCommand(message); } } } + @SuppressWarnings("checkstyle:IllegalCatch") + private void handleRequestEnvelope(final RequestEnvelope envelope) { + final long now = ticker().read(); + try { + final RequestSuccess success = handleRequest(envelope, now); + if (success != null) { + final long executionTimeNanos = ticker().read() - now; + if (success instanceof SliceableMessage) { + dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> + responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget()) + .message(envelope.newSuccessEnvelope(success, executionTimeNanos)) + .sendTo(envelope.getMessage().getReplyTo()).replyTo(self()) + .onFailureCallback(t -> { + LOG.warn("Error slicing response {}", success, t); + }).build())); + } else { + envelope.sendSuccess(success, executionTimeNanos); + } + } + } catch (RequestException e) { + LOG.debug("{}: request {} failed", persistenceId(), envelope, e); + envelope.sendFailure(e, ticker().read() - now); + } catch (Exception e) { + LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e); + envelope.sendFailure(new RuntimeRequestException("Request failed to process", e), + ticker().read() - now); + } + } + private void onMakeLeaderLocal() { LOG.debug("{}: onMakeLeaderLocal received", persistenceId()); if (isLeader()) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index a2a5e63a2b..f6a026aaa9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -1152,6 +1152,22 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { initialSnapshot, snapshotRoot); } + @Test + public void testLargeReadReplySlicing() throws Exception { + // The slicing is only implemented for tell-based protocol + Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class)); + + leaderDatastoreContextBuilder.maximumMessageSliceSize(50); + initDatastoresWithCars("testLargeReadReplySlicing"); + + final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); + + final NormalizedNode carsNode = CarsModel.create(); + rwTx.write(CarsModel.BASE_PATH, carsNode); + + verifyNode(rwTx, CarsModel.BASE_PATH, carsNode); + } + private static void verifySnapshot(final Snapshot actual, final Snapshot expected, final NormalizedNode expRoot) { assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties index a9f623b036..528cd3f79b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties @@ -10,3 +10,4 @@ org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.databroker.actors org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.sharding=debug org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.access.client=debug +org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.messaging=debug -- 2.36.6