import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
import org.opendaylight.controller.cluster.common.actor.Dispatchers;
import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherType;
import org.opendaylight.controller.cluster.common.actor.MessageTracker;
-import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.actors.JsonExportActor;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
if (message instanceof SnapshotOffer) {
exportActor.tell(new JsonExportActor.ExportSnapshot(store.readCurrentData().orElseThrow(), name),
ActorRef.noSender());
- } else if (message instanceof ReplicatedLogEntry) {
- exportActor.tell(new JsonExportActor.ExportJournal((ReplicatedLogEntry) message),
- ActorRef.noSender());
+ } else if (message instanceof ReplicatedLogEntry replicatedLogEntry) {
+ exportActor.tell(new JsonExportActor.ExportJournal(replicatedLogEntry), ActorRef.noSender());
} else if (message instanceof RecoveryCompleted) {
exportActor.tell(new JsonExportActor.FinishExport(name), ActorRef.noSender());
exportActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
@Override
// non-final for TestShard
protected void handleNonRaftCommand(final Object message) {
- try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
- final Optional<Error> maybeError = context.error();
+ try (var context = appendEntriesReplyTracker.received(message)) {
+ final var maybeError = context.error();
if (maybeError.isPresent()) {
LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
maybeError.orElseThrow());
store.resetTransactionBatch();
- if (message instanceof RequestEnvelope) {
- handleRequestEnvelope((RequestEnvelope)message);
+ if (message instanceof RequestEnvelope request) {
+ handleRequestEnvelope(request);
} else if (MessageAssembler.isHandledMessage(message)) {
handleRequestAssemblerMessage(message);
- } else if (message instanceof ConnectClientRequest) {
- handleConnectClient((ConnectClientRequest)message);
+ } else if (message instanceof ConnectClientRequest request) {
+ handleConnectClient(request);
} else if (CreateTransaction.isSerializedType(message)) {
handleCreateTransaction(message);
- } else if (message instanceof BatchedModifications) {
- handleBatchedModifications((BatchedModifications)message);
- } else if (message instanceof ForwardedReadyTransaction) {
- handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
- } else if (message instanceof ReadyLocalTransaction) {
- handleReadyLocalTransaction((ReadyLocalTransaction)message);
+ } else if (message instanceof BatchedModifications request) {
+ handleBatchedModifications(request);
+ } else if (message instanceof ForwardedReadyTransaction request) {
+ handleForwardedReadyTransaction(request);
+ } else if (message instanceof ReadyLocalTransaction request) {
+ handleReadyLocalTransaction(request);
} else if (CanCommitTransaction.isSerializedType(message)) {
handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
} else if (CommitTransaction.isSerializedType(message)) {
closeTransactionChain(CloseTransactionChain.fromSerializable(message));
} else if (message instanceof DataTreeChangedReply) {
// Ignore reply
- } else if (message instanceof RegisterDataTreeChangeListener) {
- treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader());
- } else if (message instanceof UpdateSchemaContext) {
- updateSchemaContext((UpdateSchemaContext) message);
+ } else if (message instanceof RegisterDataTreeChangeListener request) {
+ treeChangeSupport.onMessage(request, isLeader(), hasLeader());
+ } else if (message instanceof UpdateSchemaContext request) {
+ updateSchemaContext(request);
} else if (message instanceof PeerAddressResolved resolved) {
setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
} else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
commitTimeoutCheck();
- } else if (message instanceof DatastoreContext) {
- onDatastoreContext((DatastoreContext)message);
+ } else if (message instanceof DatastoreContext request) {
+ onDatastoreContext(request);
} else if (message instanceof RegisterRoleChangeListener) {
roleChangeNotifier.orElseThrow().forward(message, context());
- } else if (message instanceof FollowerInitialSyncUpStatus) {
- shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
+ } else if (message instanceof FollowerInitialSyncUpStatus request) {
+ shardMBean.setFollowerInitialSyncStatus(request.isInitialSyncDone());
context().parent().tell(message, self());
} else if (GET_SHARD_MBEAN_MESSAGE.equals(message)) {
sender().tell(getShardMBean(), self());
context().parent().forward(message, context());
} else if (ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
- } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
- store.processCohortRegistryCommand(getSender(),
- (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
+ } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand request) {
+ store.processCohortRegistryCommand(getSender(), request);
} else if (message instanceof MakeLeaderLocal) {
onMakeLeaderLocal();
} else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
throw new NotLeaderException(getSelf());
}
- final Request<?, ?> request = envelope.getMessage();
- if (request instanceof TransactionRequest) {
- final TransactionRequest<?> txReq = (TransactionRequest<?>)request;
- final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId();
+ final var request = envelope.getMessage();
+ if (request instanceof TransactionRequest<?> txReq) {
+ final var clientId = txReq.getTarget().getHistoryId().getClientId();
return getFrontend(clientId).handleTransactionRequest(txReq, envelope, now);
- } else if (request instanceof LocalHistoryRequest) {
- final LocalHistoryRequest<?> lhReq = (LocalHistoryRequest<?>)request;
- final ClientIdentifier clientId = lhReq.getTarget().getClientId();
+ } else if (request instanceof LocalHistoryRequest<?> lhReq) {
+ final var clientId = lhReq.getTarget().getClientId();
return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now);
} else {
LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request);
@Override
protected final void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
- if (data instanceof Payload) {
- if (data instanceof DisableTrackingPayload) {
- disableTracking((DisableTrackingPayload) data);
+ if (data instanceof Payload payload) {
+ if (payload instanceof DisableTrackingPayload disableTracking) {
+ disableTracking(disableTracking);
return;
}
try {
- store.applyReplicatedPayload(identifier, (Payload)data);
+ store.applyReplicatedPayload(identifier, payload);
} catch (DataValidationFailedException | IOException e) {
LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
}