import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
/**
* A sample actor showing how the RaftActor is to be extended
*/
-public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort {
+public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
private final Map<String, String> state = new HashMap();
}
}
- @Override protected void createSnapshot() {
+ @Override
+ public void createSnapshot(ActorRef actorRef) {
ByteString bs = null;
try {
bs = fromObject(state);
getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
}
- @Override protected void applySnapshot(byte [] snapshot) {
+ @Override
+ public void applySnapshot(byte [] snapshot) {
state.clear();
try {
state.putAll((HashMap) toObject(snapshot));
@Override
public void applyRecoverySnapshot(byte[] snapshot) {
}
+
+ @Override
+ protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+ return this;
+ }
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.japi.Procedure;
-import akka.persistence.SaveSnapshotFailure;
-import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
- private static final String COMMIT_SNAPSHOT = "commit_snapshot";
-
protected final Logger LOG = LoggerFactory.getLogger(getClass());
/**
private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null);
- private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
-
private RaftActorRecoverySupport raftRecovery;
+ private RaftActorSnapshotMessageSupport snapshotSupport;
+
private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
public RaftActor(String id, Map<String, String> peerAddresses) {
@Override
public void postStop() {
- if(currentBehavior != null) {
+ if(currentBehavior.getDelegate() != null) {
try {
currentBehavior.close();
} catch (Exception e) {
handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
}
- @Override public void handleCommand(Object message) {
+ @Override
+ public void handleCommand(Object message) {
+ if(snapshotSupport == null) {
+ snapshotSupport = new RaftActorSnapshotMessageSupport(delegatingPersistenceProvider, context,
+ currentBehavior, getRaftActorSnapshotCohort(), self());
+ }
+
+ boolean handled = snapshotSupport.handleSnapshotMessage(message);
+ if(handled) {
+ return;
+ }
+
if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
persistence().persist(applyEntries, NoopProcedure.instance());
- } else if(message instanceof ApplySnapshot ) {
- Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: ApplySnapshot called on Follower Actor " +
- "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
- snapshot.getLastAppliedTerm()
- );
- }
-
- applySnapshot(snapshot.getState());
-
- //clears the followers log, sets the snapshot index to ensure adjusted-index works
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
- currentBehavior));
- context.setLastApplied(snapshot.getLastAppliedIndex());
-
} else if (message instanceof FindLeader) {
getSender().tell(
new FindLeaderReply(getLeaderAddress()),
getSelf()
);
-
- } else if (message instanceof SaveSnapshotSuccess) {
- SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
- LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
-
- long sequenceNumber = success.metadata().sequenceNr();
-
- commitSnapshot(sequenceNumber);
-
- } else if (message instanceof SaveSnapshotFailure) {
- SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
-
- LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
- persistenceId(), saveSnapshotFailure.cause());
-
- context.getSnapshotManager().rollback();
-
- } else if (message instanceof CaptureSnapshot) {
- LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
-
- context.getSnapshotManager().create(createSnapshotProcedure);
-
- } else if (message instanceof CaptureSnapshotReply) {
- handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
} else if(message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
- } else if (message.equals(COMMIT_SNAPSHOT)) {
- commitSnapshot(-1);
} else {
reusableBehaviorStateHolder.init(getCurrentBehavior());
// Make saving Snapshot successful
// Committing the snapshot here would end up calling commit in the creating state which would
// be a state violation. That's why now we send a message to commit the snapshot.
- self().tell(COMMIT_SNAPSHOT, self());
+ self().tell(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT, self());
}
});
}
context.setPeerAddress(peerId, peerAddress);
}
- protected void commitSnapshot(long sequenceNumber) {
- context.getSnapshotManager().commit(persistence(), sequenceNumber);
- }
-
/**
* The applyState method will be called by the RaftActor when some data
* needs to be applied to the actor's state
protected abstract void onRecoveryComplete();
/**
- * This method will be called by the RaftActor when a snapshot needs to be
- * created. The derived actor should respond with its current state.
- * <p/>
- * During recovery the state that is returned by the derived actor will
- * be passed back to it by calling the applySnapshot method
- *
- * @return The current state of the actor
+ * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
*/
- protected abstract void createSnapshot();
-
- /**
- * This method can be called at any other point during normal
- * operations when the derived actor is out of sync with it's peers
- * and the only way to bring it in sync is by applying a snapshot
- *
- * @param snapshotBytes A snapshot of the state of the actor
- */
- protected abstract void applySnapshot(byte[] snapshotBytes);
+ @Nonnull
+ protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
/**
* This method will be called by the RaftActor when the state of the
return peerAddress;
}
- private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
- LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
-
- context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, context.getTotalMemory());
- }
-
protected boolean hasFollowers(){
return getRaftActorContext().hasFollowers();
}
}
}
- private class CreateSnapshotProcedure implements Procedure<Void> {
-
- @Override
- public void apply(Void aVoid) throws Exception {
- createSnapshot();
- }
- }
-
private static class BehaviorStateHolder {
private RaftActorBehavior behavior;
private String leaderId;
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+
+/**
+ * Interface for a class that participates in raft actor snapshotting.
+ *
+ * @author Thomas Pantelis
+ */
+public interface RaftActorSnapshotCohort {
+
+ /**
+ * This method is called by the RaftActor when a snapshot needs to be
+ * created. The implementation should send a CaptureSnapshotReply to the given actor.
+ *
+ * @param actorRef the actor to which to respond
+ */
+ void createSnapshot(ActorRef actorRef);
+
+ /**
+ * This method is called to apply a snapshot installed by the leader.
+ *
+ * @param snapshotBytes a snapshot of the state of the actor
+ */
+ void applySnapshot(byte[] snapshotBytes);
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.japi.Procedure;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+
+/**
+ * Handles snapshot related messages for a RaftActor.
+ *
+ * @author Thomas Pantelis
+ */
+class RaftActorSnapshotMessageSupport {
+ static final String COMMIT_SNAPSHOT = "commit_snapshot";
+
+ private final DataPersistenceProvider persistence;
+ private final RaftActorContext context;
+ private final RaftActorBehavior currentBehavior;
+ private final RaftActorSnapshotCohort cohort;
+ private final ActorRef raftActorRef;
+ private final Logger log;
+
+ private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
+ @Override
+ public void apply(Void notUsed) throws Exception {
+ cohort.createSnapshot(raftActorRef);
+ }
+ };
+
+ RaftActorSnapshotMessageSupport(DataPersistenceProvider persistence, RaftActorContext context,
+ RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort, ActorRef raftActorRef) {
+ this.persistence = persistence;
+ this.context = context;
+ this.currentBehavior = currentBehavior;
+ this.cohort = cohort;
+ this.raftActorRef = raftActorRef;
+ this.log = context.getLogger();
+ }
+
+ boolean handleSnapshotMessage(Object message) {
+ if(message instanceof ApplySnapshot ) {
+ onApplySnapshot(((ApplySnapshot) message).getSnapshot());
+ return true;
+ } else if (message instanceof SaveSnapshotSuccess) {
+ onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
+ return true;
+ } else if (message instanceof SaveSnapshotFailure) {
+ onSaveSnapshotFailure((SaveSnapshotFailure) message);
+ return true;
+ } else if (message instanceof CaptureSnapshot) {
+ onCaptureSnapshot(message);
+ return true;
+ } else if (message instanceof CaptureSnapshotReply) {
+ onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
+ return true;
+ } else if (message.equals(COMMIT_SNAPSHOT)) {
+ context.getSnapshotManager().commit(persistence, -1);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void onCaptureSnapshotReply(byte[] snapshotBytes) {
+ log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
+
+ context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory());
+ }
+
+ private void onCaptureSnapshot(Object message) {
+ log.debug("{}: CaptureSnapshot received by actor: {}", context.getId(), message);
+
+ context.getSnapshotManager().create(createSnapshotProcedure);
+ }
+
+ private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
+ log.error("{}: SaveSnapshotFailure received for snapshot Cause:",
+ context.getId(), saveSnapshotFailure.cause());
+
+ context.getSnapshotManager().rollback();
+ }
+
+ private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
+ log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId());
+
+ long sequenceNumber = success.metadata().sequenceNr();
+
+ context.getSnapshotManager().commit(persistence, sequenceNumber);
+ }
+
+ private void onApplySnapshot(Snapshot snapshot) {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: ApplySnapshot called on Follower Actor " +
+ "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(),
+ snapshot.getLastAppliedTerm());
+ }
+
+ cohort.applySnapshot(snapshot.getState());
+
+ //clears the followers log, sets the snapshot index to ensure adjusted-index works
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence,
+ currentBehavior));
+ context.setLastApplied(snapshot.getLastAppliedIndex());
+ }
+}
}
@Override
- protected void createSnapshot() {
+ public void createSnapshot(ActorRef actorRef) {
if(snapshot != null) {
getSelf().tell(new CaptureSnapshotReply(snapshot), ActorRef.noSender());
}
InMemorySnapshotStore.clear();
}
- public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort {
+ public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
private final RaftActor actorDelegate;
- private final RaftActorRecoveryCohort cohortDelegate;
+ private final RaftActorRecoveryCohort recoveryCohortDelegate;
+ private final RaftActorSnapshotCohort snapshotCohortDelegate;
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final List<Object> state;
private ActorRef roleChangeNotifier;
super(id, peerAddresses, config);
state = new ArrayList<>();
this.actorDelegate = mock(RaftActor.class);
- this.cohortDelegate = mock(RaftActorRecoveryCohort.class);
+ this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
+ this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
if(dataPersistenceProvider == null){
setPersistence(true);
} else {
return this;
}
+ @Override
+ protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+ return this;
+ }
+
@Override
public void startLogRecoveryBatch(int maxBatchSize) {
}
@Override
public void applyRecoverySnapshot(byte[] bytes) {
- cohortDelegate.applyRecoverySnapshot(bytes);
+ recoveryCohortDelegate.applyRecoverySnapshot(bytes);
try {
Object data = toObject(bytes);
if (data instanceof List) {
}
}
- @Override protected void createSnapshot() {
+ @Override
+ public void createSnapshot(ActorRef actorRef) {
LOG.info("{}: createSnapshot called", persistenceId());
- actorDelegate.createSnapshot();
+ snapshotCohortDelegate.createSnapshot(actorRef);
}
- @Override protected void applySnapshot(byte [] snapshot) {
+ @Override
+ public void applySnapshot(byte [] snapshot) {
LOG.info("{}: applySnapshot called", persistenceId());
- actorDelegate.applySnapshot(snapshot);
+ snapshotCohortDelegate.applySnapshot(snapshot);
}
- @Override protected void onStateChanged() {
+ @Override
+ protected void onStateChanged() {
actorDelegate.onStateChanged();
}
public ReplicatedLog getReplicatedLog(){
return this.getRaftActorContext().getReplicatedLog();
}
-
}
mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
- verify(mockRaftActor.cohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
+ verify(mockRaftActor.recoveryCohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
- verify(mockRaftActor.cohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class));
+ verify(mockRaftActor.recoveryCohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class));
mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
- verify(mockRaftActor.actorDelegate).createSnapshot();
+ verify(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
- verify(mockRaftActor.actorDelegate).applySnapshot(eq(snapshot.getState()));
+ verify(mockRaftActor.snapshotCohortDelegate).applySnapshot(eq(snapshot.getState()));
assertTrue("The replicatedLog should have changed",
oldReplicatedLog != mockRaftActor.getReplicatedLog());
.capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
new MockRaftActorContext.MockPayload("x")), 4);
- verify(leaderActor.actorDelegate).createSnapshot();
+ verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
assertEquals(8, leaderActor.getReplicatedLog().size());
new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
new MockRaftActorContext.MockPayload("D")), 4);
- verify(followerActor.actorDelegate).createSnapshot();
+ verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
assertEquals(6, followerActor.getReplicatedLog().size());
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
*/
public class Shard extends RaftActor {
- private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
-
private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
@VisibleForTesting
private DatastoreContext datastoreContext;
- private SchemaContext schemaContext;
-
- private int createSnapshotTransactionCounter;
-
private final ShardCommitCoordinator commitCoordinator;
private long transactionCommitTimeout;
private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
Serialization.serializedActorPath(getSelf()));
- private final DOMTransactionFactory transactionFactory;
+ private final DOMTransactionFactory domTransactionFactory;
+
+ private final ShardTransactionActorFactory transactionActorFactory;
- private final String txnDispatcherPath;
+ private final ShardSnapshotCohort snapshotCohort;
private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
this.name = name.toString();
this.datastoreContext = datastoreContext;
- this.schemaContext = schemaContext;
- this.txnDispatcherPath = new Dispatchers(context().system().dispatchers())
- .getDispatcherPath(Dispatchers.DispatcherType.Transaction);
setPersistence(datastoreContext.isPersistent());
getContext().become(new MeteringBehavior(this));
}
- transactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
+ domTransactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
- commitCoordinator = new ShardCommitCoordinator(transactionFactory,
+ commitCoordinator = new ShardCommitCoordinator(domTransactionFactory,
TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
+
+ transactionActorFactory = new ShardTransactionActorFactory(domTransactionFactory, datastoreContext,
+ new Dispatchers(context().system().dispatchers()).getDispatcherPath(
+ Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
+
+ snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
}
private void setTransactionCommitTimeout() {
}
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
- transactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
+ domTransactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
}
private ActorRef createTypedTransactionActor(int transactionType,
ShardTransactionIdentifier transactionId, String transactionChainId,
short clientVersion ) {
- DOMStoreTransaction transaction = transactionFactory.newTransaction(
- TransactionProxy.TransactionType.fromInt(transactionType), transactionId.toString(),
- transactionChainId);
-
- return createShardTransaction(transaction, transactionId, clientVersion);
- }
-
- private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
- short clientVersion){
- return getContext().actorOf(
- ShardTransaction.props(transaction, getSelf(),
- schemaContext, datastoreContext, shardMBean,
- transactionId.getRemoteTransactionId(), clientVersion)
- .withDispatcher(txnDispatcherPath),
- transactionId.toString());
-
+ return transactionActorFactory.newShardTransaction(TransactionProxy.TransactionType.fromInt(transactionType),
+ transactionId, transactionChainId, clientVersion);
}
private void createTransaction(CreateTransaction createTransaction) {
return transactionActor;
}
- private void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
- throws ExecutionException, InterruptedException {
- DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
- commitCohort.preCommit().get();
- commitCohort.commit().get();
- }
-
private void commitWithNewTransaction(final Modification modification) {
DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
modification.apply(tx);
try {
- syncCommitTransaction(tx);
+ snapshotCohort.syncCommitTransaction(tx);
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
} catch (InterruptedException | ExecutionException e) {
}
private void updateSchemaContext(final UpdateSchemaContext message) {
- this.schemaContext = message.getSchemaContext();
updateSchemaContext(message.getSchemaContext());
- store.onGlobalContextUpdated(message.getSchemaContext());
}
@VisibleForTesting
return config.isMetricCaptureEnabled();
}
+ @Override
+ protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+ return snapshotCohort;
+ }
+
@Override
@Nonnull
protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
}
}
- @Override
- protected void createSnapshot() {
- // Create a transaction actor. We are really going to treat the transaction as a worker
- // so that this actor does not get block building the snapshot. THe transaction actor will
- // after processing the CreateSnapshot message.
-
- ActorRef createSnapshotTransaction = createTransaction(
- TransactionProxy.TransactionType.READ_ONLY.ordinal(),
- "createSnapshot" + ++createSnapshotTransactionCounter, "",
- DataStoreVersions.CURRENT_VERSION);
-
- createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self());
- }
-
- @VisibleForTesting
- @Override
- protected void applySnapshot(final byte[] snapshotBytes) {
- // Since this will be done only on Recovery or when this actor is a Follower
- // we can safely commit everything in here. We not need to worry about event notifications
- // as they would have already been disabled on the follower
-
- LOG.info("{}: Applying snapshot", persistenceId());
- try {
- DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
-
- NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
-
- // delete everything first
- transaction.delete(DATASTORE_ROOT);
-
- // Add everything from the remote node back
- transaction.write(DATASTORE_ROOT, node);
- syncCommitTransaction(transaction);
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e);
- } finally {
- LOG.info("{}: Done applying snapshot", persistenceId());
- }
- }
-
@Override
protected void onStateChanged() {
boolean isLeader = isLeader();
persistenceId(), getId());
}
- transactionFactory.closeAllTransactionChains();
+ domTransactionFactory.closeAllTransactionChains();
}
}
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
* @author: syedbahm
private final DOMStoreReadTransaction transaction;
public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext, ShardStats shardStats, String transactionID,
- short clientTxVersion) {
- super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
+ ShardStats shardStats, String transactionID, short clientTxVersion) {
+ super(shardActor, shardStats, transactionID, clientTxVersion);
this.transaction = transaction;
}
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
* @author: syedbahm
private final DOMStoreReadWriteTransaction transaction;
public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext, ShardStats shardStats, String transactionID,
- short clientTxVersion) {
- super(transaction, shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
+ ShardStats shardStats, String transactionID, short clientTxVersion) {
+ super(transaction, shardActor, shardStats, transactionID, clientTxVersion);
this.transaction = transaction;
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+
+/**
+ * Participates in raft snapshotting on behalf of a Shard actor.
+ *
+ * @author Thomas Pantelis
+ */
+class ShardSnapshotCohort implements RaftActorSnapshotCohort {
+
+ private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
+
+ private int createSnapshotTransactionCounter;
+ private final ShardTransactionActorFactory transactionActorFactory;
+ private final InMemoryDOMDataStore store;
+ private final Logger log;
+ private final String logId;
+
+ ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, InMemoryDOMDataStore store,
+ Logger log, String logId) {
+ this.transactionActorFactory = transactionActorFactory;
+ this.store = store;
+ this.log = log;
+ this.logId = logId;
+ }
+
+ @Override
+ public void createSnapshot(ActorRef actorRef) {
+ // Create a transaction actor. We are really going to treat the transaction as a worker
+ // so that this actor does not get block building the snapshot. THe transaction actor will
+ // after processing the CreateSnapshot message.
+
+ ShardTransactionIdentifier transactionID = new ShardTransactionIdentifier(
+ "createSnapshot" + ++createSnapshotTransactionCounter);
+
+ ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction(
+ TransactionProxy.TransactionType.READ_ONLY, transactionID, "", DataStoreVersions.CURRENT_VERSION);
+
+ createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef);
+ }
+
+ @Override
+ public void applySnapshot(byte[] snapshotBytes) {
+ // Since this will be done only on Recovery or when this actor is a Follower
+ // we can safely commit everything in here. We not need to worry about event notifications
+ // as they would have already been disabled on the follower
+
+ log.info("{}: Applying snapshot", logId);
+
+ try {
+ DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+
+ NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+
+ // delete everything first
+ transaction.delete(DATASTORE_ROOT);
+
+ // Add everything from the remote node back
+ transaction.write(DATASTORE_ROOT, node);
+ syncCommitTransaction(transaction);
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("{}: An exception occurred when applying snapshot", logId, e);
+ } finally {
+ log.info("{}: Done applying snapshot", logId);
+ }
+
+ }
+
+ void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
+ throws ExecutionException, InterruptedException {
+ DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+ commitCohort.preCommit().get();
+ commitCohort.commit().get();
+ }
+}
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
* The ShardTransaction Actor represents a remote transaction
protected static final boolean SERIALIZED_REPLY = true;
private final ActorRef shardActor;
- private final SchemaContext schemaContext;
private final ShardStats shardStats;
private final String transactionID;
private final short clientTxVersion;
- protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
- ShardStats shardStats, String transactionID, short clientTxVersion) {
+ protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, String transactionID,
+ short clientTxVersion) {
super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
this.shardActor = shardActor;
- this.schemaContext = schemaContext;
this.shardStats = shardStats;
this.transactionID = transactionID;
this.clientTxVersion = clientTxVersion;
}
public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
- String transactionID, short txnClientVersion) {
- return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
+ DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
+ return Props.create(new ShardTransactionCreator(transaction, shardActor,
datastoreContext, shardStats, transactionID, txnClientVersion));
}
return transactionID;
}
- protected SchemaContext getSchemaContext() {
- return schemaContext;
- }
-
protected short getClientTxVersion() {
return clientTxVersion;
}
final DOMStoreTransaction transaction;
final ActorRef shardActor;
- final SchemaContext schemaContext;
final DatastoreContext datastoreContext;
final ShardStats shardStats;
final String transactionID;
final short txnClientVersion;
ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext, DatastoreContext datastoreContext,
- ShardStats shardStats, String transactionID, short txnClientVersion) {
+ DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
this.transaction = transaction;
this.shardActor = shardActor;
this.shardStats = shardStats;
- this.schemaContext = schemaContext;
this.datastoreContext = datastoreContext;
this.transactionID = transactionID;
this.txnClientVersion = txnClientVersion;
ShardTransaction tx;
if(transaction instanceof DOMStoreReadWriteTransaction) {
tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
- shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+ shardActor, shardStats, transactionID, txnClientVersion);
} else if(transaction instanceof DOMStoreReadTransaction) {
tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
- schemaContext, shardStats, transactionID, txnClientVersion);
+ shardStats, transactionID, txnClientVersion);
} else {
tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
- shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+ shardActor, shardStats, transactionID, txnClientVersion);
}
tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
private final DOMStoreTransactionChain chain;
private final DatastoreContext datastoreContext;
- private final SchemaContext schemaContext;
private final ShardStats shardStats;
- public ShardTransactionChain(DOMStoreTransactionChain chain, SchemaContext schemaContext,
- DatastoreContext datastoreContext, ShardStats shardStats) {
+ public ShardTransactionChain(DOMStoreTransactionChain chain, DatastoreContext datastoreContext,
+ ShardStats shardStats) {
this.chain = chain;
this.datastoreContext = datastoreContext;
- this.schemaContext = schemaContext;
this.shardStats = shardStats;
}
TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
return getContext().actorOf(
ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
- schemaContext, datastoreContext, shardStats,
- createTransaction.getTransactionId(),
+ datastoreContext, shardStats, createTransaction.getTransactionId(),
createTransaction.getVersion()), transactionName);
} else if (createTransaction.getTransactionType() ==
TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
return getContext().actorOf(
ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
- schemaContext, datastoreContext, shardStats,
- createTransaction.getTransactionId(),
+ datastoreContext, shardStats, createTransaction.getTransactionId(),
createTransaction.getVersion()), transactionName);
} else if (createTransaction.getTransactionType() ==
TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
return getContext().actorOf(
ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
- schemaContext, datastoreContext, shardStats,
- createTransaction.getTransactionId(),
+ datastoreContext, shardStats, createTransaction.getTransactionId(),
createTransaction.getVersion()), transactionName);
} else {
throw new IllegalArgumentException (
public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext,
DatastoreContext datastoreContext, ShardStats shardStats) {
- return Props.create(new ShardTransactionChainCreator(chain, schemaContext,
- datastoreContext, shardStats));
+ return Props.create(new ShardTransactionChainCreator(chain, datastoreContext, shardStats));
}
private static class ShardTransactionChainCreator implements Creator<ShardTransactionChain> {
final DOMStoreTransactionChain chain;
final DatastoreContext datastoreContext;
- final SchemaContext schemaContext;
final ShardStats shardStats;
- ShardTransactionChainCreator(DOMStoreTransactionChain chain, SchemaContext schemaContext,
- DatastoreContext datastoreContext, ShardStats shardStats) {
+ ShardTransactionChainCreator(DOMStoreTransactionChain chain, DatastoreContext datastoreContext,
+ ShardStats shardStats) {
this.chain = chain;
this.datastoreContext = datastoreContext;
- this.schemaContext = schemaContext;
this.shardStats = shardStats;
}
@Override
public ShardTransactionChain create() throws Exception {
- return new ShardTransactionChain(chain, schemaContext, datastoreContext, shardStats);
+ return new ShardTransactionChain(chain, datastoreContext, shardStats);
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.UntypedActorContext;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+
+/**
+ * A factory for creating ShardTransaction actors.
+ *
+ * @author Thomas Pantelis
+ */
+class ShardTransactionActorFactory {
+
+ private final DOMTransactionFactory domTransactionFactory;
+ private final DatastoreContext datastoreContext;
+ private final String txnDispatcherPath;
+ private final ShardStats shardMBean;
+ private final UntypedActorContext actorContext;
+ private final ActorRef shardActor;
+
+ ShardTransactionActorFactory(DOMTransactionFactory domTransactionFactory, DatastoreContext datastoreContext,
+ String txnDispatcherPath, ActorRef shardActor, UntypedActorContext actorContext, ShardStats shardMBean) {
+ this.domTransactionFactory = domTransactionFactory;
+ this.datastoreContext = datastoreContext;
+ this.txnDispatcherPath = txnDispatcherPath;
+ this.shardMBean = shardMBean;
+ this.actorContext = actorContext;
+ this.shardActor = shardActor;
+ }
+
+ ActorRef newShardTransaction(TransactionProxy.TransactionType type, ShardTransactionIdentifier transactionID,
+ String transactionChainID, short clientVersion) {
+
+ DOMStoreTransaction transaction = domTransactionFactory.newTransaction(type, transactionID.toString(),
+ transactionChainID);
+
+ return actorContext.actorOf(ShardTransaction.props(transaction, shardActor, datastoreContext, shardMBean,
+ transactionID.getRemoteTransactionId(), clientVersion).withDispatcher(txnDispatcherPath),
+ transactionID.toString());
+ }
+}
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
* @author: syedbahm
private final DOMStoreWriteTransaction transaction;
public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext, ShardStats shardStats, String transactionID,
- short clientTxVersion) {
- super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
+ ShardStats shardStats, String transactionID, short clientTxVersion) {
+ super(shardActor, shardStats, transactionID, clientTxVersion);
this.transaction = transaction;
}
import akka.dispatch.OnComplete;
import akka.japi.Creator;
import akka.pattern.Patterns;
+import akka.persistence.SaveSnapshotSuccess;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Function;
@SuppressWarnings("serial")
public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
+ final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
+
final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
TestPersistentDataProvider(DataPersistenceProvider delegate) {
dataStoreContextBuilder.persistent(persistent);
new ShardTestKit(getSystem()) {{
- final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
-
class TestShard extends Shard {
protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
}
@Override
- protected void commitSnapshot(final long sequenceNumber) {
- super.commitSnapshot(sequenceNumber);
- latch.get().countDown();
+ public void handleCommand(Object message) {
+ super.handleCommand(message);
+
+ if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
+ latch.get().countDown();
+ }
}
@Override
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
+ datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
+ datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
+ datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
+ datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
+ datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
+ datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props, "testNegativeMergeTransactionReady");
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
+ datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
short version) {
Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
- testSchemaContext, datastoreContext, shardStats, "txn", version);
+ datastoreContext, shardStats, "txn", version);
return getSystem().actorOf(props, name);
}
public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
+ datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).