BUG-8056: make doCommit/finishCommit package-private
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index cb072b5c6a031a9ee5e5c3ba1753939cb66509ab..19a2151d2fc925284a39d65960e80819d425e154 100644 (file)
@@ -18,13 +18,14 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
+import com.google.common.base.Verify;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Range;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
@@ -66,11 +67,13 @@ import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTran
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
 import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
@@ -164,8 +167,8 @@ public class Shard extends RaftActor {
 
     private final ShardTransactionMessageRetrySupport messageRetrySupport;
 
-    private final FrontendMetadata frontendMetadata = new FrontendMetadata();
-    private final Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = new HashMap<>();
+    private final FrontendMetadata frontendMetadata;
+    private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
 
     protected Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
@@ -174,6 +177,7 @@ public class Shard extends RaftActor {
         this.name = builder.getId().toString();
         this.datastoreContext = builder.getDatastoreContext();
         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
+        this.frontendMetadata = new FrontendMetadata(name);
 
         setPersistence(datastoreContext.isPersistent());
 
@@ -185,11 +189,11 @@ public class Shard extends RaftActor {
                 new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher");
         if (builder.getDataTree() != null) {
             store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
-                    treeChangeListenerPublisher, dataChangeListenerPublisher, name);
+                    treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata);
         } else {
             store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(),
                     builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher,
-                    dataChangeListenerPublisher, name);
+                    dataChangeListenerPublisher, name, frontendMetadata);
         }
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this);
@@ -209,8 +213,8 @@ public class Shard extends RaftActor {
                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
 
         transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext,
-                new Dispatchers(context().system().dispatchers()).getDispatcherPath(
-                        Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
+            new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Transaction),
+                self(), getContext(), shardMBean, builder.getId().getShardName());
 
         snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
             this.name);
@@ -332,6 +336,9 @@ public class Shard extends RaftActor {
             } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
                 store.processCohortRegistryCommand(getSender(),
                         (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
+            } else if (message instanceof PersistAbortTransactionPayload) {
+                final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
+                persistPayload(txId, AbortTransactionPayload.create(txId), true);
             } else {
                 super.handleNonRaftCommand(message);
             }
@@ -632,7 +639,8 @@ public class Shard extends RaftActor {
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
         final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
-        store.closeTransactionChain(id, () -> store.purgeTransactionChain(id, null));
+        store.closeTransactionChain(id, null);
+        store.purgeTransactionChain(id, null);
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
@@ -732,7 +740,7 @@ public class Shard extends RaftActor {
                     persistenceId(), getId());
             }
 
-            store.closeAllTransactionChains();
+            store.purgeLeaderState();
         }
 
         if (hasLeader && !isIsolatedLeader()) {
@@ -744,8 +752,18 @@ public class Shard extends RaftActor {
     protected void onLeaderChanged(final String oldLeader, final String newLeader) {
         shardMBean.incrementLeadershipChangeCount();
 
-        boolean hasLeader = hasLeader();
-        if (hasLeader && !isLeader()) {
+        final boolean hasLeader = hasLeader();
+        if (!hasLeader) {
+            // No leader implies we are not the leader, lose frontend state if we have any. This also places
+            // an explicit guard so the map will not get modified accidentally.
+            if (!knownFrontends.isEmpty()) {
+                LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
+                knownFrontends = ImmutableMap.of();
+            }
+            return;
+        }
+
+        if (!isLeader()) {
             // Another leader was elected. If we were the previous leader and had pending transactions, convert
             // them to transaction messages and send to the new leader.
             ActorSelection leader = getLeader();
@@ -764,9 +782,13 @@ public class Shard extends RaftActor {
                 commitCoordinator.abortPendingTransactions("The transacton was aborted due to inflight leadership "
                         + "change and the leader address isn't available.", this);
             }
+        } else {
+            // We have become the leader, we need to reconstruct frontend state
+            knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+            LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet());
         }
 
-        if (hasLeader && !isIsolatedLeader()) {
+        if (!isIsolatedLeader()) {
             messageRetrySupport.retryMessages();
         }
     }