The NormalizedNode snapshot payload is now streamed.
On the Shard side, added a new message CreateSnapsot to the
ShardReadTransaction to read the data tree root, serialize it, and
return a CaptureSnapshotReply message. On createSnapshot, the Shard
now sends the CreateSnapsot message to the read-only tx actor instead
of a ReadData message. This moves the serialization out of the Shard.
On the RaftActor side, the Snapshot class remained the same as it stores
a byte[] and is already Serializable. The CaptureSnapshotReply now
stores a byte[] instead of ByteString. The internal RaftActor code for
capture and apply snapshot was changed to use/pass byte[] to eliminate
the overhead of converting to/from ByteString.
Change-Id: Id12677441dce54bebbb5b71c68cf457d7c915ba1
Signed-off-by: tpantelis <tpanteli@brocade.com>
private final DataPersistenceProvider dataPersistenceProvider;
private long persistIdentifier = 1;
- private Optional<ActorRef> roleChangeNotifier;
+ private final Optional<ActorRef> roleChangeNotifier;
public ExampleActor(String id, Map<String, String> peerAddresses,
} catch (Exception e) {
LOG.error(e, "Exception in creating snapshot");
}
- getSelf().tell(new CaptureSnapshotReply(bs), null);
+ getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
}
- @Override protected void applySnapshot(ByteString snapshot) {
+ @Override protected void applySnapshot(byte [] snapshot) {
state.clear();
try {
state.putAll((HashMap) toObject(snapshot));
}
}
- private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
+ private Object toObject(byte [] bs) throws ClassNotFoundException, IOException {
Object obj = null;
ByteArrayInputStream bis = null;
ObjectInputStream ois = null;
try {
- bis = new ByteArrayInputStream(bs.toByteArray());
+ bis = new ByteArrayInputStream(bs);
ois = new ObjectInputStream(bis);
obj = ois.readObject();
} finally {
}
@Override
- protected void applyRecoverySnapshot(ByteString snapshot) {
+ protected void applyRecoverySnapshot(byte[] snapshot) {
}
}
timer.start();
// Apply the snapshot to the actors state
- applyRecoverySnapshot(ByteString.copyFrom(snapshot.getState()));
+ applyRecoverySnapshot(snapshot.getState());
timer.stop();
LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
snapshot.getLastAppliedTerm()
);
}
- applySnapshot(ByteString.copyFrom(snapshot.getState()));
+
+ applySnapshot(snapshot.getState());
//clears the followers log, sets the snapshot index to ensure adjusted-index works
replicatedLog = new ReplicatedLogImpl(snapshot);
} else if (message instanceof CaptureSnapshot) {
LOG.info("CaptureSnapshot received by actor");
- CaptureSnapshot cs = (CaptureSnapshot)message;
- captureSnapshot = cs;
- createSnapshot();
- } else if (message instanceof CaptureSnapshotReply){
- LOG.info("CaptureSnapshotReply received by actor");
- CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
+ if(captureSnapshot == null) {
+ captureSnapshot = (CaptureSnapshot)message;
+ createSnapshot();
+ }
- ByteString stateInBytes = csr.getSnapshot();
- LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
- handleCaptureSnapshotReply(stateInBytes);
+ } else if (message instanceof CaptureSnapshotReply){
+ handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
} else {
if (!(message instanceof AppendEntriesMessages.AppendEntries)
*
* @param snapshot A snapshot of the state of the actor
*/
- protected abstract void applyRecoverySnapshot(ByteString snapshot);
+ protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
/**
* This method is called during recovery at the end of a batch to apply the current batched
* operations when the derived actor is out of sync with it's peers
* and the only way to bring it in sync is by applying a snapshot
*
- * @param snapshot A snapshot of the state of the actor
+ * @param snapshotBytes A snapshot of the state of the actor
*/
- protected abstract void applySnapshot(ByteString snapshot);
+ protected abstract void applySnapshot(byte[] snapshotBytes);
/**
* This method will be called by the RaftActor when the state of the
return peerAddress;
}
- private void handleCaptureSnapshotReply(ByteString stateInBytes) {
+ private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
+ LOG.info("CaptureSnapshotReply received by actor: snapshot size {}", snapshotBytes.length);
+
// create a snapshot object from the state provided and save it
// when snapshot is saved async, SaveSnapshotSuccess is raised.
- Snapshot sn = Snapshot.create(stateInBytes.toByteArray(),
+ Snapshot sn = Snapshot.create(snapshotBytes,
context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
// this would be call straight to the leader and won't initiate in serialization
- currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(stateInBytes));
+ currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
+ ByteString.copyFrom(snapshotBytes)));
}
captureSnapshot = null;
*/
package org.opendaylight.controller.cluster.raft.base.messages;
-import com.google.protobuf.ByteString;
public class CaptureSnapshotReply {
- private ByteString snapshot;
+ private final byte [] snapshot;
- public CaptureSnapshotReply(ByteString snapshot) {
+ public CaptureSnapshotReply(byte [] snapshot) {
this.snapshot = snapshot;
}
- public ByteString getSnapshot() {
+ public byte [] getSnapshot() {
return snapshot;
}
-
- public void setSnapshot(ByteString snapshot) {
- this.snapshot = snapshot;
- }
}
package org.opendaylight.controller.cluster.raft;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
public class RaftActorTest extends AbstractActorTest {
}
@Override
- protected void applyRecoverySnapshot(ByteString snapshot) {
- delegate.applyRecoverySnapshot(snapshot);
+ protected void applyRecoverySnapshot(byte[] bytes) {
+ delegate.applyRecoverySnapshot(bytes);
try {
- Object data = toObject(snapshot);
- System.out.println("!!!!!applyRecoverySnapshot: "+data);
+ Object data = toObject(bytes);
if (data instanceof List) {
state.addAll((List<?>) data);
}
delegate.createSnapshot();
}
- @Override protected void applySnapshot(ByteString snapshot) {
+ @Override protected void applySnapshot(byte [] snapshot) {
delegate.applySnapshot(snapshot);
}
return this.getId();
}
- private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
+ private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
Object obj = null;
ByteArrayInputStream bis = null;
ObjectInputStream ois = null;
try {
- bis = new ByteArrayInputStream(bs.toByteArray());
+ bis = new ByteArrayInputStream(bs);
ois = new ObjectInputStream(bis);
obj = ois.readObject();
} finally {
mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
- verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes));
+ verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
- verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(ByteString.class));
+ verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
- mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
+ mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
verify(dataPersistenceProvider).saveSnapshot(anyObject());
verify(mockRaftActor.delegate).createSnapshot();
- mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
+ mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
- verify(mockRaftActor.delegate).applySnapshot(eq(snapshotBytes));
+ verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
assertTrue("The replicatedLog should have changed",
oldReplicatedLog != mockRaftActor.getReplicatedLog());
mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
- mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
+ mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
new Exception()));
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
-import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
*/
public class Shard extends RaftActor {
+ private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
+
private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
@VisibleForTesting
private SchemaContext schemaContext;
- private ActorRef createSnapshotTransaction;
-
private int createSnapshotTransactionCounter;
private final ShardCommitCoordinator commitCoordinator;
LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
}
- if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
- handleReadDataReply(message);
- } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+ if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
handleCreateTransaction(message);
} else if(message instanceof ForwardedReadyTransaction) {
handleForwardedReadyTransaction((ForwardedReadyTransaction)message);
}
}
- private void handleReadDataReply(final Object message) {
- // This must be for install snapshot. Don't want to open this up and trigger
- // deSerialization
-
- self().tell(new CaptureSnapshotReply(ReadDataReply.fromSerializableAsByteString(message)),
- self());
-
- createSnapshotTransaction = null;
-
- // Send a PoisonPill instead of sending close transaction because we do not really need
- // a response
- getSender().tell(PoisonPill.getInstance(), self());
- }
-
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
DOMStoreTransactionChain chain =
transactionChains.remove(closeTransactionChain.getTransactionChainId());
}
@Override
- protected void applyRecoverySnapshot(final ByteString snapshot) {
+ protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
if(recoveryCoordinator == null) {
recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
}
- recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction());
+ recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
if(LOG.isDebugEnabled()) {
LOG.debug("{} : submitted recovery sbapshot", persistenceId());
@Override
protected void createSnapshot() {
- if (createSnapshotTransaction == null) {
+ // Create a transaction actor. We are really going to treat the transaction as a worker
+ // so that this actor does not get block building the snapshot. THe transaction actor will
+ // after processing the CreateSnapshot message.
- // Create a transaction. We are really going to treat the transaction as a worker
- // so that this actor does not get block building the snapshot
- createSnapshotTransaction = createTransaction(
+ ActorRef createSnapshotTransaction = createTransaction(
TransactionProxy.TransactionType.READ_ONLY.ordinal(),
"createSnapshot" + ++createSnapshotTransactionCounter, "",
DataStoreVersions.CURRENT_VERSION);
- createSnapshotTransaction.tell(
- new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
-
- }
+ createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self());
}
@VisibleForTesting
@Override
- protected void applySnapshot(final ByteString snapshot) {
+ protected void applySnapshot(final byte[] snapshotBytes) {
// Since this will be done only on Recovery or when this actor is a Follower
// we can safely commit everything in here. We not need to worry about event notifications
// as they would have already been disabled on the follower
LOG.info("Applying snapshot");
try {
DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
- NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
- NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext)
- .decode(serializedNode);
+
+ NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
// delete everything first
- transaction.delete(YangInstanceIdentifier.builder().build());
+ transaction.delete(DATASTORE_ROOT);
// Add everything from the remote node back
- transaction.write(YangInstanceIdentifier.builder().build(), node);
+ transaction.write(DATASTORE_ROOT, node);
syncCommitTransaction(transaction);
- } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
+ } catch (InterruptedException | ExecutionException e) {
LOG.error(e, "An exception occurred when applying snapshot");
} finally {
LOG.info("Done applying snapshot");
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
* Date: 8/6/14
*/
public class ShardReadTransaction extends ShardTransaction {
+ private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
+
private final DOMStoreReadTransaction transaction;
public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
} else if (message instanceof DataExists) {
dataExists(transaction, (DataExists) message, !SERIALIZED_REPLY);
-
+ } else if (message instanceof CreateSnapshot) {
+ createSnapshot();
} else if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
readData(transaction, ReadData.fromSerializable(message), SERIALIZED_REPLY);
}
}
+ private void createSnapshot() {
+
+ // This is a special message sent by the shard to send back a serialized snapshot of the whole
+ // data store tree. This transaction was created for that purpose only so we can
+ // self-destruct after sending the reply.
+
+ final ActorRef sender = getSender();
+ final ActorRef self = getSelf();
+ final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = transaction.read(DATASTORE_ROOT);
+
+ Futures.addCallback(future, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+ @Override
+ public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
+ byte[] serialized = SerializationUtils.serializeNormalizedNode(result.get());
+ sender.tell(new CaptureSnapshotReply(serialized), self);
+
+ self.tell(PoisonPill.getInstance(), self);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ sender.tell(new akka.actor.Status.Failure(t), self);
+
+ self.tell(PoisonPill.getInstance(), self);
+ }
+ });
+ }
+
@Override
protected DOMStoreTransaction getDOMStoreTransaction() {
return transaction;
*/
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-
/**
* Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
* and journal log entry batch are de-serialized and applied to their own write transaction
/**
* Submits a snapshot.
*
- * @param snapshot the serialized snapshot
+ * @param snapshotBytes the serialized snapshot
* @param resultingTx the write Tx to which to apply the entries
*/
- void submit(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
- SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshot, resultingTx);
+ void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
+ SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx);
resultingTxList.add(resultingTx);
executor.execute(task);
}
private class SnapshotRecoveryTask extends ShardRecoveryTask {
- private final ByteString snapshot;
+ private final byte[] snapshotBytes;
- SnapshotRecoveryTask(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
+ SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
super(resultingTx);
- this.snapshot = snapshot;
+ this.snapshotBytes = snapshotBytes;
}
@Override
public void run() {
- try {
- NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
- NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext).decode(
- serializedNode);
-
- // delete everything first
- resultingTx.delete(YangInstanceIdentifier.builder().build());
-
- // Add everything from the remote node back
- resultingTx.write(YangInstanceIdentifier.builder().build(), node);
- } catch (InvalidProtocolBufferException e) {
- LOG.error("Error deserializing snapshot", e);
- }
+ NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+
+ // delete everything first
+ resultingTx.delete(YangInstanceIdentifier.builder().build());
+
+ // Add everything from the remote node back
+ resultingTx.write(YangInstanceIdentifier.builder().build(), node);
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * Message sent to a transaction actor to create a snapshot of the data store.
+ *
+ * @author Thomas Pantelis
+ */
+public class CreateSnapshot {
+ // Note: This class does not need to Serializable as it's only sent locally.
+
+ public static final CreateSnapshot INSTANCE = new CreateSnapshot();
+}
package org.opendaylight.controller.cluster.datastore.utils;
import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.IOException;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
return null;
}
+ public static NormalizedNode<?, ?> deserializeNormalizedNode(byte [] bytes) {
+ NormalizedNode<?, ?> node = null;
+ try {
+ node = deserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes)));
+ } catch(Exception e) {
+ }
+
+ if(node == null) {
+ // Must be from legacy protobuf serialization - try that.
+ try {
+ NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(bytes);
+ node = new NormalizedNodeToNodeCodec(null).decode(serializedNode);
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
+ }
+ }
+
+ return node;
+ }
+
+ public static byte [] serializeNormalizedNode(NormalizedNode<?, ?> node) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ serializeNormalizedNode(node, new DataOutputStream(bos));
+ return bos.toByteArray();
+ }
+
public static void serializePath(YangInstanceIdentifier path, DataOutput out) {
Preconditions.checkNotNull(path);
try {
import akka.dispatch.Dispatchers;
import akka.dispatch.OnComplete;
import akka.japi.Creator;
+import akka.japi.Procedure;
import akka.pattern.Patterns;
+import akka.persistence.SnapshotSelectionCriteria;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Function;
import org.mockito.InOrder;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
"testApplySnapshot");
- NormalizedNodeToNodeCodec codec =
- new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
+ InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+ store.onGlobalContextUpdated(SCHEMA_CONTEXT);
- writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
- NormalizedNode<?,?> expected = readStore(shard, root);
+ NormalizedNode<?,?> expected = readStore(store, root);
+
+ ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
+ SerializationUtils.serializeNormalizedNode(expected),
+ Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
+
+ shard.underlyingActor().onReceiveCommand(applySnapshot);
+
+ NormalizedNode<?,?> actual = readStore(shard, root);
+
+ assertEquals("Root node", expected, actual);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
+ @Test
+ public void testApplyHelium2VersionSnapshot() throws Exception {
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
+ "testApplySnapshot");
+
+ NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
+
+ InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+ store.onGlobalContextUpdated(SCHEMA_CONTEXT);
+
+ writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
+ NormalizedNode<?,?> expected = readStore(store, root);
NormalizedNodeMessages.Container encode = codec.encode(expected);
NormalizedNode<?,?> actual = readStore(shard, root);
- assertEquals(expected, actual);
+ assertEquals("Root node", expected, actual);
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
- @SuppressWarnings("serial")
@Test
public void testRecovery() throws Exception {
InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
- DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
- writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
- commitCohort.preCommit().get();
- commitCohort.commit().get();
+ writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
- NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
+ NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
- new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
- root).
- getNormalizedNode().toByteString().toByteArray(),
- Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+ SerializationUtils.serializeNormalizedNode(root),
+ Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
// Set up the InMemoryJournal.
int nListEntries = 16;
Set<Integer> listEntryKeys = new HashSet<>();
- int i = 1;
- // Add some of the legacy CompositeModificationPayload
- for(; i <= 2; i++) {
+ // Add some ModificationPayload entries
+ for(int i = 1; i <= nListEntries; i++) {
listEntryKeys.add(Integer.valueOf(i));
YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
Modification mod = new MergeModification(path,
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
- newLegacyPayload(mod)));
+ newModificationPayload(mod)));
}
- // Add some of the legacy CompositeModificationByteStringPayload
- for(; i <= 5; i++) {
+ InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
+ new ApplyLogEntries(nListEntries));
+
+ testRecovery(listEntryKeys);
+ }
+
+ @Test
+ public void testHelium2VersionRecovery() throws Exception {
+
+ // Set up the InMemorySnapshotStore.
+
+ InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
+ testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
+
+ writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
+
+ InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
+ new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root).
+ getNormalizedNode().toByteString().toByteArray(),
+ Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+
+ // Set up the InMemoryJournal.
+
+ InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
+ new WriteModification(TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
+
+ int nListEntries = 16;
+ Set<Integer> listEntryKeys = new HashSet<>();
+ int i = 1;
+
+ // Add some CompositeModificationPayload entries
+ for(; i <= 8; i++) {
listEntryKeys.add(Integer.valueOf(i));
YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
Modification mod = new MergeModification(path,
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
- newLegacyByteStringPayload(mod)));
+ newLegacyPayload(mod)));
}
- // Add some of the ModificationPayload
+ // Add some CompositeModificationByteStringPayload entries
for(; i <= nListEntries; i++) {
listEntryKeys.add(Integer.valueOf(i));
YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
Modification mod = new MergeModification(path,
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
- newModificationPayload(mod)));
+ newLegacyByteStringPayload(mod)));
}
- InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
- new ApplyLogEntries(nListEntries));
+ InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
+
+ testRecovery(listEntryKeys);
+ }
+ private void testRecovery(Set<Integer> listEntryKeys) throws Exception {
// Create the actor and wait for recovery complete.
+ int nListEntries = listEntryKeys.size();
+
final CountDownLatch recoveryComplete = new CountDownLatch(1);
+ @SuppressWarnings("serial")
Creator<Shard> creator = new Creator<Shard>() {
@Override
public Shard create() throws Exception {
}
@Test
- public void testCreateSnapshot() throws IOException, InterruptedException {
- testCreateSnapshot(true, "testCreateSnapshot");
+ public void testCreateSnapshot() throws Exception {
+ testCreateSnapshot(true, "testCreateSnapshot");
}
@Test
- public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
+ public void testCreateSnapshotWithNonPersistentData() throws Exception {
testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
}
@SuppressWarnings("serial")
- public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws IOException, InterruptedException {
- final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
- shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
+ public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
+
+ final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
+ class DelegatingPersistentDataProvider implements DataPersistenceProvider {
+ DataPersistenceProvider delegate;
+
+ DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public boolean isRecoveryApplicable() {
+ return delegate.isRecoveryApplicable();
+ }
+
+ @Override
+ public <T> void persist(T o, Procedure<T> procedure) {
+ delegate.persist(o, procedure);
+ }
+
+ @Override
+ public void saveSnapshot(Object o) {
+ savedSnapshot.set(o);
+ delegate.saveSnapshot(o);
+ }
+
+ @Override
+ public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+ delegate.deleteSnapshots(criteria);
+ }
+
+ @Override
+ public void deleteMessages(long sequenceNumber) {
+ delegate.deleteMessages(sequenceNumber);
+ }
+ }
+
+ dataStoreContextBuilder.persistent(persistent);
new ShardTestKit(getSystem()) {{
final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
public Shard create() throws Exception {
return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
newDatastoreContext(), SCHEMA_CONTEXT) {
+
+ DelegatingPersistentDataProvider delegating;
+
+ @Override
+ protected DataPersistenceProvider persistence() {
+ if(delegating == null) {
+ delegating = new DelegatingPersistentDataProvider(super.persistence());
+ }
+
+ return delegating;
+ }
+
@Override
protected void commitSnapshot(final long sequenceNumber) {
super.commitSnapshot(sequenceNumber);
waitUntilLeader(shard);
- shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
+ writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
+
+ CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1);
+ shard.tell(capture, getRef());
assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
+ assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
+ savedSnapshot.get() instanceof Snapshot);
+
+ verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+
latch.set(new CountDownLatch(1));
- shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
+ savedSnapshot.set(null);
+
+ shard.tell(capture, getRef());
assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
+ assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
+ savedSnapshot.get() instanceof Snapshot);
+
+ verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
+ private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
+
+ NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
+ assertEquals("Root node", expectedRoot, actual);
+
}};
}
static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
throws ExecutionException, InterruptedException {
- DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
+ return readStore(shard.underlyingActor().getDataStore(), id);
+ }
+
+ public static NormalizedNode<?,?> readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id)
+ throws ExecutionException, InterruptedException {
+ DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
transaction.read(id);
return node;
}
- private void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id, final NormalizedNode<?,?> node)
- throws ExecutionException, InterruptedException {
- DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
+ static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
+ final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
+ writeToStore(shard.underlyingActor().getDataStore(), id, node);
+ }
+
+ public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id,
+ final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
+ DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
transaction.write(id, node);
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
public class ShardTransactionTest extends AbstractActorTest {
- private static final InMemoryDOMDataStore store =
- new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
private static final SchemaContext testSchemaContext = TestModel.createTestContext();
private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
- @BeforeClass
- public static void staticSetup() {
+ private final InMemoryDOMDataStore store =
+ new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+
+ @Before
+ public void setup() {
store.onGlobalContextUpdated(testSchemaContext);
}
Collections.<ShardIdentifier, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
}
+ private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
+ return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION);
+ }
+
+ private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) {
+ return newTransactionActor(transaction, null, name, version);
+ }
+
+ private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) {
+ return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION);
+ }
+
+ private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
+ short version) {
+ Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
+ testSchemaContext, datastoreContext, shardStats, "txn", version);
+ return getSystem().actorOf(props, name);
+ }
+
@Test
public void testOnReceiveReadData() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
- Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
- testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
+ testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO"));
- props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
-
- testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
+ testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW"));
}
private void testOnReceiveReadData(final ActorRef transaction) {
public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
- Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
-
- testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
- props, "testReadDataWhenDataNotFoundRO"));
- props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
+ testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
+ store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
- testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
- props, "testReadDataWhenDataNotFoundRW"));
+ testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
+ store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
}
private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
@Test
public void testOnReceiveReadDataHeliumR1() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = createShard();
- Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.HELIUM_1_VERSION);
-
- ActorRef transaction = getSystem().actorOf(props, "testOnReceiveReadDataHeliumR1");
+ ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
+ "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
getRef());
public void testOnReceiveDataExistsPositive() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
- Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
-
- testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
- props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
+ testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard,
+ "testDataExistsPositiveRO"));
- testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
+ testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard,
+ "testDataExistsPositiveRW"));
}
private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
public void testOnReceiveDataExistsNegative() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
- Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
- testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
+ testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard,
+ "testDataExistsNegativeRO"));
- props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
-
- testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
+ testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard,
+ "testDataExistsNegativeRW"));
}
private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
@Test
public void testOnReceiveWriteData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
- final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveWriteData");
+ final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+ "testOnReceiveWriteData");
transaction.tell(new WriteData(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
@Test
public void testOnReceiveHeliumR1WriteData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.HELIUM_1_VERSION);
- final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1WriteData");
+ final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+ "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME));
@Test
public void testOnReceiveMergeData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
- final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
+ final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+ "testMergeData");
transaction.tell(new MergeData(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
@Test
public void testOnReceiveHeliumR1MergeData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.HELIUM_1_VERSION);
- final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1MergeData");
+ final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+ "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME));
@Test
public void testOnReceiveDeleteData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = createShard();
- final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
- final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
+ final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+ "testDeleteData");
transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(
DataStoreVersions.HELIUM_2_VERSION), getRef());
@Test
public void testOnReceiveReadyTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = createShard();
- final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
- final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
+ final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+ "testReadyTransaction");
watch(transaction);
// test
new JavaTestKit(getSystem()) {{
- final ActorRef shard = createShard();
- final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
- final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
+ final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+ "testReadyTransaction2");
watch(transaction);
expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
Terminated.class);
}};
+ }
+ @Test
+ public void testOnReceiveCreateSnapshot() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ ShardTest.writeToStore(store, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store,
+ YangInstanceIdentifier.builder().build());
+
+ final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
+ "testOnReceiveCreateSnapshot");
+
+ watch(transaction);
+
+ transaction.tell(CreateSnapshot.INSTANCE, getRef());
+
+ CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
+
+ assertNotNull("getSnapshot is null", reply.getSnapshot());
+
+ NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
+ reply.getSnapshot());
+
+ assertEquals("Root node", expectedRoot, actualRoot);
+
+ expectTerminated(duration("3 seconds"), transaction);
+ }};
}
- @SuppressWarnings("unchecked")
@Test
public void testOnReceiveCloseTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
- final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
+ final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+ "testCloseTransaction");
watch(transaction);
Duration.create(500, TimeUnit.MILLISECONDS)).build();
new JavaTestKit(getSystem()) {{
- final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
- final ActorRef transaction =
- getSystem().actorOf(props, "testShardTransactionInactivity");
+ final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+ "testShardTransactionInactivity");
watch(transaction);