import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
/**
* A sample actor showing how the RaftActor is to be extended
protected void handleNonRaftCommand(Object message) {
if(message instanceof KeyValue){
if(isLeader()) {
- String persistId = Long.toString(persistIdentifier++);
- persistData(getSender(), persistId, (Payload) message);
+ persistData(getSender(), new StringIdentifier(String.valueOf(persistIdentifier++)), (Payload) message);
} else {
if(getLeader() != null) {
getLeader().forward(message, getContext());
return roleChangeNotifier;
}
- @Override protected void applyState(final ActorRef clientActor, final String identifier,
- final Object data) {
+ @Override
+ protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
if(data instanceof KeyValue){
KeyValue kv = (KeyValue) data;
state.put(kv.getKey(), kv.getValue());
package org.opendaylight.controller.cluster.raft;
import akka.actor.ActorRef;
+import org.opendaylight.yangtools.concepts.Identifier;
public interface ClientRequestTracker {
/**
* @return the identifier of the object that is to be replicated. For example a transaction identifier in the case
* of a transaction
*/
- String getIdentifier();
+ Identifier getIdentifier();
/**
*
package org.opendaylight.controller.cluster.raft;
import akka.actor.ActorRef;
+import org.opendaylight.yangtools.concepts.Identifier;
public class ClientRequestTrackerImpl implements ClientRequestTracker {
private final ActorRef clientActor;
- private final String identifier;
+ private final Identifier identifier;
private final long logIndex;
- public ClientRequestTrackerImpl(ActorRef clientActor, String identifier,
- long logIndex) {
+ public ClientRequestTrackerImpl(ActorRef clientActor, Identifier identifier, long logIndex) {
this.clientActor = clientActor;
this.logIndex = logIndex;
}
- @Override public ActorRef getClientActor() {
+ @Override
+ public ActorRef getClientActor() {
return clientActor;
}
- @Override public long getIndex() {
+ @Override
+ public long getIndex() {
return logIndex;
}
- public String getIdentifier() {
+ @Override
+ public Identifier getIdentifier() {
return identifier;
}
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
-import akka.japi.Procedure;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.concepts.Immutable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* @param identifier
* @param data
*/
- protected void persistData(final ActorRef clientActor, final String identifier,
- final Payload data) {
+ protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) {
ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
context.getReplicatedLog().lastIndex() + 1,
final RaftActorContext raftContext = getRaftActorContext();
- replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
- @Override
- public void apply(ReplicatedLogEntry replicatedLogEntry) {
- if (!hasFollowers()){
- // Increment the Commit Index and the Last Applied values
- raftContext.setCommitIndex(replicatedLogEntry.getIndex());
- raftContext.setLastApplied(replicatedLogEntry.getIndex());
+ replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> {
+ if (!hasFollowers()){
+ // Increment the Commit Index and the Last Applied values
+ raftContext.setCommitIndex(replicatedLogEntry1.getIndex());
+ raftContext.setLastApplied(replicatedLogEntry1.getIndex());
- // Apply the state immediately.
- self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self());
+ // Apply the state immediately.
+ self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self());
- // Send a ApplyJournalEntries message so that we write the fact that we applied
- // the state to durable storage
- self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
+ // Send a ApplyJournalEntries message so that we write the fact that we applied
+ // the state to durable storage
+ self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self());
- } else if (clientActor != null) {
- context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
+ } else if (clientActor != null) {
+ context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1);
- // Send message for replication
- getCurrentBehavior().handleMessage(getSelf(),
- new Replicate(clientActor, identifier, replicatedLogEntry));
- }
+ // Send message for replication
+ getCurrentBehavior().handleMessage(getSelf(),
+ new Replicate(clientActor, identifier, replicatedLogEntry1));
}
});
}
* @param data A piece of data that was persisted by the persistData call.
* This should NEVER be null.
*/
- protected abstract void applyState(ActorRef clientActor, String identifier,
- Object data);
+ protected abstract void applyState(ActorRef clientActor, Identifier identifier, Object data);
/**
* Returns the RaftActorRecoveryCohort to participate in persistence recovery.
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.UUIDIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
private static abstract class ServerOperationContext<T> {
private final T operation;
private final ActorRef clientRequestor;
- private final String contextId;
+ private final Identifier contextId;
ServerOperationContext(T operation, ActorRef clientRequestor){
this.operation = operation;
this.clientRequestor = clientRequestor;
- contextId = UUID.randomUUID().toString();
+ contextId = new UUIDIdentifier(UUID.randomUUID());
}
- String getContextId() {
+ Identifier getContextId() {
return contextId;
}
import akka.actor.ActorRef;
import java.io.Serializable;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.yangtools.concepts.Identifier;
public class ApplyState implements Serializable {
private static final long serialVersionUID = 1L;
private final ActorRef clientActor;
- private final String identifier;
+ private final Identifier identifier;
private final ReplicatedLogEntry replicatedLogEntry;
- public ApplyState(ActorRef clientActor, String identifier,
- ReplicatedLogEntry replicatedLogEntry) {
+ public ApplyState(ActorRef clientActor, Identifier identifier, ReplicatedLogEntry replicatedLogEntry) {
this.clientActor = clientActor;
this.identifier = identifier;
this.replicatedLogEntry = replicatedLogEntry;
return clientActor;
}
- public String getIdentifier() {
+ public Identifier getIdentifier() {
return identifier;
}
package org.opendaylight.controller.cluster.raft.base.messages;
import akka.actor.ActorRef;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-
import java.io.Serializable;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.yangtools.concepts.Identifier;
public class Replicate implements Serializable {
private static final long serialVersionUID = 1L;
private final ActorRef clientActor;
- private final String identifier;
+ private final Identifier identifier;
private final ReplicatedLogEntry replicatedLogEntry;
- public Replicate(ActorRef clientActor, String identifier,
- ReplicatedLogEntry replicatedLogEntry) {
+ public Replicate(ActorRef clientActor, Identifier identifier, ReplicatedLogEntry replicatedLogEntry) {
this.clientActor = clientActor;
this.identifier = identifier;
return clientActor;
}
- public String getIdentifier() {
+ public Identifier getIdentifier() {
return identifier;
}
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.yangtools.concepts.Identifier;
import org.slf4j.Logger;
import scala.concurrent.duration.FiniteDuration;
protected void applyLogToStateMachine(final long index) {
long newLastApplied = context.getLastApplied();
// Now maybe we apply to the state machine
- for (long i = context.getLastApplied() + 1;
- i < index + 1; i++) {
- ActorRef clientActor = null;
- String identifier = null;
- ClientRequestTracker tracker = removeClientRequestTracker(i);
-
+ for (long i = context.getLastApplied() + 1; i < index + 1; i++) {
+ final ActorRef clientActor;
+ final Identifier identifier;
+ final ClientRequestTracker tracker = removeClientRequestTracker(i);
if (tracker != null) {
clientActor = tracker.getClientActor();
identifier = tracker.getIdentifier();
+ } else {
+ clientActor = null;
+ identifier = null;
}
- ReplicatedLogEntry replicatedLogEntry =
- context.getReplicatedLog().get(i);
+ ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(i);
if (replicatedLogEntry != null) {
// Send a local message to the local RaftActor (it's derived class to be
// specific to apply the log to it's index)
- actor().tell(new ApplyState(clientActor, identifier,
- replicatedLogEntry), actor());
+ actor().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), actor());
newLastApplied = i;
} else {
//if one index is not present in the log, no point in looping
// around as the rest wont be present either
- LOG.warn(
- "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
+ LOG.warn("{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
logName(), i, i, index);
break;
}
import akka.dispatch.Dispatchers;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
-import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
public void handleCommand(Object message) {
if(message instanceof MockPayload) {
MockPayload payload = (MockPayload)message;
- super.persistData(collectorActor, payload.toString(), payload);
+ super.persistData(collectorActor, new StringIdentifier(payload.toString()), payload);
return;
}
}
protected void verifyApplyJournalEntries(ActorRef actor, final long expIndex) {
- MessageCollectorActor.expectFirstMatching(actor, ApplyJournalEntries.class, new Predicate<ApplyJournalEntries>() {
- @Override
- public boolean apply(ApplyJournalEntries msg) {
- return msg.getToIndex() == expIndex;
- }
- });
+ MessageCollectorActor.expectFirstMatching(actor, ApplyJournalEntries.class, msg -> msg.getToIndex() == expIndex);
}
@SuppressWarnings("unchecked")
protected void verifyApplyState(ApplyState applyState, ActorRef expClientActor,
String expId, long expTerm, long expIndex, MockPayload payload) {
assertEquals("ApplyState getClientActor", expClientActor, applyState.getClientActor());
- assertEquals("ApplyState getIdentifier", expId, applyState.getIdentifier());
+
+ final Identifier id = expId == null ? null : new StringIdentifier(expId);
+ assertEquals("ApplyState getIdentifier", id, applyState.getIdentifier());
ReplicatedLogEntry replicatedLogEntry = applyState.getReplicatedLogEntry();
verifyReplicatedLogEntry(replicatedLogEntry, expTerm, expIndex, payload);
}
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
public static final short PAYLOAD_VERSION = 5;
}
- @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+ @Override
+ protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
actorDelegate.applyState(clientActor, identifier, data);
LOG.info("{}: applyState called: {}", persistenceId(), data);
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
new MockRaftActorContext.MockPayload("F"));
- mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
+ final Identifier id = new StringIdentifier("apply-state");
+ mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry));
- verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
+ verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject());
}
};
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
// Persist another entry (this will cause a CaptureSnapshot to be triggered
- leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
+ leaderActor.persistData(mockActorRef, new StringIdentifier("x"),
+ new MockRaftActorContext.MockPayload("duh"));
// Now send a CaptureSnapshotReply
mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
import scala.concurrent.duration.FiniteDuration;
public class LeaderTest extends AbstractLeaderTest<Leader> {
actorContext.getReplicatedLog().append(newEntry);
- RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
- new Replicate(leaderActor, "state-id", newEntry));
+ final Identifier id = new StringIdentifier("state-id");
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
// State should not change
assertTrue(raftBehavior instanceof Leader);
ApplyState last = applyStateList.get((int) newLogIndex - 1);
assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
- assertEquals("getIdentifier", "state-id", last.getIdentifier());
+ assertEquals("getIdentifier", id, last.getIdentifier());
}
@Test
// this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
RaftActorBehavior raftBehavior = leader.handleMessage(
- leaderActor, new Replicate(null, "state-id", entry));
+ leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry));
assertTrue(raftBehavior instanceof Leader);
//update follower timestamp
leader.markFollowerActive(FOLLOWER_ID);
- leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+ leader.handleMessage(leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry));
assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
assertEquals(2, cs.getLastTerm());
// if an initiate is started again when first is in progress, it shouldnt initiate Capture
- leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+ leader.handleMessage(leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry));
assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
}
assertEquals(4, cs.getLastIndex());
assertEquals(2, cs.getLastTerm());
- // if an initiate is started again when first is in progress, it shouldnt initiate Capture
- leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+ // if an initiate is started again when first is in progress, it should not initiate Capture
+ leader.handleMessage(leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry));
assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
}
import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.yangtools.util.StringIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
private final ReadWriteShardDataTreeTransaction transaction;
- private final String transactionID;
+ private final StringIdentifier transactionID;
private final CompositeDataTreeCohort userCohorts;
private final short clientVersion;
CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction,
DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
this.transaction = Preconditions.checkNotNull(transaction);
- this.transactionID = transactionID;
+ this.transactionID = new StringIdentifier(transactionID);
this.clientVersion = clientVersion;
this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
}
CohortEntry(String transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
SchemaContext schema, short clientVersion) {
- this.transactionID = transactionID;
+ this.transactionID = new StringIdentifier(transactionID);
this.cohort = cohort;
this.transaction = null;
this.clientVersion = clientVersion;
lastAccessTimer.start();
}
- String getTransactionID() {
+ StringIdentifier getTransactionID() {
return transactionID;
}
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
} else {
- Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
+ persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
DataTreeCandidatePayload.create(candidate));
}
}
private void handleCommitTransaction(final CommitTransaction commit) {
if (isLeader()) {
- if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
- shardMBean.incrementFailedTransactionsCount();
- }
+ if(!commitCoordinator.handleCommit(new StringIdentifier(commit.getTransactionID()), getSender(), this)) {
+ shardMBean.incrementFailedTransactionsCount();
+ }
} else {
ActorSelection leader = getLeader();
if (leader == null) {
}
}
- private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final String transactionID, @Nonnull final CohortEntry cohortEntry) {
+ private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final Identifier transactionID,
+ @Nonnull final CohortEntry cohortEntry) {
LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
try {
}
}
- private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
+ private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull Identifier transactionID) {
// With persistence enabled, this method is called via applyState by the leader strategy
// after the commit has been replicated to a majority of the followers.
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
if (isLeader()) {
- commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+ commitCoordinator.handleCanCommit(new StringIdentifier(canCommit.getTransactionID()), getSender(), this);
} else {
ActorSelection leader = getLeader();
if (leader == null) {
}
void doAbortTransaction(final String transactionID, final ActorRef sender) {
- commitCoordinator.handleAbort(transactionID, sender, this);
+ commitCoordinator.handleAbort(new StringIdentifier(transactionID), sender, this);
}
private void handleCreateTransaction(final Object message) {
}
@Override
- protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
+ protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
if (data instanceof DataTreeCandidatePayload) {
if (clientActor == null) {
// No clientActor indicates a replica coming from the leader
}
}
- private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
+ private void applyModificationToState(ActorRef clientActor, Identifier identifier, Object modification) {
if(modification == null) {
LOG.error(
"{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
// Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
public interface CohortDecorator {
- ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
+ ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual);
}
- private final Map<String, CohortEntry> cohortCache = new HashMap<>();
+ private final Map<Identifier, CohortEntry> cohortCache = new HashMap<>();
private CohortEntry currentCohortEntry;
final ShardDataTreeCohort cohort = ready.getTransaction().ready();
final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion());
- cohortCache.put(ready.getTransactionID(), cohortEntry);
+ cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
if(!queueCohortEntry(cohortEntry, sender, shard)) {
return;
* @param shard the transaction's shard actor
*/
void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard, SchemaContext schema) {
- CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
+ CohortEntry cohortEntry = cohortCache.get(new StringIdentifier(batched.getTransactionID()));
if(cohortEntry == null) {
cohortEntry = new CohortEntry(batched.getTransactionID(),
dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()),
cohortRegistry, schema, batched.getVersion());
- cohortCache.put(batched.getTransactionID(), cohortEntry);
+ cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
}
if(log.isDebugEnabled()) {
message.getTransactionID());
final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema,
DataStoreVersions.CURRENT_VERSION);
- cohortCache.put(message.getTransactionID(), cohortEntry);
+ cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
if(!queueCohortEntry(cohortEntry, sender, shard)) {
Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
final int maxModificationsPerBatch) {
- CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
+ CohortEntry cohortEntry = getAndRemoveCohortEntry(new StringIdentifier(from.getTransactionID()));
if(cohortEntry == null || cohortEntry.getTransaction() == null) {
return Collections.singletonList(from);
}
}
private void handleCanCommit(CohortEntry cohortEntry) {
- String transactionID = cohortEntry.getTransactionID();
-
cohortEntry.updateLastAccessTime();
if(currentCohortEntry != null) {
if(log.isDebugEnabled()) {
log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
- name, currentCohortEntry.getTransactionID(), transactionID);
+ name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID());
}
return;
if(log.isDebugEnabled()) {
log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
- transactionID);
+ cohortEntry.getTransactionID());
}
}
}
* @param sender the actor to which to send the response
* @param shard the transaction's shard actor
*/
- void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
+ void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) {
// Lookup the cohort entry that was cached previously (or should have been) by
// transactionReady (via the ForwardedReadyTransaction message).
final CohortEntry cohortEntry = cohortCache.get(transactionID);
* @param shard the transaction's shard actor
* @return true if the transaction was successfully prepared, false otherwise.
*/
- boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
+ boolean handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
// Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
// this transaction.
final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
return doCommit(cohortEntry);
}
- void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
+ void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) {
CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
if(cohortEntry != null) {
// We don't remove the cached cohort entry here (ie pass false) in case the Tx was
protected BatchedModifications getModifications() {
if(newModifications.isEmpty() ||
newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
- newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
+ newModifications.add(new BatchedModifications(cohortEntry.getTransactionID().getString(),
cohortEntry.getClientVersion(), ""));
}
messages.addAll(newModifications);
if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.CAN_COMMITTED) {
- messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
+ messages.add(new CanCommitTransaction(cohortEntry.getTransactionID().getString(),
cohortEntry.getClientVersion()));
}
if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.PRE_COMMITTED) {
- messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
+ messages.add(new CommitTransaction(cohortEntry.getTransactionID().getString(),
cohortEntry.getClientVersion()));
}
}
* @return the current CohortEntry or null if the given transaction ID does not match the
* current entry.
*/
- CohortEntry getCohortEntryIfCurrent(String transactionID) {
+ CohortEntry getCohortEntryIfCurrent(Identifier transactionID) {
if(isCurrentTransaction(transactionID)) {
return currentCohortEntry;
}
return currentCohortEntry;
}
- CohortEntry getAndRemoveCohortEntry(String transactionID) {
+ CohortEntry getAndRemoveCohortEntry(Identifier transactionID) {
return cohortCache.remove(transactionID);
}
- boolean isCurrentTransaction(String transactionID) {
+ boolean isCurrentTransaction(Identifier transactionID) {
return currentCohortEntry != null &&
currentCohortEntry.getTransactionID().equals(transactionID);
}
* @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
* the cache.
*/
- void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
+ void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) {
if(removeCohortEntry) {
cohortCache.remove(transactionID);
}
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
return new SimpleEntry<>(reg, readCurrentData());
}
- void applyForeignCandidate(final String identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
+ void applyForeignCandidate(final Identifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
final DataTreeModification mod = dataTree.takeSnapshot().newModification();
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
protected void setupCohortDecorator(Shard shard, final ShardDataTreeCohort cohort) {
shard.getCommitCoordinator().setCohortDecorator(new ShardCommitCoordinator.CohortDecorator() {
@Override
- public ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual) {
+ public ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual) {
return cohort;
}
});
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
-
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.yangtools.util.StringIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
writeMod.write(TestModel.TEST_PATH, node);
writeMod.ready();
- final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
- payloadForModification(source, writeMod)));
+ final ApplyState applyState = new ApplyState(null, new StringIdentifier("test"),
+ new ReplicatedLogImplEntry(1, 2, payloadForModification(source, writeMod)));
shard.tell(applyState, shard);
final FiniteDuration duration = duration("5 seconds");
final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
- final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
- @Override
- public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
- if(mockCohort.get() == null) {
- mockCohort.set(createDelegatingMockCohort("cohort", actual));
- }
-
- return mockCohort.get();
+ final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> {
+ if(mockCohort.get() == null) {
+ mockCohort.set(createDelegatingMockCohort("cohort", actual));
}
+
+ return mockCohort.get();
};
shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
final FiniteDuration duration = duration("5 seconds");
final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
- final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
- @Override
- public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
- if(mockCohort.get() == null) {
- mockCohort.set(createDelegatingMockCohort("cohort", actual));
- }
-
- return mockCohort.get();
+ final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> {
+ if(mockCohort.get() == null) {
+ mockCohort.set(createDelegatingMockCohort("cohort", actual));
}
+
+ return mockCohort.get();
};
shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
final String transactionID = "tx1";
final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
- new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
- @Override
- public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
- final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
-
- // Simulate an AbortTransaction message occurring during replication, after
- // persisting and before finishing the commit to the in-memory store.
- // We have no followers so due to optimizations in the RaftActor, it does not
- // attempt replication and thus we can't send an AbortTransaction message b/c
- // it would be processed too late after CommitTransaction completes. So we'll
- // simulate an AbortTransaction message occurring during replication by calling
- // the shard directly.
- //
- shard.underlyingActor().doAbortTransaction(transactionID, null);
-
- return preCommitFuture;
- }
+ cohort -> {
+ final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
+
+ // Simulate an AbortTransaction message occurring during replication, after
+ // persisting and before finishing the commit to the in-memory store.
+ // We have no followers so due to optimizations in the RaftActor, it does not
+ // attempt replication and thus we can't send an AbortTransaction message b/c
+ // it would be processed too late after CommitTransaction completes. So we'll
+ // simulate an AbortTransaction message occurring during replication by calling
+ // the shard directly.
+ //
+ shard.underlyingActor().doAbortTransaction(transactionID, null);
+
+ return preCommitFuture;
};
final MutableCompositeModification modification = new MutableCompositeModification();
new ShardTestKit(getSystem()) {{
final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
@SuppressWarnings("serial")
- final Creator<Shard> creator = new Creator<Shard>() {
+ final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
@Override
- public Shard create() throws Exception {
- return new Shard(newShardBuilder()) {
- @Override
- public void handleCommand(final Object message) {
- super.handleCommand(message);
- if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
- if(cleaupCheckLatch.get() != null) {
- cleaupCheckLatch.get().countDown();
- }
- }
+ public void handleCommand(final Object message) {
+ super.handleCommand(message);
+ if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
+ if(cleaupCheckLatch.get() != null) {
+ cleaupCheckLatch.get().countDown();
}
- };
+ }
}
};
}
}
- final Creator<Shard> creator = new Creator<Shard>() {
- @Override
- public Shard create() throws Exception {
- return new TestShard(newShardBuilder());
- }
- };
+ final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
final TestActorRef<Shard> shard = actorFactory.createTestActor(
Props.create(new DelegatingShardCreator(creator)), shardActorName);
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.yangtools.util.StringIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
+ ApplyState applyState = new ApplyState(null, new StringIdentifier("test"),
+ new ReplicatedLogImplEntry(1, 2,
newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
shard.underlyingActor().onReceiveCommand(applyState);