Bug 7814: Fix InvalidActorNameException
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 90c47256b4c6b33bdf8e3b4f71eef7d88bb7fe90..7ca79fc2349add0e9c19236a945aaed53676b45d 100644 (file)
@@ -37,6 +37,7 @@ import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
@@ -61,16 +62,17 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionCh
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
+import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
@@ -80,6 +82,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
@@ -206,8 +209,8 @@ public class Shard extends RaftActor {
                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
 
         transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext,
-                new Dispatchers(context().system().dispatchers()).getDispatcherPath(
-                        Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
+            new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Transaction),
+                self(), getContext(), shardMBean, builder.getId().getShardName());
 
         snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
             this.name);
@@ -257,7 +260,7 @@ public class Shard extends RaftActor {
     @SuppressWarnings("checkstyle:IllegalCatch")
     @Override
     protected void handleNonRaftCommand(final Object message) {
-        try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
+        try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
             final Optional<Error> maybeError = context.error();
             if (maybeError.isPresent()) {
                 LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
@@ -450,16 +453,15 @@ public class Shard extends RaftActor {
         updateConfigParams(datastoreContext.getShardRaftConfig());
     }
 
-    boolean canSkipPayload() {
-        // If we do not have any followers and we are not using persistence we can apply modification to the state
-        // immediately
-        return !hasFollowers() && !persistence().isRecoveryApplicable();
-    }
-
     // applyState() will be invoked once consensus is reached on the payload
-    void persistPayload(final TransactionIdentifier transactionId, final Payload payload, boolean batchHint) {
-        // We are faking the sender
-        persistData(self(), transactionId, payload, batchHint);
+    void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) {
+        boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable();
+        if (canSkipPayload) {
+            applyState(self(), id, payload);
+        } else {
+            // We are faking the sender
+            persistData(self(), id, payload, batchHint);
+        }
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {
@@ -613,7 +615,7 @@ public class Shard extends RaftActor {
         doAbortTransaction(abort.getTransactionId(), getSender());
     }
 
-    void doAbortTransaction(final TransactionIdentifier transactionID, final ActorRef sender) {
+    void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
         commitCoordinator.handleAbort(transactionID, sender, this);
     }
 
@@ -629,7 +631,9 @@ public class Shard extends RaftActor {
     }
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
-        store.closeTransactionChain(closeTransactionChain.getIdentifier());
+        final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
+        store.closeTransactionChain(id, null);
+        store.purgeTransactionChain(id, null);
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
@@ -784,6 +788,13 @@ public class Shard extends RaftActor {
         store.setRunOnPendingTransactionsComplete(operation);
     }
 
+    @Override
+    protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+        return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
+                .dataChangeListenerActors(changeSupport.getListenerActors())
+                .commitCohortActors(store.getCohortActors());
+    }
+
     @Override
     public String persistenceId() {
         return this.name;