BUG-5626: use Identifier instead of String 32/38632/7
authorRobert Varga <rovarga@cisco.com>
Tue, 10 May 2016 12:16:27 +0000 (14:16 +0200)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 13 May 2016 11:24:59 +0000 (11:24 +0000)
Using a String for identifying replication entries is not flexible
enough. Yangtools has an Identifier concept and utility classes that
make this more flexible. Use these so we can evolve the identifiers
we use to track requests.

Change-Id: Ie5794d1e929300928c57cbec6d2fe22329fe6a5e
Signed-off-by: Robert Varga <rovarga@cisco.com>
19 files changed:
opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTrackerImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/Replicate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java

index 87dede1086901d3c930be777d3d0fbd8642baa07..ff57bfd1d5ae21db8f8ae1646c9a8f6e1f08b3a5 100644 (file)
@@ -33,6 +33,8 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
 
 /**
  * A sample actor showing how the RaftActor is to be extended
@@ -61,8 +63,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
     protected void handleNonRaftCommand(Object message) {
         if(message instanceof KeyValue){
             if(isLeader()) {
-                String persistId = Long.toString(persistIdentifier++);
-                persistData(getSender(), persistId, (Payload) message);
+                persistData(getSender(), new StringIdentifier(String.valueOf(persistIdentifier++)), (Payload) message);
             } else {
                 if(getLeader() != null) {
                     getLeader().forward(message, getContext());
@@ -111,8 +112,8 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
         return roleChangeNotifier;
     }
 
-    @Override protected void applyState(final ActorRef clientActor, final String identifier,
-        final Object data) {
+    @Override
+    protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
         if(data instanceof KeyValue){
             KeyValue kv = (KeyValue) data;
             state.put(kv.getKey(), kv.getValue());
index af6f1d453ab59e6d4a1dc8212e9678c17eb01a73..afe680cc21b03c660c463628365f1873186c6278 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.raft;
 
 import akka.actor.ActorRef;
+import org.opendaylight.yangtools.concepts.Identifier;
 
 public interface ClientRequestTracker {
     /**
@@ -22,7 +23,7 @@ public interface ClientRequestTracker {
      * @return the identifier of the object that is to be replicated. For example a transaction identifier in the case
      * of a transaction
      */
-    String getIdentifier();
+    Identifier getIdentifier();
 
     /**
      *
index 15de6d01a7006279a2b174ecaa2333545e2a1f0f..6ffb9228dcb4ef729965453c7df60be064b56197 100644 (file)
@@ -9,15 +9,15 @@
 package org.opendaylight.controller.cluster.raft;
 
 import akka.actor.ActorRef;
+import org.opendaylight.yangtools.concepts.Identifier;
 
 public class ClientRequestTrackerImpl implements ClientRequestTracker {
 
     private final ActorRef clientActor;
-    private final String identifier;
+    private final Identifier identifier;
     private final long logIndex;
 
-    public ClientRequestTrackerImpl(ActorRef clientActor, String identifier,
-        long logIndex) {
+    public ClientRequestTrackerImpl(ActorRef clientActor, Identifier identifier, long logIndex) {
 
         this.clientActor = clientActor;
 
@@ -26,15 +26,18 @@ public class ClientRequestTrackerImpl implements ClientRequestTracker {
         this.logIndex = logIndex;
     }
 
-    @Override public ActorRef getClientActor() {
+    @Override
+    public ActorRef getClientActor() {
         return clientActor;
     }
 
-    @Override public long getIndex() {
+    @Override
+    public long getIndex() {
         return logIndex;
     }
 
-    public String getIdentifier() {
+    @Override
+    public Identifier getIdentifier() {
         return identifier;
     }
 }
index 47c8db6006544b13c11b42f7abdb370590fd3b8b..350fa54c8302c800e21ed065734bd10ab0e7badd 100644 (file)
@@ -12,7 +12,6 @@ package org.opendaylight.controller.cluster.raft;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
-import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -52,6 +51,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftS
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.Immutable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -512,8 +512,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * @param identifier
      * @param data
      */
-    protected void persistData(final ActorRef clientActor, final String identifier,
-        final Payload data) {
+    protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) {
 
         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
             context.getReplicatedLog().lastIndex() + 1,
@@ -525,28 +524,25 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         final RaftActorContext raftContext = getRaftActorContext();
 
-        replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
-            @Override
-            public void apply(ReplicatedLogEntry replicatedLogEntry) {
-                if (!hasFollowers()){
-                    // Increment the Commit Index and the Last Applied values
-                    raftContext.setCommitIndex(replicatedLogEntry.getIndex());
-                    raftContext.setLastApplied(replicatedLogEntry.getIndex());
+        replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> {
+            if (!hasFollowers()){
+                // Increment the Commit Index and the Last Applied values
+                raftContext.setCommitIndex(replicatedLogEntry1.getIndex());
+                raftContext.setLastApplied(replicatedLogEntry1.getIndex());
 
-                    // Apply the state immediately.
-                    self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self());
+                // Apply the state immediately.
+                self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self());
 
-                    // Send a ApplyJournalEntries message so that we write the fact that we applied
-                    // the state to durable storage
-                    self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
+                // Send a ApplyJournalEntries message so that we write the fact that we applied
+                // the state to durable storage
+                self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self());
 
-                } else if (clientActor != null) {
-                    context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
+            } else if (clientActor != null) {
+                context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1);
 
-                    // Send message for replication
-                    getCurrentBehavior().handleMessage(getSelf(),
-                            new Replicate(clientActor, identifier, replicatedLogEntry));
-                }
+                // Send message for replication
+                getCurrentBehavior().handleMessage(getSelf(),
+                        new Replicate(clientActor, identifier, replicatedLogEntry1));
             }
         });
     }
@@ -724,8 +720,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * @param data        A piece of data that was persisted by the persistData call.
      *                    This should NEVER be null.
      */
-    protected abstract void applyState(ActorRef clientActor, String identifier,
-        Object data);
+    protected abstract void applyState(ActorRef clientActor, Identifier identifier, Object data);
 
     /**
      * Returns the RaftActorRecoveryCohort to participate in persistence recovery.
index 46fe6269e37a907db36fe3d0ca7a5b9b9b52bd58..4cad0d0519c2d32eaeacfcda92e2b83559ab9922 100644 (file)
@@ -35,6 +35,8 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.UUIDIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
@@ -530,15 +532,15 @@ class RaftActorServerConfigurationSupport {
     private static abstract class ServerOperationContext<T> {
         private final T operation;
         private final ActorRef clientRequestor;
-        private final String contextId;
+        private final Identifier contextId;
 
         ServerOperationContext(T operation, ActorRef clientRequestor){
             this.operation = operation;
             this.clientRequestor = clientRequestor;
-            contextId = UUID.randomUUID().toString();
+            contextId = new UUIDIdentifier(UUID.randomUUID());
         }
 
-        String getContextId() {
+        Identifier getContextId() {
             return contextId;
         }
 
index 9477eb2c52a9f90d6f082d6d81747f99be44d7bd..7463a93cbd54f2a6bd23b1f9c8ea1b81a301f016 100644 (file)
@@ -11,15 +11,15 @@ package org.opendaylight.controller.cluster.raft.base.messages;
 import akka.actor.ActorRef;
 import java.io.Serializable;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.yangtools.concepts.Identifier;
 
 public class ApplyState implements Serializable {
     private static final long serialVersionUID = 1L;
     private final ActorRef clientActor;
-    private final String identifier;
+    private final Identifier identifier;
     private final ReplicatedLogEntry replicatedLogEntry;
 
-    public ApplyState(ActorRef clientActor, String identifier,
-        ReplicatedLogEntry replicatedLogEntry) {
+    public ApplyState(ActorRef clientActor, Identifier identifier, ReplicatedLogEntry replicatedLogEntry) {
         this.clientActor = clientActor;
         this.identifier = identifier;
         this.replicatedLogEntry = replicatedLogEntry;
@@ -29,7 +29,7 @@ public class ApplyState implements Serializable {
         return clientActor;
     }
 
-    public String getIdentifier() {
+    public Identifier getIdentifier() {
         return identifier;
     }
 
index 611c6eaf1f38ba9c8e2c43fdfa8b5325d9f56760..a5381d5f22c06e8b539aba3f21f88a07629931d0 100644 (file)
@@ -9,18 +9,17 @@
 package org.opendaylight.controller.cluster.raft.base.messages;
 
 import akka.actor.ActorRef;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-
 import java.io.Serializable;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.yangtools.concepts.Identifier;
 
 public class Replicate implements Serializable {
     private static final long serialVersionUID = 1L;
     private final ActorRef clientActor;
-    private final String identifier;
+    private final Identifier identifier;
     private final ReplicatedLogEntry replicatedLogEntry;
 
-    public Replicate(ActorRef clientActor, String identifier,
-        ReplicatedLogEntry replicatedLogEntry) {
+    public Replicate(ActorRef clientActor, Identifier identifier, ReplicatedLogEntry replicatedLogEntry) {
 
         this.clientActor = clientActor;
         this.identifier = identifier;
@@ -31,7 +30,7 @@ public class Replicate implements Serializable {
         return clientActor;
     }
 
-    public String getIdentifier() {
+    public Identifier getIdentifier() {
         return identifier;
     }
 
index ab09ba3967955bad54139dd92c7c6bc977714d2c..106678e4742a666617a280f79399f91c73286126 100644 (file)
@@ -25,6 +25,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.slf4j.Logger;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -363,30 +364,28 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     protected void applyLogToStateMachine(final long index) {
         long newLastApplied = context.getLastApplied();
         // Now maybe we apply to the state machine
-        for (long i = context.getLastApplied() + 1;
-             i < index + 1; i++) {
-            ActorRef clientActor = null;
-            String identifier = null;
-            ClientRequestTracker tracker = removeClientRequestTracker(i);
-
+        for (long i = context.getLastApplied() + 1; i < index + 1; i++) {
+            final ActorRef clientActor;
+            final Identifier identifier;
+            final ClientRequestTracker tracker = removeClientRequestTracker(i);
             if (tracker != null) {
                 clientActor = tracker.getClientActor();
                 identifier = tracker.getIdentifier();
+            } else {
+                clientActor = null;
+                identifier = null;
             }
-            ReplicatedLogEntry replicatedLogEntry =
-                context.getReplicatedLog().get(i);
 
+            ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(i);
             if (replicatedLogEntry != null) {
                 // Send a local message to the local RaftActor (it's derived class to be
                 // specific to apply the log to it's index)
-                actor().tell(new ApplyState(clientActor, identifier,
-                    replicatedLogEntry), actor());
+                actor().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), actor());
                 newLastApplied = i;
             } else {
                 //if one index is not present in the log, no point in looping
                 // around as the rest wont be present either
-                LOG.warn(
-                        "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
+                LOG.warn("{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
                         logName(), i, i, index);
                 break;
             }
index b30412fc40810d5fc04e833ce4a21423c1533a29..ab44654b6fcefcee29164a7b772f91b971191286 100644 (file)
@@ -16,7 +16,6 @@ import akka.actor.Terminated;
 import akka.dispatch.Dispatchers;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.ArrayList;
@@ -36,6 +35,8 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
@@ -91,7 +92,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         public void handleCommand(Object message) {
             if(message instanceof MockPayload) {
                 MockPayload payload = (MockPayload)message;
-                super.persistData(collectorActor, payload.toString(), payload);
+                super.persistData(collectorActor, new StringIdentifier(payload.toString()), payload);
                 return;
             }
 
@@ -246,12 +247,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     }
 
     protected void verifyApplyJournalEntries(ActorRef actor, final long expIndex) {
-        MessageCollectorActor.expectFirstMatching(actor, ApplyJournalEntries.class, new Predicate<ApplyJournalEntries>() {
-            @Override
-            public boolean apply(ApplyJournalEntries msg) {
-                return msg.getToIndex() == expIndex;
-            }
-        });
+        MessageCollectorActor.expectFirstMatching(actor, ApplyJournalEntries.class, msg -> msg.getToIndex() == expIndex);
     }
 
     @SuppressWarnings("unchecked")
@@ -300,7 +296,9 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     protected void verifyApplyState(ApplyState applyState, ActorRef expClientActor,
             String expId, long expTerm, long expIndex, MockPayload payload) {
         assertEquals("ApplyState getClientActor", expClientActor, applyState.getClientActor());
-        assertEquals("ApplyState getIdentifier", expId, applyState.getIdentifier());
+
+        final Identifier id = expId == null ? null : new StringIdentifier(expId);
+        assertEquals("ApplyState getIdentifier", id, applyState.getIdentifier());
         ReplicatedLogEntry replicatedLogEntry = applyState.getReplicatedLogEntry();
         verifyReplicatedLogEntry(replicatedLogEntry, expTerm, expIndex, payload);
     }
index c96ab9a245d5d0cb40cb2bbfb4fd83bbab1c744b..411b23d67290a309e793028fe8ef612d4864f6a4 100644 (file)
@@ -28,6 +28,7 @@ import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
 
 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
     public static final short PAYLOAD_VERSION = 5;
@@ -114,7 +115,8 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
     }
 
 
-    @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+    @Override
+    protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
         actorDelegate.applyState(clientActor, identifier, data);
         LOG.info("{}: applyState called: {}", persistenceId(), data);
 
index 671d875b393f328f3a1003d4f91ef1d686f92884..319970ef5846f0be66fbfcfabb936057e410f9f1 100644 (file)
@@ -86,6 +86,8 @@ import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolic
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
@@ -443,9 +445,10 @@ public class RaftActorTest extends AbstractActorTest {
                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
                         new MockRaftActorContext.MockPayload("F"));
 
-                mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
+                final Identifier id = new StringIdentifier("apply-state");
+                mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry));
 
-                verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
+                verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject());
 
             }
         };
@@ -949,7 +952,8 @@ public class RaftActorTest extends AbstractActorTest {
             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
             // Persist another entry (this will cause a CaptureSnapshot to be triggered
-            leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
+            leaderActor.persistData(mockActorRef, new StringIdentifier("x"),
+                new MockRaftActorContext.MockPayload("duh"));
 
             // Now send a CaptureSnapshotReply
             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
index 1f42af7b4511159b89a4e2a2b441042ac8a2d20e..0937f65565ccb56a2c8cc36c39780f2bc8ef2dc8 100644 (file)
@@ -63,6 +63,8 @@ import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import scala.concurrent.duration.FiniteDuration;
 
 public class LeaderTest extends AbstractLeaderTest<Leader> {
@@ -514,8 +516,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         actorContext.getReplicatedLog().append(newEntry);
 
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
-                new Replicate(leaderActor, "state-id", newEntry));
+        final Identifier id = new StringIdentifier("state-id");
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
 
         // State should not change
         assertTrue(raftBehavior instanceof Leader);
@@ -536,7 +538,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         ApplyState last = applyStateList.get((int) newLogIndex - 1);
         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
-        assertEquals("getIdentifier", "state-id", last.getIdentifier());
+        assertEquals("getIdentifier", id, last.getIdentifier());
     }
 
     @Test
@@ -652,7 +654,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
         RaftActorBehavior raftBehavior = leader.handleMessage(
-                leaderActor, new Replicate(null, "state-id", entry));
+                leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry));
 
         assertTrue(raftBehavior instanceof Leader);
 
@@ -697,7 +699,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         //update follower timestamp
         leader.markFollowerActive(FOLLOWER_ID);
 
-        leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+        leader.handleMessage(leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry));
 
         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
 
@@ -710,7 +712,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals(2, cs.getLastTerm());
 
         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
-        leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+        leader.handleMessage(leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry));
 
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
@@ -771,8 +773,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals(4, cs.getLastIndex());
         assertEquals(2, cs.getLastTerm());
 
-        // if an initiate is started again when first is in progress, it shouldnt initiate Capture
-        leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+        // if an initiate is started again when first is in progress, it should not initiate Capture
+        leader.handleMessage(leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry));
 
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
index 0fa67880be4002083ae39bcb4c16153493a85d6f..d13ecd2498c509a226eef917762da1afbd5f5396 100644 (file)
@@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -34,7 +35,7 @@ final class CohortEntry {
 
     private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
     private final ReadWriteShardDataTreeTransaction transaction;
-    private final String transactionID;
+    private final StringIdentifier transactionID;
     private final CompositeDataTreeCohort userCohorts;
     private final short clientVersion;
 
@@ -49,14 +50,14 @@ final class CohortEntry {
     CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction,
             DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
         this.transaction = Preconditions.checkNotNull(transaction);
-        this.transactionID = transactionID;
+        this.transactionID = new StringIdentifier(transactionID);
         this.clientVersion = clientVersion;
         this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
     }
 
     CohortEntry(String transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
             SchemaContext schema, short clientVersion) {
-        this.transactionID = transactionID;
+        this.transactionID = new StringIdentifier(transactionID);
         this.cohort = cohort;
         this.transaction = null;
         this.clientVersion = clientVersion;
@@ -68,7 +69,7 @@ final class CohortEntry {
         lastAccessTimer.start();
     }
 
-    String getTransactionID() {
+    StringIdentifier getTransactionID() {
         return transactionID;
     }
 
index 7165581d8103fc64148e745e477bc1ec7f2278d6..94753e553d760c5787055682b9c70f8e8b0e813b 100644 (file)
@@ -66,6 +66,8 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
@@ -330,16 +332,16 @@ public class Shard extends RaftActor {
         if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
             applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
         } else {
-            Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
+            persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
                     DataTreeCandidatePayload.create(candidate));
         }
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {
         if (isLeader()) {
-        if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
-            shardMBean.incrementFailedTransactionsCount();
-        }
+            if(!commitCoordinator.handleCommit(new StringIdentifier(commit.getTransactionID()), getSender(), this)) {
+                shardMBean.incrementFailedTransactionsCount();
+            }
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
@@ -352,7 +354,8 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final String transactionID, @Nonnull final CohortEntry cohortEntry) {
+    private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final Identifier transactionID,
+            @Nonnull final CohortEntry cohortEntry) {
         LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
 
         try {
@@ -394,7 +397,7 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
+    private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull Identifier transactionID) {
         // With persistence enabled, this method is called via applyState by the leader strategy
         // after the commit has been replicated to a majority of the followers.
 
@@ -435,7 +438,7 @@ public class Shard extends RaftActor {
         LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
 
         if (isLeader()) {
-        commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+        commitCoordinator.handleCanCommit(new StringIdentifier(canCommit.getTransactionID()), getSender(), this);
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
@@ -566,7 +569,7 @@ public class Shard extends RaftActor {
     }
 
     void doAbortTransaction(final String transactionID, final ActorRef sender) {
-        commitCoordinator.handleAbort(transactionID, sender, this);
+        commitCoordinator.handleAbort(new StringIdentifier(transactionID), sender, this);
     }
 
     private void handleCreateTransaction(final Object message) {
@@ -683,7 +686,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
+    protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
         if (data instanceof DataTreeCandidatePayload) {
             if (clientActor == null) {
                 // No clientActor indicates a replica coming from the leader
@@ -711,7 +714,7 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
+    private void applyModificationToState(ActorRef clientActor, Identifier identifier, Object modification) {
         if(modification == null) {
             LOG.error(
                     "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
index f313329c7070a1fd582bf045888321493ee4074a..3451934e25109aae47ef9b6612b47551338ab447 100644 (file)
@@ -34,6 +34,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 
@@ -46,10 +48,10 @@ final class ShardCommitCoordinator {
 
     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
     public interface CohortDecorator {
-        ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
+        ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual);
     }
 
-    private final Map<String, CohortEntry> cohortCache = new HashMap<>();
+    private final Map<Identifier, CohortEntry> cohortCache = new HashMap<>();
 
     private CohortEntry currentCohortEntry;
 
@@ -143,7 +145,7 @@ final class ShardCommitCoordinator {
 
         final ShardDataTreeCohort cohort = ready.getTransaction().ready();
         final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion());
-        cohortCache.put(ready.getTransactionID(), cohortEntry);
+        cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
 
         if(!queueCohortEntry(cohortEntry, sender, shard)) {
             return;
@@ -172,12 +174,12 @@ final class ShardCommitCoordinator {
      * @param shard the transaction's shard actor
      */
     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard, SchemaContext schema) {
-        CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
+        CohortEntry cohortEntry = cohortCache.get(new StringIdentifier(batched.getTransactionID()));
         if(cohortEntry == null) {
             cohortEntry = new CohortEntry(batched.getTransactionID(),
                     dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()),
                     cohortRegistry, schema,  batched.getVersion());
-            cohortCache.put(batched.getTransactionID(), cohortEntry);
+            cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
         }
 
         if(log.isDebugEnabled()) {
@@ -238,7 +240,7 @@ final class ShardCommitCoordinator {
                 message.getTransactionID());
         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema,
                 DataStoreVersions.CURRENT_VERSION);
-        cohortCache.put(message.getTransactionID(), cohortEntry);
+        cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
 
         if(!queueCohortEntry(cohortEntry, sender, shard)) {
@@ -258,7 +260,7 @@ final class ShardCommitCoordinator {
 
     Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
             final int maxModificationsPerBatch) {
-        CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
+        CohortEntry cohortEntry = getAndRemoveCohortEntry(new StringIdentifier(from.getTransactionID()));
         if(cohortEntry == null || cohortEntry.getTransaction() == null) {
             return Collections.singletonList(from);
         }
@@ -287,8 +289,6 @@ final class ShardCommitCoordinator {
     }
 
     private void handleCanCommit(CohortEntry cohortEntry) {
-        String transactionID = cohortEntry.getTransactionID();
-
         cohortEntry.updateLastAccessTime();
 
         if(currentCohortEntry != null) {
@@ -297,7 +297,7 @@ final class ShardCommitCoordinator {
 
             if(log.isDebugEnabled()) {
                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
-                        name, currentCohortEntry.getTransactionID(), transactionID);
+                        name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID());
             }
 
             return;
@@ -313,7 +313,7 @@ final class ShardCommitCoordinator {
             if(log.isDebugEnabled()) {
                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
                         queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
-                                transactionID);
+                                cohortEntry.getTransactionID());
             }
         }
     }
@@ -325,7 +325,7 @@ final class ShardCommitCoordinator {
      * @param sender the actor to which to send the response
      * @param shard the transaction's shard actor
      */
-    void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
+    void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) {
         // Lookup the cohort entry that was cached previously (or should have been) by
         // transactionReady (via the ForwardedReadyTransaction message).
         final CohortEntry cohortEntry = cohortCache.get(transactionID);
@@ -419,7 +419,7 @@ final class ShardCommitCoordinator {
      * @param shard the transaction's shard actor
      * @return true if the transaction was successfully prepared, false otherwise.
      */
-    boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
+    boolean handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
         // this transaction.
         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
@@ -438,7 +438,7 @@ final class ShardCommitCoordinator {
         return doCommit(cohortEntry);
     }
 
-    void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
+    void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) {
         CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
         if(cohortEntry != null) {
             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
@@ -539,7 +539,7 @@ final class ShardCommitCoordinator {
                 protected BatchedModifications getModifications() {
                     if(newModifications.isEmpty() ||
                             newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
-                        newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
+                        newModifications.add(new BatchedModifications(cohortEntry.getTransactionID().getString(),
                                 cohortEntry.getClientVersion(), ""));
         }
 
@@ -555,12 +555,12 @@ final class ShardCommitCoordinator {
                 messages.addAll(newModifications);
 
                 if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.CAN_COMMITTED) {
-                    messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
+                    messages.add(new CanCommitTransaction(cohortEntry.getTransactionID().getString(),
                             cohortEntry.getClientVersion()));
                 }
 
                 if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.PRE_COMMITTED) {
-                    messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
+                    messages.add(new CommitTransaction(cohortEntry.getTransactionID().getString(),
                             cohortEntry.getClientVersion()));
                 }
             }
@@ -577,7 +577,7 @@ final class ShardCommitCoordinator {
      * @return the current CohortEntry or null if the given transaction ID does not match the
      *         current entry.
      */
-    CohortEntry getCohortEntryIfCurrent(String transactionID) {
+    CohortEntry getCohortEntryIfCurrent(Identifier transactionID) {
         if(isCurrentTransaction(transactionID)) {
             return currentCohortEntry;
         }
@@ -589,11 +589,11 @@ final class ShardCommitCoordinator {
         return currentCohortEntry;
     }
 
-    CohortEntry getAndRemoveCohortEntry(String transactionID) {
+    CohortEntry getAndRemoveCohortEntry(Identifier transactionID) {
         return cohortCache.remove(transactionID);
     }
 
-    boolean isCurrentTransaction(String transactionID) {
+    boolean isCurrentTransaction(Identifier transactionID) {
         return currentCohortEntry != null &&
                 currentCohortEntry.getTransactionID().equals(transactionID);
     }
@@ -607,7 +607,7 @@ final class ShardCommitCoordinator {
      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
      *        the cache.
      */
-    void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
+    void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) {
         if(removeCohortEntry) {
             cohortCache.remove(transactionID);
         }
index 618ea90e265c33b13c3df266b6ab2f38bb43c52b..210d7f475a7c782ec3bd37e121117aace38579ac 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataCh
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -176,7 +177,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return new SimpleEntry<>(reg, readCurrentData());
     }
 
-    void applyForeignCandidate(final String identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
+    void applyForeignCandidate(final Identifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
         LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
 
         final DataTreeModification mod = dataTree.takeSnapshot().newModification();
index 163c15baf6c09f23d3b11a6fc7ccf243a04f2871..76aa1cbcad5aa2adb872e2432cbcc3572a9877a8 100644 (file)
@@ -56,6 +56,7 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
@@ -313,7 +314,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{
     protected void setupCohortDecorator(Shard shard, final ShardDataTreeCohort cohort) {
         shard.getCommitCoordinator().setCohortDecorator(new ShardCommitCoordinator.CohortDecorator() {
             @Override
-            public ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual) {
+            public ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual) {
                 return cohort;
             }
         });
index ade5b311f2f3691248faa6ecdf8311fa6d07ae70..e056e60888aee4caa7fe9b165bddab5cafe41db3 100644 (file)
@@ -20,7 +20,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
-
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
@@ -102,6 +101,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
@@ -446,8 +446,8 @@ public class ShardTest extends AbstractShardTest {
         writeMod.write(TestModel.TEST_PATH, node);
         writeMod.ready();
 
-        final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
-                payloadForModification(source, writeMod)));
+        final ApplyState applyState = new ApplyState(null, new StringIdentifier("test"),
+            new ReplicatedLogImplEntry(1, 2, payloadForModification(source, writeMod)));
 
         shard.tell(applyState, shard);
 
@@ -679,15 +679,12 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
 
             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
-            final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
-                    if(mockCohort.get() == null) {
-                        mockCohort.set(createDelegatingMockCohort("cohort", actual));
-                    }
-
-                    return mockCohort.get();
+            final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> {
+                if(mockCohort.get() == null) {
+                    mockCohort.set(createDelegatingMockCohort("cohort", actual));
                 }
+
+                return mockCohort.get();
             };
 
             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
@@ -745,15 +742,12 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
 
             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
-            final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
-                    if(mockCohort.get() == null) {
-                        mockCohort.set(createDelegatingMockCohort("cohort", actual));
-                    }
-
-                    return mockCohort.get();
+            final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> {
+                if(mockCohort.get() == null) {
+                    mockCohort.set(createDelegatingMockCohort("cohort", actual));
                 }
+
+                return mockCohort.get();
             };
 
             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
@@ -1614,23 +1608,20 @@ public class ShardTest extends AbstractShardTest {
 
             final String transactionID = "tx1";
             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
-                          new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
-                @Override
-                public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
-                    final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
-
-                    // Simulate an AbortTransaction message occurring during replication, after
-                    // persisting and before finishing the commit to the in-memory store.
-                    // We have no followers so due to optimizations in the RaftActor, it does not
-                    // attempt replication and thus we can't send an AbortTransaction message b/c
-                    // it would be processed too late after CommitTransaction completes. So we'll
-                    // simulate an AbortTransaction message occurring during replication by calling
-                    // the shard directly.
-                    //
-                    shard.underlyingActor().doAbortTransaction(transactionID, null);
-
-                    return preCommitFuture;
-                }
+                          cohort -> {
+                final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
+
+                // Simulate an AbortTransaction message occurring during replication, after
+                // persisting and before finishing the commit to the in-memory store.
+                // We have no followers so due to optimizations in the RaftActor, it does not
+                // attempt replication and thus we can't send an AbortTransaction message b/c
+                // it would be processed too late after CommitTransaction completes. So we'll
+                // simulate an AbortTransaction message occurring during replication by calling
+                // the shard directly.
+                //
+                shard.underlyingActor().doAbortTransaction(transactionID, null);
+
+                return preCommitFuture;
             };
 
             final MutableCompositeModification modification = new MutableCompositeModification();
@@ -1996,20 +1987,15 @@ public class ShardTest extends AbstractShardTest {
         new ShardTestKit(getSystem()) {{
             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
             @SuppressWarnings("serial")
-            final Creator<Shard> creator = new Creator<Shard>() {
+            final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
                 @Override
-                public Shard create() throws Exception {
-                    return new Shard(newShardBuilder()) {
-                        @Override
-                        public void handleCommand(final Object message) {
-                            super.handleCommand(message);
-                            if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
-                                if(cleaupCheckLatch.get() != null) {
-                                    cleaupCheckLatch.get().countDown();
-                                }
-                            }
+                public void handleCommand(final Object message) {
+                    super.handleCommand(message);
+                    if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
+                        if(cleaupCheckLatch.get() != null) {
+                            cleaupCheckLatch.get().countDown();
                         }
-                    };
+                    }
                 }
             };
 
@@ -2112,12 +2098,7 @@ public class ShardTest extends AbstractShardTest {
                 }
             }
 
-            final Creator<Shard> creator = new Creator<Shard>() {
-                @Override
-                public Shard create() throws Exception {
-                    return new TestShard(newShardBuilder());
-                }
-            };
+            final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
 
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
index c9797b5414589018b83baeffeb307b5fec81585f..79beb15a5abb4e3f43de89f9e7e77bb6e4c129ed 100644 (file)
@@ -35,6 +35,7 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
@@ -106,7 +107,8 @@ public class PreLithiumShardTest extends AbstractShardTest {
 
             NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-            ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
+            ApplyState applyState = new ApplyState(null, new StringIdentifier("test"),
+                new ReplicatedLogImplEntry(1, 2,
                     newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
 
             shard.underlyingActor().onReceiveCommand(applyState);