import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
import org.opendaylight.controller.cluster.common.actor.Dispatchers;
import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherType;
import org.opendaylight.controller.cluster.common.actor.MessageTracker;
-import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.actors.JsonExportActor;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.raft.messages.Payload;
import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStoreProperties.ExportOnRecovery;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException;
switch (exportOnRecovery) {
case Json:
if (message instanceof SnapshotOffer) {
- exportActor.tell(new JsonExportActor.ExportSnapshot(store.readCurrentData().get(), name),
- ActorRef.noSender());
- } else if (message instanceof ReplicatedLogEntry) {
- exportActor.tell(new JsonExportActor.ExportJournal((ReplicatedLogEntry) message),
+ exportActor.tell(new JsonExportActor.ExportSnapshot(store.readCurrentData().orElseThrow(), name),
ActorRef.noSender());
+ } else if (message instanceof ReplicatedLogEntry replicatedLogEntry) {
+ exportActor.tell(new JsonExportActor.ExportJournal(replicatedLogEntry), ActorRef.noSender());
} else if (message instanceof RecoveryCompleted) {
exportActor.tell(new JsonExportActor.FinishExport(name), ActorRef.noSender());
exportActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
@Override
// non-final for TestShard
protected void handleNonRaftCommand(final Object message) {
- try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
- final Optional<Error> maybeError = context.error();
+ try (var context = appendEntriesReplyTracker.received(message)) {
+ final var maybeError = context.error();
if (maybeError.isPresent()) {
LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
- maybeError.get());
+ maybeError.orElseThrow());
}
store.resetTransactionBatch();
- if (message instanceof RequestEnvelope) {
- handleRequestEnvelope((RequestEnvelope)message);
+ if (message instanceof RequestEnvelope request) {
+ handleRequestEnvelope(request);
} else if (MessageAssembler.isHandledMessage(message)) {
handleRequestAssemblerMessage(message);
- } else if (message instanceof ConnectClientRequest) {
- handleConnectClient((ConnectClientRequest)message);
+ } else if (message instanceof ConnectClientRequest request) {
+ handleConnectClient(request);
} else if (CreateTransaction.isSerializedType(message)) {
handleCreateTransaction(message);
- } else if (message instanceof BatchedModifications) {
- handleBatchedModifications((BatchedModifications)message);
- } else if (message instanceof ForwardedReadyTransaction) {
- handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
- } else if (message instanceof ReadyLocalTransaction) {
- handleReadyLocalTransaction((ReadyLocalTransaction)message);
+ } else if (message instanceof BatchedModifications request) {
+ handleBatchedModifications(request);
+ } else if (message instanceof ForwardedReadyTransaction request) {
+ handleForwardedReadyTransaction(request);
+ } else if (message instanceof ReadyLocalTransaction request) {
+ handleReadyLocalTransaction(request);
} else if (CanCommitTransaction.isSerializedType(message)) {
handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
} else if (CommitTransaction.isSerializedType(message)) {
closeTransactionChain(CloseTransactionChain.fromSerializable(message));
} else if (message instanceof DataTreeChangedReply) {
// Ignore reply
- } else if (message instanceof RegisterDataTreeChangeListener) {
- treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader());
- } else if (message instanceof UpdateSchemaContext) {
- updateSchemaContext((UpdateSchemaContext) message);
+ } else if (message instanceof RegisterDataTreeChangeListener request) {
+ treeChangeSupport.onMessage(request, isLeader(), hasLeader());
+ } else if (message instanceof UpdateSchemaContext request) {
+ updateSchemaContext(request);
} else if (message instanceof PeerAddressResolved resolved) {
setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
} else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
commitTimeoutCheck();
- } else if (message instanceof DatastoreContext) {
- onDatastoreContext((DatastoreContext)message);
+ } else if (message instanceof DatastoreContext request) {
+ onDatastoreContext(request);
} else if (message instanceof RegisterRoleChangeListener) {
- roleChangeNotifier.get().forward(message, context());
- } else if (message instanceof FollowerInitialSyncUpStatus) {
- shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
+ roleChangeNotifier.orElseThrow().forward(message, context());
+ } else if (message instanceof FollowerInitialSyncUpStatus request) {
+ shardMBean.setFollowerInitialSyncStatus(request.isInitialSyncDone());
context().parent().tell(message, self());
} else if (GET_SHARD_MBEAN_MESSAGE.equals(message)) {
sender().tell(getShardMBean(), self());
context().parent().forward(message, context());
} else if (ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
- } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
- store.processCohortRegistryCommand(getSender(),
- (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
+ } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand request) {
+ store.processCohortRegistryCommand(getSender(), request);
} else if (message instanceof MakeLeaderLocal) {
onMakeLeaderLocal();
} else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
}
private OptionalLong updateAccess(final SimpleShardDataTreeCohort cohort) {
- final FrontendIdentifier frontend = cohort.getIdentifier().getHistoryId().getClientId().getFrontendId();
+ final FrontendIdentifier frontend = cohort.transactionId().getHistoryId().getClientId().getFrontendId();
final LeaderFrontendState state = knownFrontends.get(frontend);
if (state == null) {
// Not tell-based protocol, do nothing
throw new NotLeaderException(getSelf());
}
- final Request<?, ?> request = envelope.getMessage();
- if (request instanceof TransactionRequest) {
- final TransactionRequest<?> txReq = (TransactionRequest<?>)request;
- final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId();
+ final var request = envelope.getMessage();
+ if (request instanceof TransactionRequest<?> txReq) {
+ final var clientId = txReq.getTarget().getHistoryId().getClientId();
return getFrontend(clientId).handleTransactionRequest(txReq, envelope, now);
- } else if (request instanceof LocalHistoryRequest) {
- final LocalHistoryRequest<?> lhReq = (LocalHistoryRequest<?>)request;
- final ClientIdentifier clientId = lhReq.getTarget().getClientId();
+ } else if (request instanceof LocalHistoryRequest<?> lhReq) {
+ final var clientId = lhReq.getTarget().getClientId();
return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now);
} else {
LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request);
restoreFromSnapshot = null;
//notify shard manager
- getContext().parent().tell(new ActorInitialized(), getSelf());
+ getContext().parent().tell(new ActorInitialized(getSelf()), ActorRef.noSender());
// Being paranoid here - this method should only be called once but just in case...
if (txCommitTimeoutCheckSchedule == null) {
// Schedule a message to be periodically sent to check if the current in-progress
// transaction should be expired and aborted.
- FiniteDuration period = FiniteDuration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
+ final var period = FiniteDuration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
period, period, getSelf(),
TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
@Override
protected final void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
- if (data instanceof Payload) {
- if (data instanceof DisableTrackingPayload) {
- disableTracking((DisableTrackingPayload) data);
+ if (data instanceof Payload payload) {
+ if (payload instanceof DisableTrackingPayload disableTracking) {
+ disableTracking(disableTracking);
return;
}
try {
- store.applyReplicatedPayload(identifier, (Payload)data);
+ store.applyReplicatedPayload(identifier, payload);
} catch (DataValidationFailedException | IOException e) {
LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
}
paused = true;
// Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused.
- if (datastoreContext.isUseTellBasedProtocol()) {
- knownFrontends.values().forEach(LeaderFrontendState::retire);
- knownFrontends = ImmutableMap.of();
- }
+ knownFrontends.values().forEach(LeaderFrontendState::retire);
+ knownFrontends = ImmutableMap.of();
store.setRunOnPendingTransactionsComplete(operation);
}
public T id(final ShardIdentifier newId) {
checkSealed();
- this.id = newId;
+ id = newId;
return self();
}
public T peerAddresses(final Map<String, String> newPeerAddresses) {
checkSealed();
- this.peerAddresses = newPeerAddresses;
+ peerAddresses = newPeerAddresses;
return self();
}
public T datastoreContext(final DatastoreContext newDatastoreContext) {
checkSealed();
- this.datastoreContext = newDatastoreContext;
+ datastoreContext = newDatastoreContext;
return self();
}
public T schemaContextProvider(final EffectiveModelContextProvider newSchemaContextProvider) {
checkSealed();
- this.schemaContextProvider = requireNonNull(newSchemaContextProvider);
+ schemaContextProvider = requireNonNull(newSchemaContextProvider);
return self();
}
public T restoreFromSnapshot(final DatastoreSnapshot.ShardSnapshot newRestoreFromSnapshot) {
checkSealed();
- this.restoreFromSnapshot = newRestoreFromSnapshot;
+ restoreFromSnapshot = newRestoreFromSnapshot;
return self();
}
public T dataTree(final DataTree newDataTree) {
checkSealed();
- this.dataTree = newDataTree;
+ dataTree = newDataTree;
return self();
}
}
public TreeType getTreeType() {
- switch (datastoreContext.getLogicalStoreType()) {
- case CONFIGURATION:
- return TreeType.CONFIGURATION;
- case OPERATIONAL:
- return TreeType.OPERATIONAL;
- default:
- throw new IllegalStateException("Unhandled logical store type "
- + datastoreContext.getLogicalStoreType());
- }
+ return switch (datastoreContext.getLogicalStoreType()) {
+ case CONFIGURATION -> TreeType.CONFIGURATION;
+ case OPERATIONAL -> TreeType.OPERATIONAL;
+ };
}
protected void verify() {