Switch to use PayloadVersion.CHLORINE_SR2
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 357df4ab8d819fbb7c5d46a0ea80ec025aa71200..bcf09b3331d3525bdc76aae523a75d1f8adc2065 100644 (file)
@@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Range;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
@@ -107,14 +108,14 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.Payload;
 import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
 import org.opendaylight.yangtools.concepts.Identifier;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
+import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.tree.api.TreeType;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContextProvider;
 import scala.concurrent.duration.FiniteDuration;
@@ -217,27 +218,23 @@ public class Shard extends RaftActor {
 
     private final ActorRef exportActor;
 
+    @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design")
     Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
 
-        this.name = builder.getId().toString();
-        this.shardName = builder.getId().getShardName();
-        this.datastoreContext = builder.getDatastoreContext();
-        this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
-        this.frontendMetadata = new FrontendMetadata(name);
-        this.exportOnRecovery = datastoreContext.getExportOnRecovery();
+        name = builder.getId().toString();
+        shardName = builder.getId().getShardName();
+        datastoreContext = builder.getDatastoreContext();
+        restoreFromSnapshot = builder.getRestoreFromSnapshot();
+        frontendMetadata = new FrontendMetadata(name);
+        exportOnRecovery = datastoreContext.getExportOnRecovery();
 
-        switch (exportOnRecovery) {
-            case Json:
-                exportActor = getContext().actorOf(JsonExportActor.props(builder.getSchemaContext(),
-                        datastoreContext.getRecoveryExportBaseDir()));
-                break;
-            case Off:
-            default:
-                exportActor = null;
-                break;
-        }
+        exportActor = switch (exportOnRecovery) {
+            case Json -> getContext().actorOf(JsonExportActor.props(builder.getSchemaContext(),
+                datastoreContext.getRecoveryExportBaseDir()));
+            case Off -> null;
+        };
 
         setPersistence(datastoreContext.isPersistent());
 
@@ -261,7 +258,7 @@ public class Shard extends RaftActor {
             getContext().become(new MeteringBehavior(this));
         }
 
-        commitCoordinator = new ShardCommitCoordinator(store, LOG, this.name);
+        commitCoordinator = new ShardCommitCoordinator(store, LOG, name);
 
         setTransactionCommitTimeout();
 
@@ -277,16 +274,16 @@ public class Shard extends RaftActor {
                 self(), getContext(), shardMBean, builder.getId().getShardName());
 
         snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
-            this.name, datastoreContext);
+            name, datastoreContext);
 
         messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
 
-        responseMessageSlicer = MessageSlicer.builder().logContext(this.name)
+        responseMessageSlicer = MessageSlicer.builder().logContext(name)
                 .messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
                 .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
                 .expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
 
-        requestMessageAssembler = MessageAssembler.builder().logContext(this.name)
+        requestMessageAssembler = MessageAssembler.builder().logContext(name)
                 .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
                 .assembledMessageCallback((message, sender) -> self().tell(message, sender))
                 .expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build();
@@ -302,7 +299,7 @@ public class Shard extends RaftActor {
     }
 
     private Optional<ActorRef> createRoleChangeNotifier(final String shardId) {
-        ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
+        ActorRef shardRoleChangeNotifier = getContext().actorOf(
             RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
         return Optional.of(shardRoleChangeNotifier);
     }
@@ -395,8 +392,7 @@ public class Shard extends RaftActor {
                 treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader());
             } else if (message instanceof UpdateSchemaContext) {
                 updateSchemaContext((UpdateSchemaContext) message);
-            } else if (message instanceof PeerAddressResolved) {
-                PeerAddressResolved resolved = (PeerAddressResolved) message;
+            } else if (message instanceof PeerAddressResolved resolved) {
                 setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
             } else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
                 commitTimeoutCheck();
@@ -818,22 +814,23 @@ public class Shard extends RaftActor {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
-        LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionId());
+        final TransactionIdentifier txId = message.getTransactionId();
+        LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), txId);
 
         boolean isLeaderActive = isLeaderActive();
         if (isLeader() && isLeaderActive) {
+            askProtocolEncountered(txId);
             try {
                 commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
             } catch (Exception e) {
-                LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
-                        message.getTransactionId(), e);
+                LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), txId, e);
                 getSender().tell(new Failure(e), getSelf());
             }
         } else {
             ActorSelection leader = getLeader();
             if (!isLeaderActive || leader == null) {
                 messageRetrySupport.addMessageToRetry(message, getSender(),
-                        "Could not process ready local transaction " + message.getTransactionId());
+                        "Could not process ready local transaction " + txId);
             } else {
                 LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
                 message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
@@ -1119,7 +1116,7 @@ public class Shard extends RaftActor {
 
     @Override
     public final String persistenceId() {
-        return this.name;
+        return name;
     }
 
     @Override
@@ -1242,15 +1239,10 @@ public class Shard extends RaftActor {
         }
 
         public TreeType getTreeType() {
-            switch (datastoreContext.getLogicalStoreType()) {
-                case CONFIGURATION:
-                    return TreeType.CONFIGURATION;
-                case OPERATIONAL:
-                    return TreeType.OPERATIONAL;
-                default:
-                    throw new IllegalStateException("Unhandled logical store type "
-                            + datastoreContext.getLogicalStoreType());
-            }
+            return switch (datastoreContext.getLogicalStoreType()) {
+                case CONFIGURATION -> TreeType.CONFIGURATION;
+                case OPERATIONAL -> TreeType.OPERATIONAL;
+            };
         }
 
         protected void verify() {