From cfc94248aa44307e7dc9aaefcb6748c478f93138 Mon Sep 17 00:00:00 2001 From: tpantelis Date: Thu, 29 Jan 2015 08:35:59 -0500 Subject: [PATCH] Bug 2264: Use streaming for snapshots The NormalizedNode snapshot payload is now streamed. On the Shard side, added a new message CreateSnapsot to the ShardReadTransaction to read the data tree root, serialize it, and return a CaptureSnapshotReply message. On createSnapshot, the Shard now sends the CreateSnapsot message to the read-only tx actor instead of a ReadData message. This moves the serialization out of the Shard. On the RaftActor side, the Snapshot class remained the same as it stores a byte[] and is already Serializable. The CaptureSnapshotReply now stores a byte[] instead of ByteString. The internal RaftActor code for capture and apply snapshot was changed to use/pass byte[] to eliminate the overhead of converting to/from ByteString. Change-Id: Id12677441dce54bebbb5b71c68cf457d7c915ba1 Signed-off-by: tpantelis --- .../cluster/example/ExampleActor.java | 12 +- .../controller/cluster/raft/RaftActor.java | 35 +-- .../base/messages/CaptureSnapshotReply.java | 11 +- .../cluster/raft/RaftActorTest.java | 49 ++-- .../controller/cluster/datastore/Shard.java | 62 ++--- .../datastore/ShardReadTransaction.java | 43 +++- .../datastore/ShardRecoveryCoordinator.java | 43 ++-- .../datastore/messages/CreateSnapshot.java | 19 ++ .../datastore/utils/SerializationUtils.java | 33 +++ .../cluster/datastore/ShardTest.java | 223 ++++++++++++++---- .../datastore/ShardTransactionTest.java | 188 ++++++++------- 11 files changed, 454 insertions(+), 264 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateSnapshot.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 8f416b3abc..9aff86ba2b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -41,7 +41,7 @@ public class ExampleActor extends RaftActor { private final DataPersistenceProvider dataPersistenceProvider; private long persistIdentifier = 1; - private Optional roleChangeNotifier; + private final Optional roleChangeNotifier; public ExampleActor(String id, Map peerAddresses, @@ -127,10 +127,10 @@ public class ExampleActor extends RaftActor { } catch (Exception e) { LOG.error(e, "Exception in creating snapshot"); } - getSelf().tell(new CaptureSnapshotReply(bs), null); + getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null); } - @Override protected void applySnapshot(ByteString snapshot) { + @Override protected void applySnapshot(byte [] snapshot) { state.clear(); try { state.putAll((HashMap) toObject(snapshot)); @@ -162,12 +162,12 @@ public class ExampleActor extends RaftActor { } } - private Object toObject(ByteString bs) throws ClassNotFoundException, IOException { + private Object toObject(byte [] bs) throws ClassNotFoundException, IOException { Object obj = null; ByteArrayInputStream bis = null; ObjectInputStream ois = null; try { - bis = new ByteArrayInputStream(bs.toByteArray()); + bis = new ByteArrayInputStream(bs); ois = new ObjectInputStream(bis); obj = ois.readObject(); } finally { @@ -215,6 +215,6 @@ public class ExampleActor extends RaftActor { } @Override - protected void applyRecoverySnapshot(ByteString snapshot) { + protected void applyRecoverySnapshot(byte[] snapshot) { } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 164c2cea56..c256c822a4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -199,7 +199,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { timer.start(); // Apply the snapshot to the actors state - applyRecoverySnapshot(ByteString.copyFrom(snapshot.getState())); + applyRecoverySnapshot(snapshot.getState()); timer.stop(); LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" + @@ -317,7 +317,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { snapshot.getLastAppliedTerm() ); } - applySnapshot(ByteString.copyFrom(snapshot.getState())); + + applySnapshot(snapshot.getState()); //clears the followers log, sets the snapshot index to ensure adjusted-index works replicatedLog = new ReplicatedLogImpl(snapshot); @@ -354,17 +355,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof CaptureSnapshot) { LOG.info("CaptureSnapshot received by actor"); - CaptureSnapshot cs = (CaptureSnapshot)message; - captureSnapshot = cs; - createSnapshot(); - } else if (message instanceof CaptureSnapshotReply){ - LOG.info("CaptureSnapshotReply received by actor"); - CaptureSnapshotReply csr = (CaptureSnapshotReply) message; + if(captureSnapshot == null) { + captureSnapshot = (CaptureSnapshot)message; + createSnapshot(); + } - ByteString stateInBytes = csr.getSnapshot(); - LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size()); - handleCaptureSnapshotReply(stateInBytes); + } else if (message instanceof CaptureSnapshotReply){ + handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot()); } else { if (!(message instanceof AppendEntriesMessages.AppendEntries) @@ -583,7 +581,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * * @param snapshot A snapshot of the state of the actor */ - protected abstract void applyRecoverySnapshot(ByteString snapshot); + protected abstract void applyRecoverySnapshot(byte[] snapshotBytes); /** * This method is called during recovery at the end of a batch to apply the current batched @@ -612,9 +610,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * operations when the derived actor is out of sync with it's peers * and the only way to bring it in sync is by applying a snapshot * - * @param snapshot A snapshot of the state of the actor + * @param snapshotBytes A snapshot of the state of the actor */ - protected abstract void applySnapshot(ByteString snapshot); + protected abstract void applySnapshot(byte[] snapshotBytes); /** * This method will be called by the RaftActor when the state of the @@ -661,11 +659,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return peerAddress; } - private void handleCaptureSnapshotReply(ByteString stateInBytes) { + private void handleCaptureSnapshotReply(byte[] snapshotBytes) { + LOG.info("CaptureSnapshotReply received by actor: snapshot size {}", snapshotBytes.length); + // create a snapshot object from the state provided and save it // when snapshot is saved async, SaveSnapshotSuccess is raised. - Snapshot sn = Snapshot.create(stateInBytes.toByteArray(), + Snapshot sn = Snapshot.create(snapshotBytes, context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1), captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); @@ -687,7 +687,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) { // this would be call straight to the leader and won't initiate in serialization - currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(stateInBytes)); + currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot( + ByteString.copyFrom(snapshotBytes))); } captureSnapshot = null; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java index 96150db689..82f3e0dce0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java @@ -7,20 +7,15 @@ */ package org.opendaylight.controller.cluster.raft.base.messages; -import com.google.protobuf.ByteString; public class CaptureSnapshotReply { - private ByteString snapshot; + private final byte [] snapshot; - public CaptureSnapshotReply(ByteString snapshot) { + public CaptureSnapshotReply(byte [] snapshot) { this.snapshot = snapshot; } - public ByteString getSnapshot() { + public byte [] getSnapshot() { return snapshot; } - - public void setSnapshot(ByteString snapshot) { - this.snapshot = snapshot; - } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index d999bb2ba1..6b266d710e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -1,5 +1,17 @@ package org.opendaylight.controller.cluster.raft; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; @@ -57,18 +69,6 @@ import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; public class RaftActorTest extends AbstractActorTest { @@ -177,11 +177,10 @@ public class RaftActorTest extends AbstractActorTest { } @Override - protected void applyRecoverySnapshot(ByteString snapshot) { - delegate.applyRecoverySnapshot(snapshot); + protected void applyRecoverySnapshot(byte[] bytes) { + delegate.applyRecoverySnapshot(bytes); try { - Object data = toObject(snapshot); - System.out.println("!!!!!applyRecoverySnapshot: "+data); + Object data = toObject(bytes); if (data instanceof List) { state.addAll((List) data); } @@ -194,7 +193,7 @@ public class RaftActorTest extends AbstractActorTest { delegate.createSnapshot(); } - @Override protected void applySnapshot(ByteString snapshot) { + @Override protected void applySnapshot(byte [] snapshot) { delegate.applySnapshot(snapshot); } @@ -216,12 +215,12 @@ public class RaftActorTest extends AbstractActorTest { return this.getId(); } - private Object toObject(ByteString bs) throws ClassNotFoundException, IOException { + private Object toObject(byte[] bs) throws ClassNotFoundException, IOException { Object obj = null; ByteArrayInputStream bis = null; ObjectInputStream ois = null; try { - bis = new ByteArrayInputStream(bs.toByteArray()); + bis = new ByteArrayInputStream(bs); ois = new ObjectInputStream(bis); obj = ois.readObject(); } finally { @@ -431,7 +430,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); - verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes)); + verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray())); mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); @@ -500,7 +499,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); - verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(ByteString.class)); + verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class)); mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); @@ -676,7 +675,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes)); + mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); verify(dataPersistenceProvider).saveSnapshot(anyObject()); @@ -722,7 +721,7 @@ public class RaftActorTest extends AbstractActorTest { verify(mockRaftActor.delegate).createSnapshot(); - mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes)); + mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100))); @@ -814,7 +813,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot)); - verify(mockRaftActor.delegate).applySnapshot(eq(snapshotBytes)); + verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState())); assertTrue("The replicatedLog should have changed", oldReplicatedLog != mockRaftActor.getReplicatedLog()); @@ -859,7 +858,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1)); - mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes)); + mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L), new Exception())); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index dea377a810..9cd758ba30 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; -import akka.actor.PoisonPill; import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; @@ -25,8 +24,6 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.util.Collection; import java.util.HashMap; @@ -52,13 +49,12 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; -import org.opendaylight.controller.cluster.datastore.messages.ReadData; -import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; @@ -66,18 +62,16 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; -import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; +import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; -import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory; @@ -97,6 +91,8 @@ import scala.concurrent.duration.FiniteDuration; */ public class Shard extends RaftActor { + private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build(); + private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck"; @VisibleForTesting @@ -124,8 +120,6 @@ public class Shard extends RaftActor { private SchemaContext schemaContext; - private ActorRef createSnapshotTransaction; - private int createSnapshotTransactionCounter; private final ShardCommitCoordinator commitCoordinator; @@ -244,9 +238,7 @@ public class Shard extends RaftActor { LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender()); } - if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { - handleReadDataReply(message); - } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) { + if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) { handleCreateTransaction(message); } else if(message instanceof ForwardedReadyTransaction) { handleForwardedReadyTransaction((ForwardedReadyTransaction)message); @@ -477,20 +469,6 @@ public class Shard extends RaftActor { } } - private void handleReadDataReply(final Object message) { - // This must be for install snapshot. Don't want to open this up and trigger - // deSerialization - - self().tell(new CaptureSnapshotReply(ReadDataReply.fromSerializableAsByteString(message)), - self()); - - createSnapshotTransaction = null; - - // Send a PoisonPill instead of sending close transaction because we do not really need - // a response - getSender().tell(PoisonPill.getInstance(), self()); - } - private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) { DOMStoreTransactionChain chain = transactionChains.remove(closeTransactionChain.getTransactionChainId()); @@ -702,12 +680,12 @@ public class Shard extends RaftActor { } @Override - protected void applyRecoverySnapshot(final ByteString snapshot) { + protected void applyRecoverySnapshot(final byte[] snapshotBytes) { if(recoveryCoordinator == null) { recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext); } - recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction()); + recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction()); if(LOG.isDebugEnabled()) { LOG.debug("{} : submitted recovery sbapshot", persistenceId()); @@ -824,24 +802,21 @@ public class Shard extends RaftActor { @Override protected void createSnapshot() { - if (createSnapshotTransaction == null) { + // Create a transaction actor. We are really going to treat the transaction as a worker + // so that this actor does not get block building the snapshot. THe transaction actor will + // after processing the CreateSnapshot message. - // Create a transaction. We are really going to treat the transaction as a worker - // so that this actor does not get block building the snapshot - createSnapshotTransaction = createTransaction( + ActorRef createSnapshotTransaction = createTransaction( TransactionProxy.TransactionType.READ_ONLY.ordinal(), "createSnapshot" + ++createSnapshotTransactionCounter, "", DataStoreVersions.CURRENT_VERSION); - createSnapshotTransaction.tell( - new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self()); - - } + createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self()); } @VisibleForTesting @Override - protected void applySnapshot(final ByteString snapshot) { + protected void applySnapshot(final byte[] snapshotBytes) { // Since this will be done only on Recovery or when this actor is a Follower // we can safely commit everything in here. We not need to worry about event notifications // as they would have already been disabled on the follower @@ -849,17 +824,16 @@ public class Shard extends RaftActor { LOG.info("Applying snapshot"); try { DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); - NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot); - NormalizedNode node = new NormalizedNodeToNodeCodec(schemaContext) - .decode(serializedNode); + + NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); // delete everything first - transaction.delete(YangInstanceIdentifier.builder().build()); + transaction.delete(DATASTORE_ROOT); // Add everything from the remote node back - transaction.write(YangInstanceIdentifier.builder().build(), node); + transaction.write(DATASTORE_ROOT, node); syncCommitTransaction(transaction); - } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) { + } catch (InterruptedException | ExecutionException e) { LOG.error(e, "An exception occurred when applying snapshot"); } finally { LOG.info("Done applying snapshot"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java index be9c4d80e3..6f8d0567d9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java @@ -11,11 +11,21 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; +import akka.actor.PoisonPill; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; +import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.ReadData; +import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** @@ -23,6 +33,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; * Date: 8/6/14 */ public class ShardReadTransaction extends ShardTransaction { + private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build(); + private final DOMStoreReadTransaction transaction; public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor, @@ -39,7 +51,8 @@ public class ShardReadTransaction extends ShardTransaction { } else if (message instanceof DataExists) { dataExists(transaction, (DataExists) message, !SERIALIZED_REPLY); - + } else if (message instanceof CreateSnapshot) { + createSnapshot(); } else if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) { readData(transaction, ReadData.fromSerializable(message), SERIALIZED_REPLY); @@ -51,6 +64,34 @@ public class ShardReadTransaction extends ShardTransaction { } } + private void createSnapshot() { + + // This is a special message sent by the shard to send back a serialized snapshot of the whole + // data store tree. This transaction was created for that purpose only so we can + // self-destruct after sending the reply. + + final ActorRef sender = getSender(); + final ActorRef self = getSelf(); + final ListenableFuture>> future = transaction.read(DATASTORE_ROOT); + + Futures.addCallback(future, new FutureCallback>>() { + @Override + public void onSuccess(Optional> result) { + byte[] serialized = SerializationUtils.serializeNormalizedNode(result.get()); + sender.tell(new CaptureSnapshotReply(serialized), self); + + self.tell(PoisonPill.getInstance(), self); + } + + @Override + public void onFailure(Throwable t) { + sender.tell(new akka.actor.Status.Failure(t), self); + + self.tell(PoisonPill.getInstance(), self); + } + }); + } + @Override protected DOMStoreTransaction getDOMStoreTransaction() { return transaction; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index 9457456205..238b4e46dc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -7,16 +7,16 @@ */ package org.opendaylight.controller.cluster.datastore; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; - import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; -import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; -import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; +import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -24,11 +24,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; - /** * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot * and journal log entry batch are de-serialized and applied to their own write transaction @@ -73,11 +68,11 @@ class ShardRecoveryCoordinator { /** * Submits a snapshot. * - * @param snapshot the serialized snapshot + * @param snapshotBytes the serialized snapshot * @param resultingTx the write Tx to which to apply the entries */ - void submit(ByteString snapshot, DOMStoreWriteTransaction resultingTx) { - SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshot, resultingTx); + void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) { + SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx); resultingTxList.add(resultingTx); executor.execute(task); } @@ -130,28 +125,22 @@ class ShardRecoveryCoordinator { private class SnapshotRecoveryTask extends ShardRecoveryTask { - private final ByteString snapshot; + private final byte[] snapshotBytes; - SnapshotRecoveryTask(ByteString snapshot, DOMStoreWriteTransaction resultingTx) { + SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) { super(resultingTx); - this.snapshot = snapshot; + this.snapshotBytes = snapshotBytes; } @Override public void run() { - try { - NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot); - NormalizedNode node = new NormalizedNodeToNodeCodec(schemaContext).decode( - serializedNode); - - // delete everything first - resultingTx.delete(YangInstanceIdentifier.builder().build()); - - // Add everything from the remote node back - resultingTx.write(YangInstanceIdentifier.builder().build(), node); - } catch (InvalidProtocolBufferException e) { - LOG.error("Error deserializing snapshot", e); - } + NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); + + // delete everything first + resultingTx.delete(YangInstanceIdentifier.builder().build()); + + // Add everything from the remote node back + resultingTx.write(YangInstanceIdentifier.builder().build(), node); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateSnapshot.java new file mode 100644 index 0000000000..c0d19afe10 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateSnapshot.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +/** + * Message sent to a transaction actor to create a snapshot of the data store. + * + * @author Thomas Pantelis + */ +public class CreateSnapshot { + // Note: This class does not need to Serializable as it's only sent locally. + + public static final CreateSnapshot INSTANCE = new CreateSnapshot(); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java index 189bbea2ef..5854932a6f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java @@ -8,11 +8,18 @@ package org.opendaylight.controller.cluster.datastore.utils; import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; +import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader; import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -99,6 +106,32 @@ public final class SerializationUtils { return null; } + public static NormalizedNode deserializeNormalizedNode(byte [] bytes) { + NormalizedNode node = null; + try { + node = deserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes))); + } catch(Exception e) { + } + + if(node == null) { + // Must be from legacy protobuf serialization - try that. + try { + NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(bytes); + node = new NormalizedNodeToNodeCodec(null).decode(serializedNode); + } catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException("Error deserializing NormalizedNode", e); + } + } + + return node; + } + + public static byte [] serializeNormalizedNode(NormalizedNode node) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + serializeNormalizedNode(node, new DataOutputStream(bos)); + return bos.toByteArray(); + } + public static void serializePath(YangInstanceIdentifier path, DataOutput out) { Preconditions.checkNotNull(path); try { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 14fc3a12bd..94b9698abf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -17,7 +17,9 @@ import akka.actor.Props; import akka.dispatch.Dispatchers; import akka.dispatch.OnComplete; import akka.japi.Creator; +import akka.japi.Procedure; import akka.pattern.Patterns; +import akka.persistence.SnapshotSelectionCriteria; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Function; @@ -43,6 +45,7 @@ import org.junit.Test; import org.mockito.InOrder; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; @@ -67,6 +70,7 @@ import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCo import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal; import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; +import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.Snapshot; @@ -364,13 +368,41 @@ public class ShardTest extends AbstractActorTest { TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplySnapshot"); - NormalizedNodeToNodeCodec codec = - new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT); + InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); + store.onGlobalContextUpdated(SCHEMA_CONTEXT); - writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); YangInstanceIdentifier root = YangInstanceIdentifier.builder().build(); - NormalizedNode expected = readStore(shard, root); + NormalizedNode expected = readStore(store, root); + + ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create( + SerializationUtils.serializeNormalizedNode(expected), + Collections.emptyList(), 1, 2, 3, 4)); + + shard.underlyingActor().onReceiveCommand(applySnapshot); + + NormalizedNode actual = readStore(shard, root); + + assertEquals("Root node", expected, actual); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + @Test + public void testApplyHelium2VersionSnapshot() throws Exception { + TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), + "testApplySnapshot"); + + NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT); + + InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); + store.onGlobalContextUpdated(SCHEMA_CONTEXT); + + writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + YangInstanceIdentifier root = YangInstanceIdentifier.builder().build(); + NormalizedNode expected = readStore(store, root); NormalizedNodeMessages.Container encode = codec.encode(expected); @@ -382,7 +414,7 @@ public class ShardTest extends AbstractActorTest { NormalizedNode actual = readStore(shard, root); - assertEquals(expected, actual); + assertEquals("Root node", expected, actual); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } @@ -423,7 +455,6 @@ public class ShardTest extends AbstractActorTest { shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } - @SuppressWarnings("serial") @Test public void testRecovery() throws Exception { @@ -432,20 +463,13 @@ public class ShardTest extends AbstractActorTest { InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null); testStore.onGlobalContextUpdated(SCHEMA_CONTEXT); - DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction(); - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready(); - commitCohort.preCommit().get(); - commitCohort.commit().get(); + writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction(); - NormalizedNode root = readTx.read(YangInstanceIdentifier.builder().build()).get().get(); + NormalizedNode root = readStore(testStore, YangInstanceIdentifier.builder().build()); InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create( - new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode( - root). - getNormalizedNode().toByteString().toByteArray(), - Collections.emptyList(), 0, 1, -1, -1)); + SerializationUtils.serializeNormalizedNode(root), + Collections.emptyList(), 0, 1, -1, -1)); // Set up the InMemoryJournal. @@ -455,31 +479,63 @@ public class ShardTest extends AbstractActorTest { int nListEntries = 16; Set listEntryKeys = new HashSet<>(); - int i = 1; - // Add some of the legacy CompositeModificationPayload - for(; i <= 2; i++) { + // Add some ModificationPayload entries + for(int i = 1; i <= nListEntries; i++) { listEntryKeys.add(Integer.valueOf(i)); YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); Modification mod = new MergeModification(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, - newLegacyPayload(mod))); + newModificationPayload(mod))); } - // Add some of the legacy CompositeModificationByteStringPayload - for(; i <= 5; i++) { + InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, + new ApplyLogEntries(nListEntries)); + + testRecovery(listEntryKeys); + } + + @Test + public void testHelium2VersionRecovery() throws Exception { + + // Set up the InMemorySnapshotStore. + + InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null); + testStore.onGlobalContextUpdated(SCHEMA_CONTEXT); + + writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + NormalizedNode root = readStore(testStore, YangInstanceIdentifier.builder().build()); + + InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create( + new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root). + getNormalizedNode().toByteString().toByteArray(), + Collections.emptyList(), 0, 1, -1, -1)); + + // Set up the InMemoryJournal. + + InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload( + new WriteModification(TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build())))); + + int nListEntries = 16; + Set listEntryKeys = new HashSet<>(); + int i = 1; + + // Add some CompositeModificationPayload entries + for(; i <= 8; i++) { listEntryKeys.add(Integer.valueOf(i)); YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); Modification mod = new MergeModification(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, - newLegacyByteStringPayload(mod))); + newLegacyPayload(mod))); } - // Add some of the ModificationPayload + // Add some CompositeModificationByteStringPayload entries for(; i <= nListEntries; i++) { listEntryKeys.add(Integer.valueOf(i)); YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) @@ -487,16 +543,22 @@ public class ShardTest extends AbstractActorTest { Modification mod = new MergeModification(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, - newModificationPayload(mod))); + newLegacyByteStringPayload(mod))); } - InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, - new ApplyLogEntries(nListEntries)); + InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries)); + + testRecovery(listEntryKeys); + } + private void testRecovery(Set listEntryKeys) throws Exception { // Create the actor and wait for recovery complete. + int nListEntries = listEntryKeys.size(); + final CountDownLatch recoveryComplete = new CountDownLatch(1); + @SuppressWarnings("serial") Creator creator = new Creator() { @Override public Shard create() throws Exception { @@ -1319,19 +1381,54 @@ public class ShardTest extends AbstractActorTest { } @Test - public void testCreateSnapshot() throws IOException, InterruptedException { - testCreateSnapshot(true, "testCreateSnapshot"); + public void testCreateSnapshot() throws Exception { + testCreateSnapshot(true, "testCreateSnapshot"); } @Test - public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException { + public void testCreateSnapshotWithNonPersistentData() throws Exception { testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData"); } @SuppressWarnings("serial") - public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws IOException, InterruptedException { - final DatastoreContext dataStoreContext = DatastoreContext.newBuilder(). - shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build(); + public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{ + + final AtomicReference savedSnapshot = new AtomicReference<>(); + class DelegatingPersistentDataProvider implements DataPersistenceProvider { + DataPersistenceProvider delegate; + + DelegatingPersistentDataProvider(DataPersistenceProvider delegate) { + this.delegate = delegate; + } + + @Override + public boolean isRecoveryApplicable() { + return delegate.isRecoveryApplicable(); + } + + @Override + public void persist(T o, Procedure procedure) { + delegate.persist(o, procedure); + } + + @Override + public void saveSnapshot(Object o) { + savedSnapshot.set(o); + delegate.saveSnapshot(o); + } + + @Override + public void deleteSnapshots(SnapshotSelectionCriteria criteria) { + delegate.deleteSnapshots(criteria); + } + + @Override + public void deleteMessages(long sequenceNumber) { + delegate.deleteMessages(sequenceNumber); + } + } + + dataStoreContextBuilder.persistent(persistent); new ShardTestKit(getSystem()) {{ final AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); @@ -1340,6 +1437,18 @@ public class ShardTest extends AbstractActorTest { public Shard create() throws Exception { return new Shard(shardID, Collections.emptyMap(), newDatastoreContext(), SCHEMA_CONTEXT) { + + DelegatingPersistentDataProvider delegating; + + @Override + protected DataPersistenceProvider persistence() { + if(delegating == null) { + delegating = new DelegatingPersistentDataProvider(super.persistence()); + } + + return delegating; + } + @Override protected void commitSnapshot(final long sequenceNumber) { super.commitSnapshot(sequenceNumber); @@ -1354,16 +1463,40 @@ public class ShardTest extends AbstractActorTest { waitUntilLeader(shard); - shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef()); + writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + NormalizedNode expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build()); + + CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1); + shard.tell(capture, getRef()); assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); + assertTrue("Invalid saved snapshot " + savedSnapshot.get(), + savedSnapshot.get() instanceof Snapshot); + + verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot); + latch.set(new CountDownLatch(1)); - shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef()); + savedSnapshot.set(null); + + shard.tell(capture, getRef()); assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); + assertTrue("Invalid saved snapshot " + savedSnapshot.get(), + savedSnapshot.get() instanceof Snapshot); + + verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot); + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + private void verifySnapshot(Snapshot snapshot, NormalizedNode expectedRoot) { + + NormalizedNode actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState()); + assertEquals("Root node", expectedRoot, actual); + }}; } @@ -1470,7 +1603,12 @@ public class ShardTest extends AbstractActorTest { static NormalizedNode readStore(final TestActorRef shard, final YangInstanceIdentifier id) throws ExecutionException, InterruptedException { - DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction(); + return readStore(shard.underlyingActor().getDataStore(), id); + } + + public static NormalizedNode readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id) + throws ExecutionException, InterruptedException { + DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); CheckedFuture>, ReadFailedException> future = transaction.read(id); @@ -1483,9 +1621,14 @@ public class ShardTest extends AbstractActorTest { return node; } - private void writeToStore(final TestActorRef shard, final YangInstanceIdentifier id, final NormalizedNode node) - throws ExecutionException, InterruptedException { - DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction(); + static void writeToStore(final TestActorRef shard, final YangInstanceIdentifier id, + final NormalizedNode node) throws ExecutionException, InterruptedException { + writeToStore(shard.underlyingActor().getDataStore(), id, node); + } + + public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id, + final NormalizedNode node) throws ExecutionException, InterruptedException { + DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); transaction.write(id, node); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index efae106617..69dd706f37 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -12,7 +12,7 @@ import akka.testkit.TestActorRef; import com.google.common.util.concurrent.MoreExecutors; import java.util.Collections; import java.util.concurrent.TimeUnit; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply; import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; @@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; @@ -39,17 +40,19 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded; +import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; public class ShardTransactionTest extends AbstractActorTest { - private static final InMemoryDOMDataStore store = - new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); private static final SchemaContext testSchemaContext = TestModel.createTestContext(); @@ -61,8 +64,11 @@ public class ShardTransactionTest extends AbstractActorTest { private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore"); - @BeforeClass - public static void staticSetup() { + private final InMemoryDOMDataStore store = + new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); + + @Before + public void setup() { store.onGlobalContextUpdated(testSchemaContext); } @@ -71,21 +77,33 @@ public class ShardTransactionTest extends AbstractActorTest { Collections.emptyMap(), datastoreContext, TestModel.createTestContext())); } + private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) { + return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION); + } + + private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) { + return newTransactionActor(transaction, null, name, version); + } + + private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) { + return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION); + } + + private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name, + short version) { + Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(), + testSchemaContext, datastoreContext, shardStats, "txn", version); + return getSystem().actorOf(props, name); + } + @Test public void testOnReceiveReadData() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); - Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); - testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO")); + testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO")); - props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); - - testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW")); + testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW")); } private void testOnReceiveReadData(final ActorRef transaction) { @@ -111,19 +129,12 @@ public class ShardTransactionTest extends AbstractActorTest { public void testOnReceiveReadDataWhenDataNotFound() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); - Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); - - testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf( - props, "testReadDataWhenDataNotFoundRO")); - props = ShardTransaction.props( store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + testOnReceiveReadDataWhenDataNotFound(newTransactionActor( + store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO")); - testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf( - props, "testReadDataWhenDataNotFoundRW")); + testOnReceiveReadDataWhenDataNotFound(newTransactionActor( + store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW")); } private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) { @@ -147,12 +158,8 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveReadDataHeliumR1() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = createShard(); - Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.HELIUM_1_VERSION); - - ActorRef transaction = getSystem().actorOf(props, "testOnReceiveReadDataHeliumR1"); + ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(), + "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION); transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), getRef()); @@ -168,17 +175,12 @@ public class ShardTransactionTest extends AbstractActorTest { public void testOnReceiveDataExistsPositive() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); - Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); - - testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO")); - props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard, + "testDataExistsPositiveRO")); - testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW")); + testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard, + "testDataExistsPositiveRW")); } private void testOnReceiveDataExistsPositive(final ActorRef transaction) { @@ -203,17 +205,12 @@ public class ShardTransactionTest extends AbstractActorTest { public void testOnReceiveDataExistsNegative() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); - Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); - testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO")); + testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard, + "testDataExistsNegativeRO")); - props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); - - testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW")); + testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard, + "testDataExistsNegativeRW")); } private void testOnReceiveDataExistsNegative(final ActorRef transaction) { @@ -249,11 +246,8 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveWriteData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = createShard(); - final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); - final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveWriteData"); + final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + "testOnReceiveWriteData"); transaction.tell(new WriteData(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable( @@ -275,11 +269,8 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveHeliumR1WriteData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = createShard(); - final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.HELIUM_1_VERSION); - final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1WriteData"); + final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION); Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -298,11 +289,8 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveMergeData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = createShard(); - final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); - final ActorRef transaction = getSystem().actorOf(props, "testMergeData"); + final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(), + "testMergeData"); transaction.tell(new MergeData(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable( @@ -324,11 +312,8 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveHeliumR1MergeData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = createShard(); - final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.HELIUM_1_VERSION); - final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1MergeData"); + final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION); Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -347,11 +332,8 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveDeleteData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = createShard(); - final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); - final ActorRef transaction = getSystem().actorOf(props, "testDeleteData"); + final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + "testDeleteData"); transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable( DataStoreVersions.HELIUM_2_VERSION), getRef()); @@ -371,11 +353,8 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveReadyTransaction() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = createShard(); - final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); - final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction"); + final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(), + "testReadyTransaction"); watch(transaction); @@ -389,11 +368,8 @@ public class ShardTransactionTest extends AbstractActorTest { // test new JavaTestKit(getSystem()) {{ - final ActorRef shard = createShard(); - final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); - final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2"); + final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(), + "testReadyTransaction2"); watch(transaction); @@ -404,18 +380,42 @@ public class ShardTransactionTest extends AbstractActorTest { expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class, Terminated.class); }}; + } + @Test + public void testOnReceiveCreateSnapshot() throws Exception { + new JavaTestKit(getSystem()) {{ + ShardTest.writeToStore(store, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + NormalizedNode expectedRoot = ShardTest.readStore(store, + YangInstanceIdentifier.builder().build()); + + final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(), + "testOnReceiveCreateSnapshot"); + + watch(transaction); + + transaction.tell(CreateSnapshot.INSTANCE, getRef()); + + CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class); + + assertNotNull("getSnapshot is null", reply.getSnapshot()); + + NormalizedNode actualRoot = SerializationUtils.deserializeNormalizedNode( + reply.getSnapshot()); + + assertEquals("Root node", expectedRoot, actualRoot); + + expectTerminated(duration("3 seconds"), transaction); + }}; } - @SuppressWarnings("unchecked") @Test public void testOnReceiveCloseTransaction() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = createShard(); - final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); - final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction"); + final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(), + "testCloseTransaction"); watch(transaction); @@ -445,12 +445,8 @@ public class ShardTransactionTest extends AbstractActorTest { Duration.create(500, TimeUnit.MILLISECONDS)).build(); new JavaTestKit(getSystem()) {{ - final ActorRef shard = createShard(); - final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); - final ActorRef transaction = - getSystem().actorOf(props, "testShardTransactionInactivity"); + final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(), + "testShardTransactionInactivity"); watch(transaction); -- 2.36.6