import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
* @author Robert Varga
*/
@Beta
-public final class ReadTransactionSuccess extends TransactionSuccess<ReadTransactionSuccess> {
+public final class ReadTransactionSuccess extends TransactionSuccess<ReadTransactionSuccess>
+ implements SliceableMessage {
private static final long serialVersionUID = 1L;
private final Optional<NormalizedNode<?, ?>> data;
* @throws NullPointerException if success is null
*/
public void sendSuccess(final RequestSuccess<?, ?> success, final long executionTimeNanos) {
- sendResponse(new SuccessEnvelope(success, getSessionId(), getTxSequence(), executionTimeNanos));
+ sendResponse(newSuccessEnvelope(success, executionTimeNanos));
+ }
+
+ /**
+ * Creates a successful ResponseEnvelope that wraps the given successful Request response message.
+ *
+ * @param success the successful Request response message
+ * @param executionTimeNanos the execution time of the request
+ * @return a {@link ResponseEnvelope} instance
+ */
+ public ResponseEnvelope<?> newSuccessEnvelope(final RequestSuccess<?, ?> success, final long executionTimeNanos) {
+ return new SuccessEnvelope(success, getSessionId(), getTxSequence(), executionTimeNanos);
}
private void sendResponse(final ResponseEnvelope<?> envelope) {
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * A tagging interface that specifies a message whose serialized size can be large and thus should be sliced into
+ * smaller chunks when transporting over the wire.
+ *
+ * @author Thomas Pantelis
+ */
+@Beta
+public interface SliceableMessage {
+}
return currentBehavior.persistenceId();
}
+ @Override
+ public void postStop() {
+ if (currentBehavior != null) {
+ currentBehavior.close();
+ }
+
+ super.postStop();
+ }
+
private void switchBehavior(final AbstractClientActorBehavior<?> nextBehavior) {
if (!currentBehavior.equals(nextBehavior)) {
if (nextBehavior == null) {
LOG.debug("{}: switched from {} to {}", persistenceId(), currentBehavior, nextBehavior);
}
+ currentBehavior.close();
currentBehavior = nextBehavior;
}
}
* @author Robert Varga
*/
@Beta
-public abstract class AbstractClientActorBehavior<C extends AbstractClientActorContext> {
+public abstract class AbstractClientActorBehavior<C extends AbstractClientActorContext> implements AutoCloseable {
private final C context;
AbstractClientActorBehavior(@Nonnull final C context) {
return context.self();
}
+ @Override
+ public void close() {
+ }
+
/**
* Implementation-internal method for handling an incoming command message.
*
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherType;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.concepts.WritableIdentifier;
import org.slf4j.Logger;
private final Map<Long, AbstractClientConnection<T>> connections = new ConcurrentHashMap<>();
private final InversibleLock connectionsLock = new InversibleLock();
private final BackendInfoResolver<T> resolver;
+ private final MessageAssembler responseMessageAssembler;
protected ClientActorBehavior(@Nonnull final ClientActorContext context,
@Nonnull final BackendInfoResolver<T> resolver) {
super(context);
this.resolver = Preconditions.checkNotNull(resolver);
+
+ final ClientActorConfig config = context.config();
+ responseMessageAssembler = MessageAssembler.builder().logContext(persistenceId())
+ .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(),
+ config.getTempFileDirectory()))
+ .assembledMessageCallback((message, sender) -> context.self().tell(message, sender)).build();
}
@Override
return context().getIdentifier();
}
+ @Override
+ public void close() {
+ responseMessageAssembler.close();
+ }
+
/**
* Get a connection to a shard.
*
if (command instanceof InternalCommand) {
return ((InternalCommand<T>) command).execute(this);
}
+
if (command instanceof SuccessEnvelope) {
return onRequestSuccess((SuccessEnvelope) command);
}
+
if (command instanceof FailureEnvelope) {
return internalOnRequestFailure((FailureEnvelope) command);
}
+ if (MessageAssembler.isHandledMessage(command)) {
+ context().dispatchers().getDispatcher(DispatcherType.Serialization).execute(
+ () -> responseMessageAssembler.handleMessage(command, context().self()));
+ return this;
+ }
+
return onCommand(command);
}
package org.opendaylight.controller.cluster.access.client;
import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
import akka.actor.Cancellable;
import akka.actor.Scheduler;
import com.google.common.annotations.Beta;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers;
import org.opendaylight.yangtools.concepts.Identifiable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
private final ExecutionContext executionContext;
private final ClientIdentifier identifier;
private final Scheduler scheduler;
+ private final Dispatchers dispatchers;
private final ClientActorConfig config;
// Hidden to avoid subclassing
- ClientActorContext(final ActorRef self, final Scheduler scheduler, final ExecutionContext executionContext,
- final String persistenceId, final ClientIdentifier identifier, final ClientActorConfig config) {
+ ClientActorContext(final ActorRef self, final String persistenceId, final ActorSystem system,
+ final ClientIdentifier identifier, final ClientActorConfig config) {
super(self, persistenceId);
this.identifier = Preconditions.checkNotNull(identifier);
- this.scheduler = Preconditions.checkNotNull(scheduler);
- this.executionContext = Preconditions.checkNotNull(executionContext);
+ this.scheduler = Preconditions.checkNotNull(system).scheduler();
+ this.executionContext = system.dispatcher();
+ this.dispatchers = new Dispatchers(system.dispatchers());
this.config = Preconditions.checkNotNull(config);
}
return config;
}
+ @Nonnull
+ public Dispatchers dispatchers() {
+ return dispatchers;
+ }
+
/**
* Return the time ticker for this {@link ClientActorContext}. This should be used for in all time-tracking
* done within a client actor. Subclasses of {@link ClientActorBehavior} are encouraged to use
ClientActorBehavior<?> createBehavior(final ClientIdentifier clientId) {
final ActorSystem system = actor.getContext().system();
- final ClientActorContext context = new ClientActorContext(self(), system.scheduler(), system.dispatcher(),
- persistenceId(), clientId, actor.getClientActorConfig());
+ final ClientActorContext context = new ClientActorContext(self(), persistenceId(), system,
+ clientId, actor.getClientActorConfig());
return actor.initialBehavior(context);
}
system = ActorSystem.apply();
backendProbe = new TestProbe(system);
contextProbe = new TestProbe(system);
- context = new ClientActorContext(contextProbe.ref(), system.scheduler(), system.dispatcher(),
- PERSISTENCE_ID, CLIENT_ID, AccessClientUtil.newMockClientActorConfig());
+ context = new ClientActorContext(contextProbe.ref(), PERSISTENCE_ID, system,
+ CLIENT_ID, AccessClientUtil.newMockClientActorConfig());
replyToProbe = new TestProbe(system);
connection = createConnection();
}
public static ClientActorContext createClientActorContext(final ActorSystem system, final ActorRef actor,
final ClientIdentifier id, final String persistenceId) {
- return spy(new ClientActorContext(actor, system.scheduler(), system.dispatcher(), persistenceId, id,
- newMockClientActorConfig()));
+ return spy(new ClientActorContext(actor, persistenceId, system, id, newMockClientActorConfig()));
}
public static ClientActorConfig newMockClientActorConfig() {
MockitoAnnotations.initMocks(this);
system = ActorSystem.apply();
probe = new TestProbe(system);
- ctx = new ClientActorContext(probe.ref(), system.scheduler(), system.dispatcher(),
- PERSISTENCE_ID, CLIENT_ID, AccessClientUtil.newMockClientActorConfig());
+ ctx = new ClientActorContext(probe.ref(), PERSISTENCE_ID, system,
+ CLIENT_ID, AccessClientUtil.newMockClientActorConfig());
}
@Test
import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.Dispatchers;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherType;
import org.opendaylight.controller.cluster.common.actor.MessageTracker;
import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
private final FrontendMetadata frontendMetadata;
private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
+ private final MessageSlicer responseMessageSlicer;
+ private final Dispatchers dispatchers;
+
protected Shard(final AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
+ dispatchers = new Dispatchers(context().system().dispatchers());
transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext,
- new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Transaction),
+ dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Transaction),
self(), getContext(), shardMBean, builder.getId().getShardName());
snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
this.name);
messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
+
+ responseMessageSlicer = MessageSlicer.builder().logContext(this.name)
+ .messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
+ .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
+ .expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
}
private void setTransactionCommitTimeout() {
}
}
- @SuppressWarnings("checkstyle:IllegalCatch")
@Override
protected void handleNonRaftCommand(final Object message) {
try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
store.resetTransactionBatch();
if (message instanceof RequestEnvelope) {
- final long now = ticker().read();
- final RequestEnvelope envelope = (RequestEnvelope)message;
-
- try {
- final RequestSuccess<?, ?> success = handleRequest(envelope, now);
- if (success != null) {
- envelope.sendSuccess(success, ticker().read() - now);
- }
- } catch (RequestException e) {
- LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
- envelope.sendFailure(e, ticker().read() - now);
- } catch (Exception e) {
- LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
- envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
- ticker().read() - now);
- }
+ handleRequestEnvelope((RequestEnvelope)message);
} else if (message instanceof ConnectClientRequest) {
handleConnectClient((ConnectClientRequest)message);
} else if (CreateTransaction.isSerializedType(message)) {
onMakeLeaderLocal();
} else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
store.resumeNextPendingTransaction();
- } else {
+ } else if (!responseMessageSlicer.handleMessage(message)) {
super.handleNonRaftCommand(message);
}
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void handleRequestEnvelope(final RequestEnvelope envelope) {
+ final long now = ticker().read();
+ try {
+ final RequestSuccess<?, ?> success = handleRequest(envelope, now);
+ if (success != null) {
+ final long executionTimeNanos = ticker().read() - now;
+ if (success instanceof SliceableMessage) {
+ dispatchers.getDispatcher(DispatcherType.Serialization).execute(() ->
+ responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget())
+ .message(envelope.newSuccessEnvelope(success, executionTimeNanos))
+ .sendTo(envelope.getMessage().getReplyTo()).replyTo(self())
+ .onFailureCallback(t -> {
+ LOG.warn("Error slicing response {}", success, t);
+ }).build()));
+ } else {
+ envelope.sendSuccess(success, executionTimeNanos);
+ }
+ }
+ } catch (RequestException e) {
+ LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
+ envelope.sendFailure(e, ticker().read() - now);
+ } catch (Exception e) {
+ LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
+ envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
+ ticker().read() - now);
+ }
+ }
+
private void onMakeLeaderLocal() {
LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
if (isLeader()) {
initialSnapshot, snapshotRoot);
}
+ @Test
+ public void testLargeReadReplySlicing() throws Exception {
+ // The slicing is only implemented for tell-based protocol
+ Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class));
+
+ leaderDatastoreContextBuilder.maximumMessageSliceSize(50);
+ initDatastoresWithCars("testLargeReadReplySlicing");
+
+ final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
+
+ final NormalizedNode<?, ?> carsNode = CarsModel.create();
+ rwTx.write(CarsModel.BASE_PATH, carsNode);
+
+ verifyNode(rwTx, CarsModel.BASE_PATH, carsNode);
+ }
+
private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
final NormalizedNode<?, ?> expRoot) {
assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off
org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.sharding=debug
org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.access.client=debug
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.messaging=debug