import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
+import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
}
};
+ static final Object RESUME_NEXT_PENDING_TRANSACTION = new Object() {
+ @Override
+ public String toString() {
+ return "resumeNextPendingTransaction";
+ }
+ };
+
// FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
public static final String DEFAULT_NAME = "default";
private final FrontendMetadata frontendMetadata;
private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
+ private boolean paused;
protected Shard(final AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher =
- new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher");
+ new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name);
ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
- new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher");
+ new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name);
if (builder.getDataTree() != null) {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata);
maybeError.get());
}
+ store.resetTransactionBatch();
+
if (message instanceof RequestEnvelope) {
final long now = ticker().read();
final RequestEnvelope envelope = (RequestEnvelope)message;
PeerAddressResolved resolved = (PeerAddressResolved) message;
setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
} else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
- store.checkForExpiredTransactions(transactionCommitTimeout);
- commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+ commitTimeoutCheck();
} else if (message instanceof DatastoreContext) {
onDatastoreContext((DatastoreContext)message);
} else if (message instanceof RegisterRoleChangeListener) {
persistPayload(txId, AbortTransactionPayload.create(txId), true);
} else if (message instanceof MakeLeaderLocal) {
onMakeLeaderLocal();
+ } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
+ store.resumeNextPendingTransaction();
} else {
super.handleNonRaftCommand(message);
}
}
}
+ private void commitTimeoutCheck() {
+ store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess);
+ commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+ }
+
+ private Optional<Long> updateAccess(final SimpleShardDataTreeCohort cohort) {
+ final FrontendIdentifier frontend = cohort.getIdentifier().getHistoryId().getClientId().getFrontendId();
+ // If this frontend has freshly connected, give it some time to catch up before killing its transactions.
+ final LeaderFrontendState state = knownFrontends.get(frontend);
+ return state == null ? Optional.absent() : Optional.of(state.getLastConnectTicks());
+ }
+
private void onMakeLeaderLocal() {
LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
if (isLeader()) {
}
// Acquire our frontend tracking handle and verify generation matches
- private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
+ @Nullable
+ private LeaderFrontendState findFrontend(final ClientIdentifier clientId) throws RequestException {
final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());
if (existing != null) {
final int cmp = Long.compareUnsigned(existing.getIdentifier().getGeneration(), clientId.getGeneration());
if (cmp == 0) {
+ existing.touch();
return existing;
}
if (cmp > 0) {
LOG.debug("{}: client {} is not yet known", persistenceId(), clientId);
}
- final LeaderFrontendState ret = new LeaderFrontendState(persistenceId(), clientId, store);
- knownFrontends.put(clientId.getFrontendId(), ret);
- LOG.debug("{}: created state {} for client {}", persistenceId(), ret, clientId);
- return ret;
+ return null;
+ }
+
+ private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
+ final LeaderFrontendState ret = findFrontend(clientId);
+ if (ret != null) {
+ return ret;
+ }
+
+ // TODO: a dedicated exception would be better, but this is technically true, too
+ throw new OutOfSequenceEnvelopeException(0);
}
private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) {
@SuppressWarnings("checkstyle:IllegalCatch")
private void handleConnectClient(final ConnectClientRequest message) {
try {
+ final ClientIdentifier clientId = message.getTarget();
+ final LeaderFrontendState existing = findFrontend(clientId);
+ if (existing != null) {
+ existing.touch();
+ }
+
if (!isLeader() || !isLeaderActive()) {
- LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), message);
+ LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ + "isLeadershipTransferInProgress: {}.",
+ persistenceId(), message, isLeader(), isLeaderActive(), isLeadershipTransferInProgress());
throw new NotLeaderException(getSelf());
}
final ABIVersion selectedVersion = selectVersion(message);
- final LeaderFrontendState frontend = getFrontend(message.getTarget());
+ final LeaderFrontendState frontend;
+ if (existing == null) {
+ frontend = new LeaderFrontendState(persistenceId(), clientId, store);
+ knownFrontends.put(clientId.getFrontendId(), frontend);
+ LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId);
+ } else {
+ frontend = existing;
+ }
+
frontend.reconnect();
message.getReplyTo().tell(new ConnectClientSuccess(message.getTarget(), message.getSequence(), getSelf(),
ImmutableList.of(), store.getDataTree(), CLIENT_MAX_MESSAGES).toVersion(selectedVersion),
private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
throws RequestException {
// We are not the leader, hence we want to fail-fast.
- if (!isLeader() || !isLeaderActive()) {
- LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope);
+ if (!isLeader() || paused || !isLeaderActive()) {
+ LOG.debug("{}: not currently active leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ + "isLeadershipTransferInProgress: {}, paused: {}",
+ persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress(), paused);
throw new NotLeaderException(getSelf());
}
final ClientIdentifier clientId = lhReq.getTarget().getClientId();
return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now);
} else {
- LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request);
+ LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request);
throw new UnsupportedRequestException(request);
}
}
persistenceId(), getId());
}
+ paused = false;
store.purgeLeaderState();
}
@Override
protected void onLeaderChanged(final String oldLeader, final String newLeader) {
shardMBean.incrementLeadershipChangeCount();
+ paused = false;
- final boolean hasLeader = hasLeader();
- if (!hasLeader) {
- // No leader implies we are not the leader, lose frontend state if we have any. This also places
- // an explicit guard so the map will not get modified accidentally.
+ if (!isLeader()) {
if (!knownFrontends.isEmpty()) {
LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
knownFrontends = ImmutableMap.of();
}
- return;
- }
- if (!isLeader()) {
+ if (!hasLeader()) {
+ // No leader anywhere, nothing else to do
+ return;
+ }
+
// Another leader was elected. If we were the previous leader and had pending transactions, convert
// them to transaction messages and send to the new leader.
ActorSelection leader = getLeader();
@Override
protected void pauseLeader(final Runnable operation) {
LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
+ paused = true;
+
+ // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused.
+ knownFrontends.values().forEach(LeaderFrontendState::retire);
+ knownFrontends = ImmutableMap.of();
+
store.setRunOnPendingTransactionsComplete(operation);
}
+ @Override
+ protected void unpauseLeader() {
+ LOG.debug("{}: In unpauseLeader", persistenceId());
+ paused = false;
+
+ store.setRunOnPendingTransactionsComplete(null);
+
+ // Restore tell-based protocol state as if we were becoming the leader
+ knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+ }
+
@Override
protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
private ShardIdentifier id;
private Map<String, String> peerAddresses = Collections.emptyMap();
private DatastoreContext datastoreContext;
- private SchemaContext schemaContext;
+ private SchemaContextProvider schemaContextProvider;
private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
private TipProducingDataTree dataTree;
private volatile boolean sealed;
return self();
}
- public T schemaContext(final SchemaContext newSchemaContext) {
+ public T schemaContextProvider(final SchemaContextProvider schemaContextProvider) {
checkSealed();
- this.schemaContext = newSchemaContext;
+ this.schemaContextProvider = Preconditions.checkNotNull(schemaContextProvider);
return self();
}
}
public SchemaContext getSchemaContext() {
- return schemaContext;
+ return Verify.verifyNotNull(schemaContextProvider.getSchemaContext());
}
public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
Preconditions.checkNotNull(id, "id should not be null");
Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
- Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+ Preconditions.checkNotNull(schemaContextProvider, "schemaContextProvider should not be null");
}
public Props props() {
Ticker ticker() {
return Ticker.systemTicker();
}
+
+ void scheduleNextPendingTransaction() {
+ self().tell(RESUME_NEXT_PENDING_TRANSACTION, ActorRef.noSender());
+ }
}