import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherType;
import org.opendaylight.controller.cluster.common.actor.MessageTracker;
import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
-import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
private final FrontendMetadata frontendMetadata;
private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
+ private boolean paused;
+
+ private final MessageSlicer responseMessageSlicer;
+ private final Dispatchers dispatchers;
protected Shard(final AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
+ dispatchers = new Dispatchers(context().system().dispatchers());
transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext,
- new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Transaction),
+ dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Transaction),
self(), getContext(), shardMBean, builder.getId().getShardName());
snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
this.name);
messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
+
+ responseMessageSlicer = MessageSlicer.builder().logContext(this.name)
+ .messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
+ .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
+ .expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
}
private void setTransactionCommitTimeout() {
}
}
- @SuppressWarnings("checkstyle:IllegalCatch")
@Override
protected void handleNonRaftCommand(final Object message) {
try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
store.resetTransactionBatch();
if (message instanceof RequestEnvelope) {
- final long now = ticker().read();
- final RequestEnvelope envelope = (RequestEnvelope)message;
-
- try {
- final RequestSuccess<?, ?> success = handleRequest(envelope, now);
- if (success != null) {
- envelope.sendSuccess(success, ticker().read() - now);
- }
- } catch (RequestException e) {
- LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
- envelope.sendFailure(e, ticker().read() - now);
- } catch (Exception e) {
- LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
- envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
- ticker().read() - now);
- }
+ handleRequestEnvelope((RequestEnvelope)message);
} else if (message instanceof ConnectClientRequest) {
handleConnectClient((ConnectClientRequest)message);
} else if (CreateTransaction.isSerializedType(message)) {
onMakeLeaderLocal();
} else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
store.resumeNextPendingTransaction();
- } else {
+ } else if (!responseMessageSlicer.handleMessage(message)) {
super.handleNonRaftCommand(message);
}
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void handleRequestEnvelope(final RequestEnvelope envelope) {
+ final long now = ticker().read();
+ try {
+ final RequestSuccess<?, ?> success = handleRequest(envelope, now);
+ if (success != null) {
+ final long executionTimeNanos = ticker().read() - now;
+ if (success instanceof SliceableMessage) {
+ dispatchers.getDispatcher(DispatcherType.Serialization).execute(() ->
+ responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget())
+ .message(envelope.newSuccessEnvelope(success, executionTimeNanos))
+ .sendTo(envelope.getMessage().getReplyTo()).replyTo(self())
+ .onFailureCallback(t -> {
+ LOG.warn("Error slicing response {}", success, t);
+ }).build()));
+ } else {
+ envelope.sendSuccess(success, executionTimeNanos);
+ }
+ }
+ } catch (RequestException e) {
+ LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
+ envelope.sendFailure(e, ticker().read() - now);
+ } catch (Exception e) {
+ LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
+ envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
+ ticker().read() - now);
+ }
+ }
+
private void onMakeLeaderLocal() {
LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
if (isLeader()) {
private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
throws RequestException {
// We are not the leader, hence we want to fail-fast.
- if (!isLeader() || !isLeaderActive()) {
- LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
- + "isLeadershipTransferInProgress: {}.",
- persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress());
+ if (!isLeader() || paused || !isLeaderActive()) {
+ LOG.debug("{}: not currently active leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ + "isLeadershipTransferInProgress: {}, paused: {}",
+ persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress(), paused);
throw new NotLeaderException(getSelf());
}
persistenceId(), getId());
}
+ paused = false;
store.purgeLeaderState();
}
@Override
protected void onLeaderChanged(final String oldLeader, final String newLeader) {
shardMBean.incrementLeadershipChangeCount();
+ paused = false;
- final boolean hasLeader = hasLeader();
- if (!hasLeader) {
- // No leader implies we are not the leader, lose frontend state if we have any. This also places
- // an explicit guard so the map will not get modified accidentally.
+ if (!isLeader()) {
if (!knownFrontends.isEmpty()) {
LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
knownFrontends = ImmutableMap.of();
}
- return;
- }
- if (!isLeader()) {
+ if (!hasLeader()) {
+ // No leader anywhere, nothing else to do
+ return;
+ }
+
// Another leader was elected. If we were the previous leader and had pending transactions, convert
// them to transaction messages and send to the new leader.
ActorSelection leader = getLeader();
@Override
protected void pauseLeader(final Runnable operation) {
LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
+ paused = true;
+
+ // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused.
+ knownFrontends.values().forEach(LeaderFrontendState::retire);
+ knownFrontends = ImmutableMap.of();
+
store.setRunOnPendingTransactionsComplete(operation);
}
+ @Override
+ protected void unpauseLeader() {
+ LOG.debug("{}: In unpauseLeader", persistenceId());
+ paused = false;
+
+ store.setRunOnPendingTransactionsComplete(null);
+
+ // Restore tell-based protocol state as if we were becoming the leader
+ knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+ }
+
@Override
protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())