From 2faf656bf68dd3843fd59520b27a7ec2abbdcc68 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 17 Jan 2017 13:24:28 -0500 Subject: [PATCH] Bug 7521: Convert Snapshot to store a State instance Created a new Snapshot class that stores the state as a Serializable State instance instead of a byte[] to lay the groundwork for handling snapshots > 2G. In this manner, State implementations can serialize their data directly to the ObjectOutputStream. The previous Snapshot class was deprecated but, unfortunately, it can't simply "read resolve" to the new class on de-serialization as the deserialization code doesn't know how to convert the data from byte[] to a State instance. Therefore a new method, deserializePreCarbonSnapshot, was added to RaftActorRecoveryCohort. The introduction of the State interface necessitated changes throughout the snapshot capture and snapshot recovery code paths. The Shard implementations were also changed accordingly. For follower snapshot install, an optional OutputStream is also passed to createSnapshot. The new API contract is: if the OutputStream is present, the implementation must serialize its state to the OutputStream and return the stream instance in the CaptureSnapshotReply along with the snapshot State instance. The snapshot State is serialized directly to the snapshot store while the OutputStream is used to send the state data to follower(s) in chunks. The deserializeSnapshot method is used to convert the serialized data back to a State instance on the follower end. The serialization for snapshot install is passed off so the cost of serialization is not charged to the raft actor's thread. The OutputStream is converted to a ByteSource when passed to AbstractLeader. AbstractLeader still converts it to a byte[] but will be changed to use the ByteSource's InputStream to chunk the data in a subsequent patch. Also the SnapshotManager currently passes a ByteArrayOutputStream to createSnapshot but this will be changed to a FileBackedOutputStream in a subsequent patch. Change-Id: I2ea30870b54478d7ef5669335ca4b444539f8d56 Signed-off-by: Tom Pantelis --- .../md-sal/sal-akka-raft-example/pom.xml | 5 + .../cluster/example/ExampleActor.java | 91 ++++----- .../cluster/raft/GetSnapshotReplyActor.java | 4 +- .../raft/NoopRaftActorSnapshotCohort.java | 41 ++++ .../controller/cluster/raft/RaftActor.java | 2 +- .../cluster/raft/RaftActorRecoveryCohort.java | 16 +- .../raft/RaftActorRecoverySupport.java | 28 ++- .../cluster/raft/RaftActorSnapshotCohort.java | 30 ++- .../raft/RaftActorSnapshotMessageSupport.java | 29 ++- .../cluster/raft/ReplicatedLogImpl.java | 1 + .../controller/cluster/raft/Snapshot.java | 3 + .../cluster/raft/SnapshotManager.java | 72 ++++--- .../cluster/raft/SnapshotState.java | 9 +- .../raft/base/messages/ApplySnapshot.java | 2 +- .../raft/base/messages/CaptureSnapshot.java | 16 +- .../base/messages/CaptureSnapshotReply.java | 31 +-- .../base/messages/SendInstallSnapshot.java | 11 +- .../raft/behaviors/AbstractLeader.java | 44 +++-- .../cluster/raft/behaviors/Follower.java | 10 +- .../cluster/raft/persisted/EmptyState.java | 27 +++ .../cluster/raft/persisted/Snapshot.java | 180 ++++++++++++++++++ .../AbstractRaftActorIntegrationTest.java | 19 +- .../cluster/raft/MigratedMessagesTest.java | 83 ++++++-- .../cluster/raft/MockRaftActor.java | 123 ++++++++---- .../cluster/raft/MockRaftActorContext.java | 23 ++- .../raft/RaftActorRecoverySupportTest.java | 76 +++++++- ...ftActorServerConfigurationSupportTest.java | 35 ++-- .../RaftActorSnapshotMessageSupportTest.java | 18 +- .../cluster/raft/RaftActorTest.java | 81 ++++---- .../RecoveryIntegrationSingleNodeTest.java | 4 +- ...eplicationAndSnapshotsIntegrationTest.java | 1 + ...otsWithLaggingFollowerIntegrationTest.java | 4 +- .../cluster/raft/SnapshotManagerTest.java | 115 ++++++----- .../controller/cluster/raft/SnapshotTest.java | 1 + .../cluster/raft/behaviors/FollowerTest.java | 32 ++-- .../cluster/raft/behaviors/LeaderTest.java | 59 +++--- .../cluster/raft/persisted/ByteState.java | 70 +++++++ .../raft/persisted/EmptyStateTest.java | 28 +++ .../cluster/raft/persisted/SnapshotTest.java | 59 ++++++ .../datastore/ShardRecoveryCoordinator.java | 36 ++-- .../datastore/ShardSnapshotCohort.java | 34 ++-- .../datastore/actors/ShardSnapshotActor.java | 50 ++++- ...bstractVersionedShardDataTreeSnapshot.java | 16 +- .../PreBoronShardDataTreeSnapshot.java | 11 +- .../persisted/ShardDataTreeSnapshot.java | 40 ++-- .../persisted/ShardSnapshotState.java | 104 ++++++++++ .../cluster/datastore/AbstractShardTest.java | 7 +- .../DistributedDataStoreIntegrationTest.java | 53 +++++- ...butedDataStoreRemotingIntegrationTest.java | 61 ++++++ .../ShardRecoveryCoordinatorTest.java | 9 +- .../cluster/datastore/ShardTest.java | 13 +- .../actors/ShardSnapshotActorTest.java | 44 +++-- .../persisted/ShardDataTreeSnapshotTest.java | 20 +- .../persisted/ShardSnapshotStateTest.java | 42 ++++ 54 files changed, 1549 insertions(+), 474 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/NoopRaftActorSnapshotCohort.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/EmptyState.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/Snapshot.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/ByteState.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/EmptyStateTest.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/SnapshotTest.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotState.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotStateTest.java diff --git a/opendaylight/md-sal/sal-akka-raft-example/pom.xml b/opendaylight/md-sal/sal-akka-raft-example/pom.xml index 315349a90c..8667253a2d 100644 --- a/opendaylight/md-sal/sal-akka-raft-example/pom.xml +++ b/opendaylight/md-sal/sal-akka-raft-example/pom.xml @@ -19,6 +19,11 @@ org.slf4j slf4j-simple + + + com.google.guava + guava + diff --git a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 6e8051fe55..663d400b53 100644 --- a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -11,15 +11,15 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.Props; import com.google.common.base.Optional; -import com.google.protobuf.ByteString; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; +import com.google.common.base.Throwables; +import com.google.common.io.ByteSource; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; import javax.annotation.Nonnull; +import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; @@ -32,6 +32,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.behaviors.Leader; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.util.AbstractStringIdentifier; @@ -131,21 +132,23 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, } @Override - public void createSnapshot(ActorRef actorRef) { - ByteString bs = null; + public void createSnapshot(ActorRef actorRef, java.util.Optional installSnapshotStream) { try { - bs = fromObject(state); + if (installSnapshotStream.isPresent()) { + SerializationUtils.serialize((Serializable) state, installSnapshotStream.get()); + } } catch (Exception e) { LOG.error("Exception in creating snapshot", e); } - getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null); + + getSelf().tell(new CaptureSnapshotReply(new MapState(state), installSnapshotStream), null); } @Override - public void applySnapshot(byte [] snapshot) { + public void applySnapshot(Snapshot.State snapshotState) { state.clear(); try { - state.putAll((HashMap) toObject(snapshot)); + state.putAll(((MapState)snapshotState).state); } catch (Exception e) { LOG.error("Exception in applying snapshot", e); } @@ -154,45 +157,6 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, } } - private static ByteString fromObject(Object snapshot) throws Exception { - ByteArrayOutputStream b = null; - ObjectOutputStream o = null; - try { - b = new ByteArrayOutputStream(); - o = new ObjectOutputStream(b); - o.writeObject(snapshot); - byte[] snapshotBytes = b.toByteArray(); - return ByteString.copyFrom(snapshotBytes); - } finally { - if (o != null) { - o.flush(); - o.close(); - } - if (b != null) { - b.close(); - } - } - } - - private static Object toObject(byte [] bs) throws ClassNotFoundException, IOException { - Object obj = null; - ByteArrayInputStream bis = null; - ObjectInputStream ois = null; - try { - bis = new ByteArrayInputStream(bs); - ois = new ObjectInputStream(bis); - obj = ois.readObject(); - } finally { - if (bis != null) { - bis.close(); - } - if (ois != null) { - ois.close(); - } - } - return obj; - } - @Override protected void onStateChanged() { } @@ -224,7 +188,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, } @Override - public void applyRecoverySnapshot(byte[] snapshot) { + public void applyRecoverySnapshot(Snapshot.State snapshotState) { } @Override @@ -236,4 +200,29 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, public byte[] getRestoreFromSnapshot() { return null; } + + @Override + public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) { + try { + return deserializePreCarbonSnapshot(snapshotBytes.read()); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @SuppressWarnings("unchecked") + @Override + public Snapshot.State deserializePreCarbonSnapshot(byte[] from) { + return new MapState((Map) SerializationUtils.deserialize(from)); + } + + private static class MapState implements Snapshot.State { + private static final long serialVersionUID = 1L; + + Map state; + + MapState(Map state) { + this.state = state; + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java index 3aac07f6da..c94b780b04 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java @@ -19,6 +19,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; @@ -42,7 +43,8 @@ class GetSnapshotReplyActor extends UntypedActor { @Override public void onReceive(Object message) { if (message instanceof CaptureSnapshotReply) { - Snapshot snapshot = Snapshot.create(((CaptureSnapshotReply)message).getSnapshot(), + Snapshot snapshot = Snapshot.create( + ((CaptureSnapshotReply)message).getSnapshotState(), params.captureSnapshot.getUnAppliedEntries(), params.captureSnapshot.getLastIndex(), params.captureSnapshot.getLastTerm(), params.captureSnapshot.getLastAppliedIndex(), params.captureSnapshot.getLastAppliedTerm(), diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/NoopRaftActorSnapshotCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/NoopRaftActorSnapshotCohort.java new file mode 100644 index 0000000000..42b226f2af --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/NoopRaftActorSnapshotCohort.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import akka.actor.ActorRef; +import com.google.common.io.ByteSource; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Optional; +import org.opendaylight.controller.cluster.raft.persisted.EmptyState; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State; + +/** + * RaftActorSnapshotCohort implementation that does nothing. + * + * @author Thomas Pantelis + */ +public final class NoopRaftActorSnapshotCohort implements RaftActorSnapshotCohort { + public static final NoopRaftActorSnapshotCohort INSTANCE = new NoopRaftActorSnapshotCohort(); + + private NoopRaftActorSnapshotCohort() { + } + + @Override + public void createSnapshot(ActorRef actorRef, Optional installSnapshotStream) { + } + + @Override + public void applySnapshot(State snapshotState) { + } + + @Override + public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException { + return EmptyState.INSTANCE; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 2d2fce22f9..b007e2d8f5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -762,7 +762,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected abstract void onRecoveryComplete(); /** - * Returns the RaftActorSnapshotCohort to participate in persistence recovery. + * Returns the RaftActorSnapshotCohort to participate in snapshot captures. */ @Nonnull protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java index c3760472ac..9803f1eae7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java @@ -7,7 +7,9 @@ */ package org.opendaylight.controller.cluster.raft; +import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; /** @@ -36,9 +38,9 @@ public interface RaftActorRecoveryCohort { /** * This method is called during recovery to reconstruct the state of the actor. * - * @param snapshotBytes A snapshot of the state of the actor + * @param snapshotState A snapshot of the state of the actor */ - void applyRecoverySnapshot(byte[] snapshotBytes); + void applyRecoverySnapshot(Snapshot.State snapshotState); /** * This method is called during recovery at the end of a batch to apply the current batched @@ -53,4 +55,14 @@ public interface RaftActorRecoveryCohort { */ @Nullable byte[] getRestoreFromSnapshot(); + + /** + * This method is called during recovery to de-serialize a snapshot that was persisted in the pre-Carbon format. + * + * @param from the snaphot bytes + * @return a Snapshot.State instance + */ + @Deprecated + @Nonnull + Snapshot.State deserializePreCarbonSnapshot(byte [] from); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java index 5e4e6571a1..df207670d9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java @@ -18,8 +18,10 @@ import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries; +import org.opendaylight.controller.cluster.raft.persisted.EmptyState; import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload; import org.slf4j.Logger; @@ -113,11 +115,23 @@ class RaftActorRecoverySupport { } private void onRecoveredSnapshot(SnapshotOffer offer) { - log.debug("{}: SnapshotOffer called..", context.getId()); + log.debug("{}: SnapshotOffer called.", context.getId()); initRecoveryTimer(); - Snapshot snapshot = (Snapshot) offer.snapshot(); + Object snapshotObj = offer.snapshot(); + Snapshot snapshot; + if (snapshotObj instanceof org.opendaylight.controller.cluster.raft.Snapshot) { + org.opendaylight.controller.cluster.raft.Snapshot legacy = + (org.opendaylight.controller.cluster.raft.Snapshot)snapshotObj; + snapshot = Snapshot.create(cohort.deserializePreCarbonSnapshot(legacy.getState()), + legacy.getUnAppliedEntries(), legacy.getLastIndex(), legacy.getLastTerm(), + legacy.getLastAppliedIndex(), legacy.getLastAppliedTerm(), + legacy.getElectionTerm(), legacy.getElectionVotedFor(), legacy.getServerConfiguration()); + hasMigratedDataRecovered = true; + } else { + snapshot = (Snapshot) offer.snapshot(); + } for (ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) { if (isMigratedPayload(entry)) { @@ -129,7 +143,8 @@ class RaftActorRecoverySupport { // We may have just transitioned to disabled and have a snapshot containing state data and/or log // entries - we don't want to preserve these, only the server config and election term info. - snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1, + snapshot = Snapshot.create( + EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1, snapshot.getElectionTerm(), snapshot.getElectionVotedFor(), snapshot.getServerConfiguration()); } @@ -145,7 +160,9 @@ class RaftActorRecoverySupport { Stopwatch timer = Stopwatch.createStarted(); // Apply the snapshot to the actors state - cohort.applyRecoverySnapshot(snapshot.getState()); + if (!(snapshot.getState() instanceof EmptyState)) { + cohort.applyRecoverySnapshot(snapshot.getState()); + } if (snapshot.getServerConfiguration() != null) { context.updatePeerIds(snapshot.getServerConfiguration()); @@ -274,7 +291,8 @@ class RaftActorRecoverySupport { // messages. Either way, we persist a snapshot and delete all the messages from the akka journal // to clean out unwanted messages. - Snapshot snapshot = Snapshot.create(new byte[0], Collections.emptyList(), + Snapshot snapshot = Snapshot.create( + EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1, context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(), context.getPeerServerInfo(true)); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java index ad68726371..a02b295dbc 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java @@ -8,6 +8,12 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; +import com.google.common.io.ByteSource; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Optional; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; /** * Interface for a class that participates in raft actor snapshotting. @@ -21,13 +27,31 @@ public interface RaftActorSnapshotCohort { * created. The implementation should send a CaptureSnapshotReply to the given actor. * * @param actorRef the actor to which to respond + * @param installSnapshotStream Optional OutputStream that is present if the snapshot is to also be installed + * on a follower. The implementation must serialize its state to the OutputStream and return the + * installSnapshotStream instance in the CaptureSnapshotReply along with the snapshot State instance. + * The snapshot State is serialized directly to the snapshot store while the OutputStream is used to send + * the state data to follower(s) in chunks. The {@link #deserializeSnapshot} method is used to convert the + * serialized data back to a State instance on the follower end. The serialization for snapshot install is + * passed off so the cost of serialization is not charged to the raft actor's thread. */ - void createSnapshot(ActorRef actorRef); + void createSnapshot(@Nonnull ActorRef actorRef, @Nonnull Optional installSnapshotStream); /** * This method is called to apply a snapshot installed by the leader. * - * @param snapshotBytes a snapshot of the state of the actor + * @param snapshotState a snapshot of the state of the actor */ - void applySnapshot(byte[] snapshotBytes); + void applySnapshot(@Nonnull Snapshot.State snapshotState); + + /** + * This method is called to de-serialize snapshot data that was previously serialized via {@link #createSnapshot} + * to a State instance. + * + * @param snapshotBytes the ByteSource containing the serialized data + * @return the converted snapshot State + * @throws IOException if an error occurs accessing the ByteSource or de-serializing + */ + @Nonnull + Snapshot.State deserializeSnapshot(@Nonnull ByteSource snapshotBytes) throws IOException; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java index 4a17becd5b..f119a1ecc4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java @@ -12,6 +12,7 @@ import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import com.google.common.annotations.VisibleForTesting; import java.util.Collections; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; @@ -19,6 +20,8 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply; +import org.opendaylight.controller.cluster.raft.persisted.EmptyState; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.slf4j.Logger; import scala.concurrent.duration.Duration; @@ -46,8 +49,13 @@ class RaftActorSnapshotMessageSupport { this.cohort = cohort; this.log = context.getLogger(); - context.getSnapshotManager().setCreateSnapshotRunnable(() -> cohort.createSnapshot(context.getActor())); - context.getSnapshotManager().setApplySnapshotConsumer(cohort::applySnapshot); + context.getSnapshotManager().setCreateSnapshotConsumer( + outputStream -> cohort.createSnapshot(context.getActor(), outputStream)); + context.getSnapshotManager().setSnapshotCohort(cohort); + } + + RaftActorSnapshotCohort getSnapshotCohort() { + return cohort; } boolean handleSnapshotMessage(Object message, ActorRef sender) { @@ -58,7 +66,7 @@ class RaftActorSnapshotMessageSupport { } else if (message instanceof SaveSnapshotFailure) { onSaveSnapshotFailure((SaveSnapshotFailure) message); } else if (message instanceof CaptureSnapshotReply) { - onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot()); + onCaptureSnapshotReply((CaptureSnapshotReply) message); } else if (COMMIT_SNAPSHOT.equals(message)) { context.getSnapshotManager().commit(-1, -1); } else if (message instanceof GetSnapshot) { @@ -70,11 +78,11 @@ class RaftActorSnapshotMessageSupport { return true; } - private void onCaptureSnapshotReply(byte[] snapshotBytes) { - log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), - snapshotBytes.length); + private void onCaptureSnapshotReply(CaptureSnapshotReply reply) { + log.debug("{}: CaptureSnapshotReply received by actor", context.getId()); - context.getSnapshotManager().persist(snapshotBytes, context.getTotalMemory()); + context.getSnapshotManager().persist(reply.getSnapshotState(), reply.getInstallSnapshotStream(), + context.getTotalMemory()); } private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) { @@ -103,15 +111,16 @@ class RaftActorSnapshotMessageSupport { if (context.getPersistenceProvider().isRecoveryApplicable()) { CaptureSnapshot captureSnapshot = context.getSnapshotManager().newCaptureSnapshot( - context.getReplicatedLog().last(), -1, false); + context.getReplicatedLog().last(), -1); ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot, ImmutableElectionTerm.copyOf(context.getTermInformation()), sender, snapshotReplyActorTimeout, context.getId(), context.getPeerServerInfo(true))); - cohort.createSnapshot(snapshotReplyActor); + cohort.createSnapshot(snapshotReplyActor, Optional.empty()); } else { - Snapshot snapshot = Snapshot.create(new byte[0], Collections.emptyList(), + Snapshot snapshot = Snapshot.create( + EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1, context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(), context.getPeerServerInfo(true)); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java index fe873340aa..7196fc5f12 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java @@ -14,6 +14,7 @@ import java.util.List; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; /** * Implementation of ReplicatedLog used by the RaftActor. diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java index 2677baff6d..93226ccab2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java @@ -17,7 +17,10 @@ import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPay * * @author Moiz Raja * @author Thomas Pantelis + * + * @deprecated Use {@link org.opendaylight.controller.cluster.raft.persisted.Snapshot} instead. */ +@Deprecated public class Snapshot implements Serializable { private static final long serialVersionUID = -8298574936724056236L; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 79f2ce9b4c..12508aebff 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -10,13 +10,21 @@ package org.opendaylight.controller.cluster.raft; import akka.persistence.SnapshotSelectionCriteria; import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.ByteSource; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; import java.util.List; +import java.util.Optional; import java.util.function.Consumer; +import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.persisted.EmptyState; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.slf4j.Logger; /** @@ -48,10 +56,10 @@ public class SnapshotManager implements SnapshotState { private CaptureSnapshot captureSnapshot; private long lastSequenceNumber = -1; - private Runnable createSnapshotProcedure; + private Consumer> createSnapshotProcedure; private ApplySnapshot applySnapshot; - private Consumer applySnapshotProcedure; + private RaftActorSnapshotCohort snapshotCohort = NoopRaftActorSnapshotCohort.INSTANCE; /** * Constructs an instance. @@ -89,8 +97,9 @@ public class SnapshotManager implements SnapshotState { } @Override - public void persist(final byte[] snapshotBytes, final long totalMemory) { - currentState.persist(snapshotBytes, totalMemory); + public void persist(final Snapshot.State state, final Optional installSnapshotStream, + final long totalMemory) { + currentState.persist(state, installSnapshotStream, totalMemory); } @Override @@ -108,12 +117,17 @@ public class SnapshotManager implements SnapshotState { return currentState.trimLog(desiredTrimIndex); } - public void setCreateSnapshotRunnable(Runnable createSnapshotProcedure) { + void setCreateSnapshotConsumer(Consumer> createSnapshotProcedure) { this.createSnapshotProcedure = createSnapshotProcedure; } - public void setApplySnapshotConsumer(Consumer applySnapshotProcedure) { - this.applySnapshotProcedure = applySnapshotProcedure; + void setSnapshotCohort(final RaftActorSnapshotCohort snapshotCohort) { + this.snapshotCohort = snapshotCohort; + } + + @Nonnull + public Snapshot.State convertSnapshot(ByteSource snapshotBytes) throws IOException { + return snapshotCohort.deserializeSnapshot(snapshotBytes); } public long getLastSequenceNumber() { @@ -138,11 +152,9 @@ public class SnapshotManager implements SnapshotState { * * @param lastLogEntry the last log entry for the snapshot. * @param replicatedToAllIndex the index of the last entry replicated to all followers. - * @param installSnapshotInitiated true if snapshot is initiated to install on a follower. * @return a new CaptureSnapshot instance. */ - public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, - boolean installSnapshotInitiated) { + public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { TermInformationReader lastAppliedTermInfoReader = lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(), lastLogEntry, hasFollowers()); @@ -169,7 +181,7 @@ public class SnapshotManager implements SnapshotState { } return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm, - newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, installSnapshotInitiated); + newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries); } private class AbstractSnapshotState implements SnapshotState { @@ -198,7 +210,8 @@ public class SnapshotManager implements SnapshotState { } @Override - public void persist(final byte[] snapshotBytes, final long totalMemory) { + public void persist(final Snapshot.State state, final Optional installSnapshotStream, + final long totalMemory) { log.debug("persist should not be called in state {}", this); } @@ -261,9 +274,11 @@ public class SnapshotManager implements SnapshotState { @SuppressWarnings("checkstyle:IllegalCatch") private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { - captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, targetFollower != null); + captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex); - if (captureSnapshot.isInstallSnapshotInitiated()) { + OutputStream installSnapshotStream = null; + if (targetFollower != null) { + installSnapshotStream = new ByteArrayOutputStream(); log.info("{}: Initiating snapshot capture {} to install on {}", persistenceId(), captureSnapshot, targetFollower); } else { @@ -277,7 +292,7 @@ public class SnapshotManager implements SnapshotState { SnapshotManager.this.currentState = CREATING; try { - createSnapshotProcedure.run(); + createSnapshotProcedure.accept(Optional.ofNullable(installSnapshotStream)); } catch (Exception e) { SnapshotManager.this.currentState = IDLE; log.error("Error creating snapshot", e); @@ -325,11 +340,12 @@ public class SnapshotManager implements SnapshotState { private class Creating extends AbstractSnapshotState { @Override - public void persist(final byte[] snapshotBytes, final long totalMemory) { + public void persist(final Snapshot.State snapshotState, final Optional installSnapshotStream, + final long totalMemory) { // create a snapshot object from the state provided and save it // when snapshot is saved async, SaveSnapshotSuccess is raised. - Snapshot snapshot = Snapshot.create(snapshotBytes, + Snapshot snapshot = Snapshot.create(snapshotState, captureSnapshot.getUnAppliedEntries(), captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm(), @@ -393,10 +409,20 @@ public class SnapshotManager implements SnapshotState { context.getId(), context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm()); - if (context.getId().equals(currentBehavior.getLeaderId()) - && captureSnapshot.isInstallSnapshotInitiated()) { - // this would be call straight to the leader and won't initiate in serialization - currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot)); + if (installSnapshotStream.isPresent()) { + try { + installSnapshotStream.get().close(); + } catch (IOException e) { + log.warn("Error closing install snapshot OutputStream", e); + } + + if (context.getId().equals(currentBehavior.getLeaderId())) { + ByteSource snapshotBytes = ByteSource.wrap(((ByteArrayOutputStream)installSnapshotStream.get()) + .toByteArray()); + + // this would be call straight to the leader and won't initiate in serialization + currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot, snapshotBytes)); + } } captureSnapshot = null; @@ -431,8 +457,8 @@ public class SnapshotManager implements SnapshotState { context.updatePeerIds(snapshot.getServerConfiguration()); } - if (snapshot.getState().length > 0 ) { - applySnapshotProcedure.accept(snapshot.getState()); + if (!(snapshot.getState() instanceof EmptyState)) { + snapshotCohort.applySnapshot(snapshot.getState()); } applySnapshot.getCallback().onSuccess(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java index e423737706..0a702741d8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java @@ -8,7 +8,10 @@ package org.opendaylight.controller.cluster.raft; +import java.io.OutputStream; +import java.util.Optional; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; /** * Interface for a snapshot phase state. @@ -53,10 +56,12 @@ public interface SnapshotState { /** * Persists a snapshot. * - * @param snapshotBytes the snapshot bytes + * @param snapshotState the snapshot State + * @param installSnapshotStream Optional OutputStream that is present if the snapshot is to also be installed + * on a follower. * @param totalMemory the total memory threshold */ - void persist(byte[] snapshotBytes, long totalMemory); + void persist(Snapshot.State snapshotState, Optional installSnapshotStream, long totalMemory); /** * Commit the snapshot by trimming the log. diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java index 9fb5554abc..9cf2a3f6c1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java @@ -10,7 +10,7 @@ package org.opendaylight.controller.cluster.raft.base.messages; import com.google.common.base.Preconditions; import javax.annotation.Nonnull; -import org.opendaylight.controller.cluster.raft.Snapshot; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; /** * Internal message, issued by follower to its actor. diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java index 14bd3a0af4..2173534a58 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java @@ -17,25 +17,17 @@ public class CaptureSnapshot { private final long lastAppliedTerm; private final long lastIndex; private final long lastTerm; - private final boolean installSnapshotInitiated; private final long replicatedToAllIndex; private final long replicatedToAllTerm; private final List unAppliedEntries; - public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex, long lastAppliedTerm, - long replicatedToAllIndex, long replicatedToAllTerm, List unAppliedEntries) { - this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex, replicatedToAllTerm, - unAppliedEntries, false); - } - public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex, long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, - List unAppliedEntries, boolean installSnapshotInitiated) { + List unAppliedEntries) { this.lastIndex = lastIndex; this.lastTerm = lastTerm; this.lastAppliedIndex = lastAppliedIndex; this.lastAppliedTerm = lastAppliedTerm; - this.installSnapshotInitiated = installSnapshotInitiated; this.replicatedToAllIndex = replicatedToAllIndex; this.replicatedToAllTerm = replicatedToAllTerm; this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries : @@ -58,10 +50,6 @@ public class CaptureSnapshot { return lastTerm; } - public boolean isInstallSnapshotInitiated() { - return installSnapshotInitiated; - } - public long getReplicatedToAllIndex() { return replicatedToAllIndex; } @@ -79,7 +67,7 @@ public class CaptureSnapshot { StringBuilder builder = new StringBuilder(); builder.append("CaptureSnapshot [lastAppliedIndex=").append(lastAppliedIndex).append(", lastAppliedTerm=") .append(lastAppliedTerm).append(", lastIndex=").append(lastIndex).append(", lastTerm=") - .append(lastTerm).append(", installSnapshotInitiated=").append(installSnapshotInitiated) + .append(lastTerm).append(", installSnapshotInitiated=") .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", replicatedToAllTerm=") .append(replicatedToAllTerm).append(", unAppliedEntries size=") .append(unAppliedEntries.size()).append("]"); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java index 1e573014a9..cc981e5711 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java @@ -7,22 +7,29 @@ */ package org.opendaylight.controller.cluster.raft.base.messages; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import com.google.common.base.Preconditions; +import java.io.OutputStream; +import java.util.Optional; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; public class CaptureSnapshotReply { - private final byte [] snapshot; + private final Snapshot.State snapshotState; + private final Optional installSnapshotStream; - @SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "Stores a reference to an externally mutable byte[] " - + "object but this is OK since this class is merely a DTO and does not process byte[] internally. " - + "Also it would be inefficient to create a copy as the byte[] could be large.") - public CaptureSnapshotReply(byte [] snapshot) { - this.snapshot = snapshot; + public CaptureSnapshotReply(@Nonnull final Snapshot.State snapshotState, + @Nonnull final Optional installSnapshotStream) { + this.snapshotState = Preconditions.checkNotNull(snapshotState); + this.installSnapshotStream = Preconditions.checkNotNull(installSnapshotStream); } - @SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "Exposes a mutable object stored in a field but " - + "this is OK since this class is merely a DTO and does not process the byte[] internally. " - + "Also it would be inefficient to create a return copy as the byte[] could be large.") - public byte [] getSnapshot() { - return snapshot; + @Nonnull + public Snapshot.State getSnapshotState() { + return snapshotState; + } + + @Nonnull + public Optional getInstallSnapshotStream() { + return installSnapshotStream; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java index de33b8c95b..e8d58f253a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java @@ -9,8 +9,9 @@ package org.opendaylight.controller.cluster.raft.base.messages; import com.google.common.base.Preconditions; +import com.google.common.io.ByteSource; import javax.annotation.Nonnull; -import org.opendaylight.controller.cluster.raft.Snapshot; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; /** * Internal message sent from the SnapshotManager to its associated leader when a snapshot capture is complete to @@ -18,13 +19,19 @@ import org.opendaylight.controller.cluster.raft.Snapshot; */ public final class SendInstallSnapshot { private final Snapshot snapshot; + private final ByteSource snapshotBytes; - public SendInstallSnapshot(@Nonnull Snapshot snapshot) { + public SendInstallSnapshot(@Nonnull Snapshot snapshot, @Nonnull ByteSource snapshotBytes) { this.snapshot = Preconditions.checkNotNull(snapshot); + this.snapshotBytes = Preconditions.checkNotNull(snapshotBytes); } @Nonnull public Snapshot getSnapshot() { return snapshot; } + + public ByteSource getSnapshotBytes() { + return snapshotBytes; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 10c1a156f9..548b920fe7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -14,7 +14,9 @@ import akka.actor.Cancellable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.io.ByteSource; import com.google.protobuf.ByteString; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -33,7 +35,6 @@ import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; @@ -47,6 +48,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import scala.concurrent.duration.FiniteDuration; /** @@ -83,7 +85,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private final Queue trackers = new LinkedList<>(); private Cancellable heartbeatSchedule = null; - private Optional snapshot = Optional.absent(); + private Optional snapshotHolder = Optional.absent(); private int minReplicationCount; protected AbstractLeader(RaftActorContext context, RaftState state, @@ -92,7 +94,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (initializeFromLeader != null) { followerToLog.putAll(initializeFromLeader.followerToLog); - snapshot = initializeFromLeader.snapshot; + snapshotHolder = initializeFromLeader.snapshotHolder; trackers.addAll(initializeFromLeader.trackers); } else { for (PeerInfo peerInfo: context.getPeers()) { @@ -165,17 +167,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } @VisibleForTesting - void setSnapshot(@Nullable Snapshot snapshot) { - if (snapshot != null) { - this.snapshot = Optional.of(new SnapshotHolder(snapshot)); - } else { - this.snapshot = Optional.absent(); - } + void setSnapshot(@Nullable SnapshotHolder snapshotHolder) { + this.snapshotHolder = Optional.fromNullable(snapshotHolder); } @VisibleForTesting boolean hasSnapshot() { - return snapshot.isPresent(); + return snapshotHolder.isPresent(); } @Override @@ -451,8 +449,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { sendHeartBeat(); scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } else if (message instanceof SendInstallSnapshot) { - // received from RaftActor - setSnapshot(((SendInstallSnapshot) message).getSnapshot()); + SendInstallSnapshot sendInstallSnapshot = (SendInstallSnapshot) message; + setSnapshot(new SnapshotHolder(sendInstallSnapshot.getSnapshot(), sendInstallSnapshot.getSnapshotBytes())); sendInstallSnapshot(); } else if (message instanceof Replicate) { replicate((Replicate) message); @@ -497,7 +495,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { + " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId, context.getReplicatedLog().getSnapshotIndex() + 1); - long followerMatchIndex = snapshot.get().getLastIncludedIndex(); + long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex(); followerLogInformation.setMatchIndex(followerMatchIndex); followerLogInformation.setNextIndex(followerMatchIndex + 1); followerLogInformation.clearLeaderInstallSnapshotState(); @@ -730,7 +728,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { */ public boolean initiateCaptureSnapshot(String followerId) { FollowerLogInformation followerLogInfo = followerToLog.get(followerId); - if (snapshot.isPresent()) { + if (snapshotHolder.isPresent()) { // If a snapshot is present in the memory, most likely another install is in progress no need to capture // snapshot. This could happen if another follower needs an install when one is going on. final ActorSelection followerActor = context.getPeerActorSelection(followerId); @@ -783,7 +781,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * InstallSnapshot should qualify as a heartbeat too. */ private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) { - if (snapshot.isPresent()) { + if (snapshotHolder.isPresent()) { LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState(); if (installSnapshotState == null) { installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(), @@ -792,7 +790,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } // Ensure the snapshot bytes are set - this is a no-op. - installSnapshotState.setSnapshotBytes(snapshot.get().getSnapshotBytes()); + installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes()); byte[] nextSnapshotChunk = installSnapshotState.getNextChunk(); @@ -807,8 +805,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerActor.tell( new InstallSnapshot(currentTerm(), context.getId(), - snapshot.get().getLastIncludedIndex(), - snapshot.get().getLastIncludedTerm(), + snapshotHolder.get().getLastIncludedIndex(), + snapshotHolder.get().getLastIncludedTerm(), nextSnapshotChunk, nextChunkIndex, installSnapshotState.getTotalChunks(), @@ -911,15 +909,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return followerToLog.size(); } - private static class SnapshotHolder { + static class SnapshotHolder { private final long lastIncludedTerm; private final long lastIncludedIndex; private final ByteString snapshotBytes; - SnapshotHolder(Snapshot snapshot) { + SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) { this.lastIncludedTerm = snapshot.getLastAppliedTerm(); this.lastIncludedIndex = snapshot.getLastAppliedIndex(); - this.snapshotBytes = ByteString.copyFrom(snapshot.getState()); + try { + this.snapshotBytes = ByteString.copyFrom(snapshotBytes.read()); + } catch (IOException e) { + throw new RuntimeException("Error reading state", e); + } } long getLastIncludedTerm() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 6f107e9ae6..727d6a3131 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -18,6 +18,8 @@ import akka.cluster.MemberStatus; import akka.japi.Procedure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; +import com.google.common.io.ByteSource; +import java.io.IOException; import java.util.ArrayList; import java.util.Optional; import java.util.Set; @@ -27,7 +29,6 @@ import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; @@ -39,6 +40,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; /** * The behavior of a RaftActor in the Follower raft state. @@ -528,7 +530,9 @@ public class Follower extends AbstractRaftActorBehavior { if (snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(), installSnapshot.getLastChunkHashCode())) { - Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(), + ByteSource snapshotBytes = ByteSource.wrap(snapshotTracker.getSnapshot()); + Snapshot snapshot = Snapshot.create( + context.getSnapshotManager().convertSnapshot(snapshotBytes), new ArrayList<>(), installSnapshot.getLastIncludedIndex(), installSnapshot.getLastIncludedTerm(), @@ -560,7 +564,7 @@ public class Follower extends AbstractRaftActorBehavior { sender.tell(reply, actor()); } - } catch (SnapshotTracker.InvalidChunkException e) { + } catch (SnapshotTracker.InvalidChunkException | IOException e) { log.debug("{}: Exception in InstallSnapshot of follower", logName(), e); sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/EmptyState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/EmptyState.java new file mode 100644 index 0000000000..40c90fbe94 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/EmptyState.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.persisted; + +/** + * Empty Snapshot State implementation. + * + * @author Thomas Pantelis + */ +public class EmptyState implements Snapshot.State { + private static final long serialVersionUID = 1L; + + public static final EmptyState INSTANCE = new EmptyState(); + + private EmptyState() { + } + + @SuppressWarnings("static-method") + private Object readResolve() { + return INSTANCE; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/Snapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/Snapshot.java new file mode 100644 index 0000000000..1763ad3fbe --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/Snapshot.java @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.persisted; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; + +/** + * Represents a snapshot of the raft data. + * + * @author Thomas Pantelis + */ +public class Snapshot implements Serializable { + + /** + * Implementations of this interface are used as the state payload for a snapshot. + * + * @author Thomas Pantelis + */ + public interface State extends Serializable { + } + + private static final class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + private Snapshot snapshot; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + // For Externalizable + } + + Proxy(final Snapshot snapshot) { + this.snapshot = snapshot; + } + + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + out.writeLong(snapshot.lastIndex); + out.writeLong(snapshot.lastTerm); + out.writeLong(snapshot.lastAppliedIndex); + out.writeLong(snapshot.lastAppliedTerm); + out.writeLong(snapshot.electionTerm); + out.writeObject(snapshot.electionVotedFor); + out.writeObject(snapshot.serverConfig); + + out.writeInt(snapshot.unAppliedEntries.size()); + for (ReplicatedLogEntry e: snapshot.unAppliedEntries) { + out.writeLong(e.getIndex()); + out.writeLong(e.getTerm()); + out.writeObject(e.getData()); + } + + out.writeObject(snapshot.state); + } + + @Override + public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + long lastIndex = in.readLong(); + long lastTerm = in.readLong(); + long lastAppliedIndex = in.readLong(); + long lastAppliedTerm = in.readLong(); + long electionTerm = in.readLong(); + String electionVotedFor = (String) in.readObject(); + ServerConfigurationPayload serverConfig = (ServerConfigurationPayload) in.readObject(); + + int size = in.readInt(); + List unAppliedEntries = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + unAppliedEntries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), + (Payload) in.readObject())); + } + + State state = (State) in.readObject(); + + snapshot = Snapshot.create(state, unAppliedEntries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, + electionTerm, electionVotedFor, serverConfig); + } + + private Object readResolve() { + return snapshot; + } + } + + private static final long serialVersionUID = 1L; + + private final State state; + private final List unAppliedEntries; + private final long lastIndex; + private final long lastTerm; + private final long lastAppliedIndex; + private final long lastAppliedTerm; + private final long electionTerm; + private final String electionVotedFor; + private final ServerConfigurationPayload serverConfig; + + private Snapshot(State state, List unAppliedEntries, long lastIndex, long lastTerm, + long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor, + ServerConfigurationPayload serverConfig) { + this.state = state; + this.unAppliedEntries = unAppliedEntries; + this.lastIndex = lastIndex; + this.lastTerm = lastTerm; + this.lastAppliedIndex = lastAppliedIndex; + this.lastAppliedTerm = lastAppliedTerm; + this.electionTerm = electionTerm; + this.electionVotedFor = electionVotedFor; + this.serverConfig = serverConfig; + } + + public static Snapshot create(State state, List entries, long lastIndex, long lastTerm, + long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor, + ServerConfigurationPayload serverConfig) { + return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, + electionTerm, electionVotedFor, serverConfig); + } + + public State getState() { + return state; + } + + public List getUnAppliedEntries() { + return unAppliedEntries; + } + + public long getLastTerm() { + return lastTerm; + } + + public long getLastAppliedIndex() { + return lastAppliedIndex; + } + + public long getLastAppliedTerm() { + return lastAppliedTerm; + } + + public long getLastIndex() { + return this.lastIndex; + } + + public long getElectionTerm() { + return electionTerm; + } + + public String getElectionVotedFor() { + return electionVotedFor; + } + + public ServerConfigurationPayload getServerConfiguration() { + return serverConfig; + } + + @SuppressWarnings("static-method") + private Object writeReplace() { + return new Proxy(this); + } + + @Override + public String toString() { + return "Snapshot [lastIndex=" + lastIndex + ", lastTerm=" + lastTerm + ", lastAppliedIndex=" + lastAppliedIndex + + ", lastAppliedTerm=" + lastAppliedTerm + ", unAppliedEntries size=" + unAppliedEntries.size() + + ", state=" + state + ", electionTerm=" + electionTerm + ", electionVotedFor=" + + electionVotedFor + ", ServerConfigPayload=" + serverConfig + "]"; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index 4a053137a5..eb5c9e5e91 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -20,18 +20,21 @@ import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Stopwatch; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Predicate; +import org.apache.commons.lang3.SerializationUtils; import org.junit.After; +import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; @@ -41,6 +44,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftS import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; @@ -149,12 +153,13 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest @Override @SuppressWarnings("checkstyle:IllegalCatch") - public void createSnapshot(ActorRef actorRef) { - try { - actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef); - } catch (Exception e) { - Throwables.propagate(e); + public void createSnapshot(ActorRef actorRef, Optional installSnapshotStream) { + MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState())); + if (installSnapshotStream.isPresent()) { + SerializationUtils.serialize(snapshotState, installSnapshotStream.get()); } + + actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef); } public ActorRef collectorActor() { @@ -291,7 +296,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest assertEquals(prefix + " Snapshot getLastTerm", lastTerm, snapshot.getLastTerm()); assertEquals(prefix + " Snapshot getLastIndex", lastIndex, snapshot.getLastIndex()); - List actualState = (List)MockRaftActor.toObject(snapshot.getState()); + List actualState = ((MockSnapshotState)snapshot.getState()).getState(); assertEquals(String.format("%s Snapshot getState size. Expected %s: . Actual: %s", prefix, expSnapshotState, actualState), expSnapshotState.size(), actualState.size()); for (int i = 0; i < expSnapshotState.size(); i++) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MigratedMessagesTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MigratedMessagesTest.java index 9ce89093fd..460dd4a445 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MigratedMessagesTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MigratedMessagesTest.java @@ -15,20 +15,30 @@ import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; +import java.io.OutputStream; +import java.io.Serializable; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import org.apache.commons.lang3.SerializationUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState; +import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State; import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; @@ -103,7 +113,7 @@ public class MigratedMessagesTest extends AbstractActorTest { doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> { assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor()); assertEquals("getElectionTerm", 5, snapshot.getElectionTerm()); - }); + }, ByteState.empty()); TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled ending"); } @@ -122,7 +132,7 @@ public class MigratedMessagesTest extends AbstractActorTest { doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, false, snapshot -> { assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor()); assertEquals("getElectionTerm", 5, snapshot.getElectionTerm()); - }); + }, ByteState.empty()); TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled ending"); } @@ -145,7 +155,7 @@ public class MigratedMessagesTest extends AbstractActorTest { assertEquals("getLastAppliedTerm", 1, snapshot.getLastAppliedTerm()); assertEquals("getLastIndex", 0, snapshot.getLastIndex()); assertEquals("getLastTerm", 1, snapshot.getLastTerm()); - }); + }, ByteState.empty()); TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries ending"); } @@ -164,12 +174,17 @@ public class MigratedMessagesTest extends AbstractActorTest { RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() { @Override - public void createSnapshot(ActorRef actorRef) { - actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef); + public void createSnapshot(ActorRef actorRef, java.util.Optional installSnapshotStream) { + actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef); } @Override - public void applySnapshot(byte[] snapshotBytes) { + public void applySnapshot(Snapshot.State snapshotState) { + } + + @Override + public State deserializeSnapshot(ByteSource snapshotBytes) { + throw new UnsupportedOperationException(); } }; @@ -204,7 +219,7 @@ public class MigratedMessagesTest extends AbstractActorTest { assertEquals("Unapplied entry term", 1, snapshot.getUnAppliedEntries().get(0).getTerm()); assertEquals("Unapplied entry index", 0, snapshot.getUnAppliedEntries().get(0).getIndex()); assertEquals("Unapplied entry data", expPayload, snapshot.getUnAppliedEntries().get(0).getData()); - }); + }, ByteState.empty()); TEST_LOG.info("testSnapshotAfterStartupWithMigratedReplicatedLogEntry ending"); } @@ -231,14 +246,53 @@ public class MigratedMessagesTest extends AbstractActorTest { assertEquals("getElectionTerm", 1, snapshot.getElectionTerm()); assertEquals("getServerConfiguration", new HashSet<>(expectedServerConfig.getServerConfig()), new HashSet<>(snapshot.getServerConfiguration().getServerConfig())); - }); + }, ByteState.empty()); return actor; } + @Test + public void testSnapshotAfterStartupWithMigratedSnapshot() throws Exception { + TEST_LOG.info("testSnapshotAfterStartupWithMigratedSnapshot starting"); + + String persistenceId = factory.generateActorId("test-actor-"); + + List snapshotData = Arrays.asList(new MockPayload("1")); + final MockSnapshotState snapshotState = new MockSnapshotState(snapshotData); + + org.opendaylight.controller.cluster.raft.Snapshot legacy = org.opendaylight.controller.cluster.raft.Snapshot + .create(SerializationUtils.serialize((Serializable) snapshotData), + Arrays.asList(new SimpleReplicatedLogEntry(6, 2, new MockPayload("payload"))), + 6, 2, 5, 1, 3, "member-1", new ServerConfigurationPayload(Arrays.asList( + new ServerInfo(persistenceId, true), new ServerInfo("2", false)))); + InMemorySnapshotStore.addSnapshot(persistenceId, legacy); + + doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> { + assertEquals("getLastIndex", legacy.getLastIndex(), snapshot.getLastIndex()); + assertEquals("getLastTerm", legacy.getLastTerm(), snapshot.getLastTerm()); + assertEquals("getLastAppliedIndex", legacy.getLastAppliedIndex(), snapshot.getLastAppliedIndex()); + assertEquals("getLastAppliedTerm", legacy.getLastAppliedTerm(), snapshot.getLastAppliedTerm()); + assertEquals("getState", snapshotState, snapshot.getState()); + assertEquals("Unapplied entries size", legacy.getUnAppliedEntries().size(), + snapshot.getUnAppliedEntries().size()); + assertEquals("Unapplied entry term", legacy.getUnAppliedEntries().get(0).getTerm(), + snapshot.getUnAppliedEntries().get(0).getTerm()); + assertEquals("Unapplied entry index", legacy.getUnAppliedEntries().get(0).getIndex(), + snapshot.getUnAppliedEntries().get(0).getIndex()); + assertEquals("Unapplied entry data", legacy.getUnAppliedEntries().get(0).getData(), + snapshot.getUnAppliedEntries().get(0).getData()); + assertEquals("getElectionVotedFor", legacy.getElectionVotedFor(), snapshot.getElectionVotedFor()); + assertEquals("getElectionTerm", legacy.getElectionTerm(), snapshot.getElectionTerm()); + assertEquals("getServerConfiguration", Sets.newHashSet(legacy.getServerConfiguration().getServerConfig()), + Sets.newHashSet(snapshot.getServerConfiguration().getServerConfig())); + }, snapshotState); + + TEST_LOG.info("testSnapshotAfterStartupWithMigratedSnapshot ending"); + } + @SuppressWarnings("checkstyle:IllegalCatch") private TestActorRef doTestSnapshotAfterStartupWithMigratedMessage(String id, boolean persistent, - Consumer snapshotVerifier) { + Consumer snapshotVerifier, final State snapshotState) { InMemorySnapshotStore.addSnapshotSavedLatch(id); InMemoryJournal.addDeleteMessagesCompleteLatch(id); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -246,12 +300,17 @@ public class MigratedMessagesTest extends AbstractActorTest { RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() { @Override - public void createSnapshot(ActorRef actorRef) { - actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef); + public void createSnapshot(ActorRef actorRef, java.util.Optional installSnapshotStream) { + actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef); + } + + @Override + public void applySnapshot(State newState) { } @Override - public void applySnapshot(byte[] snapshotBytes) { + public State deserializeSnapshot(ByteSource snapshotBytes) { + throw new UnsupportedOperationException(); } }; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java index dbced5f4cb..20adaf12be 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java @@ -16,10 +16,10 @@ import akka.actor.Props; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Throwables; +import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.ObjectInputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -27,8 +27,10 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; +import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; @@ -169,38 +171,43 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, } @Override - public void applyRecoverySnapshot(byte[] bytes) { - recoveryCohortDelegate.applyRecoverySnapshot(bytes); - applySnapshotBytes(bytes); + public void applyRecoverySnapshot(Snapshot.State newState) { + recoveryCohortDelegate.applyRecoverySnapshot(newState); + applySnapshotState(newState); } - private void applySnapshotBytes(byte[] bytes) { - if (bytes.length == 0) { - return; - } - - try { - Object data = toObject(bytes); - if (data instanceof List) { - state.clear(); - state.addAll((List) data); - } - } catch (ClassNotFoundException | IOException e) { - Throwables.propagate(e); + private void applySnapshotState(Snapshot.State newState) { + if (newState instanceof MockSnapshotState) { + state.clear(); + state.addAll(((MockSnapshotState)newState).getState()); } } @Override - public void createSnapshot(ActorRef actorRef) { + public void createSnapshot(ActorRef actorRef, java.util.Optional installSnapshotStream) { LOG.info("{}: createSnapshot called", persistenceId()); - snapshotCohortDelegate.createSnapshot(actorRef); + snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream); } @Override - public void applySnapshot(byte [] snapshot) { + public void applySnapshot(Snapshot.State newState) { LOG.info("{}: applySnapshot called", persistenceId()); - applySnapshotBytes(snapshot); - snapshotCohortDelegate.applySnapshot(snapshot); + applySnapshotState(newState); + snapshotCohortDelegate.applySnapshot(newState); + } + + @Override + public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) { + try { + return (Snapshot.State) SerializationUtils.deserialize(snapshotBytes.read()); + } catch (IOException e) { + throw new RuntimeException("Error deserializing state", e); + } + } + + @Override + public Snapshot.State deserializePreCarbonSnapshot(byte[] from) { + return new MockSnapshotState(SerializationUtils.deserialize(from)); } @Override @@ -243,23 +250,12 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, } } - public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException { - Object obj = null; - ByteArrayInputStream bis = null; - ObjectInputStream ois = null; - try { - bis = new ByteArrayInputStream(bs); - ois = new ObjectInputStream(bis); - obj = ois.readObject(); - } finally { - if (bis != null) { - bis.close(); - } - if (ois != null) { - ois.close(); - } + public static List fromState(Snapshot.State from) { + if (from instanceof MockSnapshotState) { + return ((MockSnapshotState)from).getState(); } - return obj; + + throw new IllegalStateException("Unexpected snapshot State: " + from); } public ReplicatedLog getReplicatedLog() { @@ -367,4 +363,53 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, super(MockRaftActor.class); } } + + public static class MockSnapshotState implements Snapshot.State { + private static final long serialVersionUID = 1L; + + private final List state; + + public MockSnapshotState(List state) { + this.state = state; + } + + public List getState() { + return state; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (state == null ? 0 : state.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + MockSnapshotState other = (MockSnapshotState) obj; + if (state == null) { + if (other.state != null) { + return false; + } + } else if (!state.equals(other.state)) { + return false; + } + return true; + } + + @Override + public String toString() { + return "MockSnapshotState [state=" + state + "]"; + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index cdcaadf815..d92f0729f2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -14,12 +14,17 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.japi.Procedure; import com.google.common.base.Throwables; +import com.google.common.io.ByteSource; +import java.io.IOException; +import java.io.OutputStream; import java.io.Serializable; import java.util.HashMap; import java.util.Map; import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.slf4j.Logger; @@ -120,7 +125,23 @@ public class MockRaftActorContext extends RaftActorContextImpl { @Override public SnapshotManager getSnapshotManager() { SnapshotManager snapshotManager = super.getSnapshotManager(); - snapshotManager.setCreateSnapshotRunnable(() -> { }); + snapshotManager.setCreateSnapshotConsumer(out -> { }); + + snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() { + @Override + public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException { + return ByteState.of(snapshotBytes.read()); + } + + @Override + public void createSnapshot(ActorRef actorRef, java.util.Optional installSnapshotStream) { + } + + @Override + public void applySnapshot(State snapshotState) { + } + }); + return snapshotManager; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java index e0abd1726f..828e3eb89e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java @@ -10,9 +10,10 @@ package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.mockito.AdditionalMatchers.aryEq; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; @@ -23,8 +24,11 @@ import akka.persistence.RecoveryCompleted; import akka.persistence.SnapshotMetadata; import akka.persistence.SnapshotOffer; import com.google.common.collect.Sets; +import java.io.Serializable; import java.util.Arrays; import java.util.Collections; +import java.util.List; +import org.apache.commons.lang3.SerializationUtils; import org.hamcrest.Description; import org.junit.Before; import org.junit.Test; @@ -36,11 +40,14 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.PersistentDataProvider; +import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState; +import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.slf4j.Logger; @@ -62,6 +69,9 @@ public class RaftActorRecoverySupportTest { @Mock private RaftActorRecoveryCohort mockCohort; + @Mock + private RaftActorSnapshotCohort mockSnapshotCohort; + @Mock PersistentDataProvider mockPersistentProvider; @@ -163,7 +173,44 @@ public class RaftActorRecoverySupportTest { replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2"))); replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3"))); - byte[] snapshotBytes = {1,2,3,4,5}; + ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1, + new MockRaftActorContext.MockPayload("4", 4)); + + ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1, + new MockRaftActorContext.MockPayload("5", 5)); + + long lastAppliedDuringSnapshotCapture = 3; + long lastIndexDuringSnapshotCapture = 5; + long electionTerm = 2; + String electionVotedFor = "member-2"; + + MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1"))); + Snapshot snapshot = Snapshot.create(snapshotState, + Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1, + lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null); + + SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345); + SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot); + + sendMessageToSupport(snapshotOffer); + + assertEquals("Journal log size", 2, context.getReplicatedLog().size()); + assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize()); + assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex()); + assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied()); + assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex()); + assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm()); + assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex()); + assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm()); + assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor()); + assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse()); + + verify(mockCohort).applyRecoverySnapshot(snapshotState); + } + + @Deprecated + @Test + public void testOnSnapshotOfferWithPreCarbonSnapshot() { ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4", 4)); @@ -176,12 +223,21 @@ public class RaftActorRecoverySupportTest { long electionTerm = 2; String electionVotedFor = "member-2"; - Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2), - lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor); + List snapshotData = Arrays.asList(new MockPayload("1")); + final MockSnapshotState snapshotState = new MockSnapshotState(snapshotData); + + org.opendaylight.controller.cluster.raft.Snapshot snapshot = org.opendaylight.controller.cluster.raft.Snapshot + .create(SerializationUtils.serialize((Serializable) snapshotData), + Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1, + lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null); SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345); SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot); + doAnswer(invocation -> new MockSnapshotState(SerializationUtils.deserialize( + invocation.getArgumentAt(0, byte[].class)))) + .when(mockCohort).deserializePreCarbonSnapshot(any(byte[].class)); + sendMessageToSupport(snapshotOffer); assertEquals("Journal log size", 2, context.getReplicatedLog().size()); @@ -195,7 +251,7 @@ public class RaftActorRecoverySupportTest { assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor()); assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse()); - verify(mockCohort).applyRecoverySnapshot(snapshotBytes); + verify(mockCohort).applyRecoverySnapshot(snapshotState); } @Test @@ -255,11 +311,12 @@ public class RaftActorRecoverySupportTest { @Test public void testDataRecoveredWithPersistenceDisabled() { - doNothing().when(mockCohort).applyRecoverySnapshot(aryEq(new byte[0])); + doNothing().when(mockCohort).applyRecoverySnapshot(anyObject()); doReturn(false).when(mockPersistence).isRecoveryApplicable(); doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber(); - Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.emptyList(), 3, 1, 3, 1); + Snapshot snapshot = Snapshot.create(new MockSnapshotState(Arrays.asList(new MockPayload("1"))), + Collections.emptyList(), 3, 1, 3, 1, -1, null, null); SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot); sendMessageToSupport(snapshotOffer); @@ -285,7 +342,7 @@ public class RaftActorRecoverySupportTest { sendMessageToSupport(RecoveryCompleted.getInstance(), true); - verify(mockCohort).applyRecoverySnapshot(aryEq(new byte[0])); + verify(mockCohort, never()).applyRecoverySnapshot(anyObject()); verify(mockCohort, never()).getRestoreFromSnapshot(); verifyNoMoreInteractions(mockCohort); @@ -389,7 +446,8 @@ public class RaftActorRecoverySupportTest { new ServerInfo("follower1", true), new ServerInfo("follower2", true))); - Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.emptyList(), + MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1"))); + Snapshot snapshot = Snapshot.create(snapshotState, Collections.emptyList(), -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload); SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java index a19281ffd8..71412f3e05 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -26,11 +26,15 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.io.ByteSource; +import java.io.OutputStream; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.SerializationUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -58,6 +62,7 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; @@ -167,8 +172,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { // Leader should install snapshot - capture and verify ApplySnapshot contents ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class); - @SuppressWarnings("unchecked") - List snapshotState = (List) MockRaftActor.toObject(applySnapshot.getSnapshot().getState()); + List snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState()); assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState()); AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); @@ -248,8 +252,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { // Leader should install snapshot - capture and verify ApplySnapshot contents ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class); - @SuppressWarnings("unchecked") - List snapshotState = (List) MockRaftActor.toObject(applySnapshot.getSnapshot().getState()); + List snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState()); assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState()); AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); @@ -1518,12 +1521,19 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { super(id, peerAddresses, config, persistent, collectorActor); snapshotCohortDelegate = new RaftActorSnapshotCohort() { @Override - public void createSnapshot(ActorRef actorRef) { - actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef); + public void createSnapshot(ActorRef actorRef, java.util.Optional installSnapshotStream) { + actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef); } @Override - public void applySnapshot(byte[] snapshotBytes) { + public void applySnapshot( + org.opendaylight.controller.cluster.raft.persisted.Snapshot.State snapshotState) { + } + + @Override + public org.opendaylight.controller.cluster.raft.persisted.Snapshot.State deserializeSnapshot( + ByteSource snapshotBytes) { + throw new UnsupportedOperationException(); } }; } @@ -1564,12 +1574,13 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { @Override @SuppressWarnings("checkstyle:IllegalCatch") - public void createSnapshot(ActorRef actorRef) { - try { - actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef); - } catch (Exception e) { - LOG.error("createSnapshot failed", e); + public void createSnapshot(ActorRef actorRef, java.util.Optional installSnapshotStream) { + MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState())); + if (installSnapshotStream.isPresent()) { + SerializationUtils.serialize(snapshotState, installSnapshotStream.get()); } + + actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef); } static Props props(Map peerAddresses, RaftActorContext fromContext) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java index 45b89b7e01..495cc6d6af 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java @@ -10,15 +10,17 @@ package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.same; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotMetadata; +import java.io.OutputStream; import java.util.Collections; +import java.util.Optional; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -27,6 +29,8 @@ import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,8 +99,8 @@ public class RaftActorSnapshotMessageSupportTest { long lastIndexDuringSnapshotCapture = 2; byte[] snapshotBytes = {1,2,3,4,5}; - Snapshot snapshot = Snapshot.create(snapshotBytes, Collections.emptyList(), - lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1); + Snapshot snapshot = Snapshot.create(ByteState.of(snapshotBytes), Collections.emptyList(), + lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, -1, null, null); ApplySnapshot applySnapshot = new ApplySnapshot(snapshot); sendMessageToSupport(applySnapshot); @@ -106,11 +110,11 @@ public class RaftActorSnapshotMessageSupportTest { @Test public void testOnCaptureSnapshotReply() { + ByteState state = ByteState.of(new byte[]{1,2,3,4,5}); + Optional optionalStream = Optional.of(mock(OutputStream.class)); + sendMessageToSupport(new CaptureSnapshotReply(state, optionalStream)); - byte[] snapshot = {1,2,3,4,5}; - sendMessageToSupport(new CaptureSnapshotReply(snapshot)); - - verify(mockSnapshotManager).persist(same(snapshot), anyLong()); + verify(mockSnapshotManager).persist(eq(state), eq(optionalStream), anyLong()); } @Test diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index edc51092d6..d674afb620 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.raft; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; @@ -63,10 +62,10 @@ import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; +import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; @@ -79,10 +78,13 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries; +import org.opendaylight.controller.cluster.raft.persisted.EmptyState; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; @@ -153,15 +155,14 @@ public class RaftActorTest extends AbstractActorTest { int lastIndexDuringSnapshotCapture = 4; // 4 messages as part of snapshot, which are applied to state - ByteString snapshotBytes = fromObject(Arrays.asList( + MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList( new MockRaftActorContext.MockPayload("A"), new MockRaftActorContext.MockPayload("B"), new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); - Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), - snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1, - lastAppliedDuringSnapshotCapture, 1); + Snapshot snapshot = Snapshot.create(snapshotState, snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1, + lastAppliedDuringSnapshotCapture, 1, -1, null, null); InMemorySnapshotStore.addSnapshot(persistenceId, snapshot); // add more entries after snapshot is taken @@ -292,7 +293,8 @@ public class RaftActorTest extends AbstractActorTest { RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class); mockRaftActor.setRaftActorRecoverySupport(mockSupport ); - Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.emptyList(), 3, 1, 3, 1); + Snapshot snapshot = Snapshot.create(ByteState.of(new byte[]{1}), + Collections.emptyList(), 3, 1, 3, 1, -1, null, null); SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot); mockRaftActor.handleRecover(snapshotOffer); @@ -337,11 +339,8 @@ public class RaftActorTest extends AbstractActorTest { doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class)); mockRaftActor.handleCommand(applySnapshot); - CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null); - doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class)); - mockRaftActor.handleCommand(captureSnapshot); - - CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]); + CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(ByteState.empty(), + java.util.Optional.empty()); doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class)); mockRaftActor.handleCommand(captureSnapshotReply); @@ -362,7 +361,6 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.handleCommand(GetSnapshot.INSTANCE); verify(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class)); - verify(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class)); verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class)); verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class)); verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class)); @@ -592,7 +590,7 @@ public class RaftActorTest extends AbstractActorTest { leaderActor.getRaftActorContext().getSnapshotManager().capture( new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("x")), 4); - verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class)); + verify(leaderActor.snapshotCohortDelegate).createSnapshot(anyObject(), anyObject()); assertEquals(8, leaderActor.getReplicatedLog().size()); @@ -611,14 +609,14 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(8, leaderActor.getReplicatedLog().size()); - ByteString snapshotBytes = fromObject(Arrays.asList( + MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList( new MockRaftActorContext.MockPayload("foo-0"), new MockRaftActorContext.MockPayload("foo-1"), new MockRaftActorContext.MockPayload("foo-2"), new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); - leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(), + leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotState, java.util.Optional.empty(), Runtime.getRuntime().totalMemory()); assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); @@ -684,7 +682,7 @@ public class RaftActorTest extends AbstractActorTest { followerActor.getRaftActorContext().getSnapshotManager().capture( new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("D")), 4); - verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class)); + verify(followerActor.snapshotCohortDelegate).createSnapshot(anyObject(), anyObject()); assertEquals(6, followerActor.getReplicatedLog().size()); @@ -711,7 +709,8 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-2"), new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); - followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); + followerActor.onReceiveCommand(new CaptureSnapshotReply(ByteState.of(snapshotBytes.toByteArray()), + java.util.Optional.empty())); assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing()); // The commit is needed to complete the snapshot creation process @@ -802,7 +801,8 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-2"), new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); - leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); + leaderActor.onReceiveCommand(new CaptureSnapshotReply(ByteState.of(snapshotBytes.toByteArray()), + java.util.Optional.empty())); assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, @@ -848,7 +848,8 @@ public class RaftActorTest extends AbstractActorTest { leaderActor.getReplicatedLog().last(), -1, "member1"); // Now send a CaptureSnapshotReply - mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); + mockActorRef.tell(new CaptureSnapshotReply(ByteState.of(fromObject("foo").toByteArray()), + java.util.Optional.empty()), mockActorRef); // Trimming log in this scenario is a no-op assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex()); @@ -888,7 +889,8 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("duh"), false); // Now send a CaptureSnapshotReply - mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); + mockActorRef.tell(new CaptureSnapshotReply(ByteState.of(fromObject("foo").toByteArray()), + java.util.Optional.empty()), mockActorRef); // Trimming log in this scenario is a no-op assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex()); @@ -1038,10 +1040,12 @@ public class RaftActorTest extends AbstractActorTest { raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef()); ArgumentCaptor replyActor = ArgumentCaptor.forClass(ActorRef.class); - verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture()); + verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture(), + eq(java.util.Optional.empty())); byte[] stateSnapshot = new byte[]{1,2,3}; - replyActor.getValue().tell(new CaptureSnapshotReply(stateSnapshot), ActorRef.noSender()); + replyActor.getValue().tell(new CaptureSnapshotReply(ByteState.of(stateSnapshot), java.util.Optional.empty()), + ActorRef.noSender()); GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class); @@ -1053,7 +1057,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("getLastAppliedTerm", term, replySnapshot.getLastAppliedTerm()); assertEquals("getLastIndex", 2L, replySnapshot.getLastIndex()); assertEquals("getLastTerm", term, replySnapshot.getLastTerm()); - assertArrayEquals("getState", stateSnapshot, replySnapshot.getState()); + assertEquals("getState", ByteState.of(stateSnapshot), replySnapshot.getState()); assertEquals("getUnAppliedEntries size", 1, replySnapshot.getUnAppliedEntries().size()); assertEquals("UnApplied entry index ", 2L, replySnapshot.getUnAppliedEntries().get(0).getIndex()); @@ -1076,7 +1080,7 @@ public class RaftActorTest extends AbstractActorTest { raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef()); reply = kit.expectMsgClass(GetSnapshotReply.class); - verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(any(ActorRef.class)); + verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(anyObject(), anyObject()); assertEquals("getId", persistenceId, reply.getId()); replySnapshot = SerializationUtils.deserialize(reply.getSnapshot()); @@ -1086,7 +1090,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("getLastAppliedTerm", -1L, replySnapshot.getLastAppliedTerm()); assertEquals("getLastIndex", -1L, replySnapshot.getLastIndex()); assertEquals("getLastTerm", -1L, replySnapshot.getLastTerm()); - assertEquals("getState length", 0, replySnapshot.getState().length); + assertEquals("getState type", EmptyState.INSTANCE, replySnapshot.getState()); assertEquals("getUnAppliedEntries size", 0, replySnapshot.getUnAppliedEntries().size()); TEST_LOG.info("testGetSnapshot ending"); @@ -1106,15 +1110,14 @@ public class RaftActorTest extends AbstractActorTest { int snapshotLastApplied = 3; int snapshotLastIndex = 4; - List state = Arrays.asList( + MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList( new MockRaftActorContext.MockPayload("A"), new MockRaftActorContext.MockPayload("B"), new MockRaftActorContext.MockPayload("C"), - new MockRaftActorContext.MockPayload("D")); - ByteString stateBytes = fromObject(state); + new MockRaftActorContext.MockPayload("D"))); - Snapshot snapshot = Snapshot.create(stateBytes.toByteArray(), snapshotUnappliedEntries, - snapshotLastIndex, 1, snapshotLastApplied, 1, 1, "member-1"); + Snapshot snapshot = Snapshot.create(snapshotState, snapshotUnappliedEntries, + snapshotLastIndex, 1, snapshotLastApplied, 1, 1, "member-1", null); InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId); @@ -1132,24 +1135,24 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("getLastAppliedTerm", snapshot.getLastAppliedTerm(), savedSnapshot.getLastAppliedTerm()); assertEquals("getLastIndex", snapshot.getLastIndex(), savedSnapshot.getLastIndex()); assertEquals("getLastTerm", snapshot.getLastTerm(), savedSnapshot.getLastTerm()); - assertArrayEquals("getState", snapshot.getState(), savedSnapshot.getState()); + assertEquals("getState", snapshot.getState(), savedSnapshot.getState()); assertEquals("getUnAppliedEntries", snapshot.getUnAppliedEntries(), savedSnapshot.getUnAppliedEntries()); - verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).applySnapshot(any(byte[].class)); + verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).applySnapshot(any(Snapshot.State.class)); RaftActorContext context = mockRaftActor.getRaftActorContext(); assertEquals("Journal log size", 1, context.getReplicatedLog().size()); assertEquals("Last index", snapshotLastIndex, context.getReplicatedLog().lastIndex()); assertEquals("Last applied", snapshotLastApplied, context.getLastApplied()); assertEquals("Commit index", snapshotLastApplied, context.getCommitIndex()); - assertEquals("Recovered state", state, mockRaftActor.getState()); + assertEquals("Recovered state", snapshotState.getState(), mockRaftActor.getState()); assertEquals("Current term", 1L, context.getTermInformation().getCurrentTerm()); assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor()); // Test with data persistence disabled - snapshot = Snapshot.create(new byte[0], Collections.emptyList(), - -1, -1, -1, -1, 5, "member-1"); + snapshot = Snapshot.create(EmptyState.INSTANCE, Collections.emptyList(), + -1, -1, -1, -1, 5, "member-1", null); persistenceId = factory.generateActorId("test-actor-"); @@ -1179,8 +1182,8 @@ public class RaftActorTest extends AbstractActorTest { config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); List state = Arrays.asList(new MockRaftActorContext.MockPayload("A")); - Snapshot snapshot = Snapshot.create(fromObject(state).toByteArray(), Arrays.asList(), - 5, 2, 5, 2, 2, "member-1"); + Snapshot snapshot = Snapshot.create(ByteState.of(fromObject(state).toByteArray()), + Arrays.asList(), 5, 2, 5, 2, 2, "member-1", null); InMemoryJournal.addEntry(persistenceId, 1, new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("B"))); @@ -1193,7 +1196,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.waitForRecoveryComplete(); Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - verify(mockRaftActor.snapshotCohortDelegate, never()).applySnapshot(any(byte[].class)); + verify(mockRaftActor.snapshotCohortDelegate, never()).applySnapshot(any(Snapshot.State.class)); RaftActorContext context = mockRaftActor.getRaftActorContext(); assertEquals("Journal log size", 1, context.getReplicatedLog().size()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationSingleNodeTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationSingleNodeTest.java index 657905e822..2a3a6c1a7a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationSingleNodeTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationSingleNodeTest.java @@ -18,6 +18,7 @@ import java.util.List; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; @@ -82,8 +83,7 @@ public class RecoveryIntegrationSingleNodeTest extends AbstractRaftActorIntegrat List persistedSnapshots = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class); assertEquals(1, persistedSnapshots.size()); - @SuppressWarnings("unchecked") - List snapshottedState = (List)MockRaftActor.toObject(persistedSnapshots.get(0).getState()); + List snapshottedState = MockRaftActor.fromState(persistedSnapshots.get(0).getState()); assertEquals("Incorrect Snapshot", Lists.newArrayList(payload0, payload1, payload2, payload3), snapshottedState); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java index 0254c6db6e..e53dfe6714 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java @@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java index 8a915495e4..0dbcb1fbd0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; +import org.apache.commons.lang3.SerializationUtils; import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; @@ -37,6 +38,7 @@ import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; @@ -553,7 +555,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A List unAppliedEntry = persistedSnapshot.getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size()); - int snapshotSize = persistedSnapshot.getState().length; + int snapshotSize = SerializationUtils.serialize(persistedSnapshot.getState()).length; final int expTotalChunks = snapshotSize / SNAPSHOT_CHUNK_SIZE + (snapshotSize % SNAPSHOT_CHUNK_SIZE > 0 ? 1 : 0); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java index fab250bd31..7591c2f7ec 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java @@ -14,6 +14,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -25,7 +27,10 @@ import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.persistence.SnapshotSelectionCriteria; import akka.testkit.TestActorRef; +import java.io.OutputStream; import java.util.Arrays; +import java.util.Optional; +import java.util.function.Consumer; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -38,7 +43,9 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.slf4j.LoggerFactory; @@ -60,7 +67,7 @@ public class SnapshotManagerTest extends AbstractActorTest { private RaftActorBehavior mockRaftActorBehavior; @Mock - private Runnable mockProcedure; + private Consumer> mockProcedure; @Mock private ElectionTerm mockElectionTerm; @@ -95,7 +102,7 @@ public class SnapshotManagerTest extends AbstractActorTest { actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-")); doReturn(actorRef).when(mockRaftActorContext).getActor(); - snapshotManager.setCreateSnapshotRunnable(mockProcedure); + snapshotManager.setCreateSnapshotConsumer(mockProcedure); } @After @@ -108,6 +115,7 @@ public class SnapshotManagerTest extends AbstractActorTest { assertEquals(false, snapshotManager.isCapturing()); } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Test public void testCaptureToInstall() throws Exception { @@ -117,7 +125,9 @@ public class SnapshotManagerTest extends AbstractActorTest { assertEquals(true, snapshotManager.isCapturing()); - verify(mockProcedure).run(); + ArgumentCaptor outputStream = ArgumentCaptor.forClass(Optional.class); + verify(mockProcedure).accept(outputStream.capture()); + assertEquals("isPresent", true, outputStream.getValue().isPresent()); CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot(); @@ -135,6 +145,7 @@ public class SnapshotManagerTest extends AbstractActorTest { actorRef.underlyingActor().clear(); } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testCapture() throws Exception { boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1, @@ -144,7 +155,9 @@ public class SnapshotManagerTest extends AbstractActorTest { assertEquals(true, snapshotManager.isCapturing()); - verify(mockProcedure).run(); + ArgumentCaptor outputStream = ArgumentCaptor.forClass(Optional.class); + verify(mockProcedure).accept(outputStream.capture()); + assertEquals("isPresent", false, outputStream.getValue().isPresent()); CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot(); @@ -164,6 +177,7 @@ public class SnapshotManagerTest extends AbstractActorTest { } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Test public void testCaptureWithNullLastLogEntry() throws Exception { boolean capture = snapshotManager.capture(null, 1); @@ -172,7 +186,9 @@ public class SnapshotManagerTest extends AbstractActorTest { assertEquals(true, snapshotManager.isCapturing()); - verify(mockProcedure).run(); + ArgumentCaptor outputStream = ArgumentCaptor.forClass(Optional.class); + verify(mockProcedure).accept(outputStream.capture()); + assertEquals("isPresent", false, outputStream.getValue().isPresent()); CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot(); @@ -193,7 +209,7 @@ public class SnapshotManagerTest extends AbstractActorTest { @Test public void testCaptureWithCreateProcedureError() throws Exception { - doThrow(new RuntimeException("mock")).when(mockProcedure).run(); + doThrow(new RuntimeException("mock")).when(mockProcedure).accept(anyObject()); boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1, new MockRaftActorContext.MockPayload()), 9); @@ -202,9 +218,10 @@ public class SnapshotManagerTest extends AbstractActorTest { assertEquals(false, snapshotManager.isCapturing()); - verify(mockProcedure).run(); + verify(mockProcedure).accept(anyObject()); } + @SuppressWarnings("unchecked") @Test public void testIllegalCapture() throws Exception { boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1, @@ -212,7 +229,7 @@ public class SnapshotManagerTest extends AbstractActorTest { assertTrue(capture); - verify(mockProcedure).run(); + verify(mockProcedure).accept(anyObject()); reset(mockProcedure); @@ -222,11 +239,11 @@ public class SnapshotManagerTest extends AbstractActorTest { assertFalse(capture); - verify(mockProcedure, never()).run(); + verify(mockProcedure, never()).accept(anyObject()); } @Test - public void testPersistWhenReplicatedToAllIndexMinusOne() { + public void testPersistWhenReplicatedToAllIndexMinusOne() throws Exception { doReturn(7L).when(mockReplicatedLog).getSnapshotIndex(); doReturn(1L).when(mockReplicatedLog).getSnapshotTerm(); @@ -245,8 +262,8 @@ public class SnapshotManagerTest extends AbstractActorTest { // when replicatedToAllIndex = -1 snapshotManager.capture(lastLogEntry, -1); - byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; - snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory()); + ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10}); + snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory()); ArgumentCaptor snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class); verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture()); @@ -257,7 +274,7 @@ public class SnapshotManagerTest extends AbstractActorTest { assertEquals("getLastIndex", 9L, snapshot.getLastIndex()); assertEquals("getLastAppliedTerm", 2L, snapshot.getLastAppliedTerm()); assertEquals("getLastAppliedIndex", 8L, snapshot.getLastAppliedIndex()); - assertArrayEquals("getState", bytes, snapshot.getState()); + assertEquals("getState", snapshotState, snapshot.getState()); assertEquals("getUnAppliedEntries", Arrays.asList(lastLogEntry), snapshot.getUnAppliedEntries()); assertEquals("electionTerm", mockElectionTerm.getCurrentTerm(), snapshot.getElectionTerm()); assertEquals("electionVotedFor", mockElectionTerm.getVotedFor(), snapshot.getElectionVotedFor()); @@ -266,7 +283,7 @@ public class SnapshotManagerTest extends AbstractActorTest { } @Test - public void testPersistWhenReplicatedToAllIndexNotMinus() { + public void testPersistWhenReplicatedToAllIndexNotMinus() throws Exception { doReturn(45L).when(mockReplicatedLog).getSnapshotIndex(); doReturn(6L).when(mockReplicatedLog).getSnapshotTerm(); ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); @@ -277,8 +294,8 @@ public class SnapshotManagerTest extends AbstractActorTest { // when replicatedToAllIndex != -1 snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), 9); - byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; - snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory()); + ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10}); + snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory()); ArgumentCaptor snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class); verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture()); @@ -289,7 +306,7 @@ public class SnapshotManagerTest extends AbstractActorTest { assertEquals("getLastIndex", 9L, snapshot.getLastIndex()); assertEquals("getLastAppliedTerm", 6L, snapshot.getLastAppliedTerm()); assertEquals("getLastAppliedIndex", 9L, snapshot.getLastAppliedIndex()); - assertArrayEquals("getState", bytes, snapshot.getState()); + assertEquals("getState", snapshotState, snapshot.getState()); assertEquals("getUnAppliedEntries size", 0, snapshot.getUnAppliedEntries().size()); verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); @@ -304,7 +321,7 @@ public class SnapshotManagerTest extends AbstractActorTest { // when replicatedToAllIndex = -1 snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1); - snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory()); verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); @@ -330,7 +347,7 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), replicatedToAllIndex); - snapshotManager.persist(new byte[]{}, 2000000L); + snapshotManager.persist(ByteState.empty(), Optional.empty(), 2000000L); verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); @@ -339,9 +356,11 @@ public class SnapshotManagerTest extends AbstractActorTest { verify(mockRaftActorBehavior).setReplicatedToAllIndex(replicatedToAllIndex); } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test - public void testPersistSendInstallSnapshot() { + public void testPersistSendInstallSnapshot() throws Exception { doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize(); + doNothing().when(mockProcedure).accept(anyObject()); // when replicatedToAllIndex = -1 boolean capture = snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6, @@ -349,9 +368,17 @@ public class SnapshotManagerTest extends AbstractActorTest { assertTrue(capture); - byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; + ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10}); - snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory()); + ArgumentCaptor installSnapshotStreamCapture = ArgumentCaptor.forClass(Optional.class); + verify(mockProcedure).accept(installSnapshotStreamCapture.capture()); + + Optional installSnapshotStream = installSnapshotStreamCapture.getValue(); + assertEquals("isPresent", true, installSnapshotStream.isPresent()); + + installSnapshotStream.get().write(snapshotState.getBytes()); + + snapshotManager.persist(snapshotState, installSnapshotStream, Runtime.getRuntime().totalMemory()); assertEquals(true, snapshotManager.isCapturing()); @@ -366,12 +393,13 @@ public class SnapshotManagerTest extends AbstractActorTest { SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue(); - assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().getState())); + assertEquals("state", snapshotState, sendInstallSnapshot.getSnapshot().getState()); + assertArrayEquals("state", snapshotState.getBytes(), sendInstallSnapshot.getSnapshotBytes().read()); } @Test public void testCallingPersistWithoutCaptureWillDoNothing() { - snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory()); verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class)); @@ -385,18 +413,15 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize(); // when replicatedToAllIndex = -1 - snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6, - new MockRaftActorContext.MockPayload()), -1, "follower-1"); + snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1); - snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory()); - snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory()); verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); - - verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class)); } @Test @@ -404,10 +429,9 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber(); // when replicatedToAllIndex = -1 - snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6, - new MockRaftActorContext.MockPayload()), -1, "follower-1"); + snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1); - snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory()); assertEquals(true, snapshotManager.isCapturing()); @@ -433,8 +457,7 @@ public class SnapshotManagerTest extends AbstractActorTest { @Test public void testCommitBeforePersist() { // when replicatedToAllIndex = -1 - snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6, - new MockRaftActorContext.MockPayload()), -1, "follower-1"); + snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1); snapshotManager.commit(100L, 0); @@ -463,10 +486,9 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber(); // when replicatedToAllIndex = -1 - snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6, - new MockRaftActorContext.MockPayload()), -1, "follower-1"); + snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1); - snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory()); snapshotManager.commit(100L, 0); @@ -482,10 +504,9 @@ public class SnapshotManagerTest extends AbstractActorTest { @Test public void testRollback() { // when replicatedToAllIndex = -1 - snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6, - new MockRaftActorContext.MockPayload()), -1, "follower-1"); + snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1); - snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory()); snapshotManager.rollback(); @@ -498,8 +519,7 @@ public class SnapshotManagerTest extends AbstractActorTest { @Test public void testRollbackBeforePersist() { // when replicatedToAllIndex = -1 - snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6, - new MockRaftActorContext.MockPayload()), -1, "follower-1"); + snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1); snapshotManager.rollback(); @@ -516,10 +536,9 @@ public class SnapshotManagerTest extends AbstractActorTest { @Test public void testCallingRollbackMultipleTimesCausesNoHarm() { // when replicatedToAllIndex = -1 - snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6, - new MockRaftActorContext.MockPayload()), -1, "follower-1"); + snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1); - snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory()); snapshotManager.rollback(); @@ -622,8 +641,8 @@ public class SnapshotManagerTest extends AbstractActorTest { @Test public void testTrimLogAfterCaptureToInstall() { - boolean capture = snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 1, - new MockRaftActorContext.MockPayload()), 9, "follower-1"); + boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1, + new MockRaftActorContext.MockPayload()), 9); assertTrue(capture); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotTest.java index 9cf4015108..b4a8c27b7c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotTest.java @@ -25,6 +25,7 @@ import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload * * @author Thomas Pantelis */ +@Deprecated public class SnapshotTest { @Test diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 8cb914c226..487a4fab90 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -29,8 +29,10 @@ import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -44,12 +46,11 @@ import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.MockRaftActor; import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder; +import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; -import org.opendaylight.controller.cluster.raft.RaftActorTest; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; @@ -63,9 +64,12 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State; import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; @@ -809,7 +813,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - ByteString bsSnapshot = createSnapshot(); + ByteString bsSnapshot = createSnapshot(); int offset = 0; int snapshotLength = bsSnapshot.size(); int chunkSize = 50; @@ -838,7 +842,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastAppliedIndex()); assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm()); - Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState()); + assertEquals("getState type", ByteState.class, snapshot.getState().getClass()); + Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes()); assertEquals("getElectionTerm", 1, snapshot.getElectionTerm()); assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor()); applySnapshot.getCallback().onSuccess(); @@ -1154,7 +1159,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm()); assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex()); assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()), - MockRaftActor.toObject(snapshot.getState())); + MockRaftActor.fromState(snapshot.getState())); } @Test @@ -1208,7 +1213,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm()); assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex()); assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(), - entries.get(2).getData()), MockRaftActor.toObject(snapshot.getState())); + entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState())); assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size()); assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex()); @@ -1283,24 +1288,29 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm()); assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex()); assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()), - MockRaftActor.toObject(snapshot.getState())); + MockRaftActor.fromState(snapshot.getState())); } @SuppressWarnings("checkstyle:IllegalCatch") private RaftActorSnapshotCohort newRaftActorSnapshotCohort(final AtomicReference followerRaftActor) { RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() { @Override - public void createSnapshot(ActorRef actorRef) { + public void createSnapshot(ActorRef actorRef, java.util.Optional installSnapshotStream) { try { - actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject( - followerRaftActor.get().getState()).toByteArray()), actorRef); + actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()), + installSnapshotStream), actorRef); } catch (Exception e) { Throwables.propagate(e); } } @Override - public void applySnapshot(byte[] snapshotBytes) { + public void applySnapshot(State snapshotState) { + } + + @Override + public State deserializeSnapshot(ByteSource snapshotBytes) { + throw new UnsupportedOperationException(); } }; return snapshotCohort; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index fbfdea0249..99b647b000 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -25,6 +25,7 @@ import akka.actor.Terminated; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import com.google.common.collect.ImmutableMap; +import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; import java.util.Arrays; @@ -43,7 +44,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohor import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; @@ -52,6 +52,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; @@ -59,7 +60,9 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; @@ -577,8 +580,9 @@ public class LeaderTest extends AbstractLeaderTest { leader.markFollowerActive(FOLLOWER_ID); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm)); + leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null), ByteSource.wrap(bs.toByteArray()))); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState( actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); fts.setSnapshotBytes(bs); @@ -700,7 +704,6 @@ public class LeaderTest extends AbstractLeaderTest { CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot(); - assertTrue(cs.isInstallSnapshotInitiated()); assertEquals(3, cs.getLastAppliedIndex()); assertEquals(1, cs.getLastAppliedTerm()); assertEquals(4, cs.getLastIndex()); @@ -763,7 +766,6 @@ public class LeaderTest extends AbstractLeaderTest { CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot(); - assertTrue(cs.isInstallSnapshotInitiated()); assertEquals(3, cs.getLastAppliedIndex()); assertEquals(1, cs.getLastAppliedTerm()); assertEquals(4, cs.getLastIndex()); @@ -810,11 +812,12 @@ public class LeaderTest extends AbstractLeaderTest { leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(0); - Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(), - Collections.emptyList(), - lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm); + byte[] bytes = toByteString(leadersSnapshot).toByteArray(); + Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.emptyList(), + lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, + new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes))); assertTrue(raftBehavior instanceof Leader); @@ -861,11 +864,12 @@ public class LeaderTest extends AbstractLeaderTest { leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(-1); - Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(), - Collections.emptyList(), - lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm); + byte[] bytes = toByteString(leadersSnapshot).toByteArray(); + Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.emptyList(), + lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, + new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes))); assertTrue(raftBehavior instanceof Leader); @@ -915,8 +919,9 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm)); + leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null), ByteSource.wrap(bs.toByteArray()))); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState( actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); fts.setSnapshotBytes(bs); @@ -983,11 +988,11 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm); - leader.setSnapshot(snapshot); + Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null); - leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -1057,12 +1062,12 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm); - leader.setSnapshot(snapshot); + Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null); Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -1122,11 +1127,11 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm); - leader.setSnapshot(snapshot); + Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null); - leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/ByteState.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/ByteState.java new file mode 100644 index 0000000000..42411a95c4 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/ByteState.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.persisted; + +import com.google.common.base.Preconditions; +import java.util.Arrays; +import javax.annotation.Nonnull; + +/** + * Snapshot State implementation backed by a byte[]. + * + * @author Thomas Pantelis + */ +public class ByteState implements Snapshot.State { + private static final long serialVersionUID = 1L; + + private final byte[] bytes; + + private ByteState(@Nonnull byte[] bytes) { + this.bytes = Preconditions.checkNotNull(bytes); + } + + public static ByteState of(@Nonnull byte[] bytes) { + return new ByteState(bytes); + } + + public static ByteState empty() { + return new ByteState(new byte[0]); + } + + public byte[] getBytes() { + return bytes; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + Arrays.hashCode(bytes); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + ByteState other = (ByteState) obj; + if (!Arrays.equals(bytes, other.bytes)) { + return false; + } + return true; + } + + @Override + public String toString() { + return "ByteState [bytes=" + Arrays.toString(bytes) + "]"; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/EmptyStateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/EmptyStateTest.java new file mode 100644 index 0000000000..963580cde4 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/EmptyStateTest.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.persisted; + +import static org.junit.Assert.assertSame; + +import org.apache.commons.lang.SerializationUtils; +import org.junit.Test; + +/** + * Unit tests for EmptyState. + * + * @author Thomas Pantelis + * + */ +public class EmptyStateTest { + + @Test + public void testSerialization() { + EmptyState cloned = (EmptyState) SerializationUtils.clone(EmptyState.INSTANCE); + assertSame("cloned", EmptyState.INSTANCE, cloned); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/SnapshotTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/SnapshotTest.java new file mode 100644 index 0000000000..19f0ec132b --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/SnapshotTest.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.persisted; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.commons.lang.SerializationUtils; +import org.junit.Test; +import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; + +/** + * Unit tests for Snapshot. + * + * @author Thomas Pantelis + */ +public class SnapshotTest { + + @Test + public void testSerialization() throws Exception { + testSerialization(new byte[]{1, 2, 3, 4, 5, 6, 7}, Arrays.asList( + new SimpleReplicatedLogEntry(6, 2, new MockPayload("payload")))); + testSerialization(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9}, Collections.emptyList()); + } + + private void testSerialization(byte[] state, List unapplied) throws Exception { + long lastIndex = 6; + long lastTerm = 2; + long lastAppliedIndex = 5; + long lastAppliedTerm = 1; + long electionTerm = 3; + String electionVotedFor = "member-1"; + ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList( + new ServerInfo("1", true), new ServerInfo("2", false))); + + Snapshot expected = Snapshot.create(ByteState.of(state), unapplied, lastIndex, lastTerm, lastAppliedIndex, + lastAppliedTerm, electionTerm, electionVotedFor, serverConfig); + Snapshot cloned = (Snapshot) SerializationUtils.clone(expected); + + assertEquals("lastIndex", expected.getLastIndex(), cloned.getLastIndex()); + assertEquals("lastTerm", expected.getLastTerm(), cloned.getLastTerm()); + assertEquals("lastAppliedIndex", expected.getLastAppliedIndex(), cloned.getLastAppliedIndex()); + assertEquals("lastAppliedTerm", expected.getLastAppliedTerm(), cloned.getLastAppliedTerm()); + assertEquals("unAppliedEntries", expected.getUnAppliedEntries(), cloned.getUnAppliedEntries()); + assertEquals("electionTerm", expected.getElectionTerm(), cloned.getElectionTerm()); + assertEquals("electionVotedFor", expected.getElectionVotedFor(), cloned.getElectionVotedFor()); + assertEquals("state", expected.getState(), cloned.getState()); + assertEquals("serverConfig", expected.getServerConfiguration().getServerConfig(), + cloned.getServerConfiguration().getServerConfig()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index 5e0d3407fb..e87f083fb1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -10,9 +10,13 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import java.io.File; +import java.io.IOException; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeXMLOutput; import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; @@ -86,24 +90,21 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { */ @Override @SuppressWarnings("checkstyle:IllegalCatch") - public void applyRecoverySnapshot(final byte[] snapshotBytes) { - log.debug("{}: Applying recovered snapshot", shardName); - - final ShardDataTreeSnapshot snapshot; - try { - snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes); - } catch (Exception e) { - log.error("{}: failed to deserialize snapshot", shardName, e); - throw Throwables.propagate(e); + public void applyRecoverySnapshot(final Snapshot.State snapshotState) { + if (!(snapshotState instanceof ShardSnapshotState)) { + log.debug("{}: applyRecoverySnapshot ignoring snapshot: {}", snapshotState); } + log.debug("{}: Applying recovered snapshot", shardName); + + ShardDataTreeSnapshot shardSnapshot = ((ShardSnapshotState)snapshotState).getSnapshot(); try { - store.applyRecoverySnapshot(snapshot); + store.applyRecoverySnapshot(shardSnapshot); } catch (Exception e) { - final File f = writeRoot("snapshot", snapshot.getRootNode().orElse(null)); + final File f = writeRoot("snapshot", shardSnapshot.getRootNode().orElse(null)); throw new IllegalStateException(String.format( "%s: Failed to apply recovery snapshot %s. Node data was written to file %s", - shardName, snapshot, f), e); + shardName, shardSnapshot, f), e); } } @@ -111,4 +112,15 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { public byte[] getRestoreFromSnapshot() { return restoreFromSnapshot; } + + @Override + @Deprecated + public State deserializePreCarbonSnapshot(byte[] from) { + try { + return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(from)); + } catch (IOException e) { + log.error("{}: failed to deserialize snapshot", shardName, e); + throw Throwables.propagate(e); + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java index 6dc3f03081..0627c0a9e5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java @@ -10,7 +10,11 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorContext; import akka.actor.ActorRef; import com.google.common.base.Preconditions; +import com.google.common.io.ByteSource; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Optional; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendType; @@ -18,7 +22,10 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State; import org.slf4j.Logger; /** @@ -56,28 +63,26 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort { } @Override - public void createSnapshot(final ActorRef actorRef) { + public void createSnapshot(final ActorRef actorRef, final Optional installSnapshotStream) { // Forward the request to the snapshot actor - ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeStateSnapshot(), actorRef); + ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeStateSnapshot(), installSnapshotStream, actorRef); } @Override @SuppressWarnings("checkstyle:IllegalCatch") - public void applySnapshot(final byte[] snapshotBytes) { + public void applySnapshot(final Snapshot.State snapshotState) { + if (!(snapshotState instanceof ShardSnapshotState)) { + log.debug("{}: applySnapshot ignoring snapshot: {}", snapshotState); + } + + final ShardDataTreeSnapshot snapshot = ((ShardSnapshotState)snapshotState).getSnapshot(); + // Since this will be done only on Recovery or when this actor is a Follower // we can safely commit everything in here. We not need to worry about event notifications // as they would have already been disabled on the follower log.info("{}: Applying snapshot", logId); - final ShardDataTreeSnapshot snapshot; - try { - snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes); - } catch (IOException e) { - log.error("{}: Failed to deserialize snapshot", logId, e); - return; - } - try { store.applySnapshot(snapshot); } catch (Exception e) { @@ -87,4 +92,11 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort { log.info("{}: Done applying snapshot", logId); } + + @Override + public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException { + try (final InputStream is = snapshotBytes.openStream()) { + return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(is)); + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java index 00bb424850..e7529fb4bf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java @@ -10,9 +10,15 @@ package org.opendaylight.controller.cluster.datastore.actors; import akka.actor.ActorRef; import akka.actor.Props; import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Optional; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This is an offload actor, which is given an isolated snapshot of the data tree. It performs the potentially @@ -21,13 +27,18 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotRep * @author Robert Varga */ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering { + private static final Logger LOG = LoggerFactory.getLogger(ShardSnapshotActor.class); + // Internal message private static final class SerializeSnapshot { private final ShardDataTreeSnapshot snapshot; + private final Optional installSnapshotStream; private final ActorRef replyTo; - SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final ActorRef replyTo) { + SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final Optional installSnapshotStream, + final ActorRef replyTo) { this.snapshot = Preconditions.checkNotNull(snapshot); + this.installSnapshotStream = Preconditions.checkNotNull(installSnapshotStream); this.replyTo = Preconditions.checkNotNull(replyTo); } @@ -35,6 +46,10 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering { return snapshot; } + Optional getInstallSnapshotStream() { + return installSnapshotStream; + } + ActorRef getReplyTo() { return replyTo; } @@ -50,16 +65,39 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering { @Override protected void handleReceive(final Object message) throws Exception { if (message instanceof SerializeSnapshot) { - final SerializeSnapshot request = (SerializeSnapshot) message; - request.getReplyTo().tell(new CaptureSnapshotReply(request.getSnapshot().serialize()), ActorRef.noSender()); + onSerializeSnapshot((SerializeSnapshot) message); } else { unknownMessage(message); } } - public static void requestSnapshot(final ActorRef snapshotActor, ShardDataTreeSnapshot snapshot, - final ActorRef replyTo) { - snapshotActor.tell(new SerializeSnapshot(snapshot, replyTo), ActorRef.noSender()); + private void onSerializeSnapshot(SerializeSnapshot request) { + Optional installSnapshotStream = request.getInstallSnapshotStream(); + if (installSnapshotStream.isPresent()) { + try { + request.getSnapshot().serialize(installSnapshotStream.get()); + } catch (IOException e) { + // TODO - we should communicate the failure in the CaptureSnapshotReply. + LOG.error("Error serializing snapshot", e); + } + } + + request.getReplyTo().tell(new CaptureSnapshotReply(new ShardSnapshotState(request.getSnapshot()), + installSnapshotStream), ActorRef.noSender()); + } + + /** + * Sends a request to a ShardSnapshotActor to process a snapshot and send a CaptureSnapshotReply. + * + * @param snapshotActor the ShardSnapshotActor + * @param snapshot the snapshot to process + * @param installSnapshotStream Optional OutputStream that is present if the snapshot is to also be installed + * on a follower. + * @param replyTo the actor to which to send the CaptureSnapshotReply + */ + public static void requestSnapshot(final ActorRef snapshotActor, final ShardDataTreeSnapshot snapshot, + final Optional installSnapshotStream, final ActorRef replyTo) { + snapshotActor.tell(new SerializeSnapshot(snapshot, installSnapshotStream, replyTo), ActorRef.noSender()); } public static Props props() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractVersionedShardDataTreeSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractVersionedShardDataTreeSnapshot.java index 48d2673421..1d9b58f270 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractVersionedShardDataTreeSnapshot.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractVersionedShardDataTreeSnapshot.java @@ -8,12 +8,12 @@ package org.opendaylight.controller.cluster.datastore.persisted; import com.google.common.base.Verify; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.io.OutputStream; import java.util.Optional; import javax.annotation.Nonnull; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -93,15 +93,11 @@ abstract class AbstractVersionedShardDataTreeSnapshot extends ShardDataTreeSnaps } @Override - public final byte[] serialize() throws IOException { - try (final ByteArrayOutputStream bos = new ByteArrayOutputStream()) { - try (final DataOutputStream dos = new DataOutputStream(bos)) { - final PayloadVersion version = version(); - version.writeTo(dos); - versionedSerialize(dos, version); - } - - return bos.toByteArray(); + public void serialize(final OutputStream os) throws IOException { + try (final DataOutputStream dos = new DataOutputStream(os)) { + final PayloadVersion version = version(); + version.writeTo(dos); + versionedSerialize(dos, version); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PreBoronShardDataTreeSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PreBoronShardDataTreeSnapshot.java index 3aca7f2314..7be0d85c7a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PreBoronShardDataTreeSnapshot.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PreBoronShardDataTreeSnapshot.java @@ -8,6 +8,9 @@ package org.opendaylight.controller.cluster.datastore.persisted; import com.google.common.annotations.Beta; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; import java.util.Optional; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils; @@ -33,7 +36,9 @@ public final class PreBoronShardDataTreeSnapshot extends ShardDataTreeSnapshot { } @Override - public byte[] serialize() { - return SerializationUtils.serializeNormalizedNode(rootNode); + public void serialize(OutputStream os) throws IOException { + try (final DataOutputStream dos = new DataOutputStream(os)) { + SerializationUtils.serializeNormalizedNode(rootNode, dos); + } } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshot.java index 6cb047fc37..7b8382e9c2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshot.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshot.java @@ -12,8 +12,8 @@ import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.Optional; -import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; @@ -32,6 +32,7 @@ public abstract class ShardDataTreeSnapshot { // Hidden to prevent subclassing from outside of this package } + @Deprecated public static ShardDataTreeSnapshot deserialize(final byte[] bytes) throws IOException { /** * Unfortunately versions prior to Boron did not include any way to evolve the snapshot format and contained @@ -50,18 +51,7 @@ public abstract class ShardDataTreeSnapshot { try { try (final InputStream is = new ByteArrayInputStream(bytes)) { - try (final DataInputStream dis = new DataInputStream(is)) { - final ShardDataTreeSnapshot ret = AbstractVersionedShardDataTreeSnapshot.deserialize(dis); - - // Make sure we consume all bytes, otherwise something went very wrong - final int bytesLeft = dis.available(); - if (bytesLeft != 0) { - throw new IOException("Deserialization left " + bytesLeft + " in the buffer"); - } - - - return ret; - } + return deserialize(is); } } catch (IOException e) { LOG.debug("Failed to deserialize versioned stream, attempting pre-Lithium ProtoBuf", e); @@ -69,6 +59,21 @@ public abstract class ShardDataTreeSnapshot { } } + public static ShardDataTreeSnapshot deserialize(final InputStream is) throws IOException { + try (final DataInputStream dis = new DataInputStream(is)) { + final ShardDataTreeSnapshot ret = AbstractVersionedShardDataTreeSnapshot.deserialize(dis); + + // Make sure we consume all bytes, otherwise something went very wrong + final int bytesLeft = dis.available(); + if (bytesLeft != 0) { + throw new IOException("Deserialization left " + bytesLeft + " in the buffer"); + } + + + return ret; + } + } + /** * Get the root data node contained in this snapshot. * @@ -76,14 +81,9 @@ public abstract class ShardDataTreeSnapshot { */ public abstract Optional> getRootNode(); - /** - * Serialize this snapshot into a byte array for persistence. - * - * @return Serialized snapshot - * @throws IOException when a serialization problem occurs - */ - public abstract @Nonnull byte[] serialize() throws IOException; + public abstract void serialize(final OutputStream os) throws IOException; + @Deprecated private static boolean isLegacyStream(final byte[] bytes) { if (bytes.length < 2) { // Versioned streams have at least two bytes diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotState.java new file mode 100644 index 0000000000..d4556cd374 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotState.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import com.google.common.base.Preconditions; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.OutputStream; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; + +/** + * Encapsulates the snapshot State for a Shard. + * + * @author Thomas Pantelis + */ +public class ShardSnapshotState implements Snapshot.State { + private static final long serialVersionUID = 1L; + + private static final class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + private ShardSnapshotState snapshotState; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + // For Externalizable + } + + Proxy(final ShardSnapshotState snapshotState) { + this.snapshotState = snapshotState; + } + + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + snapshotState.snapshot.serialize(toOutputStream(out)); + } + + private OutputStream toOutputStream(final ObjectOutput out) { + if (out instanceof OutputStream) { + return (OutputStream) out; + } + + return new OutputStream() { + @Override + public void write(int value) throws IOException { + out.write(value); + } + }; + } + + @Override + public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + snapshotState = new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(toInputStream(in))); + } + + private InputStream toInputStream(final ObjectInput in) { + if (in instanceof InputStream) { + return (InputStream) in; + } + + return new InputStream() { + @Override + public int read() throws IOException { + return in.read(); + } + }; + } + + private Object readResolve() { + return snapshotState; + } + } + + @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but this class " + + "implements writeReplace to delegate serialization to a Proxy class and thus instances of this class " + + "aren't serialized. FindBugs does not recognize this.") + private final ShardDataTreeSnapshot snapshot; + + public ShardSnapshotState(@Nonnull ShardDataTreeSnapshot snapshot) { + this.snapshot = Preconditions.checkNotNull(snapshot); + } + + @Nonnull + public ShardDataTreeSnapshot getSnapshot() { + return snapshot; + } + + @SuppressWarnings("static-method") + private Object writeReplace() { + return new Proxy(this); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 3de3e9ac22..e73fa08970 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -56,9 +56,10 @@ import org.opendaylight.controller.cluster.datastore.modification.MutableComposi import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.TestActorFactory; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; @@ -349,8 +350,8 @@ public abstract class AbstractShardTest extends AbstractActorTest { final NormalizedNode root = readStore(testStore, YangInstanceIdentifier.EMPTY); InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create( - new PreBoronShardDataTreeSnapshot(root).serialize(), - Collections.emptyList(), 0, 1, -1, -1)); + new ShardSnapshotState(new PreBoronShardDataTreeSnapshot(root)), + Collections.emptyList(), 0, 1, -1, -1, 1, null, null)); return testStore; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index cb2284a30c..1b885c467f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -30,6 +30,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; @@ -52,10 +53,12 @@ import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.Snapshot; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; @@ -180,7 +183,6 @@ public class DistributedDataStoreIntegrationTest { @Test public void testReadWriteTransactionWithSingleShard() throws Exception { - System.setProperty("shard.persistent", "true"); new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { try (AbstractDataStore dataStore = setupDistributedDataStore( @@ -1255,8 +1257,9 @@ public class DistributedDataStoreIntegrationTest { AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode); NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); - final Snapshot carsSnapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(), - Collections.emptyList(), 2, 1, 2, 1, 1, "member-1"); + final Snapshot carsSnapshot = Snapshot.create( + new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), + Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); NormalizedNode peopleNode = PeopleModel.create(); dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); @@ -1264,8 +1267,9 @@ public class DistributedDataStoreIntegrationTest { AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode); root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); - Snapshot peopleSnapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(), - Collections.emptyList(), 2, 1, 2, 1, 1, "member-1"); + Snapshot peopleSnapshot = Snapshot.create( + new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), + Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList( @@ -1290,4 +1294,41 @@ public class DistributedDataStoreIntegrationTest { } }; } + + @Test + @Deprecated + public void testRecoveryFromPreCarbonSnapshot() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + final String name = "testRecoveryFromPreCarbonSnapshot"; + + ContainerNode carsNode = CarsModel.newCarsNode( + CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)), + CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L)))); + + DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); + dataTree.setSchemaContext(SchemaContextHelper.full()); + AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode); + NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); + + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + new MetadataShardDataTreeSnapshot(root).serialize(bos); + final org.opendaylight.controller.cluster.raft.Snapshot snapshot = + org.opendaylight.controller.cluster.raft.Snapshot.create(bos.toByteArray(), + Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); + + InMemorySnapshotStore.addSnapshot("member-1-shard-cars-" + name, snapshot); + + try (AbstractDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf", + true, "cars")) { + + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + Optional> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", carsNode, optional.get()); + } + } + }; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index ff44f01402..501da5c7eb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -36,6 +36,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.math.BigInteger; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -56,10 +57,14 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; @@ -131,6 +136,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Before public void setUp() { + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); + leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS); @@ -153,6 +161,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { JavaTestKit.shutdownActorSystem(leaderSystem); JavaTestKit.shutdownActorSystem(followerSystem); JavaTestKit.shutdownActorSystem(follower2System); + + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); } private void initDatastoresWithCars(final String type) { @@ -1012,6 +1023,56 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } } + @Test + public void testInstallSnapshot() throws Exception { + final String testName = "testInstallSnapshot"; + final String leaderCarShardName = "member-1-shard-cars-" + testName; + final String followerCarShardName = "member-2-shard-cars-" + testName; + + // Setup a saved snapshot on the leader. The follower will startup with no data and the leader should + // install a snapshot to sync the follower. + + TipProducingDataTree tree = InMemoryDataTreeFactory.getInstance().create(TreeType.CONFIGURATION); + tree.setSchemaContext(SchemaContextHelper.full()); + + ContainerNode carsNode = CarsModel.newCarsNode( + CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)))); + AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode); + + NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.EMPTY); + Snapshot initialSnapshot = Snapshot.create( + new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)), + Collections.emptyList(), 5, 1, 5, 1, 1, null, null); + InMemorySnapshotStore.addSnapshot(leaderCarShardName, initialSnapshot); + + InMemorySnapshotStore.addSnapshotSavedLatch(leaderCarShardName); + InMemorySnapshotStore.addSnapshotSavedLatch(followerCarShardName); + + initDatastoresWithCars(testName); + + Optional> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read( + CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, readOptional.isPresent()); + assertEquals("Node", carsNode, readOptional.get()); + + verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(leaderCarShardName, Snapshot.class), + initialSnapshot, snapshotRoot); + + verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(followerCarShardName, Snapshot.class), + initialSnapshot, snapshotRoot); + } + + private static void verifySnapshot(Snapshot actual, Snapshot expected, NormalizedNode expRoot) { + assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm()); + assertEquals("Snapshot getLastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex()); + assertEquals("Snapshot getLastTerm", expected.getLastTerm(), actual.getLastTerm()); + assertEquals("Snapshot getLastIndex", expected.getLastIndex(), actual.getLastIndex()); + assertEquals("Snapshot state type", ShardSnapshotState.class, actual.getState().getClass()); + MetadataShardDataTreeSnapshot shardSnapshot = + (MetadataShardDataTreeSnapshot) ((ShardSnapshotState)actual.getState()).getSnapshot(); + assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().get()); + } + private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) { final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build()); DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java index 262758f4aa..736e291703 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java @@ -17,7 +17,8 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; -import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; @@ -133,7 +134,7 @@ public class ShardRecoveryCoordinatorTest extends AbstractTest { return shardDataTree.readNode(PeopleModel.BASE_PATH); } - private static byte[] createSnapshot() { + private static ShardSnapshotState createSnapshot() { final TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); dataTree.setSchemaContext(SchemaContextHelper.select(SchemaContextHelper.CARS_YANG, SchemaContextHelper.PEOPLE_YANG)); @@ -147,7 +148,7 @@ public class ShardRecoveryCoordinatorTest extends AbstractTest { modification.ready(); dataTree.commit(dataTree.prepare(modification)); - return new PreBoronShardDataTreeSnapshot(dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get()) - .serialize(); + return new ShardSnapshotState(new MetadataShardDataTreeSnapshot(dataTree.takeSnapshot().readNode( + YangInstanceIdentifier.EMPTY).get())); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index ab07b667b2..2edbff2380 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -77,14 +77,13 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; -import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; @@ -97,6 +96,7 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; @@ -442,8 +442,9 @@ public class ShardTest extends AbstractShardTest { final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY; final NormalizedNode expected = readStore(store, root); - final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(expected).serialize(), - Collections.emptyList(), 1, 2, 3, 4); + final Snapshot snapshot = Snapshot.create( + new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expected)), + Collections.emptyList(), 1, 2, 3, 4, -1, null, null); shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender()); @@ -1975,8 +1976,8 @@ public class ShardTest extends AbstractShardTest { private void verifySnapshot(final Snapshot snapshot, final NormalizedNode expectedRoot) throws IOException { - final NormalizedNode actual = ShardDataTreeSnapshot.deserialize(snapshot.getState()).getRootNode() - .get(); + final NormalizedNode actual = ((ShardSnapshotState)snapshot.getState()).getSnapshot() + .getRootNode().get(); assertEquals("Root node", expectedRoot, actual); } }; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java index 65a8ac9ce8..5cf81834d8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java @@ -9,16 +9,17 @@ package org.opendaylight.controller.cluster.datastore.actors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import akka.actor.ActorRef; import akka.testkit.JavaTestKit; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.util.Optional; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; -import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -27,41 +28,42 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; public class ShardSnapshotActorTest extends AbstractActorTest { private static final NormalizedNode DATA = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - private static void testSerializeSnapshot(final String testName, final ShardDataTreeSnapshot snapshot) - throws Exception { + private static void testSerializeSnapshot(final String testName, final ShardDataTreeSnapshot snapshot, + final boolean withInstallSnapshot) throws Exception { new JavaTestKit(getSystem()) { { - final ActorRef snapshotActor = getSystem().actorOf(ShardSnapshotActor.props(), testName); watch(snapshotActor); final NormalizedNode expectedRoot = snapshot.getRootNode().get(); - ShardSnapshotActor.requestSnapshot(snapshotActor, snapshot, getRef()); + ByteArrayOutputStream installSnapshotStream = withInstallSnapshot ? new ByteArrayOutputStream() : null; + ShardSnapshotActor.requestSnapshot(snapshotActor, snapshot, + Optional.ofNullable(installSnapshotStream), getRef()); final CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class); - assertNotNull("getSnapshot is null", reply.getSnapshot()); - - final ShardDataTreeSnapshot actual = ShardDataTreeSnapshot.deserialize(reply.getSnapshot()); - assertNotNull(actual); - assertEquals(snapshot.getClass(), actual.getClass()); + assertNotNull("getSnapshotState is null", reply.getSnapshotState()); + assertEquals("SnapshotState type", ShardSnapshotState.class, reply.getSnapshotState().getClass()); + assertEquals("Snapshot", snapshot, ((ShardSnapshotState)reply.getSnapshotState()).getSnapshot()); - final Optional> maybeNode = actual.getRootNode(); - assertTrue(maybeNode.isPresent()); + if (installSnapshotStream != null) { + final ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize( + new ByteArrayInputStream(installSnapshotStream.toByteArray())); + assertEquals("Deserialized snapshot type", snapshot.getClass(), deserialized.getClass()); - assertEquals("Root node", expectedRoot, maybeNode.get()); + final Optional> maybeNode = deserialized.getRootNode(); + assertEquals("isPresent", true, maybeNode.isPresent()); + assertEquals("Root node", expectedRoot, maybeNode.get()); + } } }; } @Test public void testSerializeBoronSnapshot() throws Exception { - testSerializeSnapshot("testSerializeBoronSnapshot", new MetadataShardDataTreeSnapshot(DATA)); - } - - @Deprecated - @Test - public void testSerializeLegacySnapshot() throws Exception { - testSerializeSnapshot("testSerializeLegacySnapshot", new PreBoronShardDataTreeSnapshot(DATA)); + testSerializeSnapshot("testSerializeBoronSnapshotWithInstallSnapshot", + new MetadataShardDataTreeSnapshot(DATA), true); + testSerializeSnapshot("testSerializeBoronSnapshotWithoutInstallSnapshot", + new MetadataShardDataTreeSnapshot(DATA), false); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotTest.java index 7be91313ac..e909e79a79 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotTest.java @@ -10,6 +10,8 @@ package org.opendaylight.controller.cluster.datastore.persisted; import static org.junit.Assert.assertEquals; import com.google.common.collect.ImmutableMap; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; @@ -37,9 +39,11 @@ public class ShardDataTreeSnapshotTest { .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); MetadataShardDataTreeSnapshot snapshot = new MetadataShardDataTreeSnapshot(expectedNode); - byte[] serialized = snapshot.serialize(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + snapshot.serialize(bos); - ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(serialized); + ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize( + new ByteArrayInputStream(bos.toByteArray())); Optional> actualNode = deserialized.getRootNode(); assertEquals("rootNode present", true, actualNode.isPresent()); @@ -57,9 +61,11 @@ public class ShardDataTreeSnapshotTest { Map>, ShardDataTreeSnapshotMetadata> expMetadata = ImmutableMap.of(TestShardDataTreeSnapshotMetadata.class, new TestShardDataTreeSnapshotMetadata("test")); MetadataShardDataTreeSnapshot snapshot = new MetadataShardDataTreeSnapshot(expectedNode, expMetadata); - byte[] serialized = snapshot.serialize(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + snapshot.serialize(bos); - ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(serialized); + ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize( + new ByteArrayInputStream(bos.toByteArray())); Optional> actualNode = deserialized.getRootNode(); assertEquals("rootNode present", true, actualNode.isPresent()); @@ -69,15 +75,17 @@ public class ShardDataTreeSnapshotTest { } @Test + @Deprecated public void testPreBoronShardDataTreeSnapshot() throws Exception { NormalizedNode expectedNode = ImmutableContainerNodeBuilder.create() .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)) .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); PreBoronShardDataTreeSnapshot snapshot = new PreBoronShardDataTreeSnapshot(expectedNode); - byte[] serialized = snapshot.serialize(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + snapshot.serialize(bos); - ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(serialized); + ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(bos.toByteArray()); Optional> actualNode = deserialized.getRootNode(); assertEquals("rootNode present", true, actualNode.isPresent()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotStateTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotStateTest.java new file mode 100644 index 0000000000..82b9f45565 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotStateTest.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import org.apache.commons.lang.SerializationUtils; +import org.junit.Test; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; + +/** + * Unit tests for ShardSnapshotState. + * + * @author Thomas Pantelis + */ +public class ShardSnapshotStateTest { + + @Test + public void testSerialization() { + NormalizedNode expectedNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)) + .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); + + ShardSnapshotState expected = new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expectedNode)); + ShardSnapshotState cloned = (ShardSnapshotState) SerializationUtils.clone(expected); + + assertNotNull("getSnapshot is null", cloned.getSnapshot()); + assertEquals("getSnapshot type", MetadataShardDataTreeSnapshot.class, cloned.getSnapshot().getClass()); + assertEquals("getRootNode", expectedNode, + ((MetadataShardDataTreeSnapshot)cloned.getSnapshot()).getRootNode().get()); + } +} -- 2.36.6