From: Robert Varga Date: Tue, 10 May 2016 12:16:27 +0000 (+0200) Subject: BUG-5626: use Identifier instead of String X-Git-Tag: release/boron~187 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=348a37f613ef444b10a0e65b400390396552fc48 BUG-5626: use Identifier instead of String Using a String for identifying replication entries is not flexible enough. Yangtools has an Identifier concept and utility classes that make this more flexible. Use these so we can evolve the identifiers we use to track requests. Change-Id: Ie5794d1e929300928c57cbec6d2fe22329fe6a5e Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 87dede1086..ff57bfd1d5 100644 --- a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -33,6 +33,8 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.util.StringIdentifier; /** * A sample actor showing how the RaftActor is to be extended @@ -61,8 +63,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, protected void handleNonRaftCommand(Object message) { if(message instanceof KeyValue){ if(isLeader()) { - String persistId = Long.toString(persistIdentifier++); - persistData(getSender(), persistId, (Payload) message); + persistData(getSender(), new StringIdentifier(String.valueOf(persistIdentifier++)), (Payload) message); } else { if(getLeader() != null) { getLeader().forward(message, getContext()); @@ -111,8 +112,8 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, return roleChangeNotifier; } - @Override protected void applyState(final ActorRef clientActor, final String identifier, - final Object data) { + @Override + protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { if(data instanceof KeyValue){ KeyValue kv = (KeyValue) data; state.put(kv.getKey(), kv.getValue()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java index af6f1d453a..afe680cc21 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; +import org.opendaylight.yangtools.concepts.Identifier; public interface ClientRequestTracker { /** @@ -22,7 +23,7 @@ public interface ClientRequestTracker { * @return the identifier of the object that is to be replicated. For example a transaction identifier in the case * of a transaction */ - String getIdentifier(); + Identifier getIdentifier(); /** * diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTrackerImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTrackerImpl.java index 15de6d01a7..6ffb9228dc 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTrackerImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTrackerImpl.java @@ -9,15 +9,15 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; +import org.opendaylight.yangtools.concepts.Identifier; public class ClientRequestTrackerImpl implements ClientRequestTracker { private final ActorRef clientActor; - private final String identifier; + private final Identifier identifier; private final long logIndex; - public ClientRequestTrackerImpl(ActorRef clientActor, String identifier, - long logIndex) { + public ClientRequestTrackerImpl(ActorRef clientActor, Identifier identifier, long logIndex) { this.clientActor = clientActor; @@ -26,15 +26,18 @@ public class ClientRequestTrackerImpl implements ClientRequestTracker { this.logIndex = logIndex; } - @Override public ActorRef getClientActor() { + @Override + public ActorRef getClientActor() { return clientActor; } - @Override public long getIndex() { + @Override + public long getIndex() { return logIndex; } - public String getIdentifier() { + @Override + public Identifier getIdentifier() { return identifier; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 47c8db6006..350fa54c83 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -12,7 +12,6 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; -import akka.japi.Procedure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -52,6 +51,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftS import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.Immutable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -512,8 +512,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * @param identifier * @param data */ - protected void persistData(final ActorRef clientActor, final String identifier, - final Payload data) { + protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) { ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry( context.getReplicatedLog().lastIndex() + 1, @@ -525,28 +524,25 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { final RaftActorContext raftContext = getRaftActorContext(); - replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure() { - @Override - public void apply(ReplicatedLogEntry replicatedLogEntry) { - if (!hasFollowers()){ - // Increment the Commit Index and the Last Applied values - raftContext.setCommitIndex(replicatedLogEntry.getIndex()); - raftContext.setLastApplied(replicatedLogEntry.getIndex()); + replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> { + if (!hasFollowers()){ + // Increment the Commit Index and the Last Applied values + raftContext.setCommitIndex(replicatedLogEntry1.getIndex()); + raftContext.setLastApplied(replicatedLogEntry1.getIndex()); - // Apply the state immediately. - self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self()); + // Apply the state immediately. + self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self()); - // Send a ApplyJournalEntries message so that we write the fact that we applied - // the state to durable storage - self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self()); + // Send a ApplyJournalEntries message so that we write the fact that we applied + // the state to durable storage + self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self()); - } else if (clientActor != null) { - context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry); + } else if (clientActor != null) { + context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1); - // Send message for replication - getCurrentBehavior().handleMessage(getSelf(), - new Replicate(clientActor, identifier, replicatedLogEntry)); - } + // Send message for replication + getCurrentBehavior().handleMessage(getSelf(), + new Replicate(clientActor, identifier, replicatedLogEntry1)); } }); } @@ -724,8 +720,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * @param data A piece of data that was persisted by the persistData call. * This should NEVER be null. */ - protected abstract void applyState(ActorRef clientActor, String identifier, - Object data); + protected abstract void applyState(ActorRef clientActor, Identifier identifier, Object data); /** * Returns the RaftActorRecoveryCohort to participate in persistence recovery. diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index 46fe6269e3..4cad0d0519 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -35,6 +35,8 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.util.UUIDIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; @@ -530,15 +532,15 @@ class RaftActorServerConfigurationSupport { private static abstract class ServerOperationContext { private final T operation; private final ActorRef clientRequestor; - private final String contextId; + private final Identifier contextId; ServerOperationContext(T operation, ActorRef clientRequestor){ this.operation = operation; this.clientRequestor = clientRequestor; - contextId = UUID.randomUUID().toString(); + contextId = new UUIDIdentifier(UUID.randomUUID()); } - String getContextId() { + Identifier getContextId() { return contextId; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyState.java index 9477eb2c52..7463a93cbd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyState.java @@ -11,15 +11,15 @@ package org.opendaylight.controller.cluster.raft.base.messages; import akka.actor.ActorRef; import java.io.Serializable; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.yangtools.concepts.Identifier; public class ApplyState implements Serializable { private static final long serialVersionUID = 1L; private final ActorRef clientActor; - private final String identifier; + private final Identifier identifier; private final ReplicatedLogEntry replicatedLogEntry; - public ApplyState(ActorRef clientActor, String identifier, - ReplicatedLogEntry replicatedLogEntry) { + public ApplyState(ActorRef clientActor, Identifier identifier, ReplicatedLogEntry replicatedLogEntry) { this.clientActor = clientActor; this.identifier = identifier; this.replicatedLogEntry = replicatedLogEntry; @@ -29,7 +29,7 @@ public class ApplyState implements Serializable { return clientActor; } - public String getIdentifier() { + public Identifier getIdentifier() { return identifier; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/Replicate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/Replicate.java index 611c6eaf1f..a5381d5f22 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/Replicate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/Replicate.java @@ -9,18 +9,17 @@ package org.opendaylight.controller.cluster.raft.base.messages; import akka.actor.ActorRef; -import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; - import java.io.Serializable; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.yangtools.concepts.Identifier; public class Replicate implements Serializable { private static final long serialVersionUID = 1L; private final ActorRef clientActor; - private final String identifier; + private final Identifier identifier; private final ReplicatedLogEntry replicatedLogEntry; - public Replicate(ActorRef clientActor, String identifier, - ReplicatedLogEntry replicatedLogEntry) { + public Replicate(ActorRef clientActor, Identifier identifier, ReplicatedLogEntry replicatedLogEntry) { this.clientActor = clientActor; this.identifier = identifier; @@ -31,7 +30,7 @@ public class Replicate implements Serializable { return clientActor; } - public String getIdentifier() { + public Identifier getIdentifier() { return identifier; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index ab09ba3967..106678e474 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -25,6 +25,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.yangtools.concepts.Identifier; import org.slf4j.Logger; import scala.concurrent.duration.FiniteDuration; @@ -363,30 +364,28 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected void applyLogToStateMachine(final long index) { long newLastApplied = context.getLastApplied(); // Now maybe we apply to the state machine - for (long i = context.getLastApplied() + 1; - i < index + 1; i++) { - ActorRef clientActor = null; - String identifier = null; - ClientRequestTracker tracker = removeClientRequestTracker(i); - + for (long i = context.getLastApplied() + 1; i < index + 1; i++) { + final ActorRef clientActor; + final Identifier identifier; + final ClientRequestTracker tracker = removeClientRequestTracker(i); if (tracker != null) { clientActor = tracker.getClientActor(); identifier = tracker.getIdentifier(); + } else { + clientActor = null; + identifier = null; } - ReplicatedLogEntry replicatedLogEntry = - context.getReplicatedLog().get(i); + ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(i); if (replicatedLogEntry != null) { // Send a local message to the local RaftActor (it's derived class to be // specific to apply the log to it's index) - actor().tell(new ApplyState(clientActor, identifier, - replicatedLogEntry), actor()); + actor().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), actor()); newLastApplied = i; } else { //if one index is not present in the log, no point in looping // around as the rest wont be present either - LOG.warn( - "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}", + LOG.warn("{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}", logName(), i, i, index); break; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index b30412fc40..ab44654b6f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -16,7 +16,6 @@ import akka.actor.Terminated; import akka.dispatch.Dispatchers; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; -import com.google.common.base.Predicate; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; @@ -36,6 +35,8 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.util.StringIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; @@ -91,7 +92,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest public void handleCommand(Object message) { if(message instanceof MockPayload) { MockPayload payload = (MockPayload)message; - super.persistData(collectorActor, payload.toString(), payload); + super.persistData(collectorActor, new StringIdentifier(payload.toString()), payload); return; } @@ -246,12 +247,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest } protected void verifyApplyJournalEntries(ActorRef actor, final long expIndex) { - MessageCollectorActor.expectFirstMatching(actor, ApplyJournalEntries.class, new Predicate() { - @Override - public boolean apply(ApplyJournalEntries msg) { - return msg.getToIndex() == expIndex; - } - }); + MessageCollectorActor.expectFirstMatching(actor, ApplyJournalEntries.class, msg -> msg.getToIndex() == expIndex); } @SuppressWarnings("unchecked") @@ -300,7 +296,9 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest protected void verifyApplyState(ApplyState applyState, ActorRef expClientActor, String expId, long expTerm, long expIndex, MockPayload payload) { assertEquals("ApplyState getClientActor", expClientActor, applyState.getClientActor()); - assertEquals("ApplyState getIdentifier", expId, applyState.getIdentifier()); + + final Identifier id = expId == null ? null : new StringIdentifier(expId); + assertEquals("ApplyState getIdentifier", id, applyState.getIdentifier()); ReplicatedLogEntry replicatedLogEntry = applyState.getReplicatedLogEntry(); verifyReplicatedLogEntry(replicatedLogEntry, expTerm, expIndex, payload); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java index c96ab9a245..411b23d672 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java @@ -28,6 +28,7 @@ import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.yangtools.concepts.Identifier; public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort { public static final short PAYLOAD_VERSION = 5; @@ -114,7 +115,8 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, } - @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { + @Override + protected void applyState(ActorRef clientActor, Identifier identifier, Object data) { actorDelegate.applyState(clientActor, identifier, data); LOG.info("{}: applyState called: {}", persistenceId(), data); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 671d875b39..319970ef58 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -86,6 +86,8 @@ import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolic import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.util.StringIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; @@ -443,9 +445,10 @@ public class RaftActorTest extends AbstractActorTest { ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F")); - mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry)); + final Identifier id = new StringIdentifier("apply-state"); + mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry)); - verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject()); + verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject()); } }; @@ -949,7 +952,8 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); // Persist another entry (this will cause a CaptureSnapshot to be triggered - leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh")); + leaderActor.persistData(mockActorRef, new StringIdentifier("x"), + new MockRaftActorContext.MockPayload("duh")); // Now send a CaptureSnapshotReply mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 1f42af7b45..0937f65565 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -63,6 +63,8 @@ import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.util.StringIdentifier; import scala.concurrent.duration.FiniteDuration; public class LeaderTest extends AbstractLeaderTest { @@ -514,8 +516,8 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getReplicatedLog().append(newEntry); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, - new Replicate(leaderActor, "state-id", newEntry)); + final Identifier id = new StringIdentifier("state-id"); + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry)); // State should not change assertTrue(raftBehavior instanceof Leader); @@ -536,7 +538,7 @@ public class LeaderTest extends AbstractLeaderTest { ApplyState last = applyStateList.get((int) newLogIndex - 1); assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData()); - assertEquals("getIdentifier", "state-id", last.getIdentifier()); + assertEquals("getIdentifier", id, last.getIdentifier()); } @Test @@ -652,7 +654,7 @@ public class LeaderTest extends AbstractLeaderTest { // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex RaftActorBehavior raftBehavior = leader.handleMessage( - leaderActor, new Replicate(null, "state-id", entry)); + leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry)); assertTrue(raftBehavior instanceof Leader); @@ -697,7 +699,7 @@ public class LeaderTest extends AbstractLeaderTest { //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); - leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); + leader.handleMessage(leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry)); assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); @@ -710,7 +712,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(2, cs.getLastTerm()); // if an initiate is started again when first is in progress, it shouldnt initiate Capture - leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); + leader.handleMessage(leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry)); assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } @@ -771,8 +773,8 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(4, cs.getLastIndex()); assertEquals(2, cs.getLastTerm()); - // if an initiate is started again when first is in progress, it shouldnt initiate Capture - leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); + // if an initiate is started again when first is in progress, it should not initiate Capture + leader.handleMessage(leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry)); assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java index 0fa67880be..d13ecd2498 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java @@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator; import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.yangtools.util.StringIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -34,7 +35,7 @@ final class CohortEntry { private final Stopwatch lastAccessTimer = Stopwatch.createStarted(); private final ReadWriteShardDataTreeTransaction transaction; - private final String transactionID; + private final StringIdentifier transactionID; private final CompositeDataTreeCohort userCohorts; private final short clientVersion; @@ -49,14 +50,14 @@ final class CohortEntry { CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) { this.transaction = Preconditions.checkNotNull(transaction); - this.transactionID = transactionID; + this.transactionID = new StringIdentifier(transactionID); this.clientVersion = clientVersion; this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT); } CohortEntry(String transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) { - this.transactionID = transactionID; + this.transactionID = new StringIdentifier(transactionID); this.cohort = cohort; this.transaction = null; this.clientVersion = clientVersion; @@ -68,7 +69,7 @@ final class CohortEntry { lastAccessTimer.start(); } - String getTransactionID() { + StringIdentifier getTransactionID() { return transactionID; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 7165581d81..94753e553d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -66,6 +66,8 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; +import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.util.StringIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; @@ -330,16 +332,16 @@ public class Shard extends RaftActor { if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) { applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate); } else { - Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), + persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), DataTreeCandidatePayload.create(candidate)); } } private void handleCommitTransaction(final CommitTransaction commit) { if (isLeader()) { - if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) { - shardMBean.incrementFailedTransactionsCount(); - } + if(!commitCoordinator.handleCommit(new StringIdentifier(commit.getTransactionID()), getSender(), this)) { + shardMBean.incrementFailedTransactionsCount(); + } } else { ActorSelection leader = getLeader(); if (leader == null) { @@ -352,7 +354,8 @@ public class Shard extends RaftActor { } } - private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final String transactionID, @Nonnull final CohortEntry cohortEntry) { + private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final Identifier transactionID, + @Nonnull final CohortEntry cohortEntry) { LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID()); try { @@ -394,7 +397,7 @@ public class Shard extends RaftActor { } } - private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) { + private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull Identifier transactionID) { // With persistence enabled, this method is called via applyState by the leader strategy // after the commit has been replicated to a majority of the followers. @@ -435,7 +438,7 @@ public class Shard extends RaftActor { LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID()); if (isLeader()) { - commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this); + commitCoordinator.handleCanCommit(new StringIdentifier(canCommit.getTransactionID()), getSender(), this); } else { ActorSelection leader = getLeader(); if (leader == null) { @@ -566,7 +569,7 @@ public class Shard extends RaftActor { } void doAbortTransaction(final String transactionID, final ActorRef sender) { - commitCoordinator.handleAbort(transactionID, sender, this); + commitCoordinator.handleAbort(new StringIdentifier(transactionID), sender, this); } private void handleCreateTransaction(final Object message) { @@ -683,7 +686,7 @@ public class Shard extends RaftActor { } @Override - protected void applyState(final ActorRef clientActor, final String identifier, final Object data) { + protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { if (data instanceof DataTreeCandidatePayload) { if (clientActor == null) { // No clientActor indicates a replica coming from the leader @@ -711,7 +714,7 @@ public class Shard extends RaftActor { } } - private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) { + private void applyModificationToState(ActorRef clientActor, Identifier identifier, Object modification) { if(modification == null) { LOG.error( "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}", diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index f313329c70..3451934e25 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -34,6 +34,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.util.StringIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -46,10 +48,10 @@ final class ShardCommitCoordinator { // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts. public interface CohortDecorator { - ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual); + ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual); } - private final Map cohortCache = new HashMap<>(); + private final Map cohortCache = new HashMap<>(); private CohortEntry currentCohortEntry; @@ -143,7 +145,7 @@ final class ShardCommitCoordinator { final ShardDataTreeCohort cohort = ready.getTransaction().ready(); final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion()); - cohortCache.put(ready.getTransactionID(), cohortEntry); + cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); if(!queueCohortEntry(cohortEntry, sender, shard)) { return; @@ -172,12 +174,12 @@ final class ShardCommitCoordinator { * @param shard the transaction's shard actor */ void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard, SchemaContext schema) { - CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID()); + CohortEntry cohortEntry = cohortCache.get(new StringIdentifier(batched.getTransactionID())); if(cohortEntry == null) { cohortEntry = new CohortEntry(batched.getTransactionID(), dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()), cohortRegistry, schema, batched.getVersion()); - cohortCache.put(batched.getTransactionID(), cohortEntry); + cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); } if(log.isDebugEnabled()) { @@ -238,7 +240,7 @@ final class ShardCommitCoordinator { message.getTransactionID()); final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema, DataStoreVersions.CURRENT_VERSION); - cohortCache.put(message.getTransactionID(), cohortEntry); + cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); if(!queueCohortEntry(cohortEntry, sender, shard)) { @@ -258,7 +260,7 @@ final class ShardCommitCoordinator { Collection createForwardedBatchedModifications(final BatchedModifications from, final int maxModificationsPerBatch) { - CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID()); + CohortEntry cohortEntry = getAndRemoveCohortEntry(new StringIdentifier(from.getTransactionID())); if(cohortEntry == null || cohortEntry.getTransaction() == null) { return Collections.singletonList(from); } @@ -287,8 +289,6 @@ final class ShardCommitCoordinator { } private void handleCanCommit(CohortEntry cohortEntry) { - String transactionID = cohortEntry.getTransactionID(); - cohortEntry.updateLastAccessTime(); if(currentCohortEntry != null) { @@ -297,7 +297,7 @@ final class ShardCommitCoordinator { if(log.isDebugEnabled()) { log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now", - name, currentCohortEntry.getTransactionID(), transactionID); + name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID()); } return; @@ -313,7 +313,7 @@ final class ShardCommitCoordinator { if(log.isDebugEnabled()) { log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name, queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???", - transactionID); + cohortEntry.getTransactionID()); } } } @@ -325,7 +325,7 @@ final class ShardCommitCoordinator { * @param sender the actor to which to send the response * @param shard the transaction's shard actor */ - void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) { + void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) { // Lookup the cohort entry that was cached previously (or should have been) by // transactionReady (via the ForwardedReadyTransaction message). final CohortEntry cohortEntry = cohortCache.get(transactionID); @@ -419,7 +419,7 @@ final class ShardCommitCoordinator { * @param shard the transaction's shard actor * @return true if the transaction was successfully prepared, false otherwise. */ - boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) { + boolean handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) { // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to // this transaction. final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID); @@ -438,7 +438,7 @@ final class ShardCommitCoordinator { return doCommit(cohortEntry); } - void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) { + void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) { CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID); if(cohortEntry != null) { // We don't remove the cached cohort entry here (ie pass false) in case the Tx was @@ -539,7 +539,7 @@ final class ShardCommitCoordinator { protected BatchedModifications getModifications() { if(newModifications.isEmpty() || newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) { - newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(), + newModifications.add(new BatchedModifications(cohortEntry.getTransactionID().getString(), cohortEntry.getClientVersion(), "")); } @@ -555,12 +555,12 @@ final class ShardCommitCoordinator { messages.addAll(newModifications); if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.CAN_COMMITTED) { - messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(), + messages.add(new CanCommitTransaction(cohortEntry.getTransactionID().getString(), cohortEntry.getClientVersion())); } if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.PRE_COMMITTED) { - messages.add(new CommitTransaction(cohortEntry.getTransactionID(), + messages.add(new CommitTransaction(cohortEntry.getTransactionID().getString(), cohortEntry.getClientVersion())); } } @@ -577,7 +577,7 @@ final class ShardCommitCoordinator { * @return the current CohortEntry or null if the given transaction ID does not match the * current entry. */ - CohortEntry getCohortEntryIfCurrent(String transactionID) { + CohortEntry getCohortEntryIfCurrent(Identifier transactionID) { if(isCurrentTransaction(transactionID)) { return currentCohortEntry; } @@ -589,11 +589,11 @@ final class ShardCommitCoordinator { return currentCohortEntry; } - CohortEntry getAndRemoveCohortEntry(String transactionID) { + CohortEntry getAndRemoveCohortEntry(Identifier transactionID) { return cohortCache.remove(transactionID); } - boolean isCurrentTransaction(String transactionID) { + boolean isCurrentTransaction(Identifier transactionID) { return currentCohortEntry != null && currentCohortEntry.getTransactionID().equals(transactionID); } @@ -607,7 +607,7 @@ final class ShardCommitCoordinator { * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from * the cache. */ - void currentTransactionComplete(String transactionID, boolean removeCohortEntry) { + void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) { if(removeCohortEntry) { cohortCache.remove(transactionID); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 618ea90e26..210d7f475a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -19,6 +19,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataCh import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; +import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -176,7 +177,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return new SimpleEntry<>(reg, readCurrentData()); } - void applyForeignCandidate(final String identifier, final DataTreeCandidate foreign) throws DataValidationFailedException { + void applyForeignCandidate(final Identifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException { LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); final DataTreeModification mod = dataTree.takeSnapshot().newModification(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 163c15baf6..76aa1cbcad 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -56,6 +56,7 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; @@ -313,7 +314,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ protected void setupCohortDecorator(Shard shard, final ShardDataTreeCohort cohort) { shard.getCommitCoordinator().setCohortDecorator(new ShardCommitCoordinator.CohortDecorator() { @Override - public ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual) { + public ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual) { return cohort; } }); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index ade5b311f2..e056e60888 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -20,7 +20,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; - import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Props; @@ -102,6 +101,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.yangtools.util.StringIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; @@ -446,8 +446,8 @@ public class ShardTest extends AbstractShardTest { writeMod.write(TestModel.TEST_PATH, node); writeMod.ready(); - final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, - payloadForModification(source, writeMod))); + final ApplyState applyState = new ApplyState(null, new StringIdentifier("test"), + new ReplicatedLogImplEntry(1, 2, payloadForModification(source, writeMod))); shard.tell(applyState, shard); @@ -679,15 +679,12 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); final AtomicReference mockCohort = new AtomicReference<>(); - final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { - @Override - public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) { - if(mockCohort.get() == null) { - mockCohort.set(createDelegatingMockCohort("cohort", actual)); - } - - return mockCohort.get(); + final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> { + if(mockCohort.get() == null) { + mockCohort.set(createDelegatingMockCohort("cohort", actual)); } + + return mockCohort.get(); }; shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); @@ -745,15 +742,12 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); final AtomicReference mockCohort = new AtomicReference<>(); - final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { - @Override - public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) { - if(mockCohort.get() == null) { - mockCohort.set(createDelegatingMockCohort("cohort", actual)); - } - - return mockCohort.get(); + final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> { + if(mockCohort.get() == null) { + mockCohort.set(createDelegatingMockCohort("cohort", actual)); } + + return mockCohort.get(); }; shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); @@ -1614,23 +1608,20 @@ public class ShardTest extends AbstractShardTest { final String transactionID = "tx1"; final Function> preCommit = - new Function>() { - @Override - public ListenableFuture apply(final ShardDataTreeCohort cohort) { - final ListenableFuture preCommitFuture = cohort.preCommit(); - - // Simulate an AbortTransaction message occurring during replication, after - // persisting and before finishing the commit to the in-memory store. - // We have no followers so due to optimizations in the RaftActor, it does not - // attempt replication and thus we can't send an AbortTransaction message b/c - // it would be processed too late after CommitTransaction completes. So we'll - // simulate an AbortTransaction message occurring during replication by calling - // the shard directly. - // - shard.underlyingActor().doAbortTransaction(transactionID, null); - - return preCommitFuture; - } + cohort -> { + final ListenableFuture preCommitFuture = cohort.preCommit(); + + // Simulate an AbortTransaction message occurring during replication, after + // persisting and before finishing the commit to the in-memory store. + // We have no followers so due to optimizations in the RaftActor, it does not + // attempt replication and thus we can't send an AbortTransaction message b/c + // it would be processed too late after CommitTransaction completes. So we'll + // simulate an AbortTransaction message occurring during replication by calling + // the shard directly. + // + shard.underlyingActor().doAbortTransaction(transactionID, null); + + return preCommitFuture; }; final MutableCompositeModification modification = new MutableCompositeModification(); @@ -1996,20 +1987,15 @@ public class ShardTest extends AbstractShardTest { new ShardTestKit(getSystem()) {{ final AtomicReference cleaupCheckLatch = new AtomicReference<>(); @SuppressWarnings("serial") - final Creator creator = new Creator() { + final Creator creator = () -> new Shard(newShardBuilder()) { @Override - public Shard create() throws Exception { - return new Shard(newShardBuilder()) { - @Override - public void handleCommand(final Object message) { - super.handleCommand(message); - if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) { - if(cleaupCheckLatch.get() != null) { - cleaupCheckLatch.get().countDown(); - } - } + public void handleCommand(final Object message) { + super.handleCommand(message); + if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) { + if(cleaupCheckLatch.get() != null) { + cleaupCheckLatch.get().countDown(); } - }; + } } }; @@ -2112,12 +2098,7 @@ public class ShardTest extends AbstractShardTest { } } - final Creator creator = new Creator() { - @Override - public Shard create() throws Exception { - return new TestShard(newShardBuilder()); - } - }; + final Creator creator = () -> new TestShard(newShardBuilder()); final TestActorRef shard = actorFactory.createTestActor( Props.create(new DelegatingShardCreator(creator)), shardActorName); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java index c9797b5414..79beb15a5a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java @@ -35,6 +35,7 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; +import org.opendaylight.yangtools.util.StringIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; @@ -106,7 +107,8 @@ public class PreLithiumShardTest extends AbstractShardTest { NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, + ApplyState applyState = new ApplyState(null, new StringIdentifier("test"), + new ReplicatedLogImplEntry(1, 2, newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node)))); shard.underlyingActor().onReceiveCommand(applyState);