}
@Override
- public byte[] getRestoreFromSnapshot() {
+ public Snapshot getRestoreFromSnapshot() {
return null;
}
import akka.actor.UntypedActor;
import com.google.common.base.Preconditions;
import java.util.concurrent.TimeoutException;
-import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
LOG.debug("{}: Received CaptureSnapshotReply, sending {}", params.id, snapshot);
- params.replyToActor.tell(new GetSnapshotReply(params.id, SerializationUtils.serialize(snapshot)),
- getSelf());
+ params.replyToActor.tell(new GetSnapshotReply(params.id, snapshot), getSelf());
getSelf().tell(PoisonPill.getInstance(), getSelf());
} else if (message instanceof ReceiveTimeout) {
LOG.warn("{}: Got ReceiveTimeout for inactivity - did not receive CaptureSnapshotReply within {} ms",
void applyCurrentLogRecoveryBatch();
/**
- * Returns the state snapshot to restore from on recovery.
+ * Returns the snapshot to restore from on recovery.
*
- * @return the snapshot bytes or null if there's no snapshot to restore
+ * @return the snapshot or null if there's no snapshot to restore
*/
@Nullable
- byte[] getRestoreFromSnapshot();
+ Snapshot getRestoreFromSnapshot();
/**
* This method is called during recovery to de-serialize a snapshot that was persisted in the pre-Carbon format.
import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotOffer;
import com.google.common.base.Stopwatch;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
import java.util.Collections;
import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
@SuppressWarnings("checkstyle:IllegalCatch")
private void possiblyRestoreFromSnapshot() {
- byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot();
+ Snapshot restoreFromSnapshot = cohort.getRestoreFromSnapshot();
if (restoreFromSnapshot == null) {
return;
}
return;
}
- try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) {
- Snapshot snapshot = (Snapshot) ois.readObject();
+ log.debug("{}: Restore snapshot: {}", context.getId(), restoreFromSnapshot);
- log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot);
-
- context.getSnapshotManager().apply(new ApplySnapshot(snapshot));
- } catch (RuntimeException | ClassNotFoundException | IOException e) {
- log.error("{}: Error deserializing snapshot restore", context.getId(), e);
- }
+ context.getSnapshotManager().apply(new ApplySnapshot(restoreFromSnapshot));
}
private ReplicatedLog replicatedLog() {
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
context.getPeerServerInfo(true));
- sender.tell(new GetSnapshotReply(context.getId(), SerializationUtils.serialize(snapshot)),
- context.getActor());
+ sender.tell(new GetSnapshotReply(context.getId(), snapshot), context.getActor());
}
}
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
/**
* Reply to GetSnapshot that returns a serialized Snapshot instance.
*/
public class GetSnapshotReply {
private final String id;
- private final byte[] snapshot;
+ private final Snapshot snapshot;
- public GetSnapshotReply(@Nonnull String id, @Nonnull byte[] snapshot) {
+ public GetSnapshotReply(@Nonnull String id, @Nonnull Snapshot snapshot) {
this.id = Preconditions.checkNotNull(id);
this.snapshot = Preconditions.checkNotNull(snapshot);
}
+ "this is OK since this class is merely a DTO and does not process the byte[] internally. "
+ "Also it would be inefficient to create a return copy as the byte[] could be large.")
@Nonnull
- public byte[] getSnapshot() {
+ public Snapshot getSnapshot() {
return snapshot;
}
@Override
public String toString() {
- return "GetSnapshotReply [id=" + id + ", snapshot.length=" + snapshot.length + "]";
+ return "GetSnapshotReply [id=" + id + ", snapshot=" + snapshot + "]";
}
}
protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
private RaftActorRecoverySupport raftActorRecoverySupport;
private RaftActorSnapshotMessageSupport snapshotMessageSupport;
- private final byte[] restoreFromSnapshot;
+ private final Snapshot restoreFromSnapshot;
final CountDownLatch snapshotCommitted = new CountDownLatch(1);
private final Function<Runnable, Void> pauseLeaderFunction;
}
@Override
- public byte[] getRestoreFromSnapshot() {
+ public Snapshot getRestoreFromSnapshot() {
return restoreFromSnapshot;
}
private DataPersistenceProvider dataPersistenceProvider;
private ActorRef roleChangeNotifier;
private RaftActorSnapshotMessageSupport snapshotMessageSupport;
- private byte[] restoreFromSnapshot;
+ private Snapshot restoreFromSnapshot;
private Optional<Boolean> persistent = Optional.absent();
private final Class<A> actorClass;
private Function<Runnable, Void> pauseLeaderFunction;
return self();
}
- public T restoreFromSnapshot(byte[] newRestoreFromSnapshot) {
+ public T restoreFromSnapshot(Snapshot newRestoreFromSnapshot) {
this.restoreFromSnapshot = newRestoreFromSnapshot;
return self();
}
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.commons.lang3.SerializationUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
assertEquals("getId", persistenceId, reply.getId());
- Snapshot replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
+ Snapshot replySnapshot = reply.getSnapshot();
assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
assertEquals("getLastAppliedIndex", 1L, replySnapshot.getLastAppliedIndex());
verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(anyObject(), anyObject());
assertEquals("getId", persistenceId, reply.getId());
- replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
+ replySnapshot = reply.getSnapshot();
assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
assertEquals("getLastAppliedIndex", -1L, replySnapshot.getLastAppliedIndex());
InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId);
TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
- .config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props()
+ .config(config).restoreFromSnapshot(snapshot).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
persistenceId = factory.generateActorId("test-actor-");
raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
- .config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot))
+ .config(config).restoreFromSnapshot(snapshot)
.persistent(Optional.of(Boolean.FALSE)).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
mockRaftActor = raftActorRef.underlyingActor();
new MockRaftActorContext.MockPayload("B")));
TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
- .config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props()
+ .config(config).restoreFromSnapshot(snapshot).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
private final ShardDataTree store;
private final String shardName;
private final Logger log;
- private final byte[] restoreFromSnapshot;
+ private final Snapshot restoreFromSnapshot;
private boolean open;
- ShardRecoveryCoordinator(final ShardDataTree store, final byte[] restoreFromSnapshot, final String shardName,
+ ShardRecoveryCoordinator(final ShardDataTree store, final Snapshot restoreFromSnapshot, final String shardName,
final Logger log) {
this.store = Preconditions.checkNotNull(store);
this.shardName = Preconditions.checkNotNull(shardName);
}
@Override
- public byte[] getRestoreFromSnapshot() {
+ public Snapshot getRestoreFromSnapshot() {
return restoreFromSnapshot;
}
@Deprecated
public State deserializePreCarbonSnapshot(byte[] from) {
try {
- return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(from));
+ return new ShardSnapshotState(ShardDataTreeSnapshot.deserializePreCarbon(from));
} catch (IOException e) {
log.error("{}: failed to deserialize snapshot", shardName, e);
throw Throwables.propagate(e);
import com.google.common.base.Preconditions;
import com.google.common.io.ByteSource;
import java.io.IOException;
-import java.io.InputStream;
+import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.util.Optional;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
@Override
public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException {
- try (final InputStream is = snapshotBytes.openStream()) {
- return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(is));
+ try (final ObjectInputStream in = new ObjectInputStream(snapshotBytes.openStream())) {
+ return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(in));
}
}
}
import akka.actor.Props;
import com.google.common.base.Preconditions;
import java.io.IOException;
+import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.Optional;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This is an offload actor, which is given an isolated snapshot of the data tree. It performs the potentially
* @author Robert Varga
*/
public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
- private static final Logger LOG = LoggerFactory.getLogger(ShardSnapshotActor.class);
-
// Internal message
private static final class SerializeSnapshot {
private final ShardDataTreeSnapshot snapshot;
}
}
- private static void onSerializeSnapshot(final SerializeSnapshot request) {
+ private void onSerializeSnapshot(final SerializeSnapshot request) {
Optional<OutputStream> installSnapshotStream = request.getInstallSnapshotStream();
if (installSnapshotStream.isPresent()) {
- try {
- request.getSnapshot().serialize(installSnapshotStream.get());
+ try (ObjectOutputStream out = new ObjectOutputStream(installSnapshotStream.get())) {
+ request.getSnapshot().serialize(out);
} catch (IOException e) {
// TODO - we should communicate the failure in the CaptureSnapshotReply.
LOG.error("Error serializing snapshot", e);
*/
package org.opendaylight.controller.cluster.datastore.messages;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
/**
* Stores a list of DatastoreSnapshot instances.
super(snapshots);
}
- private Object readResolve() {
+ private Object readResolve() throws IOException, ClassNotFoundException {
List<org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot> snapshots =
new ArrayList<>(size());
for (DatastoreSnapshot legacy: this) {
return new org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList(snapshots);
}
- private List<org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot> fromLegacy(
- List<DatastoreSnapshot.ShardSnapshot> from) {
+ private static List<org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot>
+ fromLegacy(List<DatastoreSnapshot.ShardSnapshot> from) throws IOException, ClassNotFoundException {
List<org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot> snapshots =
new ArrayList<>(from.size());
for (DatastoreSnapshot.ShardSnapshot legacy: from) {
snapshots.add(new org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot(
- legacy.getName(), legacy.getSnapshot()));
+ legacy.getName(), deserialize(legacy.getSnapshot())));
}
return snapshots;
}
+
+ private static org.opendaylight.controller.cluster.raft.persisted.Snapshot deserialize(byte[] bytes)
+ throws IOException, ClassNotFoundException {
+ try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes))) {
+ Snapshot legacy = (Snapshot) ois.readObject();
+
+ org.opendaylight.controller.cluster.raft.persisted.Snapshot.State state = EmptyState.INSTANCE;
+ if (legacy.getState().length > 0) {
+ state = new ShardSnapshotState(ShardDataTreeSnapshot.deserializePreCarbon(legacy.getState()));
+ }
+
+ return org.opendaylight.controller.cluster.raft.persisted.Snapshot.create(
+ state, legacy.getUnAppliedEntries(), legacy.getLastIndex(),
+ legacy.getLastTerm(), legacy.getLastAppliedIndex(), legacy.getLastAppliedTerm(),
+ legacy.getElectionTerm(), legacy.getElectionVotedFor(), legacy.getServerConfiguration());
+ }
+ }
}
import com.google.common.base.Verify;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.ObjectInput;
import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
+import java.io.ObjectOutput;
import java.util.Optional;
import javax.annotation.Nonnull;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
private static final Logger LOG = LoggerFactory.getLogger(AbstractVersionedShardDataTreeSnapshot.class);
@SuppressWarnings("checkstyle:FallThrough")
- static ShardDataTreeSnapshot deserialize(final DataInputStream is) throws IOException {
+ @Deprecated
+ static ShardDataTreeSnapshot deserializePreCarbon(final DataInputStream is) throws IOException {
final PayloadVersion version = PayloadVersion.readFrom(is);
switch (version) {
case BORON:
throw new IOException("Encountered unhandled version" + version);
}
+ @SuppressWarnings("checkstyle:FallThrough")
+ static ShardDataTreeSnapshot versionedDeserialize(final ObjectInput in) throws IOException {
+ final PayloadVersion version = PayloadVersion.readFrom(in);
+ switch (version) {
+ case BORON:
+ // Boron snapshots use Java Serialization
+ try {
+ return (ShardDataTreeSnapshot) in.readObject();
+ } catch (ClassNotFoundException e) {
+ LOG.error("Failed to serialize data tree snapshot", e);
+ throw new IOException("Snapshot failed to deserialize", e);
+ }
+ case TEST_FUTURE_VERSION:
+ case TEST_PAST_VERSION:
+ // These versions are never returned and this code is effectively dead
+ break;
+ default:
+ throw new IOException("Invalid payload version in snapshot");
+ }
+
+ // Not included as default in above switch to ensure we get warnings when new versions are added
+ throw new IOException("Encountered unhandled version" + version);
+ }
+
@Override
public final Optional<NormalizedNode<?, ?>> getRootNode() {
return Optional.of(Verify.verifyNotNull(rootNode(), "Snapshot %s returned non-present root node", getClass()));
@Nonnull
abstract PayloadVersion version();
- private void versionedSerialize(final DataOutputStream dos, final PayloadVersion version) throws IOException {
+ private void versionedSerialize(final ObjectOutput out, final PayloadVersion version) throws IOException {
switch (version) {
case BORON:
// Boron snapshots use Java Serialization
- try (ObjectOutputStream oos = new ObjectOutputStream(dos)) {
- oos.writeObject(this);
- }
+ out.writeObject(this);
return;
case TEST_FUTURE_VERSION:
case TEST_PAST_VERSION:
}
@Override
- public void serialize(final OutputStream os) throws IOException {
- try (DataOutputStream dos = new DataOutputStream(os)) {
- final PayloadVersion version = version();
- version.writeTo(dos);
- versionedSerialize(dos, version);
- }
+ public void serialize(final ObjectOutput out) throws IOException {
+ final PayloadVersion version = version();
+ version.writeTo(out);
+ versionedSerialize(out, version);
}
}
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
/**
* Stores a snapshot of the internal state of a data store.
public class DatastoreSnapshot implements Serializable {
private static final long serialVersionUID = 1L;
+ private static final class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private DatastoreSnapshot datastoreSnapshot;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ // For Externalizable
+ }
+
+ Proxy(final DatastoreSnapshot datastoreSnapshot) {
+ this.datastoreSnapshot = datastoreSnapshot;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(datastoreSnapshot.type);
+ out.writeObject(datastoreSnapshot.shardManagerSnapshot);
+
+ out.writeInt(datastoreSnapshot.shardSnapshots.size());
+ for (ShardSnapshot shardSnapshot: datastoreSnapshot.shardSnapshots) {
+ out.writeObject(shardSnapshot);
+ }
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ String type = (String)in.readObject();
+ byte[] shardManagerSnapshot = (byte[]) in.readObject();
+
+ int size = in.readInt();
+ List<ShardSnapshot> shardSnapshots = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ shardSnapshots.add((ShardSnapshot) in.readObject());
+ }
+
+ datastoreSnapshot = new DatastoreSnapshot(type, shardManagerSnapshot, shardSnapshots);
+ }
+
+ private Object readResolve() {
+ return datastoreSnapshot;
+ }
+ }
+
private final String type;
private final byte[] shardManagerSnapshot;
private final List<ShardSnapshot> shardSnapshots;
return shardSnapshots;
}
+ @SuppressWarnings("static-method")
+ private Object writeReplace() {
+ return new Proxy(this);
+ }
+
public static class ShardSnapshot implements Serializable {
private static final long serialVersionUID = 1L;
+ private static final class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private ShardSnapshot shardSnapshot;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ // For Externalizable
+ }
+
+ Proxy(final ShardSnapshot shardSnapshot) {
+ this.shardSnapshot = shardSnapshot;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(shardSnapshot.name);
+ out.writeObject(shardSnapshot.snapshot);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ shardSnapshot = new ShardSnapshot((String)in.readObject(), (Snapshot) in.readObject());
+ }
+
+ private Object readResolve() {
+ return shardSnapshot;
+ }
+ }
+
private final String name;
- private final byte[] snapshot;
+ private final Snapshot snapshot;
- public ShardSnapshot(@Nonnull String name, @Nonnull byte[] snapshot) {
+ public ShardSnapshot(@Nonnull String name, @Nonnull Snapshot snapshot) {
this.name = Preconditions.checkNotNull(name);
this.snapshot = Preconditions.checkNotNull(snapshot);
}
return name;
}
- @SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "Exposes a mutable object stored in a field but "
- + "this is OK since this class is merely a DTO and does not process byte[] internally. "
- + "Also it would be inefficient to create a return copy as the byte[] could be large.")
@Nonnull
- public byte[] getSnapshot() {
+ public Snapshot getSnapshot() {
return snapshot;
}
+
+ @SuppressWarnings("static-method")
+ private Object writeReplace() {
+ return new Proxy(this);
+ }
}
}
package org.opendaylight.controller.cluster.datastore.persisted;
import com.google.common.annotations.Beta;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
+import java.io.ObjectOutput;
import java.util.Optional;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
}
@Override
- public void serialize(OutputStream os) throws IOException {
- try (final DataOutputStream dos = new DataOutputStream(os)) {
- SerializationUtils.serializeNormalizedNode(rootNode, dos);
- }
+ public void serialize(ObjectOutput out) throws IOException {
+ SerializationUtils.serializeNormalizedNode(rootNode, out);
}
}
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.util.Optional;
import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
}
@Deprecated
- public static ShardDataTreeSnapshot deserialize(final byte[] bytes) throws IOException {
+ public static ShardDataTreeSnapshot deserializePreCarbon(final byte[] bytes) throws IOException {
/**
* Unfortunately versions prior to Boron did not include any way to evolve the snapshot format and contained
* only the raw data stored in the datastore. Furthermore utilities involved do not check if the array is
try {
try (InputStream is = new ByteArrayInputStream(bytes)) {
- return deserialize(is);
+ try (DataInputStream dis = new DataInputStream(is)) {
+ final ShardDataTreeSnapshot ret = AbstractVersionedShardDataTreeSnapshot.deserializePreCarbon(dis);
+
+ // Make sure we consume all bytes, otherwise something went very wrong
+ final int bytesLeft = dis.available();
+ if (bytesLeft != 0) {
+ throw new IOException("Deserialization left " + bytesLeft + " in the buffer");
+ }
+
+
+ return ret;
+ }
}
} catch (IOException e) {
LOG.debug("Failed to deserialize versioned stream, attempting pre-Lithium ProtoBuf", e);
}
}
- public static ShardDataTreeSnapshot deserialize(final InputStream is) throws IOException {
- try (DataInputStream dis = new DataInputStream(is)) {
- final ShardDataTreeSnapshot ret = AbstractVersionedShardDataTreeSnapshot.deserialize(dis);
+ public static ShardDataTreeSnapshot deserialize(final ObjectInput in) throws IOException {
+ final ShardDataTreeSnapshot ret = AbstractVersionedShardDataTreeSnapshot.versionedDeserialize(in);
- // Make sure we consume all bytes, otherwise something went very wrong
- final int bytesLeft = dis.available();
- if (bytesLeft != 0) {
- throw new IOException("Deserialization left " + bytesLeft + " in the buffer");
- }
+ // Make sure we consume all bytes, otherwise something went very wrong
+ final int bytesLeft = in.available();
+ if (bytesLeft != 0) {
+ throw new IOException("Deserialization left " + bytesLeft + " in the buffer");
+ }
- return ret;
- }
+ return ret;
}
/**
*/
public abstract Optional<NormalizedNode<?, ?>> getRootNode();
- public abstract void serialize(OutputStream os) throws IOException;
+ public abstract void serialize(ObjectOutput out) throws IOException;
@Deprecated
private static boolean isLegacyStream(final byte[] bytes) {
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Externalizable;
import java.io.IOException;
-import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import java.io.OutputStream;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
@Override
public void writeExternal(final ObjectOutput out) throws IOException {
- snapshotState.snapshot.serialize(toOutputStream(out));
- }
-
- private static OutputStream toOutputStream(final ObjectOutput out) {
- if (out instanceof OutputStream) {
- return (OutputStream) out;
- }
-
- return new OutputStream() {
- @Override
- public void write(final int value) throws IOException {
- out.write(value);
- }
- };
+ snapshotState.snapshot.serialize(out);
}
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
- snapshotState = new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(toInputStream(in)));
- }
-
- private static InputStream toInputStream(final ObjectInput in) {
- if (in instanceof InputStream) {
- return (InputStream) in;
- }
-
- return new InputStream() {
- @Override
- public int read() throws IOException {
- return in.read();
- }
- };
+ snapshotState = new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(in));
}
private Object readResolve() {
*/
package org.opendaylight.controller.cluster.datastore;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import java.io.File;
import java.io.FileOutputStream;
-import java.util.ArrayList;
-import java.util.List;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.Objects;
import org.apache.commons.lang3.SerializationUtils;
import org.junit.After;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
-import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList;
+import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerSnapshot;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
+import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+
/**
* Unit tests for DatastoreSnapshotRestore.
public void test() throws Exception {
assertTrue("Failed to mkdir " + restoreDirectoryPath, restoreDirectoryFile.mkdirs());
- List<ShardSnapshot> shardSnapshots = new ArrayList<>();
- shardSnapshots.add(new ShardSnapshot("cars", new byte[]{1,2}));
- shardSnapshots.add(new ShardSnapshot("people", new byte[]{3,4}));
- final DatastoreSnapshot configSnapshot = new DatastoreSnapshot("config", null, shardSnapshots);
+ final DatastoreSnapshot configSnapshot = new DatastoreSnapshot("config",
+ SerializationUtils.serialize(newShardManagerSnapshot("config-one", "config-two")),
+ Arrays.asList(new DatastoreSnapshot.ShardSnapshot("config-one", newSnapshot(CarsModel.BASE_PATH,
+ CarsModel.newCarsNode(CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima",
+ BigInteger.valueOf(20000L)),CarsModel.newCarEntry("sportage",
+ BigInteger.valueOf(30000L)))))),
+ new DatastoreSnapshot.ShardSnapshot("config-two", newSnapshot(PeopleModel.BASE_PATH,
+ PeopleModel.emptyContainer()))));
- shardSnapshots = new ArrayList<>();
- shardSnapshots.add(new ShardSnapshot("cars", new byte[]{5,6}));
- shardSnapshots.add(new ShardSnapshot("people", new byte[]{7,8}));
- shardSnapshots.add(new ShardSnapshot("bikes", new byte[]{9,0}));
- DatastoreSnapshot operSnapshot = new DatastoreSnapshot("oper", null, shardSnapshots);
+ DatastoreSnapshot operSnapshot = new DatastoreSnapshot("oper",
+ null, Arrays.asList(new DatastoreSnapshot.ShardSnapshot("oper-one", newSnapshot(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)))));
- DatastoreSnapshotList snapshotList = new DatastoreSnapshotList();
- snapshotList.add(configSnapshot);
- snapshotList.add(operSnapshot);
+ DatastoreSnapshotList snapshotList = new DatastoreSnapshotList(Arrays.asList(configSnapshot, operSnapshot));
try (FileOutputStream fos = new FileOutputStream(backupFile)) {
SerializationUtils.serialize(snapshotList, fos);
DatastoreSnapshotRestore instance = DatastoreSnapshotRestore.instance(restoreDirectoryPath);
- verifySnapshot(configSnapshot, instance.getAndRemove("config"));
- verifySnapshot(operSnapshot, instance.getAndRemove("oper"));
+ assertDatastoreSnapshotEquals(configSnapshot, instance.getAndRemove("config"));
+ assertDatastoreSnapshotEquals(operSnapshot, instance.getAndRemove("oper"));
assertNull("DatastoreSnapshot was not removed", instance.getAndRemove("config"));
assertNull("Expected null DatastoreSnapshot", instance.getAndRemove("oper"));
}
- private static void verifySnapshot(DatastoreSnapshot expected, DatastoreSnapshot actual) {
+ private static void assertDatastoreSnapshotEquals(DatastoreSnapshot expected, DatastoreSnapshot actual) {
assertNotNull("DatastoreSnapshot is null", actual);
assertEquals("getType", expected.getType(), actual.getType());
assertTrue("ShardManager snapshots don't match", Objects.deepEquals(expected.getShardManagerSnapshot(),
for (int i = 0; i < expected.getShardSnapshots().size(); i++) {
assertEquals("ShardSnapshot " + (i + 1) + " name", expected.getShardSnapshots().get(i).getName(),
actual.getShardSnapshots().get(i).getName());
- assertArrayEquals("ShardSnapshot " + (i + 1) + " snapshot",
+ assertSnapshotEquals("ShardSnapshot " + (i + 1) + " snapshot",
expected.getShardSnapshots().get(i).getSnapshot(), actual.getShardSnapshots().get(i).getSnapshot());
}
}
+
+ private static void assertSnapshotEquals(String prefix, Snapshot expected, Snapshot actual) {
+ assertEquals(prefix + " lastIndex", expected.getLastIndex(), actual.getLastIndex());
+ assertEquals(prefix + " lastTerm", expected.getLastTerm(), actual.getLastTerm());
+ assertEquals(prefix + " lastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex());
+ assertEquals(prefix + " lastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
+ assertEquals(prefix + " unAppliedEntries", expected.getUnAppliedEntries(), actual.getUnAppliedEntries());
+ assertEquals(prefix + " electionTerm", expected.getElectionTerm(), actual.getElectionTerm());
+ assertEquals(prefix + " electionVotedFor", expected.getElectionVotedFor(), actual.getElectionVotedFor());
+ assertEquals(prefix + " Root node", ((ShardSnapshotState)expected.getState()).getSnapshot().getRootNode(),
+ ((ShardSnapshotState)actual.getState()).getSnapshot().getRootNode());
+ }
+
+ private static ShardManagerSnapshot newShardManagerSnapshot(String... shards) {
+ return ShardManagerSnapshot.forShardList(Arrays.asList(shards));
+ }
+
+ private static Snapshot newSnapshot(YangInstanceIdentifier path, NormalizedNode<?, ?> node)
+ throws Exception {
+ DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
+ dataTree.setSchemaContext(SchemaContextHelper.full());
+ AbstractShardTest.writeToStore(dataTree, path, node);
+ NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
+
+ return Snapshot.create(new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
+ Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null);
+ }
}
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.ConfigFactory;
import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.ObjectOutputStream;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null);
- restoreFromSnapshot = new DatastoreSnapshot(name, null,
- Arrays.asList(
- new DatastoreSnapshot.ShardSnapshot("cars",
- org.apache.commons.lang3.SerializationUtils.serialize(carsSnapshot)),
- new DatastoreSnapshot.ShardSnapshot("people",
- org.apache.commons.lang3.SerializationUtils.serialize(peopleSnapshot))));
+ restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
+ new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot),
+ new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot)));
try (AbstractDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
true, "cars", "people")) {
AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
+ MetadataShardDataTreeSnapshot shardSnapshot = new MetadataShardDataTreeSnapshot(root);
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
- new MetadataShardDataTreeSnapshot(root).serialize(bos);
+ try (final DataOutputStream dos = new DataOutputStream(bos)) {
+ PayloadVersion.BORON.writeTo(dos);
+ try (ObjectOutputStream oos = new ObjectOutputStream(dos)) {
+ oos.writeObject(shardSnapshot);
+ }
+ }
+
final org.opendaylight.controller.cluster.raft.Snapshot snapshot =
org.opendaylight.controller.cluster.raft.Snapshot.create(bos.toByteArray(),
Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null);
import akka.testkit.JavaTestKit;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
import java.util.Optional;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
assertEquals("Snapshot", snapshot, ((ShardSnapshotState)reply.getSnapshotState()).getSnapshot());
if (installSnapshotStream != null) {
- final ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(
- new ByteArrayInputStream(installSnapshotStream.toByteArray()));
+ final ShardDataTreeSnapshot deserialized;
+ try (final ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(
+ installSnapshotStream.toByteArray()))) {
+ deserialized = ShardDataTreeSnapshot.deserialize(in);
+ }
+
assertEquals("Deserialized snapshot type", snapshot.getClass(), deserialized.getClass());
final Optional<NormalizedNode<?, ?>> maybeNode = deserialized.getRootNode();
*/
package org.opendaylight.controller.cluster.datastore.messages;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Optional;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerSnapshot;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
/**
public class DatastoreSnapshotListTest {
@Test
public void testSerialization() throws Exception {
+ NormalizedNode<?, ?> legacyConfigRoot1 = toRootNode(CarsModel.BASE_PATH,
+ CarsModel.newCarsNode(CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima",
+ BigInteger.valueOf(20000L)),CarsModel.newCarEntry("sportage",
+ BigInteger.valueOf(30000L)))));
+
+ NormalizedNode<?, ?> legacyConfigRoot2 = toRootNode(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+
DatastoreSnapshot legacyConfigSnapshot = new DatastoreSnapshot("config",
SerializationUtils.serialize(newLegacyShardManagerSnapshot("config-one", "config-two")),
- Arrays.asList(newLegacyShardSnapshot("config-one", newLegacySnapshot(CarsModel.BASE_PATH,
- CarsModel.newCarsNode(CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima",
- BigInteger.valueOf(20000L)),CarsModel.newCarEntry("sportage",
- BigInteger.valueOf(30000L)))))),
- newLegacyShardSnapshot("config-two", newLegacySnapshot(PeopleModel.BASE_PATH,
- PeopleModel.emptyContainer()))));
+ Arrays.asList(newLegacyShardSnapshot("config-one", newLegacySnapshot(legacyConfigRoot1)),
+ newLegacyShardSnapshot("config-two", newLegacySnapshot(legacyConfigRoot2))));
DatastoreSnapshot legacyOperSnapshot = new DatastoreSnapshot("oper",
- null, Arrays.asList(newLegacyShardSnapshot("oper-one", newLegacySnapshot(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME)))));
+ null, Arrays.asList(newLegacyShardSnapshot("oper-one", newLegacySnapshot(null))));
DatastoreSnapshotList legacy = new DatastoreSnapshotList(Arrays.asList(legacyConfigSnapshot,
legacyOperSnapshot));
SerializationUtils.clone(legacy);
assertEquals("DatastoreSnapshotList size", 2, cloned.size());
- assertDatastoreSnapshotEquals(legacyConfigSnapshot, cloned.get(0));
- assertDatastoreSnapshotEquals(legacyOperSnapshot, cloned.get(1));
+ assertDatastoreSnapshotEquals(legacyConfigSnapshot, cloned.get(0), Optional.of(legacyConfigRoot1),
+ Optional.of(legacyConfigRoot2));
+ assertDatastoreSnapshotEquals(legacyOperSnapshot, cloned.get(1), Optional.empty());
}
private void assertDatastoreSnapshotEquals(DatastoreSnapshot legacy,
- org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot actual) {
+ org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot actual,
+ Optional<NormalizedNode<?, ?>>... shardRoots) throws IOException {
assertEquals("Type", legacy.getType(), actual.getType());
if (legacy.getShardManagerSnapshot() == null) {
actualShardSnapshot = actual.getShardSnapshots().get(i);
assertEquals("Shard name", legacyShardSnapshot.getName(), actualShardSnapshot.getName());
assertSnapshotEquals((Snapshot) SerializationUtils.deserialize(legacyShardSnapshot.getSnapshot()),
- (Snapshot) SerializationUtils.deserialize(actualShardSnapshot.getSnapshot()));
+ shardRoots[i], actualShardSnapshot.getSnapshot());
}
}
- private static void assertSnapshotEquals(Snapshot expected, Snapshot actual) {
+ private static void assertSnapshotEquals(Snapshot expected, Optional<NormalizedNode<?, ?>> expRoot,
+ org.opendaylight.controller.cluster.raft.persisted.Snapshot actual) throws IOException {
assertEquals("lastIndex", expected.getLastIndex(), actual.getLastIndex());
assertEquals("lastTerm", expected.getLastTerm(), actual.getLastTerm());
assertEquals("lastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex());
assertEquals("unAppliedEntries", expected.getUnAppliedEntries(), actual.getUnAppliedEntries());
assertEquals("electionTerm", expected.getElectionTerm(), actual.getElectionTerm());
assertEquals("electionVotedFor", expected.getElectionVotedFor(), actual.getElectionVotedFor());
- assertArrayEquals("state", expected.getState(), actual.getState());
+
+ if (expRoot.isPresent()) {
+ ShardDataTreeSnapshot actualSnapshot = ((ShardSnapshotState)actual.getState()).getSnapshot();
+ assertEquals("ShardDataTreeSnapshot type", MetadataShardDataTreeSnapshot.class, actualSnapshot.getClass());
+ assertTrue("Expected root node present", actualSnapshot.getRootNode().isPresent());
+ assertEquals("Root node", expRoot.get(), actualSnapshot.getRootNode().get());
+ } else {
+ assertEquals("State type", EmptyState.class, actual.getState().getClass());
+ }
}
private static ShardManagerSnapshot newLegacyShardManagerSnapshot(String... shards) {
return new DatastoreSnapshot.ShardSnapshot(name, SerializationUtils.serialize(snapshot));
}
- private static Snapshot newLegacySnapshot(YangInstanceIdentifier path, NormalizedNode<?, ?> node)
+ private static Snapshot newLegacySnapshot(NormalizedNode<?, ?> root)
throws Exception {
- DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
- dataTree.setSchemaContext(SchemaContextHelper.full());
- AbstractShardTest.writeToStore(dataTree, path, node);
- NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
-
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
- new MetadataShardDataTreeSnapshot(root).serialize(bos);
+ if (root != null) {
+ MetadataShardDataTreeSnapshot snapshot = new MetadataShardDataTreeSnapshot(root);
+ try (final DataOutputStream dos = new DataOutputStream(bos)) {
+ PayloadVersion.BORON.writeTo(dos);
+ try (ObjectOutputStream oos = new ObjectOutputStream(dos)) {
+ oos.writeObject(snapshot);
+ }
+ }
+ }
+
return Snapshot.create(bos.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1,
"member-1", null);
}
+
+ private static NormalizedNode<?, ?> toRootNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node)
+ throws DataValidationFailedException {
+ DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
+ dataTree.setSchemaContext(SchemaContextHelper.full());
+ AbstractShardTest.writeToStore(dataTree, path, node);
+ return AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
+ }
}
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
+import java.io.ObjectInputStream;
import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
import java.util.Map;
import java.util.Optional;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
MetadataShardDataTreeSnapshot snapshot = new MetadataShardDataTreeSnapshot(expectedNode);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- snapshot.serialize(bos);
+ try (final ObjectOutputStream out = new ObjectOutputStream(bos)) {
+ snapshot.serialize(out);
+ }
- ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(
- new ByteArrayInputStream(bos.toByteArray()));
+ ShardDataTreeSnapshot deserialized;
+ try (final ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) {
+ deserialized = ShardDataTreeSnapshot.deserialize(in);
+ }
Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
assertEquals("rootNode present", true, actualNode.isPresent());
ImmutableMap.of(TestShardDataTreeSnapshotMetadata.class, new TestShardDataTreeSnapshotMetadata("test"));
MetadataShardDataTreeSnapshot snapshot = new MetadataShardDataTreeSnapshot(expectedNode, expMetadata);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- snapshot.serialize(bos);
+ try (final ObjectOutputStream out = new ObjectOutputStream(bos)) {
+ snapshot.serialize(out);
+ }
- ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(
- new ByteArrayInputStream(bos.toByteArray()));
+ ShardDataTreeSnapshot deserialized;
+ try (final ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) {
+ deserialized = ShardDataTreeSnapshot.deserialize(in);
+ }
Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
assertEquals("rootNode present", true, actualNode.isPresent());
.withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
.withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- PreBoronShardDataTreeSnapshot snapshot = new PreBoronShardDataTreeSnapshot(expectedNode);
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- snapshot.serialize(bos);
+ byte[] serialized = SerializationUtils.serializeNormalizedNode(expectedNode);
- ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(bos.toByteArray());
+ ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserializePreCarbon(serialized);
Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
assertEquals("rootNode present", true, actualNode.isPresent());
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
kit.watch(replyActor);
- byte[] shard1Snapshot = new byte[]{1,2,3};
+ ByteState shard1SnapshotState = ByteState.of(new byte[]{1,2,3});
replyActor.tell(new GetSnapshotReply(ShardIdentifier.create("shard1", MEMBER_1, "config").toString(),
- shard1Snapshot), ActorRef.noSender());
+ Snapshot.create(shard1SnapshotState, Collections.<ReplicatedLogEntry>emptyList(),
+ 2, 1, 2, 1, 1, "member-1", null)), ActorRef.noSender());
- byte[] shard2Snapshot = new byte[]{4,5,6};
+ ByteState shard2SnapshotState = ByteState.of(new byte[]{4,5,6});
replyActor.tell(new GetSnapshotReply(ShardIdentifier.create("shard2", MEMBER_1, "config").toString(),
- shard2Snapshot), ActorRef.noSender());
+ Snapshot.create(shard2SnapshotState, Collections.<ReplicatedLogEntry>emptyList(),
+ 2, 1, 2, 1, 1, "member-1", null)), ActorRef.noSender());
kit.expectNoMsg(FiniteDuration.create(500, TimeUnit.MILLISECONDS));
- byte[] shard3Snapshot = new byte[]{7,8,9};
+ ByteState shard3SnapshotState = ByteState.of(new byte[]{7,8,9});
replyActor.tell(new GetSnapshotReply(ShardIdentifier.create("shard3", MEMBER_1, "config").toString(),
- shard3Snapshot), ActorRef.noSender());
+ Snapshot.create(shard3SnapshotState, Collections.<ReplicatedLogEntry>emptyList(),
+ 2, 1, 2, 1, 1, "member-1", null)), ActorRef.noSender());
DatastoreSnapshot datastoreSnapshot = kit.expectMsgClass(DatastoreSnapshot.class);
List<ShardSnapshot> shardSnapshots = datastoreSnapshot.getShardSnapshots();
assertEquals("ShardSnapshot size", 3, shardSnapshots.size());
assertEquals("ShardSnapshot 1 getName", "shard1", shardSnapshots.get(0).getName());
- assertArrayEquals("ShardSnapshot 1 getSnapshot", shard1Snapshot, shardSnapshots.get(0).getSnapshot());
+ assertEquals("ShardSnapshot 1 getSnapshot", shard1SnapshotState,
+ shardSnapshots.get(0).getSnapshot().getState());
assertEquals("ShardSnapshot 2 getName", "shard2", shardSnapshots.get(1).getName());
- assertArrayEquals("ShardSnapshot 2 getSnapshot", shard2Snapshot, shardSnapshots.get(1).getSnapshot());
+ assertEquals("ShardSnapshot 2 getSnapshot", shard2SnapshotState,
+ shardSnapshots.get(1).getSnapshot().getState());
assertEquals("ShardSnapshot 3 getName", "shard3", shardSnapshots.get(2).getName());
- assertArrayEquals("ShardSnapshot 3 getSnapshot", shard3Snapshot, shardSnapshots.get(2).getSnapshot());
+ assertEquals("ShardSnapshot 3 getSnapshot", shard3SnapshotState,
+ shardSnapshots.get(2).getSnapshot().getState());
kit.expectMsgClass(Terminated.class);
}
kit.watch(replyActor);
replyActor.tell(new GetSnapshotReply(ShardIdentifier.create("shard1", MEMBER_1, "config").toString(),
- new byte[]{1,2,3}), ActorRef.noSender());
+ Snapshot.create(ByteState.of(new byte[]{1,2,3}), Collections.<ReplicatedLogEntry>emptyList(),
+ 2, 1, 2, 1, 1, "member-1", null)), ActorRef.noSender());
replyActor.tell(new Failure(new RuntimeException()), ActorRef.noSender());