import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
+import akka.actor.ExtendedActorSystem;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.Status.Failure;
+import akka.serialization.JavaSerializer;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardDataTreeListenerInfoMXBeanImpl;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.controller.cluster.messaging.MessageSlicer;
import org.opendaylight.controller.cluster.messaging.SliceOptions;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
/// The name of this shard
private final String name;
+ private final String shardName;
+
private final ShardStats shardMBean;
+ private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean;
+
private DatastoreContext datastoreContext;
private final ShardCommitCoordinator commitCoordinator;
private final ShardSnapshotCohort snapshotCohort;
private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
- private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
-
private ShardSnapshot restoreFromSnapshot;
private final MessageSlicer responseMessageSlicer;
private final Dispatchers dispatchers;
+ private final MessageAssembler requestMessageAssembler;
+
protected Shard(final AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
this.name = builder.getId().toString();
+ this.shardName = builder.getId().getShardName();
this.datastoreContext = builder.getDatastoreContext();
this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
this.frontendMetadata = new FrontendMetadata(name);
ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher =
new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name);
- ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
- new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name);
if (builder.getDataTree() != null) {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
- treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata);
+ treeChangeListenerPublisher, name, frontendMetadata);
} else {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(),
- builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher,
- dataChangeListenerPublisher, name, frontendMetadata);
+ builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name, frontendMetadata);
}
shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this);
.messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
.fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
.expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
+
+ requestMessageAssembler = MessageAssembler.builder().logContext(this.name)
+ .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
+ .assembledMessageCallback((message, sender) -> self().tell(message, sender))
+ .expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build();
+
+ listenerInfoMXBean = new ShardDataTreeListenerInfoMXBeanImpl(name, datastoreContext.getDataStoreMXBeanType(),
+ self());
+ listenerInfoMXBean.register();
}
private void setTransactionCommitTimeout() {
commitCoordinator.abortPendingTransactions("Transaction aborted due to shutdown.", this);
shardMBean.unregisterMBean();
+ listenerInfoMXBean.unregister();
}
@Override
if (message instanceof RequestEnvelope) {
handleRequestEnvelope((RequestEnvelope)message);
+ } else if (MessageAssembler.isHandledMessage(message)) {
+ handleRequestAssemblerMessage(message);
} else if (message instanceof ConnectClientRequest) {
handleConnectClient((ConnectClientRequest)message);
} else if (CreateTransaction.isSerializedType(message)) {
handleAbortTransaction(AbortTransaction.fromSerializable(message));
} else if (CloseTransactionChain.isSerializedType(message)) {
closeTransactionChain(CloseTransactionChain.fromSerializable(message));
- } else if (message instanceof RegisterChangeListener) {
- changeSupport.onMessage((RegisterChangeListener) message, isLeader(), hasLeader());
} else if (message instanceof RegisterDataTreeChangeListener) {
treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader());
} else if (message instanceof UpdateSchemaContext) {
}
}
+ private void handleRequestAssemblerMessage(final Object message) {
+ dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> {
+ JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) context().system());
+ requestMessageAssembler.handleMessage(message, self());
+ });
+ }
+
@SuppressWarnings("checkstyle:IllegalCatch")
private void handleRequestEnvelope(final RequestEnvelope envelope) {
final long now = ticker().read();
private void commitTimeoutCheck() {
store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess);
commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+ requestMessageAssembler.checkExpiredAssembledMessageState();
}
private Optional<Long> updateAccess(final SimpleShardDataTreeCohort cohort) {
final FrontendIdentifier frontend = cohort.getIdentifier().getHistoryId().getClientId().getFrontendId();
- // If this frontend has freshly connected, give it some time to catch up before killing its transactions.
final LeaderFrontendState state = knownFrontends.get(frontend);
- return state == null ? Optional.absent() : Optional.of(state.getLastConnectTicks());
+ if (state == null) {
+ // Not tell-based protocol, do nothing
+ return Optional.absent();
+ }
+
+ if (isIsolatedLeader()) {
+ // We are isolated and no new request can come through until we emerge from it. We are still updating
+ // liveness of frontend when we see it attempting to communicate. Use the last access timer.
+ return Optional.of(state.getLastSeenTicks());
+ }
+
+ // If this frontend has freshly connected, give it some time to catch up before killing its transactions.
+ return Optional.of(state.getLastConnectTicks());
}
private void onMakeLeaderLocal() {
}
if (cmp > 0) {
LOG.debug("{}: rejecting request from outdated client {}", persistenceId(), clientId);
- throw new RetiredGenerationException(existing.getIdentifier().getGeneration());
+ throw new RetiredGenerationException(clientId.getGeneration(),
+ existing.getIdentifier().getGeneration());
}
LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId);
throw new OutOfSequenceEnvelopeException(0);
}
- private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) {
+ @Nonnull
+ private static ABIVersion selectVersion(final ConnectClientRequest message) {
final Range<ABIVersion> clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion());
for (ABIVersion v : SUPPORTED_ABIVERSIONS) {
if (clientRange.contains(v)) {
}
}
- private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+ @Nullable
+ private RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
throws RequestException {
// We are not the leader, hence we want to fail-fast.
if (!isLeader() || paused || !isLeaderActive()) {
return roleChangeNotifier;
}
+ String getShardName() {
+ return shardName;
+ }
+
@Override
protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
final short leaderPayloadVersion) {
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
messageRetrySupport.addMessageToRetry(batched, getSender(),
- "Could not commit transaction " + batched.getTransactionId());
+ "Could not process BatchedModifications " + batched.getTransactionId());
} else {
// If this is not the first batch and leadership changed in between batched messages,
// we need to reconstruct previous BatchedModifications from the transaction
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
messageRetrySupport.addMessageToRetry(message, getSender(),
- "Could not commit transaction " + message.getTransactionId());
+ "Could not process ready local transaction " + message.getTransactionId());
} else {
LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
- "Could not commit transaction " + forwardedReady.getTransactionId());
+ "Could not process forwarded ready transaction " + forwardedReady.getTransactionId());
} else {
LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionId(),
- forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit());
+ forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit(),
+ forwardedReady.getParticipatingShardNames());
readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
leader.forward(readyLocal, getContext());
}
@Override
@Nonnull
protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
- return new ShardRecoveryCoordinator(store,
- restoreFromSnapshot != null ? restoreFromSnapshot.getSnapshot() : null, persistenceId(), LOG);
+ if (restoreFromSnapshot == null) {
+ return ShardRecoveryCoordinator.create(store, persistenceId(), LOG);
+ }
+
+ return ShardRecoveryCoordinator.forSnapshot(store, persistenceId(), LOG, restoreFromSnapshot.getSnapshot());
}
@Override
protected void onStateChanged() {
boolean isLeader = isLeader();
boolean hasLeader = hasLeader();
- changeSupport.onLeadershipChange(isLeader, hasLeader);
treeChangeSupport.onLeadershipChange(isLeader, hasLeader);
// If this actor is no longer the leader close all the transaction chains
knownFrontends = ImmutableMap.of();
}
+ requestMessageAssembler.close();
+
if (!hasLeader()) {
// No leader anywhere, nothing else to do
return;
messagesToForward.size(), leader);
for (Object message : messagesToForward) {
+ LOG.debug("{}: Forwarding pending transaction message {}", persistenceId(), message);
+
leader.tell(message, self());
}
}
}
@Override
- protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+ protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
- .dataChangeListenerActors(changeSupport.getListenerActors())
.commitCohortActors(store.getCohortActors());
}
private DatastoreContext datastoreContext;
private SchemaContextProvider schemaContextProvider;
private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
- private TipProducingDataTree dataTree;
+ private DataTree dataTree;
private volatile boolean sealed;
protected AbstractBuilder(final Class<S> shardClass) {
return self();
}
- public T schemaContextProvider(final SchemaContextProvider schemaContextProvider) {
+ public T schemaContextProvider(final SchemaContextProvider newSchemaContextProvider) {
checkSealed();
- this.schemaContextProvider = Preconditions.checkNotNull(schemaContextProvider);
+ this.schemaContextProvider = Preconditions.checkNotNull(newSchemaContextProvider);
return self();
}
return self();
}
- public T dataTree(final TipProducingDataTree newDataTree) {
+ public T dataTree(final DataTree newDataTree) {
checkSealed();
this.dataTree = newDataTree;
return self();
return restoreFromSnapshot;
}
- public TipProducingDataTree getDataTree() {
+ public DataTree getDataTree() {
return dataTree;
}
}
public static class Builder extends AbstractBuilder<Builder, Shard> {
- private Builder() {
+ Builder() {
super(Shard.class);
}
}