Slice front-end request messages 74/60574/7
authorTom Pantelis <tompantelis@gmail.com>
Wed, 19 Jul 2017 20:47:28 +0000 (16:47 -0400)
committerRobert Varga <nite@hq.sk>
Sun, 30 Jul 2017 10:17:27 +0000 (10:17 +0000)
Added infrastructure to use the MessageSlicer to slice SliceableMessages in
the TransmitQueue.Transmitting class on the front-end. A MessageAssembler is
used on the Shard side to re-assemble. Currently only the
front-end ModifyTransactionRequest is a SliceableMessage as it contains a
NormalizedNode which can be arbitrarily large - the others are small and
don't require slicing.

Change-Id: I7b09e4864e19d3fdb215c2b9dbcb64c14b6a143c
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
14 files changed:
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequest.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbstractMessagingTest.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicerTest.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java

index 30dac62ace0375010eb8cbc05b57b73f481fdadb..39b577cef26b67d9739fb32fe32b5d739a1c1908 100644 (file)
@@ -15,6 +15,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.List;
 import java.util.Optional;
 import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 
 /**
@@ -24,7 +25,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
  * @author Robert Varga
  */
 @Beta
-public final class ModifyTransactionRequest extends TransactionRequest<ModifyTransactionRequest> {
+public final class ModifyTransactionRequest extends TransactionRequest<ModifyTransactionRequest>
+        implements SliceableMessage {
     private static final long serialVersionUID = 1L;
 
     @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but this class "
index 3cc8d7073ae3467d04fed6468f1af943b7c15b0f..afee418fd6926e55abbafac2a4b3d6a120a58f67 100644 (file)
@@ -116,7 +116,8 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     // This constructor is only to be called (indirectly) by ConnectedClientConnection constructor.
     // Do not allow subclassing outside of this package
     AbstractClientConnection(final AbstractClientConnection<T> oldConn, final T newBackend, final int queueDepth) {
-        this(oldConn, new TransmitQueue.Transmitting(oldConn.queue, queueDepth, newBackend, oldConn.currentTime()));
+        this(oldConn, new TransmitQueue.Transmitting(oldConn.queue, queueDepth, newBackend, oldConn.currentTime(),
+                Preconditions.checkNotNull(oldConn.context).messageSlicer()));
     }
 
     public final ClientActorContext context() {
index 79d7eceb148d8d255ab1c9785e7349ebb5c72522..3f6515cbb8b20ce22183b857eccacd6acb50223c 100644 (file)
@@ -36,7 +36,7 @@ import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherTy
 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.concepts.WritableIdentifier;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
@@ -151,10 +151,14 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
             return this;
         }
 
+        if (context().messageSlicer().handleMessage(command)) {
+            return this;
+        }
+
         return onCommand(command);
     }
 
-    private static long extractCookie(final WritableIdentifier id) {
+    private static long extractCookie(final Identifier id) {
         if (id instanceof TransactionIdentifier) {
             return ((TransactionIdentifier) id).getHistoryId().getCookie();
         } else if (id instanceof LocalHistoryIdentifier) {
@@ -245,6 +249,8 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
         } finally {
             connectionsLock.unlockWrite(stamp);
         }
+
+        context().messageSlicer().close();
     }
 
     /**
@@ -367,6 +373,7 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
                 }
             } else {
                 LOG.info("{}: removed connection {}", persistenceId(), conn);
+                cancelSlicing(conn.cookie());
             }
         } finally {
             connectionsLock.unlockWrite(stamp);
@@ -390,6 +397,8 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
                 } else {
                     LOG.warn("{}: failed to replace connection {}, as it was not tracked", persistenceId(), conn);
                 }
+            } else {
+                cancelSlicing(oldConn.cookie());
             }
         } finally {
             connectionsLock.unlockWrite(stamp);
@@ -404,6 +413,17 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
             }));
     }
 
+    private void cancelSlicing(final Long cookie) {
+        context().messageSlicer().cancelSlicing(id -> {
+            try {
+                return cookie.equals(extractCookie(id));
+            } catch (IllegalArgumentException e) {
+                LOG.debug("extractCookie failed while cancelling slicing for cookie {}: {}", cookie, e);
+                return false;
+            }
+        });
+    }
+
     private ConnectingClientConnection<T> createConnection(final Long shard) {
         final ConnectingClientConnection<T> conn = new ConnectingClientConnection<>(context(), shard);
         resolveConnection(shard, conn);
index 3ed207ef6f393c5b84b8c4641b39099758d7df57..7e392af37570b4c735339276fb8586ff78d878c9 100644 (file)
@@ -14,10 +14,13 @@ import akka.actor.Scheduler;
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
@@ -40,6 +43,7 @@ public class ClientActorContext extends AbstractClientActorContext implements Id
     private final Scheduler scheduler;
     private final Dispatchers dispatchers;
     private final ClientActorConfig config;
+    private final MessageSlicer messageSlicer;
 
     // Hidden to avoid subclassing
     ClientActorContext(final ActorRef self, final String persistenceId, final ActorSystem system,
@@ -50,6 +54,11 @@ public class ClientActorContext extends AbstractClientActorContext implements Id
         this.executionContext = system.dispatcher();
         this.dispatchers = new Dispatchers(system.dispatchers());
         this.config = Preconditions.checkNotNull(config);
+
+        messageSlicer = MessageSlicer.builder().messageSliceSize(config.getMaximumMessageSliceSize())
+            .logContext(persistenceId).expireStateAfterInactivity(config.getRequestTimeout(), TimeUnit.NANOSECONDS)
+                .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(),
+                    config.getTempFileDirectory())).build();
     }
 
     @Override
@@ -68,6 +77,11 @@ public class ClientActorContext extends AbstractClientActorContext implements Id
         return dispatchers;
     }
 
+    @Nonnull
+    public MessageSlicer messageSlicer() {
+        return messageSlicer;
+    }
+
     /**
      * Return the time ticker for this {@link ClientActorContext}. This should be used for in all time-tracking
      * done within a client actor. Subclasses of {@link ClientActorBehavior} are encouraged to use
index 705bb0d9cd6e7a32da5f45884bc3d07b6f2592ed..0313a72a8319fc107a967a2bfbb7c188b4a08ac4 100644 (file)
@@ -24,6 +24,10 @@ import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,19 +74,29 @@ abstract class TransmitQueue {
         }
 
         @Override
-        TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
+        Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
             throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
         }
+
+        @Override
+        void preComplete(ResponseEnvelope<?> envelope) {
+        }
     }
 
     static final class Transmitting extends TransmitQueue {
+        private static final long NOT_SLICING = -1;
+
         private final BackendInfo backend;
+        private final MessageSlicer messageSlicer;
         private long nextTxSequence;
+        private long currentSlicedEnvSequenceId = NOT_SLICING;
 
         // For ConnectedClientConnection.
-        Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now) {
+        Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now,
+                final MessageSlicer messageSlicer) {
             super(oldQueue, targetDepth, now);
             this.backend = Preconditions.checkNotNull(backend);
+            this.messageSlicer = Preconditions.checkNotNull(messageSlicer);
         }
 
         @Override
@@ -91,14 +105,42 @@ abstract class TransmitQueue {
         }
 
         @Override
-        TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
-            final RequestEnvelope env = new RequestEnvelope(entry.getRequest().toVersion(backend.getVersion()),
+        Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
+            // If we're currently slicing a message we can't send any subsequent requests until slicing completes to
+            // avoid an out-of-sequence request envelope failure on the backend. In this case we return an empty
+            // Optional to indicate the request was not transmitted.
+            if (currentSlicedEnvSequenceId >= 0) {
+                return Optional.empty();
+            }
+
+            final Request<?, ?> request = entry.getRequest();
+            final RequestEnvelope env = new RequestEnvelope(request.toVersion(backend.getVersion()),
                 backend.getSessionId(), nextTxSequence++);
 
-            final TransmittedConnectionEntry ret = new TransmittedConnectionEntry(entry, env.getSessionId(),
-                env.getTxSequence(), now);
-            backend.getActor().tell(env, ActorRef.noSender());
-            return ret;
+            if (request instanceof SliceableMessage) {
+                if (messageSlicer.slice(SliceOptions.builder().identifier(request.getTarget())
+                        .message(env).replyTo(request.getReplyTo()).sendTo(backend.getActor())
+                        .onFailureCallback(t -> env.sendFailure(new RuntimeRequestException(
+                                "Failed to slice request " + request, t), 0L)).build())) {
+                    // The request was sliced so record the envelope sequence id to prevent transmitting
+                    // subsequent requests until slicing completes.
+                    currentSlicedEnvSequenceId = env.getTxSequence();
+                }
+            } else {
+                backend.getActor().tell(env, ActorRef.noSender());
+            }
+
+            return Optional.of(new TransmittedConnectionEntry(entry, env.getSessionId(),
+                    env.getTxSequence(), now));
+        }
+
+        @Override
+        void preComplete(ResponseEnvelope<?> envelope) {
+            if (envelope.getTxSequence() == currentSlicedEnvSequenceId) {
+                // Slicing completed for the prior request - clear the cached sequence id field to enable subsequent
+                // requests to be transmitted.
+                currentSlicedEnvSequenceId = NOT_SLICING;
+            }
         }
     }
 
@@ -163,6 +205,8 @@ abstract class TransmitQueue {
 
     // If a matching request was found, this will track a task was closed.
     final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
+        preComplete(envelope);
+
         Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
         if (maybeEntry == null) {
             LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
@@ -193,23 +237,27 @@ abstract class TransmitQueue {
     private void transmitEntries(final int maxTransmit, final long now) {
         for (int i = 0; i < maxTransmit; ++i) {
             final ConnectionEntry e = pending.poll();
-            if (e == null) {
+            if (e == null || !transmitEntry(e, now)) {
                 LOG.debug("Queue {} transmitted {} requests", this, i);
                 return;
             }
-
-            transmitEntry(e, now);
         }
 
         LOG.debug("Queue {} transmitted {} requests", this, maxTransmit);
     }
 
-    private void transmitEntry(final ConnectionEntry entry, final long now) {
+    private boolean transmitEntry(final ConnectionEntry entry, final long now) {
         LOG.debug("Queue {} transmitting entry {}", this, entry);
         // We are not thread-safe and are supposed to be externally-guarded,
         // hence send-before-record should be fine.
         // This needs to be revisited if the external guards are lowered.
-        inflight.addLast(transmit(entry, now));
+        final Optional<TransmittedConnectionEntry> maybeTransmitted = transmit(entry, now);
+        if (!maybeTransmitted.isPresent()) {
+            return false;
+        }
+
+        inflight.addLast(maybeTransmitted.get());
+        return true;
     }
 
     final long enqueueOrForward(final ConnectionEntry entry, final long now) {
@@ -255,7 +303,11 @@ abstract class TransmitQueue {
         }
 
         if (pending.isEmpty()) {
-            transmitEntry(entry, now);
+            if (!transmitEntry(entry, now)) {
+                LOG.debug("Queue {} cannot transmit request {} - delaying it", this, entry.getRequest());
+                pending.addLast(entry);
+            }
+
             return delay;
         }
 
@@ -269,7 +321,9 @@ abstract class TransmitQueue {
      */
     abstract int canTransmitCount(int inflightSize);
 
-    abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);
+    abstract Optional<TransmittedConnectionEntry> transmit(ConnectionEntry entry, long now);
+
+    abstract void preComplete(ResponseEnvelope<?> envelope);
 
     final boolean isEmpty() {
         return inflight.isEmpty() && pending.isEmpty();
index e6a09c17571a901c303cdde786d203f1678109ff..61d93e4ebe2611c28a04795203d8f6c449e844d9 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.access.client;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
@@ -17,8 +18,17 @@ import java.util.function.Consumer;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.messaging.MessageSlice;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 
 public class ConnectedClientConnectionTest
         extends AbstractClientConnectionTest<ConnectedClientConnection<BackendInfo>, BackendInfo> {
@@ -47,4 +57,27 @@ public class ConnectedClientConnectionTest
         verify(behavior).reconnectConnection(same(connection), any(ReconnectingClientConnection.class));
     }
 
-}
\ No newline at end of file
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testSendSliceableMessageRequest() {
+        final ClientActorConfig config = AccessClientUtil.newMockClientActorConfig();
+        doReturn(5).when(config).getMaximumMessageSliceSize();
+        context = new ClientActorContext(contextProbe.ref(), PERSISTENCE_ID, system, CLIENT_ID, config);
+        connection = createConnection();
+
+        final Consumer<Response<?, ?>> callback = mock(Consumer.class);
+
+        final TransactionIdentifier identifier =
+                new TransactionIdentifier(new LocalHistoryIdentifier(CLIENT_ID, 0L), 0L);
+        ModifyTransactionRequestBuilder reqBuilder =
+                new ModifyTransactionRequestBuilder(identifier, replyToProbe.ref());
+        reqBuilder.addModification(new TransactionWrite(YangInstanceIdentifier.EMPTY, Builders.containerBuilder()
+                .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(
+                        QName.create("namespace", "localName"))).build()));
+        reqBuilder.setSequence(0L);
+        final Request<?, ?> request = reqBuilder.build();
+        connection.sendRequest(request, callback);
+
+        backendProbe.expectMsgClass(MessageSlice.class);
+    }
+}
index b83c810aadadb2745ca05f4e24d0c69c717cd197..88e50e2f48f526f003f00bf2a48ea010bc22750d 100644 (file)
@@ -16,6 +16,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
@@ -44,6 +45,7 @@ import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
 import org.opendaylight.yangtools.concepts.WritableIdentifier;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -145,6 +147,8 @@ public class ConnectingClientConnectionTest {
         final ClientActorConfig mockConfig = AccessClientUtil.newMockClientActorConfig();
         doReturn(mockConfig).when(mockContext).config();
 
+        doReturn(mock(MessageSlicer.class)).when(mockContext).messageSlicer();
+
         mockActor = TestProbe.apply(actorSystem);
         mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5);
         mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
index f7ea931365e819ab0529f6ada509be54c428ee32..b6636553ffa661b9b4dd48411d61f75f7090a76c 100644 (file)
@@ -12,8 +12,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
 
 import com.google.common.base.Ticker;
@@ -27,7 +31,9 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
@@ -38,10 +44,13 @@ import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
 
 public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<TransmitQueue.Transmitting> {
 
     private BackendInfo backendInfo;
+    private final MessageSlicer mockMessageSlicer = mock(MessageSlicer.class);
 
     private static long now() {
         return Ticker.systemTicker().read();
@@ -54,8 +63,9 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
 
     @Override
     protected TransmitQueue.Transmitting createQueue() {
+        doReturn(false).when(mockMessageSlicer).slice(any());
         backendInfo = new BackendInfo(probe.ref(), 0L, ABIVersion.BORON, 3);
-        return new TransmitQueue.Transmitting(new TransmitQueue.Halted(0), 0, backendInfo, now());
+        return new TransmitQueue.Transmitting(new TransmitQueue.Halted(0), 0, backendInfo, now(), mockMessageSlicer);
     }
 
     @Test
@@ -133,9 +143,18 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = now();
         final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
-        queue.transmit(entry, now);
+
+        Optional<TransmittedConnectionEntry> transmitted = queue.transmit(entry, now);
+        assertTrue(transmitted.isPresent());
+        assertEquals(request, transmitted.get().getRequest());
+        assertEquals(callback, transmitted.get().getCallback());
+
         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
         assertEquals(request, requestEnvelope.getMessage());
+
+        transmitted = queue.transmit(new ConnectionEntry(new TransactionPurgeRequest(
+                TRANSACTION_IDENTIFIER, 1L, probe.ref()), callback, now), now);
+        assertTrue(transmitted.isPresent());
     }
 
     @Test
@@ -202,6 +221,89 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         assertEqualRequests(queue.getPending(), req6);
     }
 
+    @Test
+    public void testRequestSlicingOnTransmit() throws Exception {
+        doReturn(true).when(mockMessageSlicer).slice(any());
+
+        ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
+                TRANSACTION_IDENTIFIER, probe.ref());
+        reqBuilder.setSequence(0L);
+        final Request<?, ?> request = reqBuilder.build();
+
+        final long now = now();
+        final Consumer<Response<?, ?>> mockConsumer = createConsumerMock();
+        Optional<TransmittedConnectionEntry> transmitted =
+                queue.transmit(new ConnectionEntry(request, mockConsumer, now), now);
+        assertTrue(transmitted.isPresent());
+
+        ArgumentCaptor<SliceOptions> sliceOptions = ArgumentCaptor.forClass(SliceOptions.class);
+        verify(mockMessageSlicer).slice(sliceOptions.capture());
+        assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope);
+        RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage();
+        assertEquals(request, requestEnvelope.getMessage());
+
+        final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+        transmitted = queue.transmit(new ConnectionEntry(request2, mockConsumer, now), now);
+        assertFalse(transmitted.isPresent());
+    }
+
+    @Test
+    public void testSlicingFailureOnTransmit() throws Exception {
+        doAnswer(invocation -> {
+            invocation.getArgumentAt(0, SliceOptions.class).getOnFailureCallback().accept(new Exception("mock"));
+            return Boolean.FALSE;
+        }).when(mockMessageSlicer).slice(any());
+
+        ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
+                TRANSACTION_IDENTIFIER, probe.ref());
+        reqBuilder.setSequence(0L);
+
+        final long now = now();
+        Optional<TransmittedConnectionEntry> transmitted =
+                queue.transmit(new ConnectionEntry(reqBuilder.build(), createConsumerMock(), now), now);
+        assertTrue(transmitted.isPresent());
+
+        verify(mockMessageSlicer).slice(any());
+
+        probe.expectMsgClass(FailureEnvelope.class);
+    }
+
+    @Test
+    public void testSlicedRequestOnComplete() throws Exception {
+        doReturn(true).when(mockMessageSlicer).slice(any());
+
+        ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
+                TRANSACTION_IDENTIFIER, probe.ref());
+        reqBuilder.setSequence(0L);
+        final Request<?, ?> request = reqBuilder.build();
+
+        final long now = now();
+        final Consumer<Response<?, ?>> mockConsumer = createConsumerMock();
+        queue.enqueueOrForward(new ConnectionEntry(request, mockConsumer, now), now);
+
+        ArgumentCaptor<SliceOptions> sliceOptions = ArgumentCaptor.forClass(SliceOptions.class);
+        verify(mockMessageSlicer).slice(sliceOptions.capture());
+        assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope);
+
+        final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+        queue.enqueueOrForward(new ConnectionEntry(request2, mockConsumer, now), now);
+        verifyNoMoreInteractions(mockMessageSlicer);
+        probe.expectNoMsg();
+
+        RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage();
+        queue.complete(new FailureEnvelope(request.toRequestFailure(mock(RequestException.class)),
+                requestEnvelope.getSessionId(), requestEnvelope.getTxSequence(), 0), 0);
+
+        requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
+        assertEquals(request2, requestEnvelope.getMessage());
+
+        final Request<?, ?> request3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref());
+        queue.enqueueOrForward(new ConnectionEntry(request3, mockConsumer, now), now);
+
+        requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
+        assertEquals(request3, requestEnvelope.getMessage());
+    }
+
     private static void assertEqualRequests(final Collection<? extends ConnectionEntry> queue,
             final Request<?, ?>... requests) {
         final List<Request<?, ?>> queued = ImmutableList.copyOf(Collections2.transform(queue,
index ec7dcca06a3106448c91eb0b7091fc24e28e3aa1..4c3d67bdaaca759d0e81acee561dff09b8bab27b 100644 (file)
@@ -17,9 +17,12 @@ import com.google.common.cache.RemovalNotification;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.util.Iterator;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -37,7 +40,7 @@ public class MessageSlicer implements AutoCloseable {
     private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
     public static final int DEFAULT_MAX_SLICING_TRIES = 3;
 
-    private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
+    private final Cache<MessageSliceIdentifier, SlicedMessageState<ActorRef>> stateCache;
     private final FileBackedOutputStreamFactory fileBackedStreamFactory;
     private final int messageSliceSize;
     private final int maxSlicingTries;
@@ -92,8 +95,9 @@ public class MessageSlicer implements AutoCloseable {
      * options.
      *
      * @param options the SliceOptions
+     * @return true if the message was sliced, false otherwise
      */
-    public void slice(final SliceOptions options) {
+    public boolean slice(final SliceOptions options) {
         final Identifier identifier = options.getIdentifier();
         final Serializable message = options.getMessage();
         final FileBackedOutputStream fileBackedStream;
@@ -111,16 +115,16 @@ public class MessageSlicer implements AutoCloseable {
                 LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
                 fileBackedStream.cleanup();
                 options.getOnFailureCallback().accept(e);
-                return;
+                return false;
             }
         } else {
             fileBackedStream = options.getFileBackedStream();
         }
 
-        initializeSlicing(options, fileBackedStream);
+        return initializeSlicing(options, fileBackedStream);
     }
 
-    private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
+    private boolean initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
         final Identifier identifier = options.getIdentifier();
         MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
         SlicedMessageState<ActorRef> state = null;
@@ -133,7 +137,7 @@ public class MessageSlicer implements AutoCloseable {
                 LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
                 state.close();
                 sendTo(options, message, options.getReplyTo());
-                return;
+                return false;
             }
 
             final MessageSlice firstSlice = getNextSliceMessage(state);
@@ -142,6 +146,7 @@ public class MessageSlicer implements AutoCloseable {
 
             stateCache.put(messageSliceId, state);
             sendTo(options, firstSlice, ActorRef.noSender());
+            return true;
         } catch (IOException e) {
             LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
             if (state != null) {
@@ -151,6 +156,7 @@ public class MessageSlicer implements AutoCloseable {
             }
 
             options.getOnFailureCallback().accept(e);
+            return false;
         }
     }
 
@@ -196,6 +202,20 @@ public class MessageSlicer implements AutoCloseable {
         stateCache.invalidateAll();
     }
 
+    /**
+     * Cancels all in-progress sliced message state that matches the given filter.
+     *
+     * @param filter filters by Identifier
+     */
+    public void cancelSlicing(@Nonnull final Predicate<Identifier> filter) {
+        final Iterator<MessageSliceIdentifier> iter = stateCache.asMap().keySet().iterator();
+        while (iter.hasNext()) {
+            if (filter.test(iter.next().getClientIdentifier())) {
+                iter.remove();
+            }
+        }
+    }
+
     private static MessageSlice getNextSliceMessage(final SlicedMessageState<ActorRef> state) throws IOException {
         final byte[] firstSliceBytes = state.getNextSlice();
         return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),
index 3d93755ee1a67ae2d7edd3e3955c483a7c0ad7ec..5ff1f38e838d297488b4f42b88205bd45fd6a220 100644 (file)
@@ -59,12 +59,7 @@ public class AbstractMessagingTest {
         MockitoAnnotations.initMocks(this);
 
         doReturn(mockFiledBackedStream).when(mockFiledBackedStreamFactory).newInstance();
-        doNothing().when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt());
-        doNothing().when(mockFiledBackedStream).write(any(byte[].class));
-        doNothing().when(mockFiledBackedStream).write(anyInt());
-        doNothing().when(mockFiledBackedStream).close();
-        doNothing().when(mockFiledBackedStream).cleanup();
-        doNothing().when(mockFiledBackedStream).flush();
+        setupMockFiledBackedStream(mockFiledBackedStream);
         doReturn(mockByteSource).when(mockFiledBackedStream).asByteSource();
 
         doReturn(mockInputStream).when(mockByteSource).openStream();
@@ -78,4 +73,14 @@ public class AbstractMessagingTest {
     public static void tearDownClass() {
         JavaTestKit.shutdownActorSystem(ACTOR_SYSTEM, Boolean.TRUE);
     }
+
+    void setupMockFiledBackedStream(final FileBackedOutputStream mockFiledBackedStream) throws IOException {
+        doNothing().when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt());
+        doNothing().when(mockFiledBackedStream).write(any(byte[].class));
+        doNothing().when(mockFiledBackedStream).write(anyInt());
+        doNothing().when(mockFiledBackedStream).close();
+        doNothing().when(mockFiledBackedStream).cleanup();
+        doNothing().when(mockFiledBackedStream).flush();
+        doReturn(mockByteSource).when(mockFiledBackedStream).asByteSource();
+    }
 }
index ac6ab2e447b52e61b49e00c348f4af245886795e..e71b7154e4a5ff4fb9dcea0e31634d835c85c745 100644 (file)
@@ -8,11 +8,14 @@
 package org.opendaylight.controller.cluster.messaging;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
@@ -26,6 +29,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
 import org.opendaylight.yangtools.concepts.Identifier;
 
 /**
@@ -71,8 +75,9 @@ public class MessageSlicerTest extends AbstractMessagingTest {
         doThrow(mockFailure).when(mockFiledBackedStream).flush();
 
         try (MessageSlicer slicer = newMessageSlicer("testSliceWithFailedSerialization", 100)) {
-            slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
-                    mockOnFailureCallback);
+            final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(),
+                    testProbe.ref(), mockOnFailureCallback);
+            assertFalse(wasSliced);
 
             assertFailureCallback(IOException.class);
             verify(mockFiledBackedStream).cleanup();
@@ -86,8 +91,9 @@ public class MessageSlicerTest extends AbstractMessagingTest {
         doThrow(mockFailure).when(mockByteSource).openBufferedStream();
 
         try (MessageSlicer slicer = newMessageSlicer("testSliceWithByteSourceFailure", 100)) {
-            slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
-                    mockOnFailureCallback);
+            final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(),
+                    testProbe.ref(), mockOnFailureCallback);
+            assertFalse(wasSliced);
 
             assertFailureCallback(IOException.class);
             verify(mockFiledBackedStream).cleanup();
@@ -99,8 +105,9 @@ public class MessageSlicerTest extends AbstractMessagingTest {
         doReturn(0).when(mockInputStream).read(any(byte[].class));
 
         try (MessageSlicer slicer = newMessageSlicer("testSliceWithInputStreamFailure", 2)) {
-            slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
-                    mockOnFailureCallback);
+            final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(),
+                    testProbe.ref(), mockOnFailureCallback);
+            assertFalse(wasSliced);
 
             assertFailureCallback(IOException.class);
             verify(mockFiledBackedStream).cleanup();
@@ -131,6 +138,27 @@ public class MessageSlicerTest extends AbstractMessagingTest {
         verifyNoMoreInteractions(mockOnFailureCallback);
     }
 
+    @Test
+    public void testCancelSlicing() throws IOException {
+        doReturn(1).when(mockInputStream).read(any(byte[].class));
+
+        final MessageSlicer slicer = newMessageSlicer("testCloseAllSlicedMessageState", 1);
+        slicer.slice(SliceOptions.builder().identifier(IDENTIFIER).fileBackedOutputStream(mockFiledBackedStream)
+                .sendTo(testProbe.ref()).replyTo(testProbe.ref()).onFailureCallback(mockOnFailureCallback).build());
+
+        final FileBackedOutputStream mockFiledBackedStream2 = mock(FileBackedOutputStream.class);
+        setupMockFiledBackedStream(mockFiledBackedStream2);
+        slicer.slice(SliceOptions.builder().identifier(new StringIdentifier("test2"))
+                .fileBackedOutputStream(mockFiledBackedStream2).sendTo(testProbe.ref()).replyTo(testProbe.ref())
+                .onFailureCallback(mockOnFailureCallback).build());
+
+        slicer.cancelSlicing(id -> id.equals(IDENTIFIER));
+
+        verify(mockFiledBackedStream).cleanup();
+        verify(mockFiledBackedStream2, never()).cleanup();
+        verifyNoMoreInteractions(mockOnFailureCallback);
+    }
+
     @Test
     public void testCheckExpiredSlicedMessageState() throws IOException {
         doReturn(1).when(mockInputStream).read(any(byte[].class));
@@ -162,9 +190,9 @@ public class MessageSlicerTest extends AbstractMessagingTest {
                 .fileBackedStreamFactory(mockFiledBackedStreamFactory).build();
     }
 
-    static void slice(MessageSlicer slicer, Identifier identifier, Serializable message, ActorRef sendTo,
+    static boolean slice(MessageSlicer slicer, Identifier identifier, Serializable message, ActorRef sendTo,
             ActorRef replyTo, Consumer<Throwable> onFailureCallback) {
-        slicer.slice(SliceOptions.builder().identifier(identifier).message(message).sendTo(sendTo).replyTo(replyTo)
-                .onFailureCallback(onFailureCallback).build());
+        return slicer.slice(SliceOptions.builder().identifier(identifier).message(message).sendTo(sendTo)
+                .replyTo(replyTo).onFailureCallback(onFailureCallback).build());
     }
 }
index a1a1e24c6938a0f3a97db17e581544152119f2f6..7318960de68d1ba09898b9af0159aa778c10632a 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.messaging;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
@@ -119,7 +120,9 @@ public class MessageSlicingIntegrationTest {
 
         final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
         try (MessageSlicer slicer = newMessageSlicer("testSingleSlice", SerializationUtils.serialize(message).length)) {
-            slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
+            final boolean wasSliced = slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(),
+                    mockOnFailureCallback);
+            assertFalse(wasSliced);
 
             final BytesMessage sentMessage = sendToProbe.expectMsgClass(BytesMessage.class);
             assertEquals("Sent message", message, sentMessage);
@@ -214,7 +217,9 @@ public class MessageSlicingIntegrationTest {
         final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
         final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
         try (MessageSlicer slicer = newMessageSlicer("testSlicingWithFailure", messageSliceSize)) {
-            slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
+            final boolean wasSliced = slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(),
+                    mockOnFailureCallback);
+            assertTrue(wasSliced);
 
             MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
 
@@ -263,7 +268,9 @@ public class MessageSlicingIntegrationTest {
         final BytesMessage message = new BytesMessage(messageData);
 
         try (MessageSlicer slicer = newMessageSlicer(logContext, messageSliceSize)) {
-            slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
+            final boolean wasSliced = slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(),
+                    mockOnFailureCallback);
+            assertTrue(wasSliced);
 
             Identifier slicingId = null;
             int expLastSliceHashCode = SlicedMessageState.INITIAL_SLICE_HASH_CODE;
index c003d901af800c6df9505e25a52b3501c7a0e4ee..ebe5498481d9fe3befdabafbb4976f635a7b15b3 100644 (file)
@@ -11,9 +11,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
+import akka.actor.ExtendedActorSystem;
 import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.Status.Failure;
+import akka.serialization.JavaSerializer;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
@@ -82,6 +84,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
 import org.opendaylight.controller.cluster.messaging.SliceOptions;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
@@ -191,6 +194,8 @@ public class Shard extends RaftActor {
     private final MessageSlicer responseMessageSlicer;
     private final Dispatchers dispatchers;
 
+    private final MessageAssembler requestMessageAssembler;
+
     protected Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
@@ -247,6 +252,11 @@ public class Shard extends RaftActor {
                 .messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
                 .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
                 .expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
+
+        requestMessageAssembler = MessageAssembler.builder().logContext(this.name)
+                .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
+                .assembledMessageCallback((message, sender) -> self().tell(message, sender))
+                .expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build();
     }
 
     private void setTransactionCommitTimeout() {
@@ -301,6 +311,8 @@ public class Shard extends RaftActor {
 
             if (message instanceof RequestEnvelope) {
                 handleRequestEnvelope((RequestEnvelope)message);
+            } else if (requestMessageAssembler.isHandledMessage(message)) {
+                handleRequestAssemblerMessage(message);
             } else if (message instanceof ConnectClientRequest) {
                 handleConnectClient((ConnectClientRequest)message);
             } else if (CreateTransaction.isSerializedType(message)) {
@@ -361,6 +373,13 @@ public class Shard extends RaftActor {
         }
     }
 
+    private void handleRequestAssemblerMessage(Object message) {
+        dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> {
+            JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) context().system());
+            requestMessageAssembler.handleMessage(message, self());
+        });
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void handleRequestEnvelope(final RequestEnvelope envelope) {
         final long now = ticker().read();
@@ -393,6 +412,7 @@ public class Shard extends RaftActor {
     private void commitTimeoutCheck() {
         store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess);
         commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+        requestMessageAssembler.checkExpiredAssembledMessageState();
     }
 
     private Optional<Long> updateAccess(final SimpleShardDataTreeCohort cohort) {
@@ -881,6 +901,8 @@ public class Shard extends RaftActor {
                 knownFrontends = ImmutableMap.of();
             }
 
+            requestMessageAssembler.close();
+
             if (!hasLeader()) {
                 // No leader anywhere, nothing else to do
                 return;
index f6a026aaa9464d4e39442168960d08946237ebf4..04370fe0876a0be664d26da7bd4c00ab6f9ae382 100644 (file)
@@ -1153,11 +1153,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     }
 
     @Test
-    public void testLargeReadReplySlicing() throws Exception {
+    public void testReadWriteMessageSlicing() throws Exception {
         // The slicing is only implemented for tell-based protocol
         Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class));
 
-        leaderDatastoreContextBuilder.maximumMessageSliceSize(50);
+        leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
+        followerDatastoreContextBuilder.maximumMessageSliceSize(100);
         initDatastoresWithCars("testLargeReadReplySlicing");
 
         final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();