X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=f9f4350e9eec1b3f8d8071e35c246b55a03f1003;hb=18ecb07132281d3152a49baf50e8c020403588f9;hp=a647554806034f5b21d52e04dd8530dd6bb4d51d;hpb=6313c088fc7db266cc25b691e0cd909300fc8425;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
index a647554806..f9f4350e9e 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
@@ -16,6 +16,7 @@ import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
@@ -23,14 +24,13 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MessageTracker;
import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
-import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
@@ -52,8 +52,8 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
@@ -65,8 +65,8 @@ import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
@@ -82,6 +82,7 @@ import scala.concurrent.duration.FiniteDuration;
*
*/
public class Shard extends RaftActor {
+
@VisibleForTesting
static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() {
@Override
@@ -173,7 +174,8 @@ public class Shard extends RaftActor {
new Dispatchers(context().system().dispatchers()).getDispatcherPath(
Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
- snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
+ snapshotCohort = new ShardSnapshotCohort(builder.getId().getMemberName(), transactionActorFactory, store,
+ LOG, this.name);
messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
}
@@ -218,7 +220,7 @@ public class Shard extends RaftActor {
}
@Override
- protected void handleCommand(final Object message) {
+ protected void handleNonRaftCommand(final Object message) {
try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
final Optional maybeError = context.error();
if (maybeError.isPresent()) {
@@ -269,8 +271,11 @@ public class Shard extends RaftActor {
context().parent().forward(message, context());
} else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
+ } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
+ commitCoordinator.processCohortRegistryCommand(getSender(),
+ (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
} else {
- super.handleCommand(message);
+ super.handleNonRaftCommand(message);
}
}
}
@@ -320,16 +325,27 @@ public class Shard extends RaftActor {
void continueCommit(final CohortEntry cohortEntry) {
final DataTreeCandidate candidate = cohortEntry.getCandidate();
+ final TransactionIdentifier transactionId = cohortEntry.getTransactionID();
// If we do not have any followers and we are not using persistence
// or if cohortEntry has no modifications
// we can apply modification to the state immediately
if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
- applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
- } else {
- Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
- DataTreeCandidatePayload.create(candidate));
+ applyModificationToState(cohortEntry.getReplySender(), transactionId, candidate);
+ return;
+ }
+
+ final Payload payload;
+ try {
+ payload = CommitTransactionPayload.create(transactionId, candidate);
+ } catch (IOException e) {
+ LOG.error("{}: failed to encode transaction {} candidate {}", persistenceId(), transactionId, candidate,
+ e);
+ // TODO: do we need to do something smarter here?
+ throw Throwables.propagate(e);
}
+
+ persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), payload);
}
private void handleCommitTransaction(final CommitTransaction commit) {
@@ -349,12 +365,13 @@ public class Shard extends RaftActor {
}
}
- private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final String transactionID, @Nonnull final CohortEntry cohortEntry) {
+ private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final Identifier transactionID,
+ @Nonnull final CohortEntry cohortEntry) {
LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
try {
try {
- cohortEntry.commit();
+ cohortEntry.commit();
} catch(ExecutionException e) {
// We may get a "store tree and candidate base differ" IllegalStateException from commit under
// certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
@@ -391,7 +408,7 @@ public class Shard extends RaftActor {
}
}
- private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
+ private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull Identifier transactionID) {
// With persistence enabled, this method is called via applyState by the leader strategy
// after the commit has been replicated to a majority of the followers.
@@ -432,7 +449,7 @@ public class Shard extends RaftActor {
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
if (isLeader()) {
- commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+ commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
} else {
ActorSelection leader = getLeader();
if (leader == null) {
@@ -540,7 +557,8 @@ public class Shard extends RaftActor {
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
- commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
+ commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this,
+ store.getSchemaContext());
} else {
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
@@ -561,7 +579,7 @@ public class Shard extends RaftActor {
doAbortTransaction(abort.getTransactionID(), getSender());
}
- void doAbortTransaction(final String transactionID, final ActorRef sender) {
+ void doAbortTransaction(final TransactionIdentifier transactionID, final ActorRef sender) {
commitCoordinator.handleAbort(transactionID, sender, this);
}
@@ -577,14 +595,7 @@ public class Shard extends RaftActor {
}
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
- store.closeTransactionChain(closeTransactionChain.getTransactionChainId());
- }
-
- private ActorRef createTypedTransactionActor(int transactionType,
- ShardTransactionIdentifier transactionId, String transactionChainId) {
-
- return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
- transactionId, transactionChainId);
+ store.closeTransactionChain(closeTransactionChain.getIdentifier());
}
private void createTransaction(CreateTransaction createTransaction) {
@@ -595,7 +606,7 @@ public class Shard extends RaftActor {
}
ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
- createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
+ createTransaction.getTransactionId());
getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
createTransaction.getTransactionId(), createTransaction.getVersion()).toSerializable(), getSelf());
@@ -604,24 +615,14 @@ public class Shard extends RaftActor {
}
}
- private ActorRef createTransaction(int transactionType, String remoteTransactionId,
- String transactionChainId) {
-
-
- ShardTransactionIdentifier transactionId = new ShardTransactionIdentifier(remoteTransactionId);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
- }
-
- ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
- transactionChainId);
-
- return transactionActor;
+ private ActorRef createTransaction(int transactionType, TransactionIdentifier transactionId) {
+ LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
+ return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
+ transactionId);
}
- private void commitWithNewTransaction(final Modification modification) {
- ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.toString(), null);
+ private void commitWithNewTransaction(final BatchedModifications modification) {
+ ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.getTransactionID());
modification.apply(tx.getSnapshot());
try {
snapshotCohort.syncCommitTransaction(tx);
@@ -679,12 +680,12 @@ public class Shard extends RaftActor {
}
@Override
- protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
- if (data instanceof DataTreeCandidatePayload) {
+ protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
+ if (data instanceof DataTreeCandidateSupplier) {
if (clientActor == null) {
// No clientActor indicates a replica coming from the leader
try {
- store.applyForeignCandidate(identifier, ((DataTreeCandidatePayload)data).getCandidate());
+ store.applyForeignCandidate(identifier, ((DataTreeCandidateSupplier)data).getCandidate().getValue());
} catch (DataValidationFailedException | IOException e) {
LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
}
@@ -692,22 +693,13 @@ public class Shard extends RaftActor {
// Replication consensus reached, proceed to commit
finishCommit(clientActor, identifier);
}
- } else if (data instanceof CompositeModificationPayload) {
- Object modification = ((CompositeModificationPayload) data).getModification();
-
- applyModificationToState(clientActor, identifier, modification);
- } else if(data instanceof CompositeModificationByteStringPayload ){
- Object modification = ((CompositeModificationByteStringPayload) data).getModification();
-
- applyModificationToState(clientActor, identifier, modification);
} else {
- LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
- persistenceId(), data, data.getClass().getClassLoader(),
- CompositeModificationPayload.class.getClassLoader());
+ LOG.error("{}: Unknown state received {} ClassLoader {}", persistenceId(), data,
+ data.getClass().getClassLoader());
}
}
- private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
+ private void applyModificationToState(ActorRef clientActor, Identifier identifier, Object modification) {
if(modification == null) {
LOG.error(
"{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
@@ -715,7 +707,11 @@ public class Shard extends RaftActor {
} else if(clientActor == null) {
// There's no clientActor to which to send a commit reply so we must be applying
// replicated state from the leader.
- commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification));
+
+ // The only implementation we know of is BatchedModifications, which also carries a transaction
+ // identifier -- which we really need that.
+ Preconditions.checkArgument(modification instanceof BatchedModifications);
+ commitWithNewTransaction((BatchedModifications)modification);
} else {
// This must be the OK to commit after replication consensus.
finishCommit(clientActor, identifier);