import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import akka.actor.Props;
+import akka.actor.Status;
import akka.actor.Status.Failure;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherType;
import org.opendaylight.controller.cluster.common.actor.MessageTracker;
import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
+import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
-import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
+import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
}
};
+ static final Object RESUME_NEXT_PENDING_TRANSACTION = new Object() {
+ @Override
+ public String toString() {
+ return "resumeNextPendingTransaction";
+ }
+ };
+
// FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
public static final String DEFAULT_NAME = "default";
private final FrontendMetadata frontendMetadata;
private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
+ private boolean paused;
+
+ private final MessageSlicer responseMessageSlicer;
+ private final Dispatchers dispatchers;
protected Shard(final AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher =
- new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher");
+ new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name);
ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
- new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher");
+ new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name);
if (builder.getDataTree() != null) {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata);
appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
+ dispatchers = new Dispatchers(context().system().dispatchers());
transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext,
- new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Transaction),
+ dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Transaction),
self(), getContext(), shardMBean, builder.getId().getShardName());
snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
this.name);
messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
+
+ responseMessageSlicer = MessageSlicer.builder().logContext(this.name)
+ .messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
+ .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
+ .expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
}
private void setTransactionCommitTimeout() {
}
}
- @SuppressWarnings("checkstyle:IllegalCatch")
@Override
protected void handleNonRaftCommand(final Object message) {
try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
maybeError.get());
}
- if (message instanceof RequestEnvelope) {
- final long now = ticker().read();
- final RequestEnvelope envelope = (RequestEnvelope)message;
+ store.resetTransactionBatch();
- try {
- final RequestSuccess<?, ?> success = handleRequest(envelope, now);
- if (success != null) {
- envelope.sendSuccess(success, ticker().read() - now);
- }
- } catch (RequestException e) {
- LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
- envelope.sendFailure(e, ticker().read() - now);
- } catch (Exception e) {
- LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
- envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
- ticker().read() - now);
- }
+ if (message instanceof RequestEnvelope) {
+ handleRequestEnvelope((RequestEnvelope)message);
} else if (message instanceof ConnectClientRequest) {
handleConnectClient((ConnectClientRequest)message);
} else if (CreateTransaction.isSerializedType(message)) {
} else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
store.processCohortRegistryCommand(getSender(),
(DataTreeCohortActorRegistry.CohortRegistryCommand) message);
- } else {
+ } else if (message instanceof PersistAbortTransactionPayload) {
+ final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
+ persistPayload(txId, AbortTransactionPayload.create(txId), true);
+ } else if (message instanceof MakeLeaderLocal) {
+ onMakeLeaderLocal();
+ } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
+ store.resumeNextPendingTransaction();
+ } else if (!responseMessageSlicer.handleMessage(message)) {
super.handleNonRaftCommand(message);
}
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void handleRequestEnvelope(final RequestEnvelope envelope) {
+ final long now = ticker().read();
+ try {
+ final RequestSuccess<?, ?> success = handleRequest(envelope, now);
+ if (success != null) {
+ final long executionTimeNanos = ticker().read() - now;
+ if (success instanceof SliceableMessage) {
+ dispatchers.getDispatcher(DispatcherType.Serialization).execute(() ->
+ responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget())
+ .message(envelope.newSuccessEnvelope(success, executionTimeNanos))
+ .sendTo(envelope.getMessage().getReplyTo()).replyTo(self())
+ .onFailureCallback(t -> {
+ LOG.warn("Error slicing response {}", success, t);
+ }).build()));
+ } else {
+ envelope.sendSuccess(success, executionTimeNanos);
+ }
+ }
+ } catch (RequestException e) {
+ LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
+ envelope.sendFailure(e, ticker().read() - now);
+ } catch (Exception e) {
+ LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
+ envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
+ ticker().read() - now);
+ }
+ }
+
+ private void onMakeLeaderLocal() {
+ LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
+ if (isLeader()) {
+ getSender().tell(new Status.Success(null), getSelf());
+ return;
+ }
+
+ final ActorSelection leader = getLeader();
+
+ if (leader == null) {
+ // Leader is not present. The cluster is most likely trying to
+ // elect a leader and we should let that run its normal course
+
+ // TODO we can wait for the election to complete and retry the
+ // request. We can also let the caller retry by sending a flag
+ // in the response indicating the request is "reTryable".
+ getSender().tell(new Failure(
+ new LeadershipTransferFailedException("We cannot initiate leadership transfer to local node. "
+ + "Currently there is no leader for " + persistenceId())),
+ getSelf());
+ return;
+ }
+
+ leader.tell(new RequestLeadership(getId(), getSender()), getSelf());
+ }
+
// Acquire our frontend tracking handle and verify generation matches
private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());
private void handleConnectClient(final ConnectClientRequest message) {
try {
if (!isLeader() || !isLeaderActive()) {
- LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), message);
+ LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ + "isLeadershipTransferInProgress: {}.",
+ persistenceId(), message, isLeader(), isLeaderActive(), isLeadershipTransferInProgress());
throw new NotLeaderException(getSelf());
}
private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
throws RequestException {
// We are not the leader, hence we want to fail-fast.
- if (!isLeader() || !isLeaderActive()) {
- LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope);
+ if (!isLeader() || paused || !isLeaderActive()) {
+ LOG.debug("{}: not currently active leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ + "isLeadershipTransferInProgress: {}, paused: {}",
+ persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress(), paused);
throw new NotLeaderException(getSelf());
}
final ClientIdentifier clientId = lhReq.getTarget().getClientId();
return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now);
} else {
- LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request);
+ LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request);
throw new UnsupportedRequestException(request);
}
}
persistenceId(), getId());
}
+ paused = false;
store.purgeLeaderState();
}
@Override
protected void onLeaderChanged(final String oldLeader, final String newLeader) {
shardMBean.incrementLeadershipChangeCount();
+ paused = false;
- final boolean hasLeader = hasLeader();
- if (!hasLeader) {
- // No leader implies we are not the leader, lose frontend state if we have any. This also places
- // an explicit guard so the map will not get modified accidentally.
+ if (!isLeader()) {
if (!knownFrontends.isEmpty()) {
LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
knownFrontends = ImmutableMap.of();
}
- return;
- }
- if (!isLeader()) {
+ if (!hasLeader()) {
+ // No leader anywhere, nothing else to do
+ return;
+ }
+
// Another leader was elected. If we were the previous leader and had pending transactions, convert
// them to transaction messages and send to the new leader.
ActorSelection leader = getLeader();
@Override
protected void pauseLeader(final Runnable operation) {
LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
+ paused = true;
+
+ // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused.
+ knownFrontends.values().forEach(LeaderFrontendState::retire);
+ knownFrontends = ImmutableMap.of();
+
store.setRunOnPendingTransactionsComplete(operation);
}
+ @Override
+ protected void unpauseLeader() {
+ LOG.debug("{}: In unpauseLeader", persistenceId());
+ paused = false;
+
+ store.setRunOnPendingTransactionsComplete(null);
+
+ // Restore tell-based protocol state as if we were becoming the leader
+ knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+ }
+
@Override
protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
private ShardIdentifier id;
private Map<String, String> peerAddresses = Collections.emptyMap();
private DatastoreContext datastoreContext;
- private SchemaContext schemaContext;
+ private SchemaContextProvider schemaContextProvider;
private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
private TipProducingDataTree dataTree;
private volatile boolean sealed;
return self();
}
- public T schemaContext(final SchemaContext newSchemaContext) {
+ public T schemaContextProvider(final SchemaContextProvider schemaContextProvider) {
checkSealed();
- this.schemaContext = newSchemaContext;
+ this.schemaContextProvider = Preconditions.checkNotNull(schemaContextProvider);
return self();
}
}
public SchemaContext getSchemaContext() {
- return schemaContext;
+ return Verify.verifyNotNull(schemaContextProvider.getSchemaContext());
}
public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
Preconditions.checkNotNull(id, "id should not be null");
Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
- Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+ Preconditions.checkNotNull(schemaContextProvider, "schemaContextProvider should not be null");
}
public Props props() {
Ticker ticker() {
return Ticker.systemTicker();
}
+
+ void scheduleNextPendingTransaction() {
+ self().tell(RESUME_NEXT_PENDING_TRANSACTION, ActorRef.noSender());
+ }
}