X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=9284224341aeb2a49d0c867ffc62938e3e6723a1;hb=534bf6f83465cc8a575b097c1e28fbb1f34d110a;hp=357df4ab8d819fbb7c5d46a0ea80ec025aa71200;hpb=d6bc529eaf624afe74d9bf9ee9ab0cb926bba576;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 357df4ab8d..9284224341 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Range; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -107,14 +108,14 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.Payload; import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery; import org.opendaylight.yangtools.concepts.Identifier; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; +import org.opendaylight.yangtools.yang.data.tree.api.DataTree; +import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.tree.api.TreeType; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContextProvider; import scala.concurrent.duration.FiniteDuration; @@ -217,27 +218,23 @@ public class Shard extends RaftActor { private final ActorRef exportActor; + @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design") Shard(final AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION); - this.name = builder.getId().toString(); - this.shardName = builder.getId().getShardName(); - this.datastoreContext = builder.getDatastoreContext(); - this.restoreFromSnapshot = builder.getRestoreFromSnapshot(); - this.frontendMetadata = new FrontendMetadata(name); - this.exportOnRecovery = datastoreContext.getExportOnRecovery(); + name = builder.getId().toString(); + shardName = builder.getId().getShardName(); + datastoreContext = builder.getDatastoreContext(); + restoreFromSnapshot = builder.getRestoreFromSnapshot(); + frontendMetadata = new FrontendMetadata(name); + exportOnRecovery = datastoreContext.getExportOnRecovery(); - switch (exportOnRecovery) { - case Json: - exportActor = getContext().actorOf(JsonExportActor.props(builder.getSchemaContext(), - datastoreContext.getRecoveryExportBaseDir())); - break; - case Off: - default: - exportActor = null; - break; - } + exportActor = switch (exportOnRecovery) { + case Json -> getContext().actorOf(JsonExportActor.props(builder.getSchemaContext(), + datastoreContext.getRecoveryExportBaseDir())); + case Off -> null; + }; setPersistence(datastoreContext.isPersistent()); @@ -261,7 +258,7 @@ public class Shard extends RaftActor { getContext().become(new MeteringBehavior(this)); } - commitCoordinator = new ShardCommitCoordinator(store, LOG, this.name); + commitCoordinator = new ShardCommitCoordinator(store, LOG, name); setTransactionCommitTimeout(); @@ -277,16 +274,16 @@ public class Shard extends RaftActor { self(), getContext(), shardMBean, builder.getId().getShardName()); snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG, - this.name, datastoreContext); + name, datastoreContext); messageRetrySupport = new ShardTransactionMessageRetrySupport(this); - responseMessageSlicer = MessageSlicer.builder().logContext(this.name) + responseMessageSlicer = MessageSlicer.builder().logContext(name) .messageSliceSize(datastoreContext.getMaximumMessageSliceSize()) .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory()) .expireStateAfterInactivity(2, TimeUnit.MINUTES).build(); - requestMessageAssembler = MessageAssembler.builder().logContext(this.name) + requestMessageAssembler = MessageAssembler.builder().logContext(name) .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory()) .assembledMessageCallback((message, sender) -> self().tell(message, sender)) .expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build(); @@ -302,7 +299,7 @@ public class Shard extends RaftActor { } private Optional createRoleChangeNotifier(final String shardId) { - ActorRef shardRoleChangeNotifier = this.getContext().actorOf( + ActorRef shardRoleChangeNotifier = getContext().actorOf( RoleChangeNotifier.getProps(shardId), shardId + "-notifier"); return Optional.of(shardRoleChangeNotifier); } @@ -395,8 +392,7 @@ public class Shard extends RaftActor { treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader()); } else if (message instanceof UpdateSchemaContext) { updateSchemaContext((UpdateSchemaContext) message); - } else if (message instanceof PeerAddressResolved) { - PeerAddressResolved resolved = (PeerAddressResolved) message; + } else if (message instanceof PeerAddressResolved resolved) { setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress()); } else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) { commitTimeoutCheck(); @@ -818,22 +814,23 @@ public class Shard extends RaftActor { @SuppressWarnings("checkstyle:IllegalCatch") private void handleReadyLocalTransaction(final ReadyLocalTransaction message) { - LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionId()); + final TransactionIdentifier txId = message.getTransactionId(); + LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), txId); boolean isLeaderActive = isLeaderActive(); if (isLeader() && isLeaderActive) { + askProtocolEncountered(txId); try { commitCoordinator.handleReadyLocalTransaction(message, getSender(), this); } catch (Exception e) { - LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), - message.getTransactionId(), e); + LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), txId, e); getSender().tell(new Failure(e), getSelf()); } } else { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) { messageRetrySupport.addMessageToRetry(message, getSender(), - "Could not process ready local transaction " + message.getTransactionId()); + "Could not process ready local transaction " + txId); } else { LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader); message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); @@ -1119,7 +1116,7 @@ public class Shard extends RaftActor { @Override public final String persistenceId() { - return this.name; + return name; } @Override