Bug 7449: Slice ReadTransactionSuccess response 07/59407/11
authorTom Pantelis <tompantelis@gmail.com>
Thu, 22 Jun 2017 14:20:11 +0000 (10:20 -0400)
committerTom Pantelis <tompantelis@gmail.com>
Fri, 14 Jul 2017 15:17:37 +0000 (11:17 -0400)
Added slicing of the ReadTransactionSuccess message. The slicing is
initiated by the Shard usung a MessageSlicer and re-assembly is done
by the ClientActorBehavior on the FE. Introduced a SliceableMessage
interface implemented by ReadTransactionSuccess which Shard checks for
to determine if the response message should be sliced.

Change-Id: Ie55e35aa82a9d2bc21f7a8f24396cb4df467252e
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
14 files changed:
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SliceableMessage.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActor.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActorBehavior.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InitialClientActorContext.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AccessClientUtil.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ClientActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties

index 733ce71..77bd430 100644 (file)
@@ -11,6 +11,7 @@ import com.google.common.annotations.Beta;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
@@ -21,7 +22,8 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  * @author Robert Varga
  */
 @Beta
-public final class ReadTransactionSuccess extends TransactionSuccess<ReadTransactionSuccess> {
+public final class ReadTransactionSuccess extends TransactionSuccess<ReadTransactionSuccess>
+        implements SliceableMessage {
     private static final long serialVersionUID = 1L;
     private final Optional<NormalizedNode<?, ?>> data;
 
index cb9034d..46d5d1f 100644 (file)
@@ -40,7 +40,18 @@ public final class RequestEnvelope extends Envelope<Request<?, ?>> {
      * @throws NullPointerException if success is null
      */
     public void sendSuccess(final RequestSuccess<?, ?> success, final long executionTimeNanos) {
-        sendResponse(new SuccessEnvelope(success, getSessionId(), getTxSequence(), executionTimeNanos));
+        sendResponse(newSuccessEnvelope(success, executionTimeNanos));
+    }
+
+    /**
+     * Creates a successful ResponseEnvelope that wraps the given successful Request response message.
+     *
+     * @param success the successful Request response message
+     * @param executionTimeNanos the execution time of the request
+     * @return a {@link ResponseEnvelope} instance
+     */
+    public ResponseEnvelope<?> newSuccessEnvelope(final RequestSuccess<?, ?> success, final long executionTimeNanos) {
+        return new SuccessEnvelope(success, getSessionId(), getTxSequence(), executionTimeNanos);
     }
 
     private void sendResponse(final ResponseEnvelope<?> envelope) {
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SliceableMessage.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SliceableMessage.java
new file mode 100644 (file)
index 0000000..cd3e260
--- /dev/null
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * A tagging interface that specifies a message whose serialized size can be large and thus should be sliced into
+ * smaller chunks when transporting over the wire.
+ *
+ * @author Thomas Pantelis
+ */
+@Beta
+public interface SliceableMessage {
+}
index 2fb4c94..7b592fb 100644 (file)
@@ -35,6 +35,15 @@ public abstract class AbstractClientActor extends UntypedPersistentActor {
         return currentBehavior.persistenceId();
     }
 
+    @Override
+    public void postStop() {
+        if (currentBehavior != null) {
+            currentBehavior.close();
+        }
+
+        super.postStop();
+    }
+
     private void switchBehavior(final AbstractClientActorBehavior<?> nextBehavior) {
         if (!currentBehavior.equals(nextBehavior)) {
             if (nextBehavior == null) {
@@ -44,6 +53,7 @@ public abstract class AbstractClientActor extends UntypedPersistentActor {
                 LOG.debug("{}: switched from {} to {}", persistenceId(), currentBehavior, nextBehavior);
             }
 
+            currentBehavior.close();
             currentBehavior = nextBehavior;
         }
     }
index 0a49fc0..f80ae7c 100644 (file)
@@ -21,7 +21,7 @@ import javax.annotation.Nullable;
  * @author Robert Varga
  */
 @Beta
-public abstract class AbstractClientActorBehavior<C extends AbstractClientActorContext> {
+public abstract class AbstractClientActorBehavior<C extends AbstractClientActorContext> implements AutoCloseable {
     private final C context;
 
     AbstractClientActorBehavior(@Nonnull final C context) {
@@ -60,6 +60,10 @@ public abstract class AbstractClientActorBehavior<C extends AbstractClientActorC
         return context.self();
     }
 
+    @Override
+    public void close() {
+    }
+
     /**
      * Implementation-internal method for handling an incoming command message.
      *
index ccfbba6..33e0c56 100644 (file)
@@ -32,6 +32,9 @@ import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationExce
 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherType;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.concepts.WritableIdentifier;
 import org.slf4j.Logger;
@@ -79,11 +82,18 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
     private final Map<Long, AbstractClientConnection<T>> connections = new ConcurrentHashMap<>();
     private final InversibleLock connectionsLock = new InversibleLock();
     private final BackendInfoResolver<T> resolver;
+    private final MessageAssembler responseMessageAssembler;
 
     protected ClientActorBehavior(@Nonnull final ClientActorContext context,
             @Nonnull final BackendInfoResolver<T> resolver) {
         super(context);
         this.resolver = Preconditions.checkNotNull(resolver);
+
+        final ClientActorConfig config = context.config();
+        responseMessageAssembler = MessageAssembler.builder().logContext(persistenceId())
+                .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(),
+                        config.getTempFileDirectory()))
+                .assembledMessageCallback((message, sender) -> context.self().tell(message, sender)).build();
     }
 
     @Override
@@ -92,6 +102,11 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
         return context().getIdentifier();
     }
 
+    @Override
+    public void close() {
+        responseMessageAssembler.close();
+    }
+
     /**
      * Get a connection to a shard.
      *
@@ -121,13 +136,21 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
         if (command instanceof InternalCommand) {
             return ((InternalCommand<T>) command).execute(this);
         }
+
         if (command instanceof SuccessEnvelope) {
             return onRequestSuccess((SuccessEnvelope) command);
         }
+
         if (command instanceof FailureEnvelope) {
             return internalOnRequestFailure((FailureEnvelope) command);
         }
 
+        if (MessageAssembler.isHandledMessage(command)) {
+            context().dispatchers().getDispatcher(DispatcherType.Serialization).execute(
+                () -> responseMessageAssembler.handleMessage(command, context().self()));
+            return this;
+        }
+
         return onCommand(command);
     }
 
index 5520359..3ed207e 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
 import akka.actor.Cancellable;
 import akka.actor.Scheduler;
 import com.google.common.annotations.Beta;
@@ -16,6 +17,7 @@ import com.google.common.base.Ticker;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
@@ -36,15 +38,17 @@ public class ClientActorContext extends AbstractClientActorContext implements Id
     private final ExecutionContext executionContext;
     private final ClientIdentifier identifier;
     private final Scheduler scheduler;
+    private final Dispatchers dispatchers;
     private final ClientActorConfig config;
 
     // Hidden to avoid subclassing
-    ClientActorContext(final ActorRef self, final Scheduler scheduler, final ExecutionContext executionContext,
-            final String persistenceId, final ClientIdentifier identifier, final ClientActorConfig config) {
+    ClientActorContext(final ActorRef self, final String persistenceId, final ActorSystem system,
+            final ClientIdentifier identifier, final ClientActorConfig config) {
         super(self, persistenceId);
         this.identifier = Preconditions.checkNotNull(identifier);
-        this.scheduler = Preconditions.checkNotNull(scheduler);
-        this.executionContext = Preconditions.checkNotNull(executionContext);
+        this.scheduler = Preconditions.checkNotNull(system).scheduler();
+        this.executionContext = system.dispatcher();
+        this.dispatchers = new Dispatchers(system.dispatchers());
         this.config = Preconditions.checkNotNull(config);
     }
 
@@ -59,6 +63,11 @@ public class ClientActorContext extends AbstractClientActorContext implements Id
         return config;
     }
 
+    @Nonnull
+    public Dispatchers dispatchers() {
+        return dispatchers;
+    }
+
     /**
      * Return the time ticker for this {@link ClientActorContext}. This should be used for in all time-tracking
      * done within a client actor. Subclasses of {@link ClientActorBehavior} are encouraged to use
index 91675c1..4449dd2 100644 (file)
@@ -35,8 +35,8 @@ final class InitialClientActorContext extends AbstractClientActorContext {
 
     ClientActorBehavior<?> createBehavior(final ClientIdentifier clientId) {
         final ActorSystem system = actor.getContext().system();
-        final ClientActorContext context = new ClientActorContext(self(), system.scheduler(), system.dispatcher(),
-            persistenceId(), clientId, actor.getClientActorConfig());
+        final ClientActorContext context = new ClientActorContext(self(), persistenceId(), system,
+            clientId, actor.getClientActorConfig());
 
         return actor.initialBehavior(context);
     }
index f2c42e1..9cd2d1c 100644 (file)
@@ -65,8 +65,8 @@ public abstract class AbstractClientConnectionTest<T extends AbstractClientConne
         system = ActorSystem.apply();
         backendProbe = new TestProbe(system);
         contextProbe = new TestProbe(system);
-        context = new ClientActorContext(contextProbe.ref(), system.scheduler(), system.dispatcher(),
-                PERSISTENCE_ID, CLIENT_ID, AccessClientUtil.newMockClientActorConfig());
+        context = new ClientActorContext(contextProbe.ref(), PERSISTENCE_ID, system,
+                CLIENT_ID, AccessClientUtil.newMockClientActorConfig());
         replyToProbe = new TestProbe(system);
         connection = createConnection();
     }
index 586fb59..77d5938 100644 (file)
@@ -27,8 +27,7 @@ public class AccessClientUtil {
     public static ClientActorContext createClientActorContext(final ActorSystem system, final ActorRef actor,
                                                               final ClientIdentifier id, final String persistenceId) {
 
-        return spy(new ClientActorContext(actor, system.scheduler(), system.dispatcher(), persistenceId, id,
-                newMockClientActorConfig()));
+        return spy(new ClientActorContext(actor, persistenceId, system, id, newMockClientActorConfig()));
     }
 
     public static ClientActorConfig newMockClientActorConfig() {
index afb7659..564a2ed 100644 (file)
@@ -45,8 +45,8 @@ public class ClientActorContextTest {
         MockitoAnnotations.initMocks(this);
         system = ActorSystem.apply();
         probe = new TestProbe(system);
-        ctx = new ClientActorContext(probe.ref(), system.scheduler(), system.dispatcher(),
-                PERSISTENCE_ID, CLIENT_ID, AccessClientUtil.newMockClientActorConfig());
+        ctx = new ClientActorContext(probe.ref(), PERSISTENCE_ID, system,
+                CLIENT_ID, AccessClientUtil.newMockClientActorConfig());
     }
 
     @Test
index 959b37c..de6c9de 100644 (file)
@@ -46,10 +46,12 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherType;
 import org.opendaylight.controller.cluster.common.actor.MessageTracker;
 import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
@@ -79,6 +81,8 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
@@ -182,6 +186,9 @@ public class Shard extends RaftActor {
     private final FrontendMetadata frontendMetadata;
     private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
 
+    private final MessageSlicer responseMessageSlicer;
+    private final Dispatchers dispatchers;
+
     protected Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
@@ -224,14 +231,20 @@ public class Shard extends RaftActor {
         appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
 
+        dispatchers = new Dispatchers(context().system().dispatchers());
         transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext,
-            new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Transaction),
+            dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Transaction),
                 self(), getContext(), shardMBean, builder.getId().getShardName());
 
         snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
             this.name);
 
         messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
+
+        responseMessageSlicer = MessageSlicer.builder().logContext(this.name)
+                .messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
+                .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
+                .expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
     }
 
     private void setTransactionCommitTimeout() {
@@ -273,7 +286,6 @@ public class Shard extends RaftActor {
         }
     }
 
-    @SuppressWarnings("checkstyle:IllegalCatch")
     @Override
     protected void handleNonRaftCommand(final Object message) {
         try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
@@ -286,22 +298,7 @@ public class Shard extends RaftActor {
             store.resetTransactionBatch();
 
             if (message instanceof RequestEnvelope) {
-                final long now = ticker().read();
-                final RequestEnvelope envelope = (RequestEnvelope)message;
-
-                try {
-                    final RequestSuccess<?, ?> success = handleRequest(envelope, now);
-                    if (success != null) {
-                        envelope.sendSuccess(success, ticker().read() - now);
-                    }
-                } catch (RequestException e) {
-                    LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
-                    envelope.sendFailure(e, ticker().read() - now);
-                } catch (Exception e) {
-                    LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
-                    envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
-                        ticker().read() - now);
-                }
+                handleRequestEnvelope((RequestEnvelope)message);
             } else if (message instanceof ConnectClientRequest) {
                 handleConnectClient((ConnectClientRequest)message);
             } else if (CreateTransaction.isSerializedType(message)) {
@@ -357,12 +354,41 @@ public class Shard extends RaftActor {
                 onMakeLeaderLocal();
             } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
                 store.resumeNextPendingTransaction();
-            } else {
+            } else if (!responseMessageSlicer.handleMessage(message)) {
                 super.handleNonRaftCommand(message);
             }
         }
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void handleRequestEnvelope(final RequestEnvelope envelope) {
+        final long now = ticker().read();
+        try {
+            final RequestSuccess<?, ?> success = handleRequest(envelope, now);
+            if (success != null) {
+                final long executionTimeNanos = ticker().read() - now;
+                if (success instanceof SliceableMessage) {
+                    dispatchers.getDispatcher(DispatcherType.Serialization).execute(() ->
+                        responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget())
+                            .message(envelope.newSuccessEnvelope(success, executionTimeNanos))
+                            .sendTo(envelope.getMessage().getReplyTo()).replyTo(self())
+                            .onFailureCallback(t -> {
+                                LOG.warn("Error slicing response {}", success, t);
+                            }).build()));
+                } else {
+                    envelope.sendSuccess(success, executionTimeNanos);
+                }
+            }
+        } catch (RequestException e) {
+            LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
+            envelope.sendFailure(e, ticker().read() - now);
+        } catch (Exception e) {
+            LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
+            envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
+                ticker().read() - now);
+        }
+    }
+
     private void onMakeLeaderLocal() {
         LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
         if (isLeader()) {
index a2a5e63..f6a026a 100644 (file)
@@ -1152,6 +1152,22 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 initialSnapshot, snapshotRoot);
     }
 
+    @Test
+    public void testLargeReadReplySlicing() throws Exception {
+        // The slicing is only implemented for tell-based protocol
+        Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class));
+
+        leaderDatastoreContextBuilder.maximumMessageSliceSize(50);
+        initDatastoresWithCars("testLargeReadReplySlicing");
+
+        final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
+
+        final NormalizedNode<?, ?> carsNode = CarsModel.create();
+        rwTx.write(CarsModel.BASE_PATH, carsNode);
+
+        verifyNode(rwTx, CarsModel.BASE_PATH, carsNode);
+    }
+
     private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
                                        final NormalizedNode<?, ?> expRoot) {
         assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
index a9f623b..528cd3f 100644 (file)
@@ -10,3 +10,4 @@ org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.databroker.actors
 org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off
 org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.sharding=debug
 org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.access.client=debug
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.messaging=debug

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.