import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardDataTreeListenerInfoMXBeanImpl;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
private final ShardStats shardMBean;
+ private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean;
+
private DatastoreContext datastoreContext;
private final ShardCommitCoordinator commitCoordinator;
.fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
.assembledMessageCallback((message, sender) -> self().tell(message, sender))
.expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build();
+
+ listenerInfoMXBean = new ShardDataTreeListenerInfoMXBeanImpl(name, datastoreContext.getDataStoreMXBeanType(),
+ self());
+ listenerInfoMXBean.register();
}
private void setTransactionCommitTimeout() {
commitCoordinator.abortPendingTransactions("Transaction aborted due to shutdown.", this);
shardMBean.unregisterMBean();
+ listenerInfoMXBean.unregister();
}
@Override
if (message instanceof RequestEnvelope) {
handleRequestEnvelope((RequestEnvelope)message);
- } else if (requestMessageAssembler.isHandledMessage(message)) {
+ } else if (MessageAssembler.isHandledMessage(message)) {
handleRequestAssemblerMessage(message);
} else if (message instanceof ConnectClientRequest) {
handleConnectClient((ConnectClientRequest)message);
}
}
- private void handleRequestAssemblerMessage(Object message) {
+ private void handleRequestAssemblerMessage(final Object message) {
dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> {
JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) context().system());
requestMessageAssembler.handleMessage(message, self());
throw new OutOfSequenceEnvelopeException(0);
}
- private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) {
+ @Nonnull
+ private static ABIVersion selectVersion(final ConnectClientRequest message) {
final Range<ABIVersion> clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion());
for (ABIVersion v : SUPPORTED_ABIVERSIONS) {
if (clientRange.contains(v)) {
}
}
- private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+ @Nullable
+ private RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
throws RequestException {
// We are not the leader, hence we want to fail-fast.
if (!isLeader() || paused || !isLeaderActive()) {
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
messageRetrySupport.addMessageToRetry(batched, getSender(),
- "Could not commit transaction " + batched.getTransactionId());
+ "Could not process BatchedModifications " + batched.getTransactionId());
} else {
// If this is not the first batch and leadership changed in between batched messages,
// we need to reconstruct previous BatchedModifications from the transaction
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
messageRetrySupport.addMessageToRetry(message, getSender(),
- "Could not commit transaction " + message.getTransactionId());
+ "Could not process ready local transaction " + message.getTransactionId());
} else {
LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
- "Could not commit transaction " + forwardedReady.getTransactionId());
+ "Could not process forwarded ready transaction " + forwardedReady.getTransactionId());
} else {
LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
@Override
@Nonnull
protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
- return new ShardRecoveryCoordinator(store,
- restoreFromSnapshot != null ? restoreFromSnapshot.getSnapshot() : null, persistenceId(), LOG);
+ if (restoreFromSnapshot == null) {
+ return ShardRecoveryCoordinator.create(store, persistenceId(), LOG);
+ }
+
+ return ShardRecoveryCoordinator.forSnapshot(store, persistenceId(), LOG, restoreFromSnapshot.getSnapshot());
}
@Override
}
@Override
- protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+ protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
.dataChangeListenerActors(changeSupport.getListenerActors())
.commitCohortActors(store.getCohortActors());
return self();
}
- public T schemaContextProvider(final SchemaContextProvider schemaContextProvider) {
+ public T schemaContextProvider(final SchemaContextProvider newSchemaContextProvider) {
checkSealed();
- this.schemaContextProvider = Preconditions.checkNotNull(schemaContextProvider);
+ this.schemaContextProvider = Preconditions.checkNotNull(newSchemaContextProvider);
return self();
}
}
public static class Builder extends AbstractBuilder<Builder, Shard> {
- private Builder() {
+ Builder() {
super(Shard.class);
}
}