From d7c9a8ccfcb57f005490a226803d094289997ef9 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 19 Jul 2017 16:47:28 -0400 Subject: [PATCH] Slice front-end request messages Added infrastructure to use the MessageSlicer to slice SliceableMessages in the TransmitQueue.Transmitting class on the front-end. A MessageAssembler is used on the Shard side to re-assemble. Currently only the front-end ModifyTransactionRequest is a SliceableMessage as it contains a NormalizedNode which can be arbitrarily large - the others are small and don't require slicing. Change-Id: I7b09e4864e19d3fdb215c2b9dbcb64c14b6a143c Signed-off-by: Tom Pantelis --- .../commands/ModifyTransactionRequest.java | 4 +- .../client/AbstractClientConnection.java | 3 +- .../access/client/ClientActorBehavior.java | 24 +++- .../access/client/ClientActorContext.java | 14 +++ .../cluster/access/client/TransmitQueue.java | 84 +++++++++++--- .../client/ConnectedClientConnectionTest.java | 35 +++++- .../ConnectingClientConnectionTest.java | 4 + .../client/TransmittingTransmitQueueTest.java | 106 +++++++++++++++++- .../cluster/messaging/MessageSlicer.java | 32 +++++- .../messaging/AbstractMessagingTest.java | 17 ++- .../cluster/messaging/MessageSlicerTest.java | 46 ++++++-- .../MessageSlicingIntegrationTest.java | 13 ++- .../controller/cluster/datastore/Shard.java | 22 ++++ ...butedDataStoreRemotingIntegrationTest.java | 5 +- 14 files changed, 361 insertions(+), 48 deletions(-) diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequest.java index 30dac62ace..39b577cef2 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequest.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequest.java @@ -15,6 +15,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; import java.util.Optional; import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.SliceableMessage; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; /** @@ -24,7 +25,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier * @author Robert Varga */ @Beta -public final class ModifyTransactionRequest extends TransactionRequest { +public final class ModifyTransactionRequest extends TransactionRequest + implements SliceableMessage { private static final long serialVersionUID = 1L; @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but this class " diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java index 3cc8d7073a..afee418fd6 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java @@ -116,7 +116,8 @@ public abstract class AbstractClientConnection { // This constructor is only to be called (indirectly) by ConnectedClientConnection constructor. // Do not allow subclassing outside of this package AbstractClientConnection(final AbstractClientConnection oldConn, final T newBackend, final int queueDepth) { - this(oldConn, new TransmitQueue.Transmitting(oldConn.queue, queueDepth, newBackend, oldConn.currentTime())); + this(oldConn, new TransmitQueue.Transmitting(oldConn.queue, queueDepth, newBackend, oldConn.currentTime(), + Preconditions.checkNotNull(oldConn.context).messageSlicer())); } public final ClientActorContext context() { diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index 79d7eceb14..3f6515cbb8 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -36,7 +36,7 @@ import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherTy import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; import org.opendaylight.controller.cluster.messaging.MessageAssembler; import org.opendaylight.yangtools.concepts.Identifiable; -import org.opendaylight.yangtools.concepts.WritableIdentifier; +import org.opendaylight.yangtools.concepts.Identifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; @@ -151,10 +151,14 @@ public abstract class ClientActorBehavior extends return this; } + if (context().messageSlicer().handleMessage(command)) { + return this; + } + return onCommand(command); } - private static long extractCookie(final WritableIdentifier id) { + private static long extractCookie(final Identifier id) { if (id instanceof TransactionIdentifier) { return ((TransactionIdentifier) id).getHistoryId().getCookie(); } else if (id instanceof LocalHistoryIdentifier) { @@ -245,6 +249,8 @@ public abstract class ClientActorBehavior extends } finally { connectionsLock.unlockWrite(stamp); } + + context().messageSlicer().close(); } /** @@ -367,6 +373,7 @@ public abstract class ClientActorBehavior extends } } else { LOG.info("{}: removed connection {}", persistenceId(), conn); + cancelSlicing(conn.cookie()); } } finally { connectionsLock.unlockWrite(stamp); @@ -390,6 +397,8 @@ public abstract class ClientActorBehavior extends } else { LOG.warn("{}: failed to replace connection {}, as it was not tracked", persistenceId(), conn); } + } else { + cancelSlicing(oldConn.cookie()); } } finally { connectionsLock.unlockWrite(stamp); @@ -404,6 +413,17 @@ public abstract class ClientActorBehavior extends })); } + private void cancelSlicing(final Long cookie) { + context().messageSlicer().cancelSlicing(id -> { + try { + return cookie.equals(extractCookie(id)); + } catch (IllegalArgumentException e) { + LOG.debug("extractCookie failed while cancelling slicing for cookie {}: {}", cookie, e); + return false; + } + }); + } + private ConnectingClientConnection createConnection(final Long shard) { final ConnectingClientConnection conn = new ConnectingClientConnection<>(context(), shard); resolveConnection(shard, conn); diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java index 3ed207ef6f..7e392af375 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java @@ -14,10 +14,13 @@ import akka.actor.Scheduler; import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; import com.google.common.base.Ticker; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.concurrent.ThreadSafe; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.common.actor.Dispatchers; +import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; +import org.opendaylight.controller.cluster.messaging.MessageSlicer; import org.opendaylight.yangtools.concepts.Identifiable; import scala.concurrent.ExecutionContext; import scala.concurrent.duration.FiniteDuration; @@ -40,6 +43,7 @@ public class ClientActorContext extends AbstractClientActorContext implements Id private final Scheduler scheduler; private final Dispatchers dispatchers; private final ClientActorConfig config; + private final MessageSlicer messageSlicer; // Hidden to avoid subclassing ClientActorContext(final ActorRef self, final String persistenceId, final ActorSystem system, @@ -50,6 +54,11 @@ public class ClientActorContext extends AbstractClientActorContext implements Id this.executionContext = system.dispatcher(); this.dispatchers = new Dispatchers(system.dispatchers()); this.config = Preconditions.checkNotNull(config); + + messageSlicer = MessageSlicer.builder().messageSliceSize(config.getMaximumMessageSliceSize()) + .logContext(persistenceId).expireStateAfterInactivity(config.getRequestTimeout(), TimeUnit.NANOSECONDS) + .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(), + config.getTempFileDirectory())).build(); } @Override @@ -68,6 +77,11 @@ public class ClientActorContext extends AbstractClientActorContext implements Id return dispatchers; } + @Nonnull + public MessageSlicer messageSlicer() { + return messageSlicer; + } + /** * Return the time ticker for this {@link ClientActorContext}. This should be used for in all time-tracking * done within a client actor. Subclasses of {@link ClientActorBehavior} are encouraged to use diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java index 705bb0d9cd..0313a72a83 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -24,6 +24,10 @@ import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; +import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; +import org.opendaylight.controller.cluster.access.concepts.SliceableMessage; +import org.opendaylight.controller.cluster.messaging.MessageSlicer; +import org.opendaylight.controller.cluster.messaging.SliceOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,19 +74,29 @@ abstract class TransmitQueue { } @Override - TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) { + Optional transmit(final ConnectionEntry entry, final long now) { throw new UnsupportedOperationException("Attempted to transmit on a halted queue"); } + + @Override + void preComplete(ResponseEnvelope envelope) { + } } static final class Transmitting extends TransmitQueue { + private static final long NOT_SLICING = -1; + private final BackendInfo backend; + private final MessageSlicer messageSlicer; private long nextTxSequence; + private long currentSlicedEnvSequenceId = NOT_SLICING; // For ConnectedClientConnection. - Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now) { + Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now, + final MessageSlicer messageSlicer) { super(oldQueue, targetDepth, now); this.backend = Preconditions.checkNotNull(backend); + this.messageSlicer = Preconditions.checkNotNull(messageSlicer); } @Override @@ -91,14 +105,42 @@ abstract class TransmitQueue { } @Override - TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) { - final RequestEnvelope env = new RequestEnvelope(entry.getRequest().toVersion(backend.getVersion()), + Optional transmit(final ConnectionEntry entry, final long now) { + // If we're currently slicing a message we can't send any subsequent requests until slicing completes to + // avoid an out-of-sequence request envelope failure on the backend. In this case we return an empty + // Optional to indicate the request was not transmitted. + if (currentSlicedEnvSequenceId >= 0) { + return Optional.empty(); + } + + final Request request = entry.getRequest(); + final RequestEnvelope env = new RequestEnvelope(request.toVersion(backend.getVersion()), backend.getSessionId(), nextTxSequence++); - final TransmittedConnectionEntry ret = new TransmittedConnectionEntry(entry, env.getSessionId(), - env.getTxSequence(), now); - backend.getActor().tell(env, ActorRef.noSender()); - return ret; + if (request instanceof SliceableMessage) { + if (messageSlicer.slice(SliceOptions.builder().identifier(request.getTarget()) + .message(env).replyTo(request.getReplyTo()).sendTo(backend.getActor()) + .onFailureCallback(t -> env.sendFailure(new RuntimeRequestException( + "Failed to slice request " + request, t), 0L)).build())) { + // The request was sliced so record the envelope sequence id to prevent transmitting + // subsequent requests until slicing completes. + currentSlicedEnvSequenceId = env.getTxSequence(); + } + } else { + backend.getActor().tell(env, ActorRef.noSender()); + } + + return Optional.of(new TransmittedConnectionEntry(entry, env.getSessionId(), + env.getTxSequence(), now)); + } + + @Override + void preComplete(ResponseEnvelope envelope) { + if (envelope.getTxSequence() == currentSlicedEnvSequenceId) { + // Slicing completed for the prior request - clear the cached sequence id field to enable subsequent + // requests to be transmitted. + currentSlicedEnvSequenceId = NOT_SLICING; + } } } @@ -163,6 +205,8 @@ abstract class TransmitQueue { // If a matching request was found, this will track a task was closed. final Optional complete(final ResponseEnvelope envelope, final long now) { + preComplete(envelope); + Optional maybeEntry = findMatchingEntry(inflight, envelope); if (maybeEntry == null) { LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope); @@ -193,23 +237,27 @@ abstract class TransmitQueue { private void transmitEntries(final int maxTransmit, final long now) { for (int i = 0; i < maxTransmit; ++i) { final ConnectionEntry e = pending.poll(); - if (e == null) { + if (e == null || !transmitEntry(e, now)) { LOG.debug("Queue {} transmitted {} requests", this, i); return; } - - transmitEntry(e, now); } LOG.debug("Queue {} transmitted {} requests", this, maxTransmit); } - private void transmitEntry(final ConnectionEntry entry, final long now) { + private boolean transmitEntry(final ConnectionEntry entry, final long now) { LOG.debug("Queue {} transmitting entry {}", this, entry); // We are not thread-safe and are supposed to be externally-guarded, // hence send-before-record should be fine. // This needs to be revisited if the external guards are lowered. - inflight.addLast(transmit(entry, now)); + final Optional maybeTransmitted = transmit(entry, now); + if (!maybeTransmitted.isPresent()) { + return false; + } + + inflight.addLast(maybeTransmitted.get()); + return true; } final long enqueueOrForward(final ConnectionEntry entry, final long now) { @@ -255,7 +303,11 @@ abstract class TransmitQueue { } if (pending.isEmpty()) { - transmitEntry(entry, now); + if (!transmitEntry(entry, now)) { + LOG.debug("Queue {} cannot transmit request {} - delaying it", this, entry.getRequest()); + pending.addLast(entry); + } + return delay; } @@ -269,7 +321,9 @@ abstract class TransmitQueue { */ abstract int canTransmitCount(int inflightSize); - abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now); + abstract Optional transmit(ConnectionEntry entry, long now); + + abstract void preComplete(ResponseEnvelope envelope); final boolean isEmpty() { return inflight.isEmpty() && pending.isEmpty(); diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java index e6a09c1757..61d93e4ebe 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.access.client; import static org.mockito.Matchers.any; import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -17,8 +18,17 @@ import java.util.function.Consumer; import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder; +import org.opendaylight.controller.cluster.access.commands.TransactionWrite; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.messaging.MessageSlice; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; public class ConnectedClientConnectionTest extends AbstractClientConnectionTest, BackendInfo> { @@ -47,4 +57,27 @@ public class ConnectedClientConnectionTest verify(behavior).reconnectConnection(same(connection), any(ReconnectingClientConnection.class)); } -} \ No newline at end of file + @SuppressWarnings("unchecked") + @Test + public void testSendSliceableMessageRequest() { + final ClientActorConfig config = AccessClientUtil.newMockClientActorConfig(); + doReturn(5).when(config).getMaximumMessageSliceSize(); + context = new ClientActorContext(contextProbe.ref(), PERSISTENCE_ID, system, CLIENT_ID, config); + connection = createConnection(); + + final Consumer> callback = mock(Consumer.class); + + final TransactionIdentifier identifier = + new TransactionIdentifier(new LocalHistoryIdentifier(CLIENT_ID, 0L), 0L); + ModifyTransactionRequestBuilder reqBuilder = + new ModifyTransactionRequestBuilder(identifier, replyToProbe.ref()); + reqBuilder.addModification(new TransactionWrite(YangInstanceIdentifier.EMPTY, Builders.containerBuilder() + .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create( + QName.create("namespace", "localName"))).build())); + reqBuilder.setSequence(0L); + final Request request = reqBuilder.build(); + connection.sendRequest(request, callback); + + backendProbe.expectMsgClass(MessageSlice.class); + } +} diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java index b83c810aad..88e50e2f48 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java @@ -16,6 +16,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -44,6 +45,7 @@ import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.messaging.MessageSlicer; import org.opendaylight.yangtools.concepts.WritableIdentifier; import scala.concurrent.duration.FiniteDuration; @@ -145,6 +147,8 @@ public class ConnectingClientConnectionTest { final ClientActorConfig mockConfig = AccessClientUtil.newMockClientActorConfig(); doReturn(mockConfig).when(mockContext).config(); + doReturn(mock(MessageSlicer.class)).when(mockContext).messageSlicer(); + mockActor = TestProbe.apply(actorSystem); mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5); mockRequest = new MockRequest(mockIdentifier, mockReplyTo); diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java index f7ea931365..b6636553ff 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java @@ -12,8 +12,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest; import com.google.common.base.Ticker; @@ -27,7 +31,9 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse; import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope; @@ -38,10 +44,13 @@ import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.messaging.MessageSlicer; +import org.opendaylight.controller.cluster.messaging.SliceOptions; public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest { private BackendInfo backendInfo; + private final MessageSlicer mockMessageSlicer = mock(MessageSlicer.class); private static long now() { return Ticker.systemTicker().read(); @@ -54,8 +63,9 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest> callback = createConsumerMock(); final long now = now(); final ConnectionEntry entry = new ConnectionEntry(request, callback, now); - queue.transmit(entry, now); + + Optional transmitted = queue.transmit(entry, now); + assertTrue(transmitted.isPresent()); + assertEquals(request, transmitted.get().getRequest()); + assertEquals(callback, transmitted.get().getCallback()); + final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); assertEquals(request, requestEnvelope.getMessage()); + + transmitted = queue.transmit(new ConnectionEntry(new TransactionPurgeRequest( + TRANSACTION_IDENTIFIER, 1L, probe.ref()), callback, now), now); + assertTrue(transmitted.isPresent()); } @Test @@ -202,6 +221,89 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest request = reqBuilder.build(); + + final long now = now(); + final Consumer> mockConsumer = createConsumerMock(); + Optional transmitted = + queue.transmit(new ConnectionEntry(request, mockConsumer, now), now); + assertTrue(transmitted.isPresent()); + + ArgumentCaptor sliceOptions = ArgumentCaptor.forClass(SliceOptions.class); + verify(mockMessageSlicer).slice(sliceOptions.capture()); + assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope); + RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage(); + assertEquals(request, requestEnvelope.getMessage()); + + final Request request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref()); + transmitted = queue.transmit(new ConnectionEntry(request2, mockConsumer, now), now); + assertFalse(transmitted.isPresent()); + } + + @Test + public void testSlicingFailureOnTransmit() throws Exception { + doAnswer(invocation -> { + invocation.getArgumentAt(0, SliceOptions.class).getOnFailureCallback().accept(new Exception("mock")); + return Boolean.FALSE; + }).when(mockMessageSlicer).slice(any()); + + ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder( + TRANSACTION_IDENTIFIER, probe.ref()); + reqBuilder.setSequence(0L); + + final long now = now(); + Optional transmitted = + queue.transmit(new ConnectionEntry(reqBuilder.build(), createConsumerMock(), now), now); + assertTrue(transmitted.isPresent()); + + verify(mockMessageSlicer).slice(any()); + + probe.expectMsgClass(FailureEnvelope.class); + } + + @Test + public void testSlicedRequestOnComplete() throws Exception { + doReturn(true).when(mockMessageSlicer).slice(any()); + + ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder( + TRANSACTION_IDENTIFIER, probe.ref()); + reqBuilder.setSequence(0L); + final Request request = reqBuilder.build(); + + final long now = now(); + final Consumer> mockConsumer = createConsumerMock(); + queue.enqueueOrForward(new ConnectionEntry(request, mockConsumer, now), now); + + ArgumentCaptor sliceOptions = ArgumentCaptor.forClass(SliceOptions.class); + verify(mockMessageSlicer).slice(sliceOptions.capture()); + assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope); + + final Request request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref()); + queue.enqueueOrForward(new ConnectionEntry(request2, mockConsumer, now), now); + verifyNoMoreInteractions(mockMessageSlicer); + probe.expectNoMsg(); + + RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage(); + queue.complete(new FailureEnvelope(request.toRequestFailure(mock(RequestException.class)), + requestEnvelope.getSessionId(), requestEnvelope.getTxSequence(), 0), 0); + + requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); + assertEquals(request2, requestEnvelope.getMessage()); + + final Request request3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref()); + queue.enqueueOrForward(new ConnectionEntry(request3, mockConsumer, now), now); + + requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); + assertEquals(request3, requestEnvelope.getMessage()); + } + private static void assertEqualRequests(final Collection queue, final Request... requests) { final List> queued = ImmutableList.copyOf(Collections2.transform(queue, diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java index ec7dcca06a..4c3d67bdaa 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java @@ -17,9 +17,12 @@ import com.google.common.cache.RemovalNotification; import java.io.IOException; import java.io.ObjectOutputStream; import java.io.Serializable; +import java.util.Iterator; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; +import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.io.FileBackedOutputStream; import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; import org.opendaylight.yangtools.concepts.Identifier; @@ -37,7 +40,7 @@ public class MessageSlicer implements AutoCloseable { private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1); public static final int DEFAULT_MAX_SLICING_TRIES = 3; - private final Cache> stateCache; + private final Cache> stateCache; private final FileBackedOutputStreamFactory fileBackedStreamFactory; private final int messageSliceSize; private final int maxSlicingTries; @@ -92,8 +95,9 @@ public class MessageSlicer implements AutoCloseable { * options. * * @param options the SliceOptions + * @return true if the message was sliced, false otherwise */ - public void slice(final SliceOptions options) { + public boolean slice(final SliceOptions options) { final Identifier identifier = options.getIdentifier(); final Serializable message = options.getMessage(); final FileBackedOutputStream fileBackedStream; @@ -111,16 +115,16 @@ public class MessageSlicer implements AutoCloseable { LOG.debug("{}: Error serializing message for {}", logContext, identifier, e); fileBackedStream.cleanup(); options.getOnFailureCallback().accept(e); - return; + return false; } } else { fileBackedStream = options.getFileBackedStream(); } - initializeSlicing(options, fileBackedStream); + return initializeSlicing(options, fileBackedStream); } - private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) { + private boolean initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) { final Identifier identifier = options.getIdentifier(); MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id); SlicedMessageState state = null; @@ -133,7 +137,7 @@ public class MessageSlicer implements AutoCloseable { LOG.debug("{}: Message does not need to be sliced - sending original message", logContext); state.close(); sendTo(options, message, options.getReplyTo()); - return; + return false; } final MessageSlice firstSlice = getNextSliceMessage(state); @@ -142,6 +146,7 @@ public class MessageSlicer implements AutoCloseable { stateCache.put(messageSliceId, state); sendTo(options, firstSlice, ActorRef.noSender()); + return true; } catch (IOException e) { LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e); if (state != null) { @@ -151,6 +156,7 @@ public class MessageSlicer implements AutoCloseable { } options.getOnFailureCallback().accept(e); + return false; } } @@ -196,6 +202,20 @@ public class MessageSlicer implements AutoCloseable { stateCache.invalidateAll(); } + /** + * Cancels all in-progress sliced message state that matches the given filter. + * + * @param filter filters by Identifier + */ + public void cancelSlicing(@Nonnull final Predicate filter) { + final Iterator iter = stateCache.asMap().keySet().iterator(); + while (iter.hasNext()) { + if (filter.test(iter.next().getClientIdentifier())) { + iter.remove(); + } + } + } + private static MessageSlice getNextSliceMessage(final SlicedMessageState state) throws IOException { final byte[] firstSliceBytes = state.getNextSlice(); return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(), diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbstractMessagingTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbstractMessagingTest.java index 3d93755ee1..5ff1f38e83 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbstractMessagingTest.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbstractMessagingTest.java @@ -59,12 +59,7 @@ public class AbstractMessagingTest { MockitoAnnotations.initMocks(this); doReturn(mockFiledBackedStream).when(mockFiledBackedStreamFactory).newInstance(); - doNothing().when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt()); - doNothing().when(mockFiledBackedStream).write(any(byte[].class)); - doNothing().when(mockFiledBackedStream).write(anyInt()); - doNothing().when(mockFiledBackedStream).close(); - doNothing().when(mockFiledBackedStream).cleanup(); - doNothing().when(mockFiledBackedStream).flush(); + setupMockFiledBackedStream(mockFiledBackedStream); doReturn(mockByteSource).when(mockFiledBackedStream).asByteSource(); doReturn(mockInputStream).when(mockByteSource).openStream(); @@ -78,4 +73,14 @@ public class AbstractMessagingTest { public static void tearDownClass() { JavaTestKit.shutdownActorSystem(ACTOR_SYSTEM, Boolean.TRUE); } + + void setupMockFiledBackedStream(final FileBackedOutputStream mockFiledBackedStream) throws IOException { + doNothing().when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt()); + doNothing().when(mockFiledBackedStream).write(any(byte[].class)); + doNothing().when(mockFiledBackedStream).write(anyInt()); + doNothing().when(mockFiledBackedStream).close(); + doNothing().when(mockFiledBackedStream).cleanup(); + doNothing().when(mockFiledBackedStream).flush(); + doReturn(mockByteSource).when(mockFiledBackedStream).asByteSource(); + } } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicerTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicerTest.java index ac6ab2e447..e71b7154e4 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicerTest.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicerTest.java @@ -8,11 +8,14 @@ package org.opendaylight.controller.cluster.messaging; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -26,6 +29,7 @@ import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.opendaylight.controller.cluster.io.FileBackedOutputStream; import org.opendaylight.yangtools.concepts.Identifier; /** @@ -71,8 +75,9 @@ public class MessageSlicerTest extends AbstractMessagingTest { doThrow(mockFailure).when(mockFiledBackedStream).flush(); try (MessageSlicer slicer = newMessageSlicer("testSliceWithFailedSerialization", 100)) { - slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(), - mockOnFailureCallback); + final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), + testProbe.ref(), mockOnFailureCallback); + assertFalse(wasSliced); assertFailureCallback(IOException.class); verify(mockFiledBackedStream).cleanup(); @@ -86,8 +91,9 @@ public class MessageSlicerTest extends AbstractMessagingTest { doThrow(mockFailure).when(mockByteSource).openBufferedStream(); try (MessageSlicer slicer = newMessageSlicer("testSliceWithByteSourceFailure", 100)) { - slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(), - mockOnFailureCallback); + final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), + testProbe.ref(), mockOnFailureCallback); + assertFalse(wasSliced); assertFailureCallback(IOException.class); verify(mockFiledBackedStream).cleanup(); @@ -99,8 +105,9 @@ public class MessageSlicerTest extends AbstractMessagingTest { doReturn(0).when(mockInputStream).read(any(byte[].class)); try (MessageSlicer slicer = newMessageSlicer("testSliceWithInputStreamFailure", 2)) { - slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(), - mockOnFailureCallback); + final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), + testProbe.ref(), mockOnFailureCallback); + assertFalse(wasSliced); assertFailureCallback(IOException.class); verify(mockFiledBackedStream).cleanup(); @@ -131,6 +138,27 @@ public class MessageSlicerTest extends AbstractMessagingTest { verifyNoMoreInteractions(mockOnFailureCallback); } + @Test + public void testCancelSlicing() throws IOException { + doReturn(1).when(mockInputStream).read(any(byte[].class)); + + final MessageSlicer slicer = newMessageSlicer("testCloseAllSlicedMessageState", 1); + slicer.slice(SliceOptions.builder().identifier(IDENTIFIER).fileBackedOutputStream(mockFiledBackedStream) + .sendTo(testProbe.ref()).replyTo(testProbe.ref()).onFailureCallback(mockOnFailureCallback).build()); + + final FileBackedOutputStream mockFiledBackedStream2 = mock(FileBackedOutputStream.class); + setupMockFiledBackedStream(mockFiledBackedStream2); + slicer.slice(SliceOptions.builder().identifier(new StringIdentifier("test2")) + .fileBackedOutputStream(mockFiledBackedStream2).sendTo(testProbe.ref()).replyTo(testProbe.ref()) + .onFailureCallback(mockOnFailureCallback).build()); + + slicer.cancelSlicing(id -> id.equals(IDENTIFIER)); + + verify(mockFiledBackedStream).cleanup(); + verify(mockFiledBackedStream2, never()).cleanup(); + verifyNoMoreInteractions(mockOnFailureCallback); + } + @Test public void testCheckExpiredSlicedMessageState() throws IOException { doReturn(1).when(mockInputStream).read(any(byte[].class)); @@ -162,9 +190,9 @@ public class MessageSlicerTest extends AbstractMessagingTest { .fileBackedStreamFactory(mockFiledBackedStreamFactory).build(); } - static void slice(MessageSlicer slicer, Identifier identifier, Serializable message, ActorRef sendTo, + static boolean slice(MessageSlicer slicer, Identifier identifier, Serializable message, ActorRef sendTo, ActorRef replyTo, Consumer onFailureCallback) { - slicer.slice(SliceOptions.builder().identifier(identifier).message(message).sendTo(sendTo).replyTo(replyTo) - .onFailureCallback(onFailureCallback).build()); + return slicer.slice(SliceOptions.builder().identifier(identifier).message(message).sendTo(sendTo) + .replyTo(replyTo).onFailureCallback(onFailureCallback).build()); } } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicingIntegrationTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicingIntegrationTest.java index a1a1e24c69..7318960de6 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicingIntegrationTest.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicingIntegrationTest.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.messaging; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -119,7 +120,9 @@ public class MessageSlicingIntegrationTest { final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3}); try (MessageSlicer slicer = newMessageSlicer("testSingleSlice", SerializationUtils.serialize(message).length)) { - slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback); + final boolean wasSliced = slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), + mockOnFailureCallback); + assertFalse(wasSliced); final BytesMessage sentMessage = sendToProbe.expectMsgClass(BytesMessage.class); assertEquals("Sent message", message, sentMessage); @@ -214,7 +217,9 @@ public class MessageSlicingIntegrationTest { final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3}); final int messageSliceSize = SerializationUtils.serialize(message).length / 2; try (MessageSlicer slicer = newMessageSlicer("testSlicingWithFailure", messageSliceSize)) { - slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback); + final boolean wasSliced = slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), + mockOnFailureCallback); + assertTrue(wasSliced); MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class); @@ -263,7 +268,9 @@ public class MessageSlicingIntegrationTest { final BytesMessage message = new BytesMessage(messageData); try (MessageSlicer slicer = newMessageSlicer(logContext, messageSliceSize)) { - slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback); + final boolean wasSliced = slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), + mockOnFailureCallback); + assertTrue(wasSliced); Identifier slicingId = null; int expLastSliceHashCode = SlicedMessageState.INITIAL_SLICE_HASH_CODE; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index c003d901af..ebe5498481 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -11,9 +11,11 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; +import akka.actor.ExtendedActorSystem; import akka.actor.Props; import akka.actor.Status; import akka.actor.Status.Failure; +import akka.serialization.JavaSerializer; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; @@ -82,6 +84,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot; +import org.opendaylight.controller.cluster.messaging.MessageAssembler; import org.opendaylight.controller.cluster.messaging.MessageSlicer; import org.opendaylight.controller.cluster.messaging.SliceOptions; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; @@ -191,6 +194,8 @@ public class Shard extends RaftActor { private final MessageSlicer responseMessageSlicer; private final Dispatchers dispatchers; + private final MessageAssembler requestMessageAssembler; + protected Shard(final AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION); @@ -247,6 +252,11 @@ public class Shard extends RaftActor { .messageSliceSize(datastoreContext.getMaximumMessageSliceSize()) .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory()) .expireStateAfterInactivity(2, TimeUnit.MINUTES).build(); + + requestMessageAssembler = MessageAssembler.builder().logContext(this.name) + .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory()) + .assembledMessageCallback((message, sender) -> self().tell(message, sender)) + .expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build(); } private void setTransactionCommitTimeout() { @@ -301,6 +311,8 @@ public class Shard extends RaftActor { if (message instanceof RequestEnvelope) { handleRequestEnvelope((RequestEnvelope)message); + } else if (requestMessageAssembler.isHandledMessage(message)) { + handleRequestAssemblerMessage(message); } else if (message instanceof ConnectClientRequest) { handleConnectClient((ConnectClientRequest)message); } else if (CreateTransaction.isSerializedType(message)) { @@ -361,6 +373,13 @@ public class Shard extends RaftActor { } } + private void handleRequestAssemblerMessage(Object message) { + dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> { + JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) context().system()); + requestMessageAssembler.handleMessage(message, self()); + }); + } + @SuppressWarnings("checkstyle:IllegalCatch") private void handleRequestEnvelope(final RequestEnvelope envelope) { final long now = ticker().read(); @@ -393,6 +412,7 @@ public class Shard extends RaftActor { private void commitTimeoutCheck() { store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess); commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this); + requestMessageAssembler.checkExpiredAssembledMessageState(); } private Optional updateAccess(final SimpleShardDataTreeCohort cohort) { @@ -881,6 +901,8 @@ public class Shard extends RaftActor { knownFrontends = ImmutableMap.of(); } + requestMessageAssembler.close(); + if (!hasLeader()) { // No leader anywhere, nothing else to do return; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index f6a026aaa9..04370fe087 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -1153,11 +1153,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } @Test - public void testLargeReadReplySlicing() throws Exception { + public void testReadWriteMessageSlicing() throws Exception { // The slicing is only implemented for tell-based protocol Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class)); - leaderDatastoreContextBuilder.maximumMessageSliceSize(50); + leaderDatastoreContextBuilder.maximumMessageSliceSize(100); + followerDatastoreContextBuilder.maximumMessageSliceSize(100); initDatastoresWithCars("testLargeReadReplySlicing"); final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); -- 2.36.6