BUG-5626: use Identifier instead of String 32/38632/7
authorRobert Varga <rovarga@cisco.com>
Tue, 10 May 2016 12:16:27 +0000 (14:16 +0200)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 13 May 2016 11:24:59 +0000 (11:24 +0000)
Using a String for identifying replication entries is not flexible
enough. Yangtools has an Identifier concept and utility classes that
make this more flexible. Use these so we can evolve the identifiers
we use to track requests.

Change-Id: Ie5794d1e929300928c57cbec6d2fe22329fe6a5e
Signed-off-by: Robert Varga <rovarga@cisco.com>
19 files changed:
opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTrackerImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/Replicate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java

index 87dede1..ff57bfd 100644 (file)
@@ -33,6 +33,8 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
 
 /**
  * A sample actor showing how the RaftActor is to be extended
@@ -61,8 +63,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
     protected void handleNonRaftCommand(Object message) {
         if(message instanceof KeyValue){
             if(isLeader()) {
-                String persistId = Long.toString(persistIdentifier++);
-                persistData(getSender(), persistId, (Payload) message);
+                persistData(getSender(), new StringIdentifier(String.valueOf(persistIdentifier++)), (Payload) message);
             } else {
                 if(getLeader() != null) {
                     getLeader().forward(message, getContext());
@@ -111,8 +112,8 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
         return roleChangeNotifier;
     }
 
-    @Override protected void applyState(final ActorRef clientActor, final String identifier,
-        final Object data) {
+    @Override
+    protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
         if(data instanceof KeyValue){
             KeyValue kv = (KeyValue) data;
             state.put(kv.getKey(), kv.getValue());
index af6f1d4..afe680c 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.raft;
 
 import akka.actor.ActorRef;
+import org.opendaylight.yangtools.concepts.Identifier;
 
 public interface ClientRequestTracker {
     /**
@@ -22,7 +23,7 @@ public interface ClientRequestTracker {
      * @return the identifier of the object that is to be replicated. For example a transaction identifier in the case
      * of a transaction
      */
-    String getIdentifier();
+    Identifier getIdentifier();
 
     /**
      *
index 15de6d0..6ffb922 100644 (file)
@@ -9,15 +9,15 @@
 package org.opendaylight.controller.cluster.raft;
 
 import akka.actor.ActorRef;
+import org.opendaylight.yangtools.concepts.Identifier;
 
 public class ClientRequestTrackerImpl implements ClientRequestTracker {
 
     private final ActorRef clientActor;
-    private final String identifier;
+    private final Identifier identifier;
     private final long logIndex;
 
-    public ClientRequestTrackerImpl(ActorRef clientActor, String identifier,
-        long logIndex) {
+    public ClientRequestTrackerImpl(ActorRef clientActor, Identifier identifier, long logIndex) {
 
         this.clientActor = clientActor;
 
@@ -26,15 +26,18 @@ public class ClientRequestTrackerImpl implements ClientRequestTracker {
         this.logIndex = logIndex;
     }
 
-    @Override public ActorRef getClientActor() {
+    @Override
+    public ActorRef getClientActor() {
         return clientActor;
     }
 
-    @Override public long getIndex() {
+    @Override
+    public long getIndex() {
         return logIndex;
     }
 
-    public String getIdentifier() {
+    @Override
+    public Identifier getIdentifier() {
         return identifier;
     }
 }
index 47c8db6..350fa54 100644 (file)
@@ -12,7 +12,6 @@ package org.opendaylight.controller.cluster.raft;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
-import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -52,6 +51,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftS
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.Immutable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -512,8 +512,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * @param identifier
      * @param data
      */
-    protected void persistData(final ActorRef clientActor, final String identifier,
-        final Payload data) {
+    protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) {
 
         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
             context.getReplicatedLog().lastIndex() + 1,
@@ -525,28 +524,25 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         final RaftActorContext raftContext = getRaftActorContext();
 
-        replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
-            @Override
-            public void apply(ReplicatedLogEntry replicatedLogEntry) {
-                if (!hasFollowers()){
-                    // Increment the Commit Index and the Last Applied values
-                    raftContext.setCommitIndex(replicatedLogEntry.getIndex());
-                    raftContext.setLastApplied(replicatedLogEntry.getIndex());
+        replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> {
+            if (!hasFollowers()){
+                // Increment the Commit Index and the Last Applied values
+                raftContext.setCommitIndex(replicatedLogEntry1.getIndex());
+                raftContext.setLastApplied(replicatedLogEntry1.getIndex());
 
-                    // Apply the state immediately.
-                    self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self());
+                // Apply the state immediately.
+                self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self());
 
-                    // Send a ApplyJournalEntries message so that we write the fact that we applied
-                    // the state to durable storage
-                    self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
+                // Send a ApplyJournalEntries message so that we write the fact that we applied
+                // the state to durable storage
+                self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self());
 
-                } else if (clientActor != null) {
-                    context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
+            } else if (clientActor != null) {
+                context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1);
 
-                    // Send message for replication
-                    getCurrentBehavior().handleMessage(getSelf(),
-                            new Replicate(clientActor, identifier, replicatedLogEntry));
-                }
+                // Send message for replication
+                getCurrentBehavior().handleMessage(getSelf(),
+                        new Replicate(clientActor, identifier, replicatedLogEntry1));
             }
         });
     }
@@ -724,8 +720,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * @param data        A piece of data that was persisted by the persistData call.
      *                    This should NEVER be null.
      */
-    protected abstract void applyState(ActorRef clientActor, String identifier,
-        Object data);
+    protected abstract void applyState(ActorRef clientActor, Identifier identifier, Object data);
 
     /**
      * Returns the RaftActorRecoveryCohort to participate in persistence recovery.
index 46fe626..4cad0d0 100644 (file)
@@ -35,6 +35,8 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.UUIDIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
@@ -530,15 +532,15 @@ class RaftActorServerConfigurationSupport {
     private static abstract class ServerOperationContext<T> {
         private final T operation;
         private final ActorRef clientRequestor;
-        private final String contextId;
+        private final Identifier contextId;
 
         ServerOperationContext(T operation, ActorRef clientRequestor){
             this.operation = operation;
             this.clientRequestor = clientRequestor;
-            contextId = UUID.randomUUID().toString();
+            contextId = new UUIDIdentifier(UUID.randomUUID());
         }
 
-        String getContextId() {
+        Identifier getContextId() {
             return contextId;
         }
 
index 9477eb2..7463a93 100644 (file)
@@ -11,15 +11,15 @@ package org.opendaylight.controller.cluster.raft.base.messages;
 import akka.actor.ActorRef;
 import java.io.Serializable;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.yangtools.concepts.Identifier;
 
 public class ApplyState implements Serializable {
     private static final long serialVersionUID = 1L;
     private final ActorRef clientActor;
-    private final String identifier;
+    private final Identifier identifier;
     private final ReplicatedLogEntry replicatedLogEntry;
 
-    public ApplyState(ActorRef clientActor, String identifier,
-        ReplicatedLogEntry replicatedLogEntry) {
+    public ApplyState(ActorRef clientActor, Identifier identifier, ReplicatedLogEntry replicatedLogEntry) {
         this.clientActor = clientActor;
         this.identifier = identifier;
         this.replicatedLogEntry = replicatedLogEntry;
@@ -29,7 +29,7 @@ public class ApplyState implements Serializable {
         return clientActor;
     }
 
-    public String getIdentifier() {
+    public Identifier getIdentifier() {
         return identifier;
     }
 
index 611c6ea..a5381d5 100644 (file)
@@ -9,18 +9,17 @@
 package org.opendaylight.controller.cluster.raft.base.messages;
 
 import akka.actor.ActorRef;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-
 import java.io.Serializable;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.yangtools.concepts.Identifier;
 
 public class Replicate implements Serializable {
     private static final long serialVersionUID = 1L;
     private final ActorRef clientActor;
-    private final String identifier;
+    private final Identifier identifier;
     private final ReplicatedLogEntry replicatedLogEntry;
 
-    public Replicate(ActorRef clientActor, String identifier,
-        ReplicatedLogEntry replicatedLogEntry) {
+    public Replicate(ActorRef clientActor, Identifier identifier, ReplicatedLogEntry replicatedLogEntry) {
 
         this.clientActor = clientActor;
         this.identifier = identifier;
@@ -31,7 +30,7 @@ public class Replicate implements Serializable {
         return clientActor;
     }
 
-    public String getIdentifier() {
+    public Identifier getIdentifier() {
         return identifier;
     }
 
index ab09ba3..106678e 100644 (file)
@@ -25,6 +25,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.slf4j.Logger;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -363,30 +364,28 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     protected void applyLogToStateMachine(final long index) {
         long newLastApplied = context.getLastApplied();
         // Now maybe we apply to the state machine
-        for (long i = context.getLastApplied() + 1;
-             i < index + 1; i++) {
-            ActorRef clientActor = null;
-            String identifier = null;
-            ClientRequestTracker tracker = removeClientRequestTracker(i);
-
+        for (long i = context.getLastApplied() + 1; i < index + 1; i++) {
+            final ActorRef clientActor;
+            final Identifier identifier;
+            final ClientRequestTracker tracker = removeClientRequestTracker(i);
             if (tracker != null) {
                 clientActor = tracker.getClientActor();
                 identifier = tracker.getIdentifier();
+            } else {
+                clientActor = null;
+                identifier = null;
             }
-            ReplicatedLogEntry replicatedLogEntry =
-                context.getReplicatedLog().get(i);
 
+            ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(i);
             if (replicatedLogEntry != null) {
                 // Send a local message to the local RaftActor (it's derived class to be
                 // specific to apply the log to it's index)
-                actor().tell(new ApplyState(clientActor, identifier,
-                    replicatedLogEntry), actor());
+                actor().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), actor());
                 newLastApplied = i;
             } else {
                 //if one index is not present in the log, no point in looping
                 // around as the rest wont be present either
-                LOG.warn(
-                        "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
+                LOG.warn("{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
                         logName(), i, i, index);
                 break;
             }
index b30412f..ab44654 100644 (file)
@@ -16,7 +16,6 @@ import akka.actor.Terminated;
 import akka.dispatch.Dispatchers;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.ArrayList;
@@ -36,6 +35,8 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
@@ -91,7 +92,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         public void handleCommand(Object message) {
             if(message instanceof MockPayload) {
                 MockPayload payload = (MockPayload)message;
-                super.persistData(collectorActor, payload.toString(), payload);
+                super.persistData(collectorActor, new StringIdentifier(payload.toString()), payload);
                 return;
             }
 
@@ -246,12 +247,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     }
 
     protected void verifyApplyJournalEntries(ActorRef actor, final long expIndex) {
-        MessageCollectorActor.expectFirstMatching(actor, ApplyJournalEntries.class, new Predicate<ApplyJournalEntries>() {
-            @Override
-            public boolean apply(ApplyJournalEntries msg) {
-                return msg.getToIndex() == expIndex;
-            }
-        });
+        MessageCollectorActor.expectFirstMatching(actor, ApplyJournalEntries.class, msg -> msg.getToIndex() == expIndex);
     }
 
     @SuppressWarnings("unchecked")
@@ -300,7 +296,9 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     protected void verifyApplyState(ApplyState applyState, ActorRef expClientActor,
             String expId, long expTerm, long expIndex, MockPayload payload) {
         assertEquals("ApplyState getClientActor", expClientActor, applyState.getClientActor());
-        assertEquals("ApplyState getIdentifier", expId, applyState.getIdentifier());
+
+        final Identifier id = expId == null ? null : new StringIdentifier(expId);
+        assertEquals("ApplyState getIdentifier", id, applyState.getIdentifier());
         ReplicatedLogEntry replicatedLogEntry = applyState.getReplicatedLogEntry();
         verifyReplicatedLogEntry(replicatedLogEntry, expTerm, expIndex, payload);
     }
index c96ab9a..411b23d 100644 (file)
@@ -28,6 +28,7 @@ import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
 
 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
     public static final short PAYLOAD_VERSION = 5;
@@ -114,7 +115,8 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
     }
 
 
-    @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+    @Override
+    protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
         actorDelegate.applyState(clientActor, identifier, data);
         LOG.info("{}: applyState called: {}", persistenceId(), data);
 
index 671d875..319970e 100644 (file)
@@ -86,6 +86,8 @@ import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolic
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
@@ -443,9 +445,10 @@ public class RaftActorTest extends AbstractActorTest {
                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
                         new MockRaftActorContext.MockPayload("F"));
 
-                mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
+                final Identifier id = new StringIdentifier("apply-state");
+                mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry));
 
-                verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
+                verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject());
 
             }
         };
@@ -949,7 +952,8 @@ public class RaftActorTest extends AbstractActorTest {
             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
             // Persist another entry (this will cause a CaptureSnapshot to be triggered
-            leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
+            leaderActor.persistData(mockActorRef, new StringIdentifier("x"),
+                new MockRaftActorContext.MockPayload("duh"));
 
             // Now send a CaptureSnapshotReply
             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
index 1f42af7..0937f65 100644 (file)
@@ -63,6 +63,8 @@ import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import scala.concurrent.duration.FiniteDuration;
 
 public class LeaderTest extends AbstractLeaderTest<Leader> {
@@ -514,8 +516,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         actorContext.getReplicatedLog().append(newEntry);
 
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
-                new Replicate(leaderActor, "state-id", newEntry));
+        final Identifier id = new StringIdentifier("state-id");
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
 
         // State should not change
         assertTrue(raftBehavior instanceof Leader);
@@ -536,7 +538,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         ApplyState last = applyStateList.get((int) newLogIndex - 1);
         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
-        assertEquals("getIdentifier", "state-id", last.getIdentifier());
+        assertEquals("getIdentifier", id, last.getIdentifier());
     }
 
     @Test
@@ -652,7 +654,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
         RaftActorBehavior raftBehavior = leader.handleMessage(
-                leaderActor, new Replicate(null, "state-id", entry));
+                leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry));
 
         assertTrue(raftBehavior instanceof Leader);
 
@@ -697,7 +699,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         //update follower timestamp
         leader.markFollowerActive(FOLLOWER_ID);
 
-        leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+        leader.handleMessage(leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry));
 
         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
 
@@ -710,7 +712,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals(2, cs.getLastTerm());
 
         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
-        leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+        leader.handleMessage(leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry));
 
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
@@ -771,8 +773,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals(4, cs.getLastIndex());
         assertEquals(2, cs.getLastTerm());
 
-        // if an initiate is started again when first is in progress, it shouldnt initiate Capture
-        leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+        // if an initiate is started again when first is in progress, it should not initiate Capture
+        leader.handleMessage(leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry));
 
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
index 0fa6788..d13ecd2 100644 (file)
@@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -34,7 +35,7 @@ final class CohortEntry {
 
     private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
     private final ReadWriteShardDataTreeTransaction transaction;
-    private final String transactionID;
+    private final StringIdentifier transactionID;
     private final CompositeDataTreeCohort userCohorts;
     private final short clientVersion;
 
@@ -49,14 +50,14 @@ final class CohortEntry {
     CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction,
             DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
         this.transaction = Preconditions.checkNotNull(transaction);
-        this.transactionID = transactionID;
+        this.transactionID = new StringIdentifier(transactionID);
         this.clientVersion = clientVersion;
         this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
     }
 
     CohortEntry(String transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
             SchemaContext schema, short clientVersion) {
-        this.transactionID = transactionID;
+        this.transactionID = new StringIdentifier(transactionID);
         this.cohort = cohort;
         this.transaction = null;
         this.clientVersion = clientVersion;
@@ -68,7 +69,7 @@ final class CohortEntry {
         lastAccessTimer.start();
     }
 
-    String getTransactionID() {
+    StringIdentifier getTransactionID() {
         return transactionID;
     }
 
index 7165581..94753e5 100644 (file)
@@ -66,6 +66,8 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
@@ -330,16 +332,16 @@ public class Shard extends RaftActor {
         if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
             applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
         } else {
-            Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
+            persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
                     DataTreeCandidatePayload.create(candidate));
         }
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {
         if (isLeader()) {
-        if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
-            shardMBean.incrementFailedTransactionsCount();
-        }
+            if(!commitCoordinator.handleCommit(new StringIdentifier(commit.getTransactionID()), getSender(), this)) {
+                shardMBean.incrementFailedTransactionsCount();
+            }
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
@@ -352,7 +354,8 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final String transactionID, @Nonnull final CohortEntry cohortEntry) {
+    private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final Identifier transactionID,
+            @Nonnull final CohortEntry cohortEntry) {
         LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
 
         try {
@@ -394,7 +397,7 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
+    private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull Identifier transactionID) {
         // With persistence enabled, this method is called via applyState by the leader strategy
         // after the commit has been replicated to a majority of the followers.
 
@@ -435,7 +438,7 @@ public class Shard extends RaftActor {
         LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
 
         if (isLeader()) {
-        commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+        commitCoordinator.handleCanCommit(new StringIdentifier(canCommit.getTransactionID()), getSender(), this);
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
@@ -566,7 +569,7 @@ public class Shard extends RaftActor {
     }
 
     void doAbortTransaction(final String transactionID, final ActorRef sender) {
-        commitCoordinator.handleAbort(transactionID, sender, this);
+        commitCoordinator.handleAbort(new StringIdentifier(transactionID), sender, this);
     }
 
     private void handleCreateTransaction(final Object message) {
@@ -683,7 +686,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
+    protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
         if (data instanceof DataTreeCandidatePayload) {
             if (clientActor == null) {
                 // No clientActor indicates a replica coming from the leader
@@ -711,7 +714,7 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
+    private void applyModificationToState(ActorRef clientActor, Identifier identifier, Object modification) {
         if(modification == null) {
             LOG.error(
                     "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
index f313329..3451934 100644 (file)
@@ -34,6 +34,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 
@@ -46,10 +48,10 @@ final class ShardCommitCoordinator {
 
     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
     public interface CohortDecorator {
-        ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
+        ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual);
     }
 
-    private final Map<String, CohortEntry> cohortCache = new HashMap<>();
+    private final Map<Identifier, CohortEntry> cohortCache = new HashMap<>();
 
     private CohortEntry currentCohortEntry;
 
@@ -143,7 +145,7 @@ final class ShardCommitCoordinator {
 
         final ShardDataTreeCohort cohort = ready.getTransaction().ready();
         final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion());
-        cohortCache.put(ready.getTransactionID(), cohortEntry);
+        cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
 
         if(!queueCohortEntry(cohortEntry, sender, shard)) {
             return;
@@ -172,12 +174,12 @@ final class ShardCommitCoordinator {
      * @param shard the transaction's shard actor
      */
     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard, SchemaContext schema) {
-        CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
+        CohortEntry cohortEntry = cohortCache.get(new StringIdentifier(batched.getTransactionID()));
         if(cohortEntry == null) {
             cohortEntry = new CohortEntry(batched.getTransactionID(),
                     dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()),
                     cohortRegistry, schema,  batched.getVersion());
-            cohortCache.put(batched.getTransactionID(), cohortEntry);
+            cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
         }
 
         if(log.isDebugEnabled()) {
@@ -238,7 +240,7 @@ final class ShardCommitCoordinator {
                 message.getTransactionID());
         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema,
                 DataStoreVersions.CURRENT_VERSION);
-        cohortCache.put(message.getTransactionID(), cohortEntry);
+        cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
 
         if(!queueCohortEntry(cohortEntry, sender, shard)) {
@@ -258,7 +260,7 @@ final class ShardCommitCoordinator {
 
     Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
             final int maxModificationsPerBatch) {
-        CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
+        CohortEntry cohortEntry = getAndRemoveCohortEntry(new StringIdentifier(from.getTransactionID()));
         if(cohortEntry == null || cohortEntry.getTransaction() == null) {
             return Collections.singletonList(from);
         }
@@ -287,8 +289,6 @@ final class ShardCommitCoordinator {
     }
 
     private void handleCanCommit(CohortEntry cohortEntry) {
-        String transactionID = cohortEntry.getTransactionID();
-
         cohortEntry.updateLastAccessTime();
 
         if(currentCohortEntry != null) {
@@ -297,7 +297,7 @@ final class ShardCommitCoordinator {
 
             if(log.isDebugEnabled()) {
                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
-                        name, currentCohortEntry.getTransactionID(), transactionID);
+                        name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID());
             }
 
             return;
@@ -313,7 +313,7 @@ final class ShardCommitCoordinator {
             if(log.isDebugEnabled()) {
                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
                         queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
-                                transactionID);
+                                cohortEntry.getTransactionID());
             }
         }
     }
@@ -325,7 +325,7 @@ final class ShardCommitCoordinator {
      * @param sender the actor to which to send the response
      * @param shard the transaction's shard actor
      */
-    void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
+    void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) {
         // Lookup the cohort entry that was cached previously (or should have been) by
         // transactionReady (via the ForwardedReadyTransaction message).
         final CohortEntry cohortEntry = cohortCache.get(transactionID);
@@ -419,7 +419,7 @@ final class ShardCommitCoordinator {
      * @param shard the transaction's shard actor
      * @return true if the transaction was successfully prepared, false otherwise.
      */
-    boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
+    boolean handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
         // this transaction.
         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
@@ -438,7 +438,7 @@ final class ShardCommitCoordinator {
         return doCommit(cohortEntry);
     }
 
-    void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
+    void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) {
         CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
         if(cohortEntry != null) {
             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
@@ -539,7 +539,7 @@ final class ShardCommitCoordinator {
                 protected BatchedModifications getModifications() {
                     if(newModifications.isEmpty() ||
                             newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
-                        newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
+                        newModifications.add(new BatchedModifications(cohortEntry.getTransactionID().getString(),
                                 cohortEntry.getClientVersion(), ""));
         }
 
@@ -555,12 +555,12 @@ final class ShardCommitCoordinator {
                 messages.addAll(newModifications);
 
                 if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.CAN_COMMITTED) {
-                    messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
+                    messages.add(new CanCommitTransaction(cohortEntry.getTransactionID().getString(),
                             cohortEntry.getClientVersion()));
                 }
 
                 if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.PRE_COMMITTED) {
-                    messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
+                    messages.add(new CommitTransaction(cohortEntry.getTransactionID().getString(),
                             cohortEntry.getClientVersion()));
                 }
             }
@@ -577,7 +577,7 @@ final class ShardCommitCoordinator {
      * @return the current CohortEntry or null if the given transaction ID does not match the
      *         current entry.
      */
-    CohortEntry getCohortEntryIfCurrent(String transactionID) {
+    CohortEntry getCohortEntryIfCurrent(Identifier transactionID) {
         if(isCurrentTransaction(transactionID)) {
             return currentCohortEntry;
         }
@@ -589,11 +589,11 @@ final class ShardCommitCoordinator {
         return currentCohortEntry;
     }
 
-    CohortEntry getAndRemoveCohortEntry(String transactionID) {
+    CohortEntry getAndRemoveCohortEntry(Identifier transactionID) {
         return cohortCache.remove(transactionID);
     }
 
-    boolean isCurrentTransaction(String transactionID) {
+    boolean isCurrentTransaction(Identifier transactionID) {
         return currentCohortEntry != null &&
                 currentCohortEntry.getTransactionID().equals(transactionID);
     }
@@ -607,7 +607,7 @@ final class ShardCommitCoordinator {
      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
      *        the cache.
      */
-    void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
+    void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) {
         if(removeCohortEntry) {
             cohortCache.remove(transactionID);
         }
index 618ea90..210d7f4 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataCh
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -176,7 +177,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return new SimpleEntry<>(reg, readCurrentData());
     }
 
-    void applyForeignCandidate(final String identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
+    void applyForeignCandidate(final Identifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
         LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
 
         final DataTreeModification mod = dataTree.takeSnapshot().newModification();
index 163c15b..76aa1cb 100644 (file)
@@ -56,6 +56,7 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
@@ -313,7 +314,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{
     protected void setupCohortDecorator(Shard shard, final ShardDataTreeCohort cohort) {
         shard.getCommitCoordinator().setCohortDecorator(new ShardCommitCoordinator.CohortDecorator() {
             @Override
-            public ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual) {
+            public ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual) {
                 return cohort;
             }
         });
index ade5b31..e056e60 100644 (file)
@@ -20,7 +20,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
-
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
@@ -102,6 +101,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
@@ -446,8 +446,8 @@ public class ShardTest extends AbstractShardTest {
         writeMod.write(TestModel.TEST_PATH, node);
         writeMod.ready();
 
-        final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
-                payloadForModification(source, writeMod)));
+        final ApplyState applyState = new ApplyState(null, new StringIdentifier("test"),
+            new ReplicatedLogImplEntry(1, 2, payloadForModification(source, writeMod)));
 
         shard.tell(applyState, shard);
 
@@ -679,15 +679,12 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
 
             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
-            final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
-                    if(mockCohort.get() == null) {
-                        mockCohort.set(createDelegatingMockCohort("cohort", actual));
-                    }
-
-                    return mockCohort.get();
+            final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> {
+                if(mockCohort.get() == null) {
+                    mockCohort.set(createDelegatingMockCohort("cohort", actual));
                 }
+
+                return mockCohort.get();
             };
 
             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
@@ -745,15 +742,12 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
 
             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
-            final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
-                    if(mockCohort.get() == null) {
-                        mockCohort.set(createDelegatingMockCohort("cohort", actual));
-                    }
-
-                    return mockCohort.get();
+            final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> {
+                if(mockCohort.get() == null) {
+                    mockCohort.set(createDelegatingMockCohort("cohort", actual));
                 }
+
+                return mockCohort.get();
             };
 
             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
@@ -1614,23 +1608,20 @@ public class ShardTest extends AbstractShardTest {
 
             final String transactionID = "tx1";
             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
-                          new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
-                @Override
-                public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
-                    final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
-
-                    // Simulate an AbortTransaction message occurring during replication, after
-                    // persisting and before finishing the commit to the in-memory store.
-                    // We have no followers so due to optimizations in the RaftActor, it does not
-                    // attempt replication and thus we can't send an AbortTransaction message b/c
-                    // it would be processed too late after CommitTransaction completes. So we'll
-                    // simulate an AbortTransaction message occurring during replication by calling
-                    // the shard directly.
-                    //
-                    shard.underlyingActor().doAbortTransaction(transactionID, null);
-
-                    return preCommitFuture;
-                }
+                          cohort -> {
+                final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
+
+                // Simulate an AbortTransaction message occurring during replication, after
+                // persisting and before finishing the commit to the in-memory store.
+                // We have no followers so due to optimizations in the RaftActor, it does not
+                // attempt replication and thus we can't send an AbortTransaction message b/c
+                // it would be processed too late after CommitTransaction completes. So we'll
+                // simulate an AbortTransaction message occurring during replication by calling
+                // the shard directly.
+                //
+                shard.underlyingActor().doAbortTransaction(transactionID, null);
+
+                return preCommitFuture;
             };
 
             final MutableCompositeModification modification = new MutableCompositeModification();
@@ -1996,20 +1987,15 @@ public class ShardTest extends AbstractShardTest {
         new ShardTestKit(getSystem()) {{
             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
             @SuppressWarnings("serial")
-            final Creator<Shard> creator = new Creator<Shard>() {
+            final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
                 @Override
-                public Shard create() throws Exception {
-                    return new Shard(newShardBuilder()) {
-                        @Override
-                        public void handleCommand(final Object message) {
-                            super.handleCommand(message);
-                            if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
-                                if(cleaupCheckLatch.get() != null) {
-                                    cleaupCheckLatch.get().countDown();
-                                }
-                            }
+                public void handleCommand(final Object message) {
+                    super.handleCommand(message);
+                    if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
+                        if(cleaupCheckLatch.get() != null) {
+                            cleaupCheckLatch.get().countDown();
                         }
-                    };
+                    }
                 }
             };
 
@@ -2112,12 +2098,7 @@ public class ShardTest extends AbstractShardTest {
                 }
             }
 
-            final Creator<Shard> creator = new Creator<Shard>() {
-                @Override
-                public Shard create() throws Exception {
-                    return new TestShard(newShardBuilder());
-                }
-            };
+            final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
 
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
index c9797b5..79beb15 100644 (file)
@@ -35,6 +35,7 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
@@ -106,7 +107,8 @@ public class PreLithiumShardTest extends AbstractShardTest {
 
             NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-            ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
+            ApplyState applyState = new ApplyState(null, new StringIdentifier("test"),
+                new ReplicatedLogImplEntry(1, 2,
                     newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
 
             shard.underlyingActor().onReceiveCommand(applyState);

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.