import java.util.List;
import java.util.Optional;
import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
/**
* @author Robert Varga
*/
@Beta
-public final class ModifyTransactionRequest extends TransactionRequest<ModifyTransactionRequest> {
+public final class ModifyTransactionRequest extends TransactionRequest<ModifyTransactionRequest>
+ implements SliceableMessage {
private static final long serialVersionUID = 1L;
@SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but this class "
// This constructor is only to be called (indirectly) by ConnectedClientConnection constructor.
// Do not allow subclassing outside of this package
AbstractClientConnection(final AbstractClientConnection<T> oldConn, final T newBackend, final int queueDepth) {
- this(oldConn, new TransmitQueue.Transmitting(oldConn.queue, queueDepth, newBackend, oldConn.currentTime()));
+ this(oldConn, new TransmitQueue.Transmitting(oldConn.queue, queueDepth, newBackend, oldConn.currentTime(),
+ Preconditions.checkNotNull(oldConn.context).messageSlicer()));
}
public final ClientActorContext context() {
import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.concepts.WritableIdentifier;
+import org.opendaylight.yangtools.concepts.Identifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
return this;
}
+ if (context().messageSlicer().handleMessage(command)) {
+ return this;
+ }
+
return onCommand(command);
}
- private static long extractCookie(final WritableIdentifier id) {
+ private static long extractCookie(final Identifier id) {
if (id instanceof TransactionIdentifier) {
return ((TransactionIdentifier) id).getHistoryId().getCookie();
} else if (id instanceof LocalHistoryIdentifier) {
} finally {
connectionsLock.unlockWrite(stamp);
}
+
+ context().messageSlicer().close();
}
/**
}
} else {
LOG.info("{}: removed connection {}", persistenceId(), conn);
+ cancelSlicing(conn.cookie());
}
} finally {
connectionsLock.unlockWrite(stamp);
} else {
LOG.warn("{}: failed to replace connection {}, as it was not tracked", persistenceId(), conn);
}
+ } else {
+ cancelSlicing(oldConn.cookie());
}
} finally {
connectionsLock.unlockWrite(stamp);
}));
}
+ private void cancelSlicing(final Long cookie) {
+ context().messageSlicer().cancelSlicing(id -> {
+ try {
+ return cookie.equals(extractCookie(id));
+ } catch (IllegalArgumentException e) {
+ LOG.debug("extractCookie failed while cancelling slicing for cookie {}: {}", cookie, e);
+ return false;
+ }
+ });
+ }
+
private ConnectingClientConnection<T> createConnection(final Long shard) {
final ConnectingClientConnection<T> conn = new ConnectingClientConnection<>(context(), shard);
resolveConnection(shard, conn);
import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.common.actor.Dispatchers;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
import org.opendaylight.yangtools.concepts.Identifiable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
private final Scheduler scheduler;
private final Dispatchers dispatchers;
private final ClientActorConfig config;
+ private final MessageSlicer messageSlicer;
// Hidden to avoid subclassing
ClientActorContext(final ActorRef self, final String persistenceId, final ActorSystem system,
this.executionContext = system.dispatcher();
this.dispatchers = new Dispatchers(system.dispatchers());
this.config = Preconditions.checkNotNull(config);
+
+ messageSlicer = MessageSlicer.builder().messageSliceSize(config.getMaximumMessageSliceSize())
+ .logContext(persistenceId).expireStateAfterInactivity(config.getRequestTimeout(), TimeUnit.NANOSECONDS)
+ .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(),
+ config.getTempFileDirectory())).build();
}
@Override
return dispatchers;
}
+ @Nonnull
+ public MessageSlicer messageSlicer() {
+ return messageSlicer;
+ }
+
/**
* Return the time ticker for this {@link ClientActorContext}. This should be used for in all time-tracking
* done within a client actor. Subclasses of {@link ClientActorBehavior} are encouraged to use
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
@Override
- TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
+ Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
}
+
+ @Override
+ void preComplete(ResponseEnvelope<?> envelope) {
+ }
}
static final class Transmitting extends TransmitQueue {
+ private static final long NOT_SLICING = -1;
+
private final BackendInfo backend;
+ private final MessageSlicer messageSlicer;
private long nextTxSequence;
+ private long currentSlicedEnvSequenceId = NOT_SLICING;
// For ConnectedClientConnection.
- Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now) {
+ Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now,
+ final MessageSlicer messageSlicer) {
super(oldQueue, targetDepth, now);
this.backend = Preconditions.checkNotNull(backend);
+ this.messageSlicer = Preconditions.checkNotNull(messageSlicer);
}
@Override
}
@Override
- TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
- final RequestEnvelope env = new RequestEnvelope(entry.getRequest().toVersion(backend.getVersion()),
+ Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
+ // If we're currently slicing a message we can't send any subsequent requests until slicing completes to
+ // avoid an out-of-sequence request envelope failure on the backend. In this case we return an empty
+ // Optional to indicate the request was not transmitted.
+ if (currentSlicedEnvSequenceId >= 0) {
+ return Optional.empty();
+ }
+
+ final Request<?, ?> request = entry.getRequest();
+ final RequestEnvelope env = new RequestEnvelope(request.toVersion(backend.getVersion()),
backend.getSessionId(), nextTxSequence++);
- final TransmittedConnectionEntry ret = new TransmittedConnectionEntry(entry, env.getSessionId(),
- env.getTxSequence(), now);
- backend.getActor().tell(env, ActorRef.noSender());
- return ret;
+ if (request instanceof SliceableMessage) {
+ if (messageSlicer.slice(SliceOptions.builder().identifier(request.getTarget())
+ .message(env).replyTo(request.getReplyTo()).sendTo(backend.getActor())
+ .onFailureCallback(t -> env.sendFailure(new RuntimeRequestException(
+ "Failed to slice request " + request, t), 0L)).build())) {
+ // The request was sliced so record the envelope sequence id to prevent transmitting
+ // subsequent requests until slicing completes.
+ currentSlicedEnvSequenceId = env.getTxSequence();
+ }
+ } else {
+ backend.getActor().tell(env, ActorRef.noSender());
+ }
+
+ return Optional.of(new TransmittedConnectionEntry(entry, env.getSessionId(),
+ env.getTxSequence(), now));
+ }
+
+ @Override
+ void preComplete(ResponseEnvelope<?> envelope) {
+ if (envelope.getTxSequence() == currentSlicedEnvSequenceId) {
+ // Slicing completed for the prior request - clear the cached sequence id field to enable subsequent
+ // requests to be transmitted.
+ currentSlicedEnvSequenceId = NOT_SLICING;
+ }
}
}
// If a matching request was found, this will track a task was closed.
final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
+ preComplete(envelope);
+
Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
if (maybeEntry == null) {
LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
private void transmitEntries(final int maxTransmit, final long now) {
for (int i = 0; i < maxTransmit; ++i) {
final ConnectionEntry e = pending.poll();
- if (e == null) {
+ if (e == null || !transmitEntry(e, now)) {
LOG.debug("Queue {} transmitted {} requests", this, i);
return;
}
-
- transmitEntry(e, now);
}
LOG.debug("Queue {} transmitted {} requests", this, maxTransmit);
}
- private void transmitEntry(final ConnectionEntry entry, final long now) {
+ private boolean transmitEntry(final ConnectionEntry entry, final long now) {
LOG.debug("Queue {} transmitting entry {}", this, entry);
// We are not thread-safe and are supposed to be externally-guarded,
// hence send-before-record should be fine.
// This needs to be revisited if the external guards are lowered.
- inflight.addLast(transmit(entry, now));
+ final Optional<TransmittedConnectionEntry> maybeTransmitted = transmit(entry, now);
+ if (!maybeTransmitted.isPresent()) {
+ return false;
+ }
+
+ inflight.addLast(maybeTransmitted.get());
+ return true;
}
final long enqueueOrForward(final ConnectionEntry entry, final long now) {
}
if (pending.isEmpty()) {
- transmitEntry(entry, now);
+ if (!transmitEntry(entry, now)) {
+ LOG.debug("Queue {} cannot transmit request {} - delaying it", this, entry.getRequest());
+ pending.addLast(entry);
+ }
+
return delay;
}
*/
abstract int canTransmitCount(int inflightSize);
- abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);
+ abstract Optional<TransmittedConnectionEntry> transmit(ConnectionEntry entry, long now);
+
+ abstract void preComplete(ResponseEnvelope<?> envelope);
final boolean isEmpty() {
return inflight.isEmpty() && pending.isEmpty();
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.messaging.MessageSlice;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
public class ConnectedClientConnectionTest
extends AbstractClientConnectionTest<ConnectedClientConnection<BackendInfo>, BackendInfo> {
verify(behavior).reconnectConnection(same(connection), any(ReconnectingClientConnection.class));
}
-}
\ No newline at end of file
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSendSliceableMessageRequest() {
+ final ClientActorConfig config = AccessClientUtil.newMockClientActorConfig();
+ doReturn(5).when(config).getMaximumMessageSliceSize();
+ context = new ClientActorContext(contextProbe.ref(), PERSISTENCE_ID, system, CLIENT_ID, config);
+ connection = createConnection();
+
+ final Consumer<Response<?, ?>> callback = mock(Consumer.class);
+
+ final TransactionIdentifier identifier =
+ new TransactionIdentifier(new LocalHistoryIdentifier(CLIENT_ID, 0L), 0L);
+ ModifyTransactionRequestBuilder reqBuilder =
+ new ModifyTransactionRequestBuilder(identifier, replyToProbe.ref());
+ reqBuilder.addModification(new TransactionWrite(YangInstanceIdentifier.EMPTY, Builders.containerBuilder()
+ .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(
+ QName.create("namespace", "localName"))).build()));
+ reqBuilder.setSequence(0L);
+ final Request<?, ?> request = reqBuilder.build();
+ connection.sendRequest(request, callback);
+
+ backendProbe.expectMsgClass(MessageSlice.class);
+ }
+}
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
import org.opendaylight.yangtools.concepts.WritableIdentifier;
import scala.concurrent.duration.FiniteDuration;
final ClientActorConfig mockConfig = AccessClientUtil.newMockClientActorConfig();
doReturn(mockConfig).when(mockContext).config();
+ doReturn(mock(MessageSlicer.class)).when(mockContext).messageSlicer();
+
mockActor = TestProbe.apply(actorSystem);
mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5);
mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
import com.google.common.base.Ticker;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<TransmitQueue.Transmitting> {
private BackendInfo backendInfo;
+ private final MessageSlicer mockMessageSlicer = mock(MessageSlicer.class);
private static long now() {
return Ticker.systemTicker().read();
@Override
protected TransmitQueue.Transmitting createQueue() {
+ doReturn(false).when(mockMessageSlicer).slice(any());
backendInfo = new BackendInfo(probe.ref(), 0L, ABIVersion.BORON, 3);
- return new TransmitQueue.Transmitting(new TransmitQueue.Halted(0), 0, backendInfo, now());
+ return new TransmitQueue.Transmitting(new TransmitQueue.Halted(0), 0, backendInfo, now(), mockMessageSlicer);
}
@Test
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = now();
final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
- queue.transmit(entry, now);
+
+ Optional<TransmittedConnectionEntry> transmitted = queue.transmit(entry, now);
+ assertTrue(transmitted.isPresent());
+ assertEquals(request, transmitted.get().getRequest());
+ assertEquals(callback, transmitted.get().getCallback());
+
final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
assertEquals(request, requestEnvelope.getMessage());
+
+ transmitted = queue.transmit(new ConnectionEntry(new TransactionPurgeRequest(
+ TRANSACTION_IDENTIFIER, 1L, probe.ref()), callback, now), now);
+ assertTrue(transmitted.isPresent());
}
@Test
assertEqualRequests(queue.getPending(), req6);
}
+ @Test
+ public void testRequestSlicingOnTransmit() throws Exception {
+ doReturn(true).when(mockMessageSlicer).slice(any());
+
+ ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
+ TRANSACTION_IDENTIFIER, probe.ref());
+ reqBuilder.setSequence(0L);
+ final Request<?, ?> request = reqBuilder.build();
+
+ final long now = now();
+ final Consumer<Response<?, ?>> mockConsumer = createConsumerMock();
+ Optional<TransmittedConnectionEntry> transmitted =
+ queue.transmit(new ConnectionEntry(request, mockConsumer, now), now);
+ assertTrue(transmitted.isPresent());
+
+ ArgumentCaptor<SliceOptions> sliceOptions = ArgumentCaptor.forClass(SliceOptions.class);
+ verify(mockMessageSlicer).slice(sliceOptions.capture());
+ assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope);
+ RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage();
+ assertEquals(request, requestEnvelope.getMessage());
+
+ final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+ transmitted = queue.transmit(new ConnectionEntry(request2, mockConsumer, now), now);
+ assertFalse(transmitted.isPresent());
+ }
+
+ @Test
+ public void testSlicingFailureOnTransmit() throws Exception {
+ doAnswer(invocation -> {
+ invocation.getArgumentAt(0, SliceOptions.class).getOnFailureCallback().accept(new Exception("mock"));
+ return Boolean.FALSE;
+ }).when(mockMessageSlicer).slice(any());
+
+ ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
+ TRANSACTION_IDENTIFIER, probe.ref());
+ reqBuilder.setSequence(0L);
+
+ final long now = now();
+ Optional<TransmittedConnectionEntry> transmitted =
+ queue.transmit(new ConnectionEntry(reqBuilder.build(), createConsumerMock(), now), now);
+ assertTrue(transmitted.isPresent());
+
+ verify(mockMessageSlicer).slice(any());
+
+ probe.expectMsgClass(FailureEnvelope.class);
+ }
+
+ @Test
+ public void testSlicedRequestOnComplete() throws Exception {
+ doReturn(true).when(mockMessageSlicer).slice(any());
+
+ ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
+ TRANSACTION_IDENTIFIER, probe.ref());
+ reqBuilder.setSequence(0L);
+ final Request<?, ?> request = reqBuilder.build();
+
+ final long now = now();
+ final Consumer<Response<?, ?>> mockConsumer = createConsumerMock();
+ queue.enqueueOrForward(new ConnectionEntry(request, mockConsumer, now), now);
+
+ ArgumentCaptor<SliceOptions> sliceOptions = ArgumentCaptor.forClass(SliceOptions.class);
+ verify(mockMessageSlicer).slice(sliceOptions.capture());
+ assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope);
+
+ final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+ queue.enqueueOrForward(new ConnectionEntry(request2, mockConsumer, now), now);
+ verifyNoMoreInteractions(mockMessageSlicer);
+ probe.expectNoMsg();
+
+ RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage();
+ queue.complete(new FailureEnvelope(request.toRequestFailure(mock(RequestException.class)),
+ requestEnvelope.getSessionId(), requestEnvelope.getTxSequence(), 0), 0);
+
+ requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
+ assertEquals(request2, requestEnvelope.getMessage());
+
+ final Request<?, ?> request3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref());
+ queue.enqueueOrForward(new ConnectionEntry(request3, mockConsumer, now), now);
+
+ requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
+ assertEquals(request3, requestEnvelope.getMessage());
+ }
+
private static void assertEqualRequests(final Collection<? extends ConnectionEntry> queue,
final Request<?, ?>... requests) {
final List<Request<?, ?>> queued = ImmutableList.copyOf(Collections2.transform(queue,
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
import org.opendaylight.yangtools.concepts.Identifier;
private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
public static final int DEFAULT_MAX_SLICING_TRIES = 3;
- private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
+ private final Cache<MessageSliceIdentifier, SlicedMessageState<ActorRef>> stateCache;
private final FileBackedOutputStreamFactory fileBackedStreamFactory;
private final int messageSliceSize;
private final int maxSlicingTries;
* options.
*
* @param options the SliceOptions
+ * @return true if the message was sliced, false otherwise
*/
- public void slice(final SliceOptions options) {
+ public boolean slice(final SliceOptions options) {
final Identifier identifier = options.getIdentifier();
final Serializable message = options.getMessage();
final FileBackedOutputStream fileBackedStream;
LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
fileBackedStream.cleanup();
options.getOnFailureCallback().accept(e);
- return;
+ return false;
}
} else {
fileBackedStream = options.getFileBackedStream();
}
- initializeSlicing(options, fileBackedStream);
+ return initializeSlicing(options, fileBackedStream);
}
- private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
+ private boolean initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
final Identifier identifier = options.getIdentifier();
MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
SlicedMessageState<ActorRef> state = null;
LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
state.close();
sendTo(options, message, options.getReplyTo());
- return;
+ return false;
}
final MessageSlice firstSlice = getNextSliceMessage(state);
stateCache.put(messageSliceId, state);
sendTo(options, firstSlice, ActorRef.noSender());
+ return true;
} catch (IOException e) {
LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
if (state != null) {
}
options.getOnFailureCallback().accept(e);
+ return false;
}
}
stateCache.invalidateAll();
}
+ /**
+ * Cancels all in-progress sliced message state that matches the given filter.
+ *
+ * @param filter filters by Identifier
+ */
+ public void cancelSlicing(@Nonnull final Predicate<Identifier> filter) {
+ final Iterator<MessageSliceIdentifier> iter = stateCache.asMap().keySet().iterator();
+ while (iter.hasNext()) {
+ if (filter.test(iter.next().getClientIdentifier())) {
+ iter.remove();
+ }
+ }
+ }
+
private static MessageSlice getNextSliceMessage(final SlicedMessageState<ActorRef> state) throws IOException {
final byte[] firstSliceBytes = state.getNextSlice();
return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),
MockitoAnnotations.initMocks(this);
doReturn(mockFiledBackedStream).when(mockFiledBackedStreamFactory).newInstance();
- doNothing().when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt());
- doNothing().when(mockFiledBackedStream).write(any(byte[].class));
- doNothing().when(mockFiledBackedStream).write(anyInt());
- doNothing().when(mockFiledBackedStream).close();
- doNothing().when(mockFiledBackedStream).cleanup();
- doNothing().when(mockFiledBackedStream).flush();
+ setupMockFiledBackedStream(mockFiledBackedStream);
doReturn(mockByteSource).when(mockFiledBackedStream).asByteSource();
doReturn(mockInputStream).when(mockByteSource).openStream();
public static void tearDownClass() {
JavaTestKit.shutdownActorSystem(ACTOR_SYSTEM, Boolean.TRUE);
}
+
+ void setupMockFiledBackedStream(final FileBackedOutputStream mockFiledBackedStream) throws IOException {
+ doNothing().when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt());
+ doNothing().when(mockFiledBackedStream).write(any(byte[].class));
+ doNothing().when(mockFiledBackedStream).write(anyInt());
+ doNothing().when(mockFiledBackedStream).close();
+ doNothing().when(mockFiledBackedStream).cleanup();
+ doNothing().when(mockFiledBackedStream).flush();
+ doReturn(mockByteSource).when(mockFiledBackedStream).asByteSource();
+ }
}
package org.opendaylight.controller.cluster.messaging;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
import org.opendaylight.yangtools.concepts.Identifier;
/**
doThrow(mockFailure).when(mockFiledBackedStream).flush();
try (MessageSlicer slicer = newMessageSlicer("testSliceWithFailedSerialization", 100)) {
- slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
- mockOnFailureCallback);
+ final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(),
+ testProbe.ref(), mockOnFailureCallback);
+ assertFalse(wasSliced);
assertFailureCallback(IOException.class);
verify(mockFiledBackedStream).cleanup();
doThrow(mockFailure).when(mockByteSource).openBufferedStream();
try (MessageSlicer slicer = newMessageSlicer("testSliceWithByteSourceFailure", 100)) {
- slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
- mockOnFailureCallback);
+ final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(),
+ testProbe.ref(), mockOnFailureCallback);
+ assertFalse(wasSliced);
assertFailureCallback(IOException.class);
verify(mockFiledBackedStream).cleanup();
doReturn(0).when(mockInputStream).read(any(byte[].class));
try (MessageSlicer slicer = newMessageSlicer("testSliceWithInputStreamFailure", 2)) {
- slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
- mockOnFailureCallback);
+ final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(),
+ testProbe.ref(), mockOnFailureCallback);
+ assertFalse(wasSliced);
assertFailureCallback(IOException.class);
verify(mockFiledBackedStream).cleanup();
verifyNoMoreInteractions(mockOnFailureCallback);
}
+ @Test
+ public void testCancelSlicing() throws IOException {
+ doReturn(1).when(mockInputStream).read(any(byte[].class));
+
+ final MessageSlicer slicer = newMessageSlicer("testCloseAllSlicedMessageState", 1);
+ slicer.slice(SliceOptions.builder().identifier(IDENTIFIER).fileBackedOutputStream(mockFiledBackedStream)
+ .sendTo(testProbe.ref()).replyTo(testProbe.ref()).onFailureCallback(mockOnFailureCallback).build());
+
+ final FileBackedOutputStream mockFiledBackedStream2 = mock(FileBackedOutputStream.class);
+ setupMockFiledBackedStream(mockFiledBackedStream2);
+ slicer.slice(SliceOptions.builder().identifier(new StringIdentifier("test2"))
+ .fileBackedOutputStream(mockFiledBackedStream2).sendTo(testProbe.ref()).replyTo(testProbe.ref())
+ .onFailureCallback(mockOnFailureCallback).build());
+
+ slicer.cancelSlicing(id -> id.equals(IDENTIFIER));
+
+ verify(mockFiledBackedStream).cleanup();
+ verify(mockFiledBackedStream2, never()).cleanup();
+ verifyNoMoreInteractions(mockOnFailureCallback);
+ }
+
@Test
public void testCheckExpiredSlicedMessageState() throws IOException {
doReturn(1).when(mockInputStream).read(any(byte[].class));
.fileBackedStreamFactory(mockFiledBackedStreamFactory).build();
}
- static void slice(MessageSlicer slicer, Identifier identifier, Serializable message, ActorRef sendTo,
+ static boolean slice(MessageSlicer slicer, Identifier identifier, Serializable message, ActorRef sendTo,
ActorRef replyTo, Consumer<Throwable> onFailureCallback) {
- slicer.slice(SliceOptions.builder().identifier(identifier).message(message).sendTo(sendTo).replyTo(replyTo)
- .onFailureCallback(onFailureCallback).build());
+ return slicer.slice(SliceOptions.builder().identifier(identifier).message(message).sendTo(sendTo)
+ .replyTo(replyTo).onFailureCallback(onFailureCallback).build());
}
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
try (MessageSlicer slicer = newMessageSlicer("testSingleSlice", SerializationUtils.serialize(message).length)) {
- slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
+ final boolean wasSliced = slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(),
+ mockOnFailureCallback);
+ assertFalse(wasSliced);
final BytesMessage sentMessage = sendToProbe.expectMsgClass(BytesMessage.class);
assertEquals("Sent message", message, sentMessage);
final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
try (MessageSlicer slicer = newMessageSlicer("testSlicingWithFailure", messageSliceSize)) {
- slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
+ final boolean wasSliced = slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(),
+ mockOnFailureCallback);
+ assertTrue(wasSliced);
MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
final BytesMessage message = new BytesMessage(messageData);
try (MessageSlicer slicer = newMessageSlicer(logContext, messageSliceSize)) {
- slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
+ final boolean wasSliced = slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(),
+ mockOnFailureCallback);
+ assertTrue(wasSliced);
Identifier slicingId = null;
int expLastSliceHashCode = SlicedMessageState.INITIAL_SLICE_HASH_CODE;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
+import akka.actor.ExtendedActorSystem;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.Status.Failure;
+import akka.serialization.JavaSerializer;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.controller.cluster.messaging.MessageSlicer;
import org.opendaylight.controller.cluster.messaging.SliceOptions;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
private final MessageSlicer responseMessageSlicer;
private final Dispatchers dispatchers;
+ private final MessageAssembler requestMessageAssembler;
+
protected Shard(final AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
.messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
.fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
.expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
+
+ requestMessageAssembler = MessageAssembler.builder().logContext(this.name)
+ .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
+ .assembledMessageCallback((message, sender) -> self().tell(message, sender))
+ .expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build();
}
private void setTransactionCommitTimeout() {
if (message instanceof RequestEnvelope) {
handleRequestEnvelope((RequestEnvelope)message);
+ } else if (requestMessageAssembler.isHandledMessage(message)) {
+ handleRequestAssemblerMessage(message);
} else if (message instanceof ConnectClientRequest) {
handleConnectClient((ConnectClientRequest)message);
} else if (CreateTransaction.isSerializedType(message)) {
}
}
+ private void handleRequestAssemblerMessage(Object message) {
+ dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> {
+ JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) context().system());
+ requestMessageAssembler.handleMessage(message, self());
+ });
+ }
+
@SuppressWarnings("checkstyle:IllegalCatch")
private void handleRequestEnvelope(final RequestEnvelope envelope) {
final long now = ticker().read();
private void commitTimeoutCheck() {
store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess);
commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+ requestMessageAssembler.checkExpiredAssembledMessageState();
}
private Optional<Long> updateAccess(final SimpleShardDataTreeCohort cohort) {
knownFrontends = ImmutableMap.of();
}
+ requestMessageAssembler.close();
+
if (!hasLeader()) {
// No leader anywhere, nothing else to do
return;
}
@Test
- public void testLargeReadReplySlicing() throws Exception {
+ public void testReadWriteMessageSlicing() throws Exception {
// The slicing is only implemented for tell-based protocol
Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class));
- leaderDatastoreContextBuilder.maximumMessageSliceSize(50);
+ leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
+ followerDatastoreContextBuilder.maximumMessageSliceSize(100);
initDatastoresWithCars("testLargeReadReplySlicing");
final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();